Continuing from yesterday’s talks here are my summary of the second and last day of NoSQL East.
Pig—Kevin Weil—Twitter
Data is getting big. NYSE produces 1TB of data every day, Facebook produces 20TB+ of compressed data each day, and CERN produces 40TB each day (15PB each year). This creates a need for multiple machines and horizontal scalability.
Hadoop is two things: a distributed file system and a map/reduce framework for parallel computation. The file system is fault-tolerant, replicated, and handles node failure. If it’s hard to store a PB it’s even harder to compute a PB. Yahoo is the largest user of Hadoop.
Map/reduce at twitter: how many tweets per user, given tweets table?
Input: key=row, value=tweet_info. Map: output key=user_id, value=1. Shuffle: sort by user_id (so that one can use more than one reducer). Reduce: for each user_id, sum. Output: user_id, tweet count.
With 2x machines this job runs just about 2x faster.
The problem with Hadoop is that analysis is typically written in Java. It’s single-input, two-stage data flow (map, then reduce) is rigid. Joins (distributed) are also hard to do in Hadoop. Java is simply to verbose for writing map/reduce jobs. Lastly, rapid prototyping and exploration is hindered by compilation.
Pig is a high level language that can be easily read. You think about data as transformations on sets of records. You do one transformation at a time so that you just have to think about the next step. This leads to fewer errors. Pig helps Twitter understand their business faster.
|
Pig makes it easier for non programmers to do large-scale data analysis. It’s easily learnable for people who know SQL. An example analytic script from Twitter is 5% the code and took 5% of the time of writing compared to straight Hadoop. Execution time is roughly 50% of Hadoop.
At Twitter they run Cloudera’s free distribution of Hadoop 0.20.1, with a heavily modified Scribe installation for log collection straight to HDFS, and heavily modified LZO code for fast and splittable data compression. Data is then stored as either LZO-compressed flat files or serialized, LZO-compressed protocol buffers.
Semi structured data at Twitter are: apache logs, search logs, RoR logs, MySQL logs, rate limiter logs, per-application logs. Structured data are: tweets, users, block notifications, phones, saved searches, retweets, authentication, SMS usage, third party clients, followings. Entangled data: the social graph (doesn’t really lend itself to be map/reduced).
Pig is good at counting big data with standard counts, min, max, std dev. “What get measured gets improved”. Examples at Twitter are:
- How many request do we serve each day.
- What is the average latency? 95% latency?
- Grouped by response code: what is the hourly distribution?
- How many searches happen each day at Twitter?
- Where do they come from?
- How many unique queries?
- How many unique users?
- Geographic distribution?
Correlating big data with probabilities, covariance, influence. Examples at Twitter are:
- How does usage differ for mobile users?
- How does usage differ for 3rd party desktop client users?
- Cohort analysis: all users who signed up on the same day—then see how they differ over time.
- Site problems: what goes wrong at the same time?
- Which features get users hooked?
- Which features do successful users use often?
- Search corrections and suggestions (not done now at Twitter, but coming in the feature).
- A/B testing.
Research on Big data: prediction, graph analysis, natural language. Examples at Twitter are:
- What can web tell about a user from their tweets?
- What can we tell about you from the tweets of those you follow?
- What can we tell about you from the tweets of your followers?
- What can we tell about you from the ratio of your followers/following?
- What graph structures lead to successful networks? (Twitter’s graph structure is interesting since it’s not two-way)
- Sentiment analysis (you can predict the success of a movie by following its mentions on Twitter the first days after its release).
- What features get a tweet retweeted?
- When a tweet is retweeted, how deep is the corresponding retweet three?
- Long-term duplicate detection (short term for abuse and stopping spammers)
- Machine learning. About not quite knowing the right questions to ask at first. How do we cluster users?
- Language detection (contact mobile providers to get SMS deals for users—focusing on the most popular countries at first).
- How can we detect bots and other non-human tweeters?
Cascading—Chris Curtin—Silverpop
Cascading is another layer on top of Hadoop. “Pig makes the easy things really easy. Cascading makes the hard stuff possible”. Unlike Pig you write Cascading jobs in Java.
The challenge when starting with map/reduce was to find any real examples of its use. Secondly, when do you map and when do you reduce? Thirdly, what do you do if you need more than one map/reduce pair? Lastly, what do you do with non-trivial business logic that you can’t rewrite?
Cascading has four main concepts:
- Tuple: a single row of that is being processed where each column is named (can be accessed by name or position).
- Operations: defines what you do on the data. Each (for each tuple), Group (like group by from SQL), and Every (for every key in the Group). The output of an operation is a new tuple.
- Pipes: ties the operations together, a tuple stream. The Pipes can be split so that Operations can be executed in parallel.
- Flows: reusable combinations of Taps, Pipes, and Operations. A way to build libraries of collected functions.
|
Once one have defined all the Flows and Cascades the Cascading scheduler looks for dependencies. It tells Hadoop what map, reduce, or shuffle operations to start. Cascading also allows the use of dynamic Flows which can be created at run time. The same dynamicallity applies to tuple definitions. Cascading allows the use of regular Java code in between Flows so that you can call out to databases or write intermediate files.
Pros of Cascading:
- Great to mix Java between the map/reduce steps.
- You don’t have to worry about when to map and when to reduce.
- You don’t have to think about dependencies.
- The data definitions can change on the fly.
Cons of Cascading:
- The level above Hadoop is sometimes like black magic. Hard to debug at times.
- Data should be outside of a database to get high concurrency.
Neo4j—Emil Eifrem—Neo Technology
Why is 2009 the year when the NoSQL “movement” started?
- Trend 1: Increase in data set size. In 1997 40 exabytes of new unique electronic data was generated that year. In 2010 it’s estimated that 988 exabytes will be created.
- Trend 2: Higher degree of connectedness. From text documents, to hypertext, to RSS, to Wikis, to blogs, to tagging, to user generated content, to RDF, to folksonomies, to Ontologies.
- Trend 3: Semi-structure. Individualization of content.
- Trend 4: Architecture. From databases as an integration hub for shared data, to decoupled services with its own backend.
What is the NoSQL space? NoSQL is not “Never SQL” but “Not Only SQL”. There are four emerging NoSQL categories:
- Key value stores which are based on the Dynamo paper (Dynomite, Voldemort, Tokyo). Poor data model but they scale well for data size.
- BigTable clones with a slightly more sophisticated data model with column families (Hbase, Hypertable). These scales slightly less to size than pure key/value stores.
- Document databases inspired by Lotus Notes (at least in theory). Key value stores with documents as values. Scales fairly well to data size and data complexity.
- Graph databases which have nodes and relationships and arbitrary key/value pairs which can connect to those. Scales well for data complexity.
Neo4j is a graph database which uses nodes, and relationships between the nodes to represent data. Graph databases has three core abstractions:
- Nodes.
- Relationships between nodes (can be both directed and bidirectional).
- Data on nodes and relationships (arbitrary number of key/value pairs).
One first defines a node, then its data value, and its relationship to other nodes, and optionally the data value of these relationships. Every operation in Neo4j must happen inside an transaction.
Graph databases are white board friendly. Domain experts seem to draw graphs when they explain their model. It also maps cleanly to the cognitive model of programmers.
How you do get stuff out? By traversal. One can traverse on certain relation types, and go breath or depth first until one reaches the end of the graph or a certain depth. One can also define to follow relationships that goes in a certain direction.
How do you then write the domain model on top of a graph database? Each domain model wraps a node. This is possible since Neo4j is an embedded database (no read/write step).
Ne4j is disk based with a custom binary disk format. It’s also transactional with JTA/JTS, XA, 2PC, Tx recovery, deadlock detection, MVCC.
Scales up to one box to several billions of nodes/relationships/props on a single JVM. If you hit this limit you’ll have to start partitioning your data.
Neo4j is robust and has been deployed in production since 2003.
Social network pathExists() in MySQL for 1000 people takes 2000ms.
The same lookup takes 2ms in Neo4j. 1 000 000 people are processed in
2ms as well.
Pros compared to RDBMS:
- No O/R impedance mismatch.
- Can easily evolve schemas (organically grow the graph).
- Can represent semi structured info
- Can represent graphs/networks (with performance).
Cons:
- Lacks in tool and framework support.
- Few other implementations means a potential for lock in.
Just recently implemented SPARQL support.
Less then 500k jar file. All the richness are componentized as addons.
Working on replication support for scaling out. Today one can use the backup utility with low increments to get almost synchronous replication. Will be master-slave based, but all slaves can write. Synchronously replicated between slave and master. This can handle billions of entries, but not 100 billion.
Sharding is possible today, but you have to do manual work. Working on automatic sharding by using clustering algorithm to figure out which parts of the graph could be sharded out.
AGPL, but below 1M primitives is free for all.
Redis—Kevin Smith—Hypothetical Labs
Redis is a lot like memcached with some interesting twists. It’s a key/value store backed by an in-memory database. This makes it really fast. Can do 19,600 gets and 13,900 sets a second on a MacBook Pro. Redis is single threaded, but it uses non-blocking IO. The client libraries tends to implement consistent hashing which makes horizontal scaling easy.
Redis has a simple protocol and easy to understand data model. It does not have a sophisticated persistence engine, asynchronously writes to disk which somewhat makes it more like a cache than a database. It does have very basic master slave replication support, but more advanced replication is in the works.
The data types of Redis are Strings (but can be used as incremented counters), lists, sets, and skip lists (coming soon). All operations on these data types are atomic.
With strings you can build general purpose caches and primitive message queues. Using lists one can create more advanced message queues. Lists are very efficient compared to storing serialized values with delimiters in an traditional key/value store without specific value types (mostly due to reduced network I/O).
The Redis asynchronous persistence performs a background save every X changes in Y seconds. One can also use the SAVE/BGSAVE commands for explicitly saving to disk. Internally all Redis is doing is dumping memory to a file descriptor.
Replication is just dumping to another file descriptor. Can be configured at boot up or activated at run time. It’s important to only write to the master in a master-slave replicated setup.
Redis is good for caches, statistics, work queues, social meta data like click tracking—where it’s not to bad if you loose some of the data.
Sherpa—John Corwin—Yahoo!
Sherpa is a collaboration between Yahoo’s cloud computing and research group. Yahoo has over 100 properties. 11% of all time people spend on the Internet is spent on some of Yahoo’s properties.
Sherpa is a distributed key/value store which is a bit different than the Dynamo style solutions. It does geographical distribution with low latency local access. It scales to thousands of servers and it’s easy to add new servers (elastic). It automatically recover from failures and can serve reads and writes if servers fail. You can access your data as a hash table or ordered table.
The data values are JSON objects of fields and meta data (eg. seq ID, expiration). The API is RESTful with GET, SET, and DELETE in addition to a scan functions (table and range).
The Sherpa architecture consists of a Tablet controller which maps from database.table.key to storage unit. It also provides load balancing. There are a set of routers which caches the mappings from the Tablet controller. At the bottom of the stack sits a bunch of storage nodes. At a global level sits a messaging layer which sends requests to the correct data center.
Sherpa has been in production for 6 months now. Yahoo! Mail for instance uses it in its spam protection utility. The cloud group would really like to open source Sherpa, but unfortunately it’s built on other internal Yahoo! architecture. They are going to open source it piece by piece.
The core of the system is in C++, with glue scripts in PHP, Perl, and Python.

