JBrisbin.com

Map/Reduce in Groovy

27
Sep

Map/Reduce in Groovy

I'm half way through re-vamping my RabbitMQ/AMQP Groovy-based Map/Reduce framework and I'd like to share with you what I'm thinking and how I'm going about it. This is part thinking out loud, part eliciting comment from folks who likely have a better understanding of Map/Reduce and async/parallel programming than I do.

I read Google's brief, but informative paper on Map/Reduce to start with. I'm not going to implement my version of Groovy Map/Reduce like what's described in that paper, but it gives a good background on their own implementation and it helped me understand how to schedule Map/Reduce jobs and how the Map and Reduce components work together.

My own implementation of Map/Reduce is being integrated with the cloud job scheduling utility I've written. I've talked about these utilities before. I've since re-written them using Spring AMQP, which allowed me to vastly simplify the core messaging code and extend the original software with better functionality. One of the features I built into this job scheduler is a Groovy-based plugin mechanism.

There are several components to my job scheduler that we can use for different purposes. There's an "Exe" handler that will execute shell scripts and (if the consumer is running on a Windows machine) executables which leverage the library of Windows-based code our C++/MFC programmers have written over the last 10 years. There's a "batch" message handler which makes it easier for a sometimes-connected remote location to create a zip file of all the messages it wants to send to the broker. The batch handler will unzip this file, publish all the messages inside it, aggregate the results back into a response zip file, and send that compressed archive back to the requestor.

Here's a snippet of the Ruby code I'm using to generate some test messages to send into this (Spring/Java-based) job scheduler:

require 'rubygems'
require 'zipruby'
require 'json'
require 'mq'

buff = ''

Zip::Archive.open_buffer(buff, Zip::CREATE) do |z|
  z.add_buffer('jobsched:jobsched.sql:batch.test:SECURITY_KEY',
               JSON.dump({"datasource" => "postgres",
                          "sql" => "plugin:classpath:plugins/sql_handler_mapreduce.groovy?sql=select+now()",
                          "params" => []}))
end
puts "Created zip archive"

t = Thread.new { EM.run }

AMQP.start(:host => "localhost",
           :user => "guest",
           :pass => "guest",
           :vhost => "/") do

  results = MQ.queue('batch.test').subscribe do |hdrs, body|
    Zip::Archive.open_buffer(body) do |ar|
      ar.map do |entry|
        puts "#{entry.name}: #{entry.read}"
      end
    end
    EM.stop_event_loop
  end

  exch = MQ.topic('jobsched', :durable => true, :auto_delete => false)
  puts "Publishing message..."
  exch.publish(buff,
               :correlation_id => 'batch.test',
               :routing_key => 'jobsched.batch',
               :content_type => 'application/zip',
               :reply_to => 'batch.test',
               :headers => {
                   'security.key' => 'SECURITY_KEY'
               })
  puts "Published message."

end

t.join(5)

And the response...

Created zip archive
Publishing message...
Published message.
batch.test: {"data":[["2010-09-27 10:20:07.10-05"]],"errors":null,"columnNames":["now"],"totalRows":1}

The component I've been focusing on extending at the moment is the SQL handler. We have code throughout the enterprise (and cloud) that may or may not have the ability to connect to our databases. We often choose to save ourselves the headache of managing 10,000 installations of a special driver and/or database configuration by embedding data access into a library of code that doesn't depend on the OS for just about anything but user input. Besides executing arbitrary SQL on behalf of requesting code, the SQL handler is extensible via Groovy-language plugins. I've chosen this as the initial entry point into my Groovy Map/Reduce implementation.

In Google's paper on the topic, they explain how their Map/Reduce software tries to schedule the Map jobs relating to a given key on the actual piece of hardware that contains data for that key. Since we're not using a distributed file system like GFS but a SQL database as the source of our data, we have to integrate with the code that's actually extracting the data from the database. To accomplish this, I simply implemented a plugin for my SQL handler that executes the SQL I give it, but sends each key/value pair through the AMQP server (RabbitMQ) to a special handler (also running in the same job scheduler system) that runs my Groovy Map/Reduce.

One of the peculiarities of an AMQP-based Map/Reduce system is that the Map and Reduce objects need to have a lifetime long enough to handle processing all the data for a given key. Since the "map" and "reduce" closures will be called multiple times, the Script object in which these closures are defined needs to exist in memory long enough to receive all the data coming in for that key. An added complication: since this system is asynchronous, I can't rely on the key changing to know when I'm finished with the Map phase for a given key. The only code that knows when a key in the original ordered set has changed is the SQL handler that's iterating over the result set. It seems necessary, then, to have two types of messages controlling Map/Reduce in an asynchronous way: one for the data and one for "control". I use a topic exchange and named queues for the Map and Reduce handlers because the broker will load-balance messages for me automagically. But I also need those workers to be listening to a fanout exchange so it can receive notifications from the cloud "master" (in this case the SQL handler). When a key changes or the end of the result set is reached, the master will send out a special message alerting all the Map workers as to the state of the result set.

Within a map closure, emit() might be called multiple times, depending on the job requirements. Each time data is emitted from a map closure, it is forwarded through AMQP to the Reduce handler for that key. In a more "traditional" Map/Reduce approach, the results of a Map operation are cached until either the entire set of data for that key has passed through the Map operation or an arbitrary number limit is reached. The Reduce phase of Map/Reduce involves passing the data from the Map into the Reduce as a list of values. But our situation is complicated a little by the fact that an emit() doesn't actually need to keep anything in memory. Take calculating a SUM of values, for instance: if the entire data set for a store is passed into a Reduce as an array of values, the sum function will iterate over those values and return a single result. But this is functionally equivalent to passing each data point into a reduce closure as soon as it is available and accumulating the sum by adding the new data point to a single value that persists throughout the Reduce handler's lifetime (until the control message denoting end of processing on a given key is received). This gives us a couple advantages I can see right off, not the least of which is I don't ever have to keep very much data in memory at any point in time. I'm interested in passing this data through my Map/Reduce system as fast as possible, with as little overhead as possible. This means "streaming" the data rather than aggregating arrays of values in memory. If a Reduce handler needs access to all the data for a key at once (to calculate mean values, andcetera...) it can certainly do so itself by keeping the values passed to it in its own data structures. If it didn't do this, the Map handler would have to do this identical bit of work, so it made sense to me to allow the application to choose whether it wanted to operate on arrays of data at one time, or deal with streaming data by responding to each record of a Reduce phase's input.

Since we're operating asynchronously here, the Map and Reduce phases are rolled up from method calls to object lifetimes. By that I mean: in this implementation of Map/Reduce, the Map phase is the entire cycle of work performed during the Map handler's lifetime, which is from first encountering a key value, until a new key value is encountered. The same goes for the Reduce handler. Once a Map operation starts emitting data for a given key, Reduce handlers need to be instantiated for that key if they don't already exist and the emitted data needs to be sent to them by calling the reduce closure and passing the output of the Map operation to them. Once the Map phase is complete for a key and the Map handler has notified the Reduce handlers that it has fully processed data for that key, the Reduce handlers can either reply to the original requestor with their partial results, or "rereduce" the result down to a single value by calling the "rereduce" closure, for which there is only ever a single listener for a given key (first-come, first-served exclusive reply queue based on key hash). If rereduce is called, it is expected to return a reply message which is then handled like any other message in a Spring AMQP application.

Without code examples, I realize much of this can be pretty academic and hard to follow. Some of this design is not implemented yet, since I'm not finished writing the guts of the code that actually does what I'm laying out here. But I've written enough of the basics, between the job scheduler, the Groovy plugin, and the Groovy Map/Reduce framework, to feel comfortable this will actually work when fully implemented. :)

I'd love to hear what you think, though! Leave a comment or send me an email. I'll also be tweeting my progress: @j_brisbin

blog comments powered by Disqus