A very simple persistent consumer-producer queue

This is my take on implementing a very simple persistent consumer-producer style queue using database table. The producers insert records to the table and consumers process them. There can be many producers and/or consumers working simultaneously. They can be separate processes or threads. There is no explicit locking.

First, a disclaimer: if you want a robust, stable and efficient queue, you should probably use a full fledged queue mechanism. The following code works with Ruby on Rails and MySQL.

The queue table should contain two additional columns that will be used by the queue mechanism: STATUS and PID. The STATUS column defines a processing status for each record — we need at least three:

class Thing < ActiveRecord::Base
  STATUS_READY       = 1
  STATUS_IN_PROGRESS = 2
  STATUS_DONE        = 3
  ...

STATUS_READY means that the record is ready for processing by a consumer, STATUS_IN_PROGRESS is set when a consumer is working on this record and after that it’s STATUS_DONE. Alternatively, the consumer could just remove the record, it doesn’t matter for the algorithm.

The PID column is the key to this concept. I assume that each consumer can be identified with a unique number. If you use separate processes, this can be the operating system’s PID ($$ in Ruby). For threads, you can use Thread.current.object_id. If none of these is available you get any number for each customer as long as your scheme guarantees that each number will be unique across all simultaneously working consumers.

The producers part in this algorithm is trivial: they just have to insert records with STATUS_READY.

The queue mechanism must ensure that each record will be processed by only one consumer. That’s what the PID is used for:

class Thing < ActiveRecord::Base
  ...
  def self.get_for_processing pid
    update_all(
      { :status => STATUS_IN_PROGRESS,
           :pid => pid },
      { :status => STATUS_READY },
      { :limit  => 1 })
    find_by_status_and_pid STATUS_IN_PROGRESS, pid
  end
  ...

The above method first updates one record from STATUS_READY to STATUS_IN_PROGRESS and sets PID column to given value. This means that the record is reserved for the consumer identified by this PID. Then the reserved record is returned by find_by_status_and_pid call. This is equivalent to issuing following SQL queries:

UPDATE things
   SET status = <STATUS_IN_PROGRESS>,
       pid    = <pid>
 WHERE status = <STATUS_READY>
 LIMIT 1;

SELECT *
  FROM things
 WHERE status = <STATUS_IN_PROGRESS>
   AND pid    = <pid>;

The UPDATE is atomic so there should be no collisions. It reserves one record if there are any available. The reserved record will not be reserved by any other consumer.

One important thing: each consumer must change the status of processed record to STATUS_DONE (or remove it) before it asks for another one. Otherwise the find_by_status_and_pid will return the same record over and over again (although update_all will reserve another record with each call).

When there are no more records to be processed, get_for_processing returns nil. It’s up to consumer to decide what to do then (wait, raise an exeception, etc.)

If you want the records processed in an order other than default (which is lowest-id-first for MySQL), you can use :order clause, for example to implement a LIFO queue:

class Thing < ActiveRecord::Base
  ...
  def self.get_for_processing pid
    update_all(
      { :status => STATUS_IN_PROGRESS,
           :pid => pid },
      { :status => STATUS_READY },
      { :limit  => 1,
        :order  => 'created_at asc' })
    find_by_status_and_pid STATUS_IN_PROGRESS, pid
  end
  ...

~ by szeryf on 2009-06-17.

Leave a Reply