fiber.pool

Pools are supported by Fiber. They allow the user to manage a pool of worker processes. Fiber extend pools with job-backed processes so that it can manage thousands of (remote) workers per pool. Users can also create multiple pools at the same time.

Fiber implements 2 different version of Pool: ZPool and ResilientZPool. Both has the same API as multiprocessing.Pool. ZPool is pool based on "r"/"w" socket pairs. ResilientZPool is ZPool + error handling. Failed tasks will be resubmitted to the Pool and worked on by other pool workers.

By default, ResilientZPool is exposed as fiber.Pool.

Example:

pool = fiber.Pool(processes=4)
pool.map(math.sqrt, range(10))

ApplyResult

ApplyResult(self, seq, inventory)

An object that is returned by asynchronous methods of Pool. It represents an handle that can be used to get the actual result.

get

ApplyResult.get()

Get the actual result represented by this object

Returns:

Actual result. This method will block if the actual result is not ready.

MapResult

MapResult(self, seq, inventory)

ZPool

ZPool(self,
      processes=None,
      initializer=None,
      initargs=(),
      maxtasksperchild=None,
      cluster=None,
      master_sock_type='w')

A Pool implementation based on Fiber sockets.

ZPool directly uses Fiber sockets instead of SimpleQueue for tasks and results handling. This makes it faster.

apply_async

ZPool.apply_async(func,
                  args=(),
                  kwds={},
                  callback=None,
                  error_callback=None)

Run function func with arguments args and keyword arguments kwds on a remote Pool worker. This is an asynchronous version of apply.

Arguments:

  • func: target function to run.
  • args: positional arguments that needs to be passed to func.
  • kwds: keyword arguments that needs to be passed to func.
  • callback: Currently not supported. A callback function that will be called when the result is ready.
  • error_callback: Currently not supported. A callback function that will be called when an error occurred.

Returns:

An ApplyResult object which has a method .get() to get the actual results.

map_async

ZPool.map_async(func,
                iterable,
                chunksize=None,
                callback=None,
                error_callback=None)

For each element e in iterable, run func(e). The workload is distributed between all the Pool workers. This is an asynchronous version of map.

Arguments:

  • func: target function to run.
  • iterable: an iterable object to be mapped.
  • chunksize: if set, elements in iterable will be put in to chunks whose size is decided by chunksize. These chunks will be sent to Pool workers instead of each elements in iterable. If not set, the chunksize is decided automatically.
  • callback: Currently not supported. A callback function that will be called when the result is ready.
  • error_callback: Currently not supported. A callback function that will be called when an error occurred.

Returns:

An MapResult object which has a method .get() to get the actual results.

apply

ZPool.apply(func, args=(), kwds={})

Run function func with arguments args and keyword arguments kwds on a remote Pool worker.

Arguments:

  • func: target function to run.
  • args: positional arguments that needs to be passed to func.
  • kwds: keyword arguments that needs to be passed to func.

Returns:

the return value of func(*args, **kwargs).

map

ZPool.map(func, iterable, chunksize=None)

For each element e in iterable, run func(e). The workload is distributed between all the Pool workers.

Arguments:

  • func: target function to run.
  • iterable: an iterable object to be mapped.
  • chunksize: if set, elements in iterable will be put in to chunks whose size is decided by chunksize. These chunks will be sent to Pool workers instead of each elements in iterable. If not set, the chunksize is decided automatically.

Returns:

A list of results equivalent to calling [func(x) for x in iterable].

imap

ZPool.imap(func, iterable, chunksize=1)

For each element e in iterable, run func(e). The workload is distributed between all the Pool workers. This function returns an iterator which user and iterate over to get results.

Arguments:

  • func: target function to run.
  • iterable: an iterable object to be mapped.
  • chunksize: if set, elements in iterable will be put in to chunks whose size is decided by chunksize. These chunks will be sent to Pool workers instead of each elements in iterable. If not set, the chunksize is decided automatically.

Returns:

an iterator which user can use to get results.

imap_unordered

ZPool.imap_unordered(func, iterable, chunksize=1)

For each element e in iterable, run func(e). The workload is distributed between all the Pool workers. This function returns an unordered iterator which user and iterate over to get results. This means that the order of the results may not match the order of the iterable.

Arguments:

  • func: target function to run.
  • iterable: an iterable object to be mapped.
  • chunksize: if set, elements in iterable will be put in to chunks whose size is decided by chunksize. These chunks will be sent to Pool workers instead of each elements in iterable. If not set, the chunksize is decided automatically.

Returns:

an unordered iterator which user can use to get results.

starmap_async

ZPool.starmap_async(func,
                    iterable,
                    chunksize=None,
                    callback=None,
                    error_callback=None)

For each element args in iterable, run func(*args). The workload is distributed between all the Pool workers. This is an asynchronous version of starmap.

For example, starmap_async(func, [(1, 2, 3), (4, 5, 6)]) will result in calling func(1, 2, 3) and func(4, 5, 6) on a remote host.

Arguments:

  • func: target function to run.
  • iterable: an iterable object to be mapped.
  • chunksize: if set, elements in iterable will be put in to chunks whose size is decided by chunksize. These chunks will be sent to Pool workers instead of each elements in iterable. If not set, the chunksize is decided automatically.
  • callback: Currently not supported. A callback function that will be called when the result is ready.
  • error_callback: Currently not supported. A callback function that will be called when an error occurred.

Returns:

An MapResult object which has a method .get() to get the actual results.

starmap

ZPool.starmap(func, iterable, chunksize=None)

For each element args in iterable, run func(*args). The workload is distributed between all the Pool workers.

For example, starmap_async(func, [(1, 2, 3), (4, 5, 6)]) will result in calling func(1, 2, 3) and func(4, 5, 6) on a remote host.

Arguments:

  • func: target function to run.
  • iterable: an iterable object to be mapped.
  • chunksize: if set, elements in iterable will be put in to chunks whose size is decided by chunksize. These chunks will be sent to Pool workers instead of each elements in iterable. If not set, the chunksize is decided automatically.
  • callback: Currently not supported. A callback function that will be called when the result is ready.
  • error_callback: Currently not supported. A callback function that will be called when an error occurred.

Returns:

A list of results equivalent to calling [func(*arg) for arg in iterable]

close

ZPool.close()

Close this Pool. This means the current pool will be put in to a closing state and it will not accept new tasks. Existing workers will continue to work on tasks that have been dispatched to them and exit when all the tasks are done.

terminate

ZPool.terminate()

Terminate this pool. This means that this pool will be terminated and all its pool workers will also be terminated. Task that have been dispatched will be discarded.

join

ZPool.join()

Wait for all the pool workers of this pool to exit. This should be used after terminate() or close() are called on this pool.

ResilientZPool

ResilientZPool(self,
               processes=None,
               initializer=None,
               initargs=(),
               maxtasksperchild=None,
               cluster=None)

ZPool with error handling. The differences are:

  • Master socket is a ROUTER socket instead of DEALER socket.
  • Add pending table.
  • When an died worker is detected, it's jobs are resubmitted to work Q in addition to restarting that worker.

The API of ResilientZPool is the same as ZPool. One difference is that if processes argument is not set, its default value is 1.