I am Mr. Rourke...er...Bill Dueber, your host.

Even better, even simpler multithreading with JRuby



[Yes, another post about ruby code; I’ll get back to library stuff soon.]

Quite a while ago, I released a little gem called threach (for “threaded #each”). It allows you to easily process a block with multiple threads.


  # Process a CSV file with three threads
  FIle.open('data.csv').threach(3, :each_line) {|line| send_to_db(line)}

Nice, right?

The problem is that I could never figure out a way to deal with a break or an Exception raised inside the block. The core problem is that once a thread trying to push/pop from a ruby SizedQueue is blocking, there’s no way (I could find) to tell it to wake up and see if there’s an error from another thread floating around that needs to be addressed.

So, I got into a pattern of running my code with each for a while, debugging, and eventually doing the production run under threach. Which is just dumb. Then I’d try to re-write threach to deal with this stuff using different approach (mutexes, lightweight events), quickly (or not so quickly) fail, give up, and start again.

So…let’s not worry MRI for the moment. I run all my big jobs under JRuby these days anyway, and there I can take advantage of Java’s blocking queues that have timeouts. When a queue operation times out, I can check to see if there’s been a break or an exception thrown in the meantime and behave appropriately.

The result is the gem jruby_threach. It works just like threach, except that, you know, it actually works the way I’d like it to.


require 'jruby_threach'
FIle.open('data.csv').threach(3, :each_line) {|line| send_to_db(line)}

Looks familiar, doesn’t it.

But you can also break out of the loop.


myarray.threach(2) do |item|
  break if item_indicates_to_break(item)
  if item == :really_bad_value
    raise RuntimeError.new, "Something's really wrong", nil
  end
  process_item(item)
end

Any exceptions that are rescued within the block are handled internally and don’t cause processing to stop. Any that are not handled within the block are noticed by threach, cause the processing to stop, and the re-raised so you can deal with them outside of threach



reader = SpecializedFileReader.new(filename)

begin
  reader.threach(2) do |item|
    process_item(item)
  end
rescue SpecializedFileReaderError
  # deal with the fact that the reader failed
rescue Exception
  # deal with the problem processing the item
end


Dealing with the underlying Java data structures makes life a lot easier. To the point that I added an enhancement – threading production as well.


  # Use two threads to read lines from files, and another three threads
  # to process the data that comes out of those files.
  Dir.glob("*.csv").map{|f| File.open(f)}.mthreach(2,3) do |item|
    send_item_to_datbase(item)
  end

mthreach basically allows you to treat an array of Enumerables as a single logical entity, multithreading both the producer and consumer sides of the operation. There aren’t a whole lot of obvious use cases, but it can certainly come in handy.

You can also access the underlying class that aggregates multiple enumerables directly.


require 'jruby_threach'
me = Threach::MultiEnum.new(
  [enum1, enum2, enum3], # enumerables
  threads,               # How many threads to use to
  :each_with_index,      # the iterator to call on the enumerables
  size                   # size of the under-the-hood queue
)

# Note that like threach, calling #each against an MultiEnum actually
# calls the iterator you sent in (in this case, #each_with_index)
me.each {|item| process_item(item)}



Commentaires

comments powered by Disqus