Message queues are useful in a sense that they facilitate a simple way/pattern for the communications among
multiple processes/threads. Python’s
multiprocessing package also provides data structures with more
convenient interface to facilitate message passing among processes.
I tried using
multiprocessing.JoinableQueue class in one of my personal projects some time ago.
is a subclass of
multiprocessing.Queue class and both are modelled on the
Queue.Queue class in the standard
library. While it was straightforward to work with, the program I created got stuck and ran painfully slowly
I did a bit of research and found a limitation of using
JoinableQueue. Now let’s get into the details of what
could go wrong.
First of all, Let’s check the source code of
multiprocessing package (
cpython/Lib/multiprocessing/) to figure
cpython/Lib/multiprocessing/queues.py contains the implementations of three classes:
__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
Queue is the base class of
JoinableQueue, most of the important inner workings are implemented in
Queue. The first few lines of
Queue class is as follows:
# # Queue type using a pipe, buffer and thread # class Queue(object): def __init__(self, maxsize=0, *, ctx): if maxsize <= 0: # Can raise ImportError (see issues #3770 and #23400) from .synchronize import SEM_VALUE_MAX as maxsize self._maxsize = maxsize self._reader, self._writer = connection.Pipe(duplex=False) ...
There are two important things to pay attention here:
- The comment mentioned that Queue class uses a pipe, buffer and thread underneath. The buffer is
actually a deque (
collections.deque); and the thread (dedicated feeder thread) transfers data from the deque to the pipe, which brings us to the next point …
- The pipe, created via
connection.Pipe()call, is a data structure having a reading and writing ends forming a chanel for interprocess communication.
The pipe is either an OS pipe or Unix Domain Socket depends on the value of the keywoard argument
passed to the
# An excerpt from cpython/Lib/multiprocessing/connection.py if sys.platform != 'win32': def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else: fd1, fd2 = os.pipe() c1 = Connection(fd1, writable=False) c2 = Connection(fd2, readable=False) return c1, c2
According to the source code above, it is clear that the pipe is actually an OS pipe and thus an
unidirectional channel for processes. The remaining implementation details of
Queue is irrelevant
in this discussion … The KEY POINT here is that an OS pipe has limited capacity (
man 7 pipe)
(same applied to sockets)! If a pipe is full, a process/thread trying to write (using system call
write) to the pipe will block!
Having this concept in mind, it is natural to understand that deadlock will occur if no other concurrent
threads/processes remove items on the queue before joining the producer processes; since
join() will wait
until the feeder thread has managed to write all the items to the underlying pipe/socket, and this thread
will hang if the pipe/socket is full. In conclusion, this is a system (OS) limitation, not a bug in the
library. And you need to understand this limitation to use the library correctly and effectively.
In the case of my personal project, something similar happened, although more complicated: there were several
JoinableQueues created and separate groups of processes handling tasks on those different queues. Group 1 processes
dequeue from queue 1 and enqueue queue 2; group 2 processes dequeue from queue 2 and enqueue queue 3. The
problem occurs when group 1 processes are joined before all the items in queue 2 are popped by group 2 processes
and the underlying pipes for queue 2 is full. The same thing also happened to group 2 processes. This slows
down the entire processing as congested pipes for one queue causes congestion for multiple queues.
Of course, this is not a good design regarding the architecture and can be avoided by using only one task queue and only joining processes until all the tasks are processed. However, instead of adapting it with the same tools provided by the standard library, I used Redis List to replace the entire queuing infrastructure.
The reason I wanted to use Redis was solely because I want to play with this NOSQL database and see how it works. It turned out to be fairly easy to use and a drop-in replacement for the queueing infrastructure perfectly. I do not know the details of how Redis Lists are implemented yet, but it looks like it did not suffer from pipe-congestion problems at all and run fast. Here is an example of how to use Redis Lists as a message queue.