Often in time when you need to execute multiple tasks it pays off to run them in parallel, especially when they involve blocking I/O operations. However, it may not always be possible to spin as many threads as the number of tasks, think about the maximum number of connections allowed to a database server or threads that could use large chunks of memory.
In order to improve performance of an application without using tonnes of resources the Worker Pool is a simple and efficient solution that powers many concurrency patterns.
There are a lot of opensource projects that implement this pattern, Celluloid and Concurrent Ruby are probably the best solutions. Nonetheless I thought it would be fun to implement one from scratch in order to better understand the dynamics of this pattern.
The smallest unit: the worker
A worker is simply a component that pulls work from a queue and fulfills it.
The work to be fulfilled could be anything, a Job
class with a perform
method - like the popular Delayed Job library - a Ruby block or any arbitrary object. For simplicity here we implement a worker that expects jobs to be any callable object, which is simply an object that implements the call
method. In our case we can enqueue lambdas which are perfectly legit callable objects.
We start, using TDD, to define how we would like the worker to behave. When creating multiple workers it could be a good idea to define a name
or ID
attribute which will ease debugging.
Then the obvious feature: as we queue jobs we expect them to be performed by the worker. To ensure that the jobs have been executed we can collect the results in an array to be used during the assertions. Also, as we saw in the previous post, we need to tell the worker when to stop waiting for jobs. We will enqueue the symbol :done
and we will add an exit condition.
To satisfy the tests we create a Worker
class that accepts a name and a queue to listen for jobs. As we initialize the Worker we run it straight away so it starts to listen to jobs. This part is important! Make sure all workers are running before you start to queue jobs.
When we initialize the Worker it also runs on a separate Thread. At this moment the Worker will simply pause on the while
loop until new items are available in the queue. It will then pull jobs and execute them until it meets the exit condition.
Then, with the join
method we wait for the Worker to complete.
Managing workers: the worker pool
Now we can move up the abstraction ladder and assemble multiple workers together that share the same queue. The worker pool can be considered to be a very thin layer on top of an array of workers.
As usual we start by writing the tests first. We’ll use the Fibonacci sequence so we can let the worker pool churn the calculations.
In the test above we defined a WorkerPool
that initializes 10 workers and we queued 30 lambdas to it. Once each lambda is executed by a worker it pushes the result of fib
to a Hash which is asserted at the end. Then we tell the pool that no more work will be queued and we wait for the workers to fully process the queue.
And for the sake of completeness we define fib
as below:
Now we can satisfy the test. Notice the method <<
, as we signal that no more jobs are queued, the WorkerPool in turn will signal each worker. Otherwise, a job is put into the queue.
When running the tests we can see that they are all passing and the system schedules the threads without any particular order. So, it is not guaranteed that worker_0
will take the first job.
That’s really it! This is a simple and flexible implementation that can help you boost parts of the code base that perform large I/O operations.
Further improvements
The implementation of the WorkerPool above has a slight bug. If the producer creates jobs faster than the workers process them, there is a risk that the program runs out of memory.
We could fix this by limiting the number of jobs that the queue can hold. Enter the SizedQueue!
The SizedQueue is initialized with an Integer that represents the max number of items it can contain. When the max is reached, the statement that pushes jobs to the queue is paused until an item is pulled from the queue. At this point the queue wakes up the previously paused statement.
We can now edit the WorkerPool
initializer by limiting the number of items in the queue to the number of workers available. This way we ensure that the producer and worker pool as a whole maintain the same pace. The WorkerPool, in fact, will act as a traffic light: if there are too many jobs it will temporary block any threads that push onto the queue until there is space available again.
Playing with scheduling algorithms
So far our WorkerPool has been delegating most of the responsibility to the internal queue. An alternative approach could be to have the WorkerPool playing a more active role by being responsible for the scheduling. This means that instead of having all workers pulling from the same queue, each worker will have its own queue.
In this section we are going to explore some variations to solve different problems.
Least-Busy First
We modify the previous Worker implementation by removing the queue
argument from the initializer and instead we have the Worker defining a queue internally. This new dedicated queue will be the “inbox” of the Worker.
Now we need to allow the Worker to receive jobs from the outside - remember that previously the worker was pulling from a shared queue. We define a <<
method that delegates to the inbox queue and a jobs_count
to let the WorkerPool inspect the load of each worker and schedule jobs by consequence.
The WorkerPool code will then need to be changed by initializing workers without passing in the shared queue. Given that we want to implement different scheduling algorithms we could isolate the scheduler’s responsibility from the WorkerPool by creating a new Scheduler class that the WorkerPool will delegate to for dispatching the jobs.
That’s it for this scheduler. A nice, small class with a single responsibility!
We only need to inject it to the WorkerPool constructor so that we can easily switch strategy if we want to. Notice that as the WorkerPool creates the array of workers we can pass in a factory object (just the class in this case) that will be used to initialize the concrete scheduler.
There you have it! A pool of workers that schedules jobs to the least busy worker.
Round-Robin
Now that we have laid the ground work for the WorkerPool, we can define a new Scheduler that uses the Round-Robin algorithm.
We can use the Enumerable#cycle method which generates an Enumerator that cycles through the elements of the array and restarts when it reaches the end.
Topic Scheduler
Similar to a Pub-Sub system we have workers that can only perform specific jobs. This type of scheduler could be useful if you want to segregate the workers by responsibility - for example: mailer workers, file download workers, database workers, etc.
Conclusions
Learning how to implement a pool of workers, in my opinion, is like learning any other design patterns. Only when you practice it you really understand the mechanics and you could implement the algorithm that best suits your application’s needs.
If you are learning a web framework, very often you might hear that the first time it’s better to build the authentication layer from scratch instead of relying on additional libraries. Similarly, if you are learning a programming language, the standard library is your friend. Use complex libraries to solve complex problems but only after you have learned the fundamentals.