Skip to content

Even better, even simpler multithreading with JRuby

[Yes, another post about ruby code; I’ll get back to library stuff soon.]

Quite a while ago, I released a little gem called threach (for “threaded #each”). It allows you to easily process a block with multiple threads.

   # Process a CSV file with three threads   FIle.open('data.csv').threach(3, :each_line) {|line| send_to_db(line)} 

Nice, right?

The problem is that I could never figure out a way to deal with a break or an Exception raised inside the block. The core problem is that once a thread trying to push/pop from a ruby SizedQueue is blocking, there’s no way (I could find) to tell it to wake up and see if there’s an error from another thread floating around that needs to be addressed.

So, I got into a pattern of running my code with each for a while, debugging, and eventually doing the production run under threach. Which is just dumb. Then I’d try to re-write threach to deal with this stuff using different approach (mutexes, lightweight events), quickly (or not so quickly) fail, give up, and start again.

So…let’s not worry MRI for the moment. I run all my big jobs under JRuby these days anyway, and there I can take advantage of Java’s blocking queues that have timeouts. When a queue operation times out, I can check to see if there’s been a break or an exception thrown in the meantime and behave appropriately.

The result is the gem jruby_threach. It works just like threach, except that, you know, it actually works the way I’d like it to.

 require 'jruby_threach' FIle.open('data.csv').threach(3, :each_line) {|line| send_to_db(line)} 

Looks familiar, doesn’t it.

But you can also break out of the loop.

 myarray.threach(2) do |item|   break if item_indicates_to_break(item)   if item == :really_bad_value     raise RuntimeError.new, "Something's really wrong", nil   end   process_item(item) end 

Any exceptions that are rescued within the block are handled internally and don’t cause processing to stop. Any that are not handled within the block are noticed by threach, cause the processing to stop, and the re-raised so you can deal with them outside of threach

  reader = SpecializedFileReader.new(filename)  begin   reader.threach(2) do |item|     process_item(item)   end rescue SpecializedFileReaderError   # deal with the fact that the reader failed rescue Exception   # deal with the problem processing the item end  

Dealing with the underlying Java data structures makes life a lot easier. To the point that I added an enhancement — threading production as well.

   # Use two threads to read lines from files, and another three threads   # to process the data that comes out of those files.   Dir.glob("*.csv").map{|f| File.open(f)}.mthreach(2,3) do |item|     send_item_to_datbase(item)   end 

mthreach basically allows you to treat an array of Enumerables as a single logical entity, multithreading both the producer and consumer sides of the operation. There aren’t a whole lot of obvious use cases, but it can certainly come in handy.

You can also access the underlying class that aggregates multiple enumerables directly.

 require 'jruby_threach' me = Threach::MultiEnum.new(   [enum1, enum2, enum3], # enumerables   threads,               # How many threads to use to   :each_with_index,      # the iterator to call on the enumerables   size                   # size of the under-the-hood queue )  # Note that like threach, calling #each against an MultiEnum actually # calls the iterator you sent in (in this case, #each_with_index) me.each {|item| process_item(item)}