A very simple persistent consumer-producer queue

This is my take on implementing a very simple persistent consumer-producer style queue using database table. The producers insert records to the table and consumers process them. There can be many producers and/or consumers working simultaneously. They can be separate processes or threads. There is no explicit locking.

First, a disclaimer: if you want a robust, stable and efficient queue, you should probably use a full fledged queue mechanism. The following code works with Ruby on Rails and MySQL.

The queue table should contain two additional columns that will be used by the queue mechanism: STATUS and PID. The STATUS column defines a processing status for each record — we need at least three:

class Thing < ActiveRecord::Base
  STATUS_READY       = 1
  STATUS_IN_PROGRESS = 2
  STATUS_DONE        = 3
  ...
&#91;/sourcecode&#93;

<p><code>STATUS_READY</code> means that the record is ready for processing by a consumer, <code>STATUS_IN_PROGRESS</code> is set when a consumer is working on this record and after that it's <code>STATUS_DONE</code>. Alternatively, the consumer could just remove the record, it doesn't matter for the algorithm.</p>

<p>The <code>PID</code> column is the key to this concept. I assume that each consumer can be identified with a <strong>unique number</strong>. If you use separate processes, this can be the operating system's PID (<code>$$</code> in Ruby). For threads, you can use <code>Thread.current.object_id</code>. If none of these is available you get any number for each customer as long as your scheme guarantees that each number will be unique across all simultaneously working consumers.</p>

<p>The producers part in this algorithm is trivial: they just have to insert records with <code>STATUS_READY</code>. </p>

<p>The queue mechanism must ensure that each record will be <strong>processed by only one consumer</strong>. That's what the <code>PID</code> is used for:</p>


class Thing < ActiveRecord::Base
  ...
  def self.get_for_processing pid
    update_all(
      { :status => STATUS_IN_PROGRESS, 
           :pid => pid },
      { :status => STATUS_READY },
      { :limit  => 1 })
    find_by_status_and_pid STATUS_IN_PROGRESS, pid
  end
  ...

The above method first updates one record from STATUS_READY to STATUS_IN_PROGRESS and sets PID column to given value. This means that the record is reserved for the consumer identified by this PID. Then the reserved record is returned by find_by_status_and_pid call. This is equivalent to issuing following SQL queries:

UPDATE things
   SET status = <STATUS_IN_PROGRESS>,
       pid    = <pid>
 WHERE status = <STATUS_READY>
 LIMIT 1;
 
SELECT *
  FROM things
 WHERE status = <STATUS_IN_PROGRESS>
   AND pid    = <pid>;

The UPDATE is atomic so there should be no collisions. It reserves one record if there are any available. The reserved record will not be reserved by any other consumer.

One important thing: each consumer must change the status of processed record to STATUS_DONE (or remove it) before it asks for another one. Otherwise the find_by_status_and_pid will return the same record over and over again (although update_all will reserve another record with each call).

When there are no more records to be processed, get_for_processing returns nil. It’s up to consumer to decide what to do then (wait, raise an exeception, etc.)

If you want the records processed in an order other than default (which is lowest-id-first for MySQL), you can use :order clause, for example to implement a LIFO queue:

class Thing < ActiveRecord::Base
  ...
  def self.get_for_processing pid
    update_all(
      { :status => STATUS_IN_PROGRESS, 
           :pid => pid },
      { :status => STATUS_READY },
      { :limit  => 1, 
        :order  => 'created_at asc' })
    find_by_status_and_pid STATUS_IN_PROGRESS, pid
  end
  ...

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: