[Edit 2011-July-1: I’ve written a jruby_specific threach that takes advantage of better underlying java libraries called jruby_threach that is a much better option if you’re running jruby]
Lately on the #code4lib IRC channel, several of us have been knocking around different versions (in several programming languages) of programs to read in a ginormous file and do some processing on each line. I noted some speedups related to multi-threading, and someone (maybe rsinger?) said, basically, that to bother with threading for a one-off simple program was a waste.
Well, it turns out I’ve been trying to figure out how to deal with threading in jruby anyway. And I think I have a pretty elegant solution — a generic “threaded each” I’m calling threach
.
enumerable_object.threach(number_of_threads, :which_iterator) do |i| do_something_threadsafe(i) end
Some examples
# You like #each? You'll love...err..probably like #threach load 'threach.rb' # Process with 2 threads. It assumes you want 'each' # as your iterator. (1..10).threach(2) {|i| puts i.to_s} # You can also specify the iterator File.open('mybigfile') do |f| f.threach(2, :each_line) do |line| processLine(line) end end # threach does not care what the arity of your block is # as long as it matches the iterator you ask for ('A'..'Z').threach(3, :each_with_index) do |letter, index| puts "#{index}: #{letter}" end # Or with a hash h = {'a' => 1, 'b'=>2, 'c'=>3} h.threach(2) do |letter, i| puts "#{i}: #{letter}" end
threach.rb adds to the Enumerable module to provide a threaded version of whatever enumerator you throw at it (each
by default).
How does it work?
How about I just put the source here. It’s short.
require 'thread' module Enumerable def threach(threads=0, iterator=:each, &blk) if threads == 0 # Just call the iterator itself self.send(iterator, &blk) else bq = SizedQueue.new(threads * 4) consumers = [] threads.times do |i| consumers << Thread.new do until (a = bq.pop) === :end_of_data blk.call(*a) end end end # The producer count = 0 self.send(iterator) do |*x| bq.push x count += 1 end # Now end it threads.times do bq << :end_of_data end # Do the join consumers.each {|t| t.join} end end end
That’s it. If threads=0, just use the iterator itself. If not:
- Create a SizedQueue. It is thread-safe by definition and acts as the glue between the consumers and the main-thread producer.
- Start a set of consumer threads that basically just pull an item out of the queue and then run the given block on it. Bail when you see the
end_of_data
token. These consumer threads all immediately block because there’s nothing in the SizedQueue yet. - Populate the SizedQueue. When you run out of stuff to add, push on an
end_of_data
token for each consumer thread. - Call
join
on the threads to keep the main program around when one of them exits.
Why use it?
Well, if you’re using stock ruby — you probably shouldn’t. It’ll just slow things down. But if you’re using a ruby implementation that has real threads, like JRuby, this will give you relatively painless multi-threading.
You can always do something like:
if defined? JRUBY_VERSION numthreads = 3 else numthreads = 0 end my_enumerable.threach(numthreads) {|i| ...}
Note the “relatively” up there. The block you pass still has to be thread-safe, and there are many data structures you’ll encounter that are not thread-safe. Scalars, arrays, and hashes are, though, under JRuby, and that’ll get you pretty far.