The Blog
Lightweight concurrency with Ruby and Eventmachine
Eventmachine describes itself as a “fast, simple event-processing library for Ruby programs.” Included in it is a module called Deferrable that allows easy and lightweight concurrency. Deferrable makes it simple to spawn a blocking or long running operation, push it to the background, and on completion execute any number of code blocks (callbacks).
Below, I’ve written a sample application that uses the Deferrable class and Eventmachine’s event loop to parallelize HTTP API calls to whoismyrepresentative.com.
require 'httparty'require 'eventmachine'
class Request include EM::Deferrable
@@requests = []
attr_reader :method, :params
def self.run return if @@requests.empty? EM.run do @@requests.each do |request| Thread.new do begin request.send_request rescue => e puts e.backtrace request.fail({"result" => {"message" => e}}) ensure @@requests.delete(request) end end end
until @@requests.empty?; end EM.stop end end
def initialize(method, params, &callback) @method, @params = method, params
self.callback(&callback)
@@requests << self end
def send_request @response = HTTParty.get(@method, @params)
if @response["result"]["message"].nil? succeed(@response) else fail(@response) end endend
whoismyrep = 'http://whoismyrepresentative.com/whoismyrep.php'%w(91108 94026 90274 07620 10014 90210 10065 94920 10012 93108 11568 11013 92067 91302 94022 92661 11976 94024 11932 94010 11765 10003 21056 90265 10021).each do |zip| req = Request.new(whoismyrep, :query => { :zip => zip }) do |response| reps = response["result"]["rep"] if reps.is_a?(Array) reps.each do |rep| print "#{zip}: #{rep["name"]} => District #{rep["district"]}, #{rep["phone"]}\n" end else print "#{zip}: #{reps["name"]} => District #{reps["district"]}, #{reps["phone"]}\n" end end
req.callback do |response| rep = response["result"]["rep"] print "#{zip} is in #{(rep.is_a?(Array) ? rep.first : rep)["state"]}\n" end
req.errback do |response| print "#{zip}: #{response["result"]["message"]}\n" endend
Request.run
As each request is created it placed in a pool with all other requests which are then spawned and executed after a call to Request.run. As the GET calls come back they are checked for an error message and based upon that the appropriate callbacks attached at the creation of the request are called. In this case, if the call succeeds each representative for that zip code is printed out along with a phone number. If it fails, the error message is printed. As the output below shows, the calls are done completely in parallel and immediately after they return the callbacks are executed.
10014: Jerrold Nadler => District 8, (202) 225-563510014 is in NY91108: David Dreier => District 26, (202) 225-230591108: Adam B. Schiff => District 29, (202) 225-417691108 is in CA90210: Henry A. Waxman => District 30, (202) 225-397690210: Howard L. Berman => District 28, (202) 225-469590210 is in CA10065: No Data Found...
As the number of requests increases, so does the performance benefit of using this model. Example timings for 25 requests done serially and in parallel are below.
$ time ruby request_serial.rb > serialreal 0m7.150suser 0m0.423ssys 0m0.146s$ time ruby request.rb > parallelreal 0m2.274suser 0m1.997ssys 0m0.157s





