There are so many APIs to choose from…
Features of the space:
Automatic data placement: It’s a hard problem, and its been hard for a long time – but technology is changing: networks are fast relative to the size of memory. You can move a significant fraction of memory in a cluster in relatively little time. “Disk is the new tape”: we want to do work in-memory for that 1000x speedup over disk, but this requires loading memory into one of many little slices in the cluster – which implies data-placement. Start with random placement – while it’s never perfect, it’s also rarely “perfectly wrong” – you get consistent decent access patterns. Then add local caching to catch the hot common read blocks, and local caching of hot or streaming writes. For H2O, this is one of the reasons for our K/V store; we get full JMM semantics, exact consistency, but also raw speed: 150ns for a cache-hitting read or write. Typically a cache miss is a few msec (1 network hop there and back).
Map/Reduce: It’s a simple paradigm shown to scale well. Much of Big Data involves some kind of structure (log files, bit/byte streams, up to well organized SQL/Hive/DB files). Map is very generic, and allows an arbitrary function on a unit of work/data to be easily scaled. Reduce brings Big down to Small in a logarithmic number of steps. Together, they handle a lot of problems. Key to making this work: simplicity in the API. Map converts an A to a B, and Reduce combines two B’s into one B – for any arbitrary A and B. No resource management, no counting Map or Reduce slots, no shuffle, no keys. Just a nice clean Map and Reduce. For H2O this means: Map reads your data directly (type A) and produces results in a Plain-Olde Java POJO’s (type B) – which is also the Map’s “this” pointer. Results returned directly in “this”. “This is Not Your Father’s Map Reduce”
Correct By Default: No multi-threaded access questions. Synchronization, if needed, is provided already. No figuring out sharding or data placement; replication (caching) will happen as-needed. NO resource management, other than Xmx (Java heap size). Like sync, resource management is notoriously hard to get right so don’t require people to do it. For 0xdata, this means we use very fine grained parallelism (no Map is too small, Doug Lea Fork/Join), and very fine-grained Reduces (so all Big Data shrinks as rapidly as possible).
Fast By Default: Use of the default Map/Reduce API will produce programs that run in parallel & distributed across the cluster, at memory bandwidth speeds for both reads and writes. Other clustered / parallel paradigms are available but are not guaranteed to go fast. The API has a simple obvious design, and all calls have a “cost model” associated with them (these calls are guaranteed fast, these calls are only fast in these situations, these calls will work but may be slow, etc. For 0xdata, code that accesses any number of columns at once (e.g. a single row) – but independent rows – will run at memory bandwidth speeds. Same for writing any number of columns, including writing subsets (filtering) on rows. Reductions will happen every so many Maps in a Log-tree fashion. All filter results are guaranteed to be strongly ordered as well (despite the distributed & parallel execution).
Easy / Multiple APIs – Not all APIs are for all users! Java & Map/Reduce are good for Java programmers – but people do math in R, Python and a host of other tools. For 0xdata, we are a team of language implementers as well as mathematicians and systems’ engineers. We have implemented a generic REST/JSON API and can drive this API from R, python, bash, and Excel – with the ability to add more clients easily. From inside the JVM, we can drive the system using Scala, or a simple REPL with an R-like syntax.
Lets get a little more concrete here, and bring out the jargon –
Primitives – at the bottom level, the data are a Java primitive – be it a byte, char, long or double. Or at least that’s the presentation. Under the hood we compress aggressively, often seeing 2x to 4x more compression than the GZIP’d file on disk – and we can do math on this compressed form typically at memory bandwidth speeds (i.e., the work to decompress is hidden in the overhead of pulling the data in from memory). We also support the notion of “missing data” – a crucial notion for data scientists. It’s similar to Double.NaN, but for all data types.
A Chunk – The basic unit of parallel work, typically holding 1e3 to 1e6 (compressed) primitives. This data unit is completely hidden, unless you are writing batch-Map calls (where the batching is for efficiency). It’s big enough to hide control overheads when launching parallel tasks, and small enough to be the unit of caching. We promise that Chunks from Vecs being worked on together will be co-located on the same machine.
A Vec – A Distributed Vector. Just like a Java array, but it can hold more than 2e31 elements – limited only by available memory. Usually used to hold data of a single conceptual type, such as a person’s age or IP address or name or last-click-time or balance, etc. This is the main conceptual holder of Big Data, and collections of these will typically make up all your data. Access will be parallel & distributed, and at memory-bandwidth speeds.
A Frame – A Collection of Vecs. Pattered after R’s data.frame (it’s worked very well for more than 20 years!). While Vecs might Big Data (and thus can be expensive to touch all of), Frames are mere pointers to Vecs. We add & drop columns and reorganize them willy-nilly. The data munging side of things has a lot of convenience functions here.
(1) Make a subclass of MRTask2, with POJO Java fields that inherent from Iced, or are primitives, or arrays of either. Why subclassed from Iced? Because the bytecode weaver will inject code to do a number of things, including serialization & JSON display, and code & loop optimizations.
class Calc extends MRTask2<Calc> {
(2) Break out the small-data Inputs to the Map, and initialize them in an instance of your subclass. “Small data” will be replicated across the cluster, and available as read-only copies everywhere. Inputs need to be read-only as they will be shared on each node in the cluster. “Small” needs to fit in memory and my example is with doubles, but mega-byte sized data is cheap & commonly done.
final double mean; // Read-only, shared, distributed
final int maxHisto; // Read-only, shared, distributed
Calc( double meanX, int maxHisto ) { this.meanX=meanX; this.maxHisto = maxHisto; }
(3) Break out the small-data Outputs from your map, and initialize them in the Map call. Because they are initialized in the Map, they are guaranteed thread-local. Because you get a new one for every Map call, they need to be rolled-up in a matching Reduce.
long histogram[];
double sumError;
void map( ... ) {
histogram = new long[maxHisto]; // New private histogram[] object
(4) Break out the Big Data (inputs or outputs). This will be passed to a doAll() call, and every Chunk of the Big Data will get a private cloned instance of the Calc object, distributed across the cluster:
new Calc(mean,vec.max()).doAll(myBigVector /*or Frame or Vec[] or ....*/);
(5) Implement your Map. Here we show a Batching-Map, which typically does a plain “for” loop over all the elements in the Chunk. Access your data with the “at0” (Chunk-relative addressing) call – which is the fastest accessor but requires a chunk (and takes an “int” index). Plain Vec-relative addressing is a little slower, but takes a full “long” index: “vec.at(long idx)”.
void map( Chunk chk ) {
histogram = new long[maxHisto];
for( int i=0; i<chk.len; i++ ) {
histogram[(int)chk.at0(i)]++;
double err = chk.at0(i)-mean;
sumError += err*err;
}
}
(6) Implement a Reduce for any output fields. Note that Reduce has a “type B” in the “this” pointer, and is passed a 2nd one:
void reduce( Calc that ) {
sumError += that.sumError;
// Add the array elements with a simple for-loop... we use this
// simple utility.
histogram = Utils.add(histogram,that.histogram);
}
(7) That’s it! Results are in your original Calc object:
Calc results = new Calc(mean,vec.max()).doAll(myBigVector);
System.out.println(results.sumError+" "+Arrays.toString(histogram));
You have to get the data in there – and we’ll import from HDFS, S3, NFS, local disk, or through your browser’s upload. You can drive data upload from Java, but more typically from R, python, REST/JSON, or Excel. Same for outputing Big Data results: we’ll write back Big Data to any store, while being driven by any of the above languages. If you build a predictive model, you’ll want to eventually use the model in production. You can use it in-memory as-is, scoring new datasets on the model – and for example constantly streaming new data through the model while at the same time constant churning out new models to be streamed through. Or you can get a Java version of any model suitable for dropping into your production environment.
And that’s the end of my whirl-wind tour of the H2O Distributed Computing API. Hope you like it!
Comments & suggestions welcome.
Cliff