fiber.pool
Pools are supported by Fiber. They allow the user to manage a pool of worker processes. Fiber extend pools with job-backed processes so that it can manage thousands of (remote) workers per pool. Users can also create multiple pools at the same time.
Fiber implements 2 different version of Pool
: ZPool
and ResilientZPool
.
Both has the same API as multiprocessing.Pool
. ZPool
is pool based on
"r"/"w" socket pairs.
ResilientZPool
is ZPool
+ error handling.
Failed tasks will be resubmitted to the Pool and worked on by other pool
workers.
By default, ResilientZPool
is exposed as fiber.Pool
.
Example:
pool = fiber.Pool(processes=4) pool.map(math.sqrt, range(10))
ApplyResult
ApplyResult(self, seq, inventory)
An object that is returned by asynchronous methods of Pool
. It
represents an handle that can be used to get the actual result.
get
ApplyResult.get()
Get the actual result represented by this object
Returns:
Actual result. This method will block if the actual result is not ready.
MapResult
MapResult(self, seq, inventory)
ZPool
ZPool(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, cluster=None, master_sock_type='w')
A Pool implementation based on Fiber sockets.
ZPool directly uses Fiber sockets instead of SimpleQueue for tasks and results handling. This makes it faster.
apply_async
ZPool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
Run function func
with arguments args
and keyword arguments kwds
on a remote Pool worker. This is an asynchronous version of apply
.
Arguments:
func
: target function to run.args
: positional arguments that needs to be passed tofunc
.kwds
: keyword arguments that needs to be passed tofunc
.callback
: Currently not supported. A callback function that will be called when the result is ready.error_callback
: Currently not supported. A callback function that will be called when an error occurred.
Returns:
An ApplyResult object which has a method .get()
to get
the actual results.
map_async
ZPool.map_async(func, iterable, chunksize=None, callback=None, error_callback=None)
For each element e
in iterable
, run func(e)
. The workload is
distributed between all the Pool workers. This is an asynchronous
version of map
.
Arguments:
func
: target function to run.iterable
: an iterable object to be mapped.chunksize
: if set, elements initerable
will be put in to chunks whose size is decided bychunksize
. These chunks will be sent to Pool workers instead of each elements initerable
. If not set, the chunksize is decided automatically.callback
: Currently not supported. A callback function that will be called when the result is ready.error_callback
: Currently not supported. A callback function that will be called when an error occurred.
Returns:
An MapResult object which has a method .get()
to get
the actual results.
apply
ZPool.apply(func, args=(), kwds={})
Run function func
with arguments args
and keyword arguments kwds
on a remote Pool worker.
Arguments:
func
: target function to run.args
: positional arguments that needs to be passed tofunc
.kwds
: keyword arguments that needs to be passed tofunc
.
Returns:
the return value of func(*args, **kwargs)
.
map
ZPool.map(func, iterable, chunksize=None)
For each element e
in iterable
, run func(e)
. The workload is
distributed between all the Pool workers.
Arguments:
func
: target function to run.iterable
: an iterable object to be mapped.chunksize
: if set, elements initerable
will be put in to chunks whose size is decided bychunksize
. These chunks will be sent to Pool workers instead of each elements initerable
. If not set, the chunksize is decided automatically.
Returns:
A list of results equivalent to calling
[func(x) for x in iterable]
.
imap
ZPool.imap(func, iterable, chunksize=1)
For each element e
in iterable
, run func(e)
. The workload is
distributed between all the Pool workers. This function returns an
iterator which user and iterate over to get results.
Arguments:
func
: target function to run.iterable
: an iterable object to be mapped.chunksize
: if set, elements initerable
will be put in to chunks whose size is decided bychunksize
. These chunks will be sent to Pool workers instead of each elements initerable
. If not set, the chunksize is decided automatically.
Returns:
an iterator which user can use to get results.
imap_unordered
ZPool.imap_unordered(func, iterable, chunksize=1)
For each element e
in iterable
, run func(e)
. The workload is
distributed between all the Pool workers. This function returns an
unordered iterator which user and iterate over to get results.
This means that the order of the results may not match the order of
the iterable
.
Arguments:
func
: target function to run.iterable
: an iterable object to be mapped.chunksize
: if set, elements initerable
will be put in to chunks whose size is decided bychunksize
. These chunks will be sent to Pool workers instead of each elements initerable
. If not set, the chunksize is decided automatically.
Returns:
an unordered iterator which user can use to get results.
starmap_async
ZPool.starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)
For each element args
in iterable
, run func(*args)
. The workload
is distributed between all the Pool workers. This is an asynchronous
version of starmap
.
For example, starmap_async(func, [(1, 2, 3), (4, 5, 6)])
will result
in calling func(1, 2, 3)
and func(4, 5, 6)
on a remote host.
Arguments:
func
: target function to run.iterable
: an iterable object to be mapped.chunksize
: if set, elements initerable
will be put in to chunks whose size is decided bychunksize
. These chunks will be sent to Pool workers instead of each elements initerable
. If not set, the chunksize is decided automatically.callback
: Currently not supported. A callback function that will be called when the result is ready.error_callback
: Currently not supported. A callback function that will be called when an error occurred.
Returns:
An MapResult object which has a method .get()
to get
the actual results.
starmap
ZPool.starmap(func, iterable, chunksize=None)
For each element args
in iterable
, run func(*args)
. The workload
is distributed between all the Pool workers.
For example, starmap_async(func, [(1, 2, 3), (4, 5, 6)])
will result
in calling func(1, 2, 3)
and func(4, 5, 6)
on a remote host.
Arguments:
func
: target function to run.iterable
: an iterable object to be mapped.chunksize
: if set, elements initerable
will be put in to chunks whose size is decided bychunksize
. These chunks will be sent to Pool workers instead of each elements initerable
. If not set, the chunksize is decided automatically.callback
: Currently not supported. A callback function that will be called when the result is ready.error_callback
: Currently not supported. A callback function that will be called when an error occurred.
Returns:
A list of results equivalent to calling
[func(*arg) for arg in iterable]
close
ZPool.close()
Close this Pool. This means the current pool will be put in to a closing state and it will not accept new tasks. Existing workers will continue to work on tasks that have been dispatched to them and exit when all the tasks are done.
terminate
ZPool.terminate()
Terminate this pool. This means that this pool will be terminated and all its pool workers will also be terminated. Task that have been dispatched will be discarded.
join
ZPool.join()
Wait for all the pool workers of this pool to exit. This should be
used after terminate()
or close()
are called on this pool.
ResilientZPool
ResilientZPool(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, cluster=None)
ZPool with error handling. The differences are:
- Master socket is a ROUTER socket instead of DEALER socket.
- Add pending table.
- When an died worker is detected, it's jobs are resubmitted to work Q in addition to restarting that worker.
The API of ResilientZPool
is the same as ZPool
. One difference is that
if processes
argument is not set, its default value is 1.