Lately I’ve been working on scaling up the algorithm of a Master-Slave architecture and given the amount of patterns I’m using I figured I’d write a series of blog posts on Ruby concurrency and its patterns.
Ruby has concurrency primitives built in its standard library. Today we start with the fundamental blocks and we will move to more complex patterns later on. The Ruby thread
standard library contains the Thread
and Queue
objects as well as many others that we will see in other posts.
Today we start with a simple design where a producer queues jobs to be processed by a consumer. The producer could be a web server queuing email notifications to be sent out, a main thread in a crawler that queues urls to downloader components… the list could be long.
The first tool we are going to need is a queue. A queue is thread-safe which means we can build a lot of concurrency patterns with just that.
We will use this queue to communicate between the producer and the consumer.
Then we can flesh out a basic producer which will run in a loop, will produce some jobs and will push them on to the queue we just created.
We implement the producer as a thread so that we can move the control of the program to the next block.
Queue
has plenty of aliases for inserting and removing items: here we just used <<
for insert but we could have used enq
or push
. Similarly, for removing items we can use shift
, deq
or pop
. As of writing this post I personally prefer the <<
operator for inserting messages as it’s graphically expressive, then either pop
or deq
for pulling from the queue. In my opinion enq
and deq
are very similar in appearance to be used together (I generally prefer to use different looking terms to amplify the constrast) and push
and pop
are a bit misleading as highly used for stack behavior Last-In-First-Out.
Let’s continue our journey towards the consumer side. Here we now have to implent the consumer which is going to pull the next job from the queue and do some work with it.
Producer and consumer are not depending on each other. We could entirely redesign the consumer without impacting the producer and vice-versa. This because we have enstablished a communication protocol between the two components.
If we were to run our program right now we would see it exiting straight away despite having defined the two threads. This is because we havent told the main thread to wait for the other threads to finish.
This has a similar effect to running a Unix command with &
at the end, which will run the command in the background. We’ll fix it by running the Thread#join
method on each thread created.
When running our program we’ll see that it’s all working correctly except the fact that the process never ends - we will tackle this later on.
If you had notice from before, I’ve intentionally added sleep 1
on the producer to simulate some work on its side and sleep 2
on the consumer to simulate a long running job. In short I’ve made the consumer slower than the produer. Observing the output we can see that by the time we consumer processes job #6 the producer has already enqueued 10 jobs.
If the produer creates constantly jobs at such pace the consumer would never be able to catch up. On a production applicaton this would not be acceptable. But we do care about performance and so we decide to deploy another consumer that pulls jobs from the same queue so that both consumers can keep up the pace with the producer.
We modify a little the consumer block to be an array of threads. A nice side effect is that we also have a variable that we can control to scale up/down the number of consumers.
Running the script now we notice that produer and consumers are running all at the same pace because one consumer is picking up a new job while the other one is still processing the previous job.
Exit strategy
For now our program will run forever and that’s not a reasonable thing to test. If we were to write tests for this program we would probably have the producer queuing a finite number of jobs and then we would assert that the consumer handles them.
Lets modify the producer to only queue a finite number of jobs.
If we run the program we see the consumers pulling jobs but then an unusual error occurs:
What is this error about?
The Ruby runtime realized that there are 2 other threads ready to pull from the queue (and in waiting state) but no other thread will enqueue new jobs because the consumer thread exited after queuing 5 jobs.
What we need is a mechanism to tell the consumers that no further jobs will be enqueued and that they can safely exit.
We can enqueue a :done
symbol as a signal for the consumer that will read it. As we have 2 consumers we do it twice.
On the consumer side we have to check whether the job is the special case :done
and interrupt the loop if so.
With this small change our script is now able to exit graceully because there are no more threads hanging around.
EDIT: Thanks to Jeremy for pointing our that since Ruby 2.3.0 we can now use Queue#close
as exit strategy.
@fabiopitino wrt to your Threading posts (which I have enjoyed): Have you seen `Queue#close` from 2.3.0? We no longer need `<< :done` - https://t.co/nS3ghUroHM
— Jeremy Hinegardner (@copiousfreetime) November 21, 2017
Regardless whether we have a producer that enqueues a finite number of messages or that runs in a loops and produces messages forever, having an exit strategy for the consumers is always a good practice. For example we could catch the Ctrl-C
signal and instead of exit immediately we could notify the consumers that we are closing the queue and no more work will be added, then do any teardown or cleanup to exit gracefully.