Why bother with threading in jruby? Because it’s easy.
March 11, 2010 at 10:48 pmCategory:Uncategorized
[Edit 2011-July-1: I've written a jruby_specific threach that takes advantage of better underlying java libraries called jruby_threach that is a much better option if you're running jruby]
Lately on the #code4lib IRC channel, several of us have been knocking around different versions (in several programming languages) of programs to read in a ginormous file and do some processing on each line. I noted some speedups related to multi-threading, and someone (maybe rsinger?) said, basically, that to bother with threading for a one-off simple program was a waste.
Well, it turns out I’ve been trying to figure out how to deal with threading in jruby anyway. And I think I have a pretty elegant solution — a generic “threaded each” I’m calling threach.
- enumerable_object.threach(number_of_threads, :which_iterator) do |i|
- do_something_threadsafe(i)
- end
Some examples
- # You like #each? You'll love…err..probably like #threach
- load 'threach.rb'
- # Process with 2 threads. It assumes you want 'each'
- # as your iterator.
- (1..10).threach(2) {|i| puts i.to_s}
- # You can also specify the iterator
- File.open('mybigfile') do |f|
- f.threach(2, :each_line) do |line|
- processLine(line)
- end
- end
- # threach does not care what the arity of your block is
- # as long as it matches the iterator you ask for
- ('A'..'Z').threach(3, :each_with_index) do |letter, index|
- puts "#{index}: #{letter}"
- end
- # Or with a hash
- h = {'a' => 1, 'b'=>2, 'c'=>3}
- h.threach(2) do |letter, i|
- puts "#{i}: #{letter}"
- end
threach.rb adds to the Enumerable module to provide a threaded
version of whatever enumerator you throw at it (each by default).
How does it work?
How about I just put the source here. It’s short.
- require 'thread'
- module Enumerable
- def threach(threads=0, iterator=:each, &blk)
- if threads == 0
- # Just call the iterator itself
- self.send(iterator, &blk)
- else
- bq = SizedQueue.new(threads * 4)
- consumers = []
- threads.times do |i|
- consumers << Thread.new do
- until (a = bq.pop) === :end_of_data
- blk.call(*a)
- end
- end
- end
- # The producer
- count = 0
- self.send(iterator) do |*x|
- bq.push x
- count += 1
- end
- # Now end it
- threads.times do
- bq << :end_of_data
- end
- # Do the join
- consumers.each {|t| t.join}
- end
- end
- end
That’s it. If threads=0, just use the iterator itself. If not:
- Create a SizedQueue. It is thread-safe by definition and acts as the glue between the consumers and the main-thread producer.
- Start a set of consumer threads that basically just pull an item out of the queue and then run the given block on it. Bail when you see the
end_of_datatoken. These consumer threads all immediately block because there’s nothing in the SizedQueue yet. - Populate the SizedQueue. When you run out of stuff to add, push on an
end_of_datatoken for each consumer thread. - Call
joinon the threads to keep the main program around when one of them exits.
Why use it?
Well, if you’re using stock ruby — you probably shouldn’t. It’ll just slow things down. But if you’re using a ruby implementation that has real threads, like JRuby, this will give you relatively painless multi-threading.
You can always do something like:
- if defined? JRUBY_VERSION
- numthreads = 3
- else
- numthreads = 0
- end
- my_enumerable.threach(numthreads) {|i| …}
Note the “relatively” up there. The block you pass still has to be thread-safe, and there are many data structures you’ll encounter that are not thread-safe. Scalars, arrays, and hashes are, though, under JRuby, and that’ll get you pretty far.
Nice. You wrote that one? Ruby’s pretty sweet, huh?
What’s the purpose of using a SizedQueue instead of an ordinary Queue? What if the producer produces so much faster than the consumers consume, that the threads*4 size is exhausted, what happens? Does the producer just block waiting for there to be room to enqueue?
The assumption is that the producer is faster than the consumer (otherwise, why bother to have multiple consumers). A regular Queue (not sized) would grow without bound based on the speed difference between consumption and production. We don’t, for example, want 10K lines in memory while we’re waiting for consumers to turn them into MARC objects or whatnot.
A SizedQueue will block on both enqueue (if it’s full) and dequeue (if there’s nothing in it), so it’s exactly what we need for this kind of thing.
Nice.
Just call the iterator itself
self.send(iterator) do |*args| blk.call *args end
could be
self.send(iterator, &blk)
Thanks — I’m obviously still translating from Perl in my head :-). Changed.
For what it’s worth, there’s a gem called “peach” (for “parallel each”) that basically does this same thing. It actually shook loose a few bugs in our Enumerable logic, where the iteration structures were not thread-safe (that’s long since been fixed).
Nice example either way. JRuby + threads can really kick some ass :)
Yeah, I saw peach (which, among other things, left me scrambling for a different name), but was dissatisfied with its monkey-patching of only Array. My use cases mostly involve pulling stuff out of a file, so I wanted to hit up Enumerable directly.
Is there a list somewhere of what’s thread-safe in JRuby?