I am very happy to announce that Google's MapReduce is now available for Ruby (via
gem install starfish). MapReduce is the technique used by Google to do monstrous distributed programming over 30 terabyte files. I have been
reading about MapReduce recently and thought that it was very exciting for Google to have laid out the ideas that ran Google. I also wondered how they could be applied to everyday applications.
Recently, I gave a talk on
Ridiculously easy ways to distribute processor intensive tasks using Rinda and DRb. This talk came from my work with Rinda recently at
MOG. We use distributed programming to handle real-time processor intensive needs for over 1 million requests a day. We also use it to make large changes or clean up our database. I realized that the plumbing I wrote in Rinda to accomplish these tasks could be abstracted and easily conform to the MapReduce technique.
Before I move on, I will provide a little more background of Google's MapReduce. MapReduce is a C++ library written by Google. There are about 12 MapReduce programs used to create the inverted index of the www that Google uses for searching. The term MapReduce itself refers to map and reduce functions. Joel recently
wrote an article that explains what map a reduce do, so I will refrain from repeating him. One of the parts Joel unfortunately messed up on was this sentence though:
[...] you only have to get one supergenius to write the hard code to run map and reduce on a global massively parallel array of computers, and all the old code that used to work fine when you just ran a loop still works only it's a zillion times faster which means it can be used to tackle huge problems in an instant [...]
Google, nor anyone I know, has written a map function that will "replace" your existing calls to map, like a plugin. In fact, here is some real world MapReduce example code that is used to provide a word count on an arbitrarily sized document:
#include "mapreduce/mapreduce.h"
// User's map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
// User's reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");
// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// Done: 'result' structure contains info
// about counters, time taken, number of
// machines used, etc.
return 0;
}
MapReduce takes a large data set (in this case a large file), divides the file into many different pieces, and lets 2000 machines each count words and aggregate statistics for a small part of that file, aggregating the result together in the end.
One of the parts that stood out to me is how there is a clear separation of how to do the call to map and how to do the call to reduce. The other part is all the set calls like
spec.set_machines(2000);. I love the simplicity: you tell the system how to map, you tell it how to reduce, you set some options, and run it. Notice specifically that you are not writing network code... this is obviously a very network intensive task, but that is all hidden behind
#include "mapreduce/mapreduce.h". This is much like Rinda for Ruby where you do not have to write any network code to distribute objects over the network. You do however have to learn an API to use either Rinda or DRb. MapReduce feels much less like an API and more like a layout, a template that you fill in.
I took the lessons from MapReduce, injected my background of Ruby and came up with what I call
Starfish. The backend implementation of Starfish is vastly different than Google's MapReduce: MapReduce is highly optimized for speed and best use of 2000 computer resources at a time, Starfish is highly optimized for speed of development and ease of use. That said, the goal of Starfish is the same as MapReduce.
Starfish takes a large data set (in this case a database), divides the table into many different sections, and lets machines each do work on sections of the database in parallel, aggregating the result together in the end.
Here is some example code:
class Item < ActiveRecord::Base; end
server do |map_reduce|
map_reduce.type = Item
end
client do |item|
logger.info item.some_processor_intensive_task
end
You will notice a few major differences quite quickly. First, you do not need to require any libraries, if this file was called item.rb you would run
starfish item.rb
on the command line on as many servers as you want and it will do everything it needed to start working and distributing the work. Next, you do not specify map and reduce functions, rather you specify a client and a server. I loved the simplicity and clarity of defining the two most important parts to Google's MapReduce, but in Ruby it would have been silly to do so because it is not C++ and mapping and reducing is too easy. So I gave it some thought and came up with what I thought was the most important part of distributed programming: what does the server serve and how do the client process the served objects.
Aside from the differences, you will notice the similarity, in the server you are setting options, setting
map_reduce.type = Item much like
input->set_format("text"); in MapReduce. In the near future, you will be able to tell Starfish that the type is File and let Starfish process files the same way we saw MapReduce do it in the example. Also, logger.info sends some information back to the server that logs it to a file much the same way that
out->set_filebase("/gfs/test/freq"); works.
However the biggest major difference is that Starfish is open-source and easy to use. Performing distributed tasks is now a ridiculously easy reality for programmers that may not have been steeped enough in CORBA or some other library to accomplish before.
I hope that you find this library helpful, please tell me how you use it and how I can make it work better for you. There any many options I didn't cover, so if you do use it, please
read the documentation.
UPDATE: I wrote an example of
how I sent emails 10x faster than before using Starfish.