Indexing data into Solr via JRuby (with threads!)

[Note: in this post I'm just going to focus on the "get stuff into Solr" part. My normal focus -- MARC data -- will make an appearance in the next post when I talk about using this in addition to / instead of solrmarc.]

Working with Solr

I love me the Solr. I love everything about it except that the best way to interact with it is via Java. I don’t so much love me the java.

So…taking Erik Hatcher’s lead and advice, as I will do whenever he offers either, I wrote some code to work within JRuby to deal with Solr.

Getting the code

I’ve added the gems to gemcutter, if you want to play along at home:

  • jruby_producer_consumer (github, rdoc.info) Ruby syntax for threaded operations under jruby
  • jruby_streaming_update_solr_server (github, rdoc.info) Ruby syntax on top of the Java class of the same name
  • marc4j4r (github, rdoc.info) Ruby syntax on top of the marc4j java library.

WARNING: None of these gems have a 1.0 version tag on them, and that means that the API may change a titch in the future. Also, the fact that they’re released as gems means that it’s easy to release gems, not that I’m not an idiot.

The basics: Using SolrInputDocument and StreamingUpdateSolrServer

OK, with the disclaimer out of the way, let’s look at some code.

  1.   require 'rubygems'
  2.   require 'jruby_streaming_update_solr_server'
  3.  
  4.   solrurl = 'http://your.solr.server:port/solr'
  5.   sussqueuesize = 24 # how many items to buffer on their way to solr
  6.   sussthreads = 1   # how many threads to use to send stuff to solr
  7.  
  8.   suss = StreamingUpdateSolrServer.new(solrurl,sussqueuesize,sussthreads)
  9.  
  10.   # Let's add a simple document via a hash: A title, three authors, and a year
  11.  
  12.   h = {
  13.     :title => "Never been deader",
  14.     :author => ['Bill', 'Mike', 'Molly'],
  15.     :year => 2003
  16.   }
  17.   suss << h
  18.   suss.commit
  19.  
  20.   # YEA! You just added a document to solr and committed it.
  21.   # Have a cookie!
  22.  
  23.   # We can also use a document object to do the same thing
  24.  
  25.   doc = SolrInputDocument.new
  26.   # Add the title
  27.   doc << ['title', 'Never been deader']
  28.  
  29.   # Add the first author
  30.   doc << [:author, 'Bill']
  31.  
  32.   # Add more. Re-used keys mean you're adding additional values
  33.   # Note values can be scalars or arrays
  34.  
  35.   doc << [:author, ['Mike', 'Molly']]
  36.  
  37.   # Add the wrong year using [] syntax
  38.   doc[:year] = 2001
  39.  
  40.   # Oops! fix it. []= overwrites existing value(s)
  41.  
  42.   doc[:year] = 2003
  43.  
  44.   # Finally, we can merge a hash (or anything else that responds to
  45.   # 'each_pair' with key-value pairs) into an existing doc
  46.  
  47.   doc.merge! {'author' => 'Ringo Starrre', 'publisher'=>'Vainity Books'}
  48.  
  49.   # Add it
  50.  
  51.   suss << doc
  52.  
  53.   # Commit and optimize if you'd like
  54.  
  55.   suss.commit
  56.   suss.optimize # if you want

Nothing really fancy in there — just a few things worth noting:

  • An suss object will take a hash (again, anything that responds to #each_pair) or a SolrInputDoc
  • You can use either strings or symbols to represent Solr field names
  • Values can be either a single value, or an array of multiple values

And there are three ways to get data into a doc:

  • Via << [field, value(s)] (additive)
  • Via doc.merge! hash (additive)
  • Via doc[field] = value (replaces)

Adding Threads

I also went down the garden path of threading things. There are an awful lot of operations that are not threadsafe (e.g., reading a line from a file) but once you’ve got a bunch of records to worth with, turning them into Solr documents is usually thread-safe.

My model is that there’s a producer (usually the method #each) from an underlying data object. A thread takes whatever that method yields and sticks the values into a java BlockingQueue awaiting consumption. You then use ProdcuerConsumer#threaded_each (or ProducerConsumer#threaded_each_with_index) to pull items out of the queue and do something useful with them.

I extracted stuff into a library (jruby_producer_consumer) for your viewing pleasure.

CONFUSION ALERT: It’s perhaps unfortunate that the object you send to ProducerConsumer.new(obj) must implement #each and that the ProducerConsumer method #threaded_each calls that underlying #each…well there’s a lot of #each’s floating around. Keep them straight.

So…let’s look at some code to work with consumer threads.

  1.   # Start off the same as before
  2.   require 'rubygems'
  3.   require 'jruby_streaming_update_solr_server'
  4.   require 'jruby_producer_consumer'
  5.   require 'marc4j4r'
  6.  
  7.   solrurl = 'http://your.solr.server:port/solr'
  8.   sussqueuesize = 24 # how many items to buffer on their way to solr
  9.   sussthreads = 2   # how many threads to use to send stuff to solr
  10.  
  11.   suss = StreamingUpdateSolrServer.new(solrurl,sussqueuesize,sussthreads)
  12.  
  13.   # I'll go ahead and use a MARC file as my example, but won't talk about the
  14.   # MARC parts of it. All you need to know is that the reader object
  15.   # implements #each
  16.  
  17.   reader = MARC4J4R.reader('test.xml', :marcxml)
  18.  
  19.   # Get a producer/consumer object with the reader at its base, using
  20.   # the default method #each to get stuff out of it, and with the assumption
  21.   # that we only need to keep the default 5 items in memory at a time to
  22.   # keep up with consumption
  23.  
  24.   pc = ProducerConsumer.new(reader)
  25.  
  26.   # Get three threads to actually consume the things, turn them into solr
  27.   # documents, and send them to solr (potentially out of order)
  28.  
  29.   numconsumerthreads = 3
  30.   pc.threaded_each(numconsumerthreads).each do |r|
  31.     suss << turn_marc_record_into_a_hash_or_solrdoc(r)
  32.   end
  33.   suss.commit

Again, not a lot happening here.

  • The “producer” is always one thread, because so little is thread-safe at the ‘each’ level. In this case, there’s a single thread pulling data out of the file and turning it into MARC records, which are added to the internal BlockingQueue. I buffer 5 of these at a pop (the default) so the consumer threads don’t starve. I presume that producing items is cheaper than consuming them, or else this library won’t help you much.
  • ProducerConsumer#threaded_each calls the #each method of the underlying object. You can substitute anything that yields, though, as in this example where I call #each_line instead of the default #each
  1.   queuesize = 5
  2.   pc = ProducerConsumer.new(File.new('myfile.txt'), queuesize, :each_line)
  • Keep track of your threads. In this last example, there is one thread getting MARC records and putting them into the PC buffer (no way to change that), three threads consuming those records and sticking them into the suss object, and another two pulling stuff out of the suss object and sending things to Sorl. And, of course, there’s other stuff running on the computer, too. Experiment and figure out what works best for your hardware.
  • See the docs for how to mess with what goes into a ProducerConsumer object. It’s entirely possible to use, say, #each_slice. There’s also a convenience method #threaded_each_with_index, but it does not call the underlying #each_with_index, it produces its own index as things are read.

Feedback not only welcome but necessary!

I’ve done a lot of messing around with Ruby in the last 10 days or so, but I’m still basically converting from Perl in my head. Any comments, bugs reports, or whatnot are definitely welcome!

Post a Comment

Your email is never published nor shared. Required fields are marked *

*
*