Python Multiprocessing Using Queues and Redis Lists

20 Aug 2016

Python standard library Queue class is a convenient data structure for a message queue or job queue. Checkout its source code if you are interested in how to implement such data structure in Python.

Message queues are useful in a sense that they facilitate a simple way/pattern for the communications among multiple processes/threads. Python’s multiprocessing package also provides data structures with more convenient interface to facilitate message passing among processes.

I tried using multiprocessing.JoinableQueue class in one of my personal projects some time ago. JoinableQueue is a subclass of multiprocessing.Queue class and both are modelled on the Queue.Queue class in the standard library. While it was straightforward to work with, the program I created got stuck and ran painfully slowly sometimes.

I did a bit of research and found a limitation of using JoinableQueue. Now let’s get into the details of what could go wrong.

First of all, Let’s check the source code of multiprocessing package (cpython/Lib/multiprocessing/) to figure out how multiprocessing.Queue and multiprocessing.JoinableQueue work.

File cpython/Lib/multiprocessing/queues.py contains the implementations of three classes:

__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']

And class Queue is the base class of JoinableQueue, most of the important inner workings are implemented in Queue. The first few lines of Queue class is as follows:

#
# Queue type using a pipe, buffer and thread
#

class Queue(object):

    def __init__(self, maxsize=0, *, ctx):
        if maxsize <= 0:
            # Can raise ImportError (see issues #3770 and #23400)
            from .synchronize import SEM_VALUE_MAX as maxsize
        self._maxsize = maxsize
        self._reader, self._writer = connection.Pipe(duplex=False)

        ...

There are two important things to pay attention here:

  1. The comment mentioned that Queue class uses a pipe, buffer and thread underneath. The buffer is actually a deque (collections.deque); and the thread (dedicated feeder thread) transfers data from the deque to the pipe, which brings us to the next point …
  2. The pipe, created via connection.Pipe() call, is a data structure having a reading and writing ends forming a chanel for interprocess communication.

The pipe is either an OS pipe or Unix Domain Socket depends on the value of the keywoard argument duplex passed to the Pipe constructor:

# An excerpt from cpython/Lib/multiprocessing/connection.py

if sys.platform != 'win32':

    def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = Connection(s1.detach())
            c2 = Connection(s2.detach())
        else:
            fd1, fd2 = os.pipe()
            c1 = Connection(fd1, writable=False)
            c2 = Connection(fd2, readable=False)

        return c1, c2

According to the source code above, it is clear that the pipe is actually an OS pipe and thus an unidirectional channel for processes. The remaining implementation details of Queue is irrelevant in this discussion … The KEY POINT here is that an OS pipe has limited capacity (man 7 pipe) (same applied to sockets)! If a pipe is full, a process/thread trying to write (using system call write) to the pipe will block!

Having this concept in mind, it is natural to understand that deadlock will occur if no other concurrent threads/processes remove items on the queue before joining the producer processes; since join() will wait until the feeder thread has managed to write all the items to the underlying pipe/socket, and this thread will hang if the pipe/socket is full. In conclusion, this is a system (OS) limitation, not a bug in the library. And you need to understand this limitation to use the library correctly and effectively.

In the case of my personal project, something similar happened, although more complicated: there were several JoinableQueues created and separate groups of processes handling tasks on those different queues. Group 1 processes dequeue from queue 1 and enqueue queue 2; group 2 processes dequeue from queue 2 and enqueue queue 3. The problem occurs when group 1 processes are joined before all the items in queue 2 are popped by group 2 processes and the underlying pipes for queue 2 is full. The same thing also happened to group 2 processes. This slows down the entire processing as congested pipes for one queue causes congestion for multiple queues.

Of course, this is not a good design regarding the architecture and can be avoided by using only one task queue and only joining processes until all the tasks are processed. However, instead of adapting it with the same tools provided by the standard library, I used Redis List to replace the entire queuing infrastructure.

The reason I wanted to use Redis was solely because I want to play with this NOSQL database and see how it works. It turned out to be fairly easy to use and a drop-in replacement for the queueing infrastructure perfectly. I do not know the details of how Redis Lists are implemented yet, but it looks like it did not suffer from pipe-congestion problems at all and run fast. Here is an example of how to use Redis Lists as a message queue.