fiber.queues

Queues and pipes in Fiber behave the same as in multiprocessing. The difference is that queues and pipes are now shared by multiple processes running on different machines. Two processes can read from and write to the same pipe. Furthermore, queues can be shared between many processes on different machines and each process can send to or receive from the same queue at the same time. Fiber's queue is implemented with Nanomsg, a high-performance asynchronous message queue system.

We implemented two version of SimpleQueue: SimpleQueuePush and SimpleQueuePull. To decide which queue to use, you can set fiber.config.use_push_queue to True to use SimpleQueuePush or False to use SimpleQueuePull.

The difference between SimpleQueuePush and SimpleQueuePull is whether data is push from master process to sub-processes or data is requested by sub-processes. When SimpleQueuePull is used, sub-processes will send an message to the master process to notify master process that it is ready. And the master process will send the data to sub-processes. In this way, data in the queue are load-balanced between different sub-processes due to the workload of them. When SimpleQueuePush is used, all the data are push to the sub-processes as soon as possible and the data are load-balanced in a round-robin manner. This way, there is less overhead for sending the data to the client, but there may be uneven load between each of the sub-processes.

Example:

q = fiber.SimpleQueue()
q.put(42)

# By default, `SimpleQueue` is `SimpleQueuePull`, now we switch to
# `SimpleQueuePush`
fiber.config.use_push_queue = True
q = fiber.SimpleQueue()

SimpleQueuePush

SimpleQueuePush(self)

A queue build on top of Fiber socket. It uses "w" - ("r" - "w") - "r" socket combination. Messages are pushed from one end of the queue to the other end without explicitly pulling.

get

SimpleQueuePush.get()

Get an element from this Queue.

Returns:

An element from this queue. If there is no element in the queue, this method will block.

put

SimpleQueuePush.put(obj)

Put an element into the Queue.

Arguments:

  • obj: Any picklable Python object.

    Pipe

Pipe(duplex=True)

Return a pair of connected ZConnection objects.

Arguments:

  • duplex: if duplex, then both read and write are allowed on each of the returned connection object. Otherwise, the first returned connection will be read-only and the second connection will be write-only. By default, duplex is enabled.

    ZConnection

ZConnection(self, handle, readable=True, writable=True)

A Connection class implemented with Fiber socket.

It takes a (sock_type, dest_addr) tuple as input, creates a new connection which can send and recv Python objects. This ZConnection class can be serialized by pickle and then send to another remote process. Fiber socket will be reconnected after unpickle.

Arguments:

  • handle: a (sock_type, dest_addr) tuple. sock_type should be a Fiber socket type. dest_addr should be in format like "tcp://127.0.0.1:9000".

note: ZConnection's fileno method returns a Fiber socket.