Submitting Functions¶
- class scalable.ScalableClient(cluster: Any, *args: Any, **kwargs: Any)[source]
Bases:
ClientClient for submitting tasks to a Dask cluster. Inherits the dask client object.
- Parameters:
cluster (Cluster) – The cluster object to connect to for submitting tasks.
- __init__(cluster: Any, *args: Any, **kwargs: Any) None[source]
Initialize a client bound to an existing cluster/scheduler.
- scalable.ScalableClient.submit(self, func: Any, *args: Any, tag: str | None = None, n: int = 1, **kwargs: Any) Any
Submit a function to be ran by workers in the cluster.
- Parameters:
func (function) – Function to be scheduled for execution.
*args (tuple) – Optional positional arguments to pass to the function.
tag (str (optional)) – User-defined tag for the container that can run func. If not provided, func is assigned to be ran on a random container.
n (int (default 1)) – Number of workers needed to run this task. Meant to be used with tag. Multiple workers can be useful for application level distributed computing.
**kwargs (dict (optional)) – Optional key-value pairs to be passed to the function.
Examples
>>> c = client.submit(add, a, b)
- Returns:
Returns the future object that runs the function.
- Return type:
Future
- Raises:
TypeError – If ‘func’ is not callable, a TypeError is raised.
ValueError – If ‘allow_other_workers’is True and ‘workers’ is None, a ValueError is raised.
- scalable.ScalableClient.map(self, func: Any, *parameters: Iterable[Any], tag: str | None = None, n: int = 1, **kwargs: Any) Any
Map a function on multiple sets of arguments to run the function multiple times with different inputs.
- Parameters:
func (function) – Function to be scheduled for execution.
parameters (list of lists) – Lists of parameters to be passed to the function. The first list should have the first parameter values, the second list should have the second parameter values, and so on. The lists should be of the same length.
tag (str (optional)) – User-defined tag for the container that can run func. If not provided, func is assigned to be ran on a random container.
n (int (default 1)) – Number of workers needed to run this task. Meant to be used with tag. Multiple workers can be useful for application level distributed computing.
*args (tuple) – Positional arguments to pass to dask client’s map method.
**kwargs (dict) – Keyword arguments to pass to dask client’s map method.
Examples
>>> def add(a, b): ... >>> L = client.map(add, [[1, 2, 3], [4, 5, 6]])
- Returns:
Returns a list of future objects, each for a separate run of the function with the given parameters.
- Return type:
List of futures
- scalable.ScalableClient.get_versions(self, check: bool = False, packages: list[str] | None = None) Any
Return version info for the scheduler, all workers and myself
- Parameters:
check (bool) – Raise ValueError if all required & optional packages do not match. Default is False.
packages (list) – Extra package names to check.
Examples
>>> c.get_versions()
>>> c.get_versions(packages=['sklearn', 'geopandas'])
- scalable.ScalableClient.cancel(self, futures: Any, *args: Any, **kwargs: Any) Any
Cancel running futures This stops future tasks from being scheduled if they have not yet run and deletes them if they have already run. After calling, this result and all dependent results will no longer be accessible
- Parameters:
futures (future | future, list) – One or more futures to cancel (as a list).
*args (tuple) – Positional arguments to pass to dask client’s cancel method.
**kwargs (dict) – Keyword arguments to pass to dask client’s cancel method.
- scalable.ScalableClient.close(self, timeout: Any = <no_default>) Any
Close this client
Clients will also close automatically when your Python session ends
- Parameters:
timeout (number) – Time in seconds after which to raise a
dask.distributed.TimeoutError
Inherited Dask Client API¶
ScalableClient extends distributed.Client and keeps core Dask client
behavior unchanged for methods not overridden by Scalable.
Note
Methods such as gather are inherited directly from
distributed.Client.
- distributed.Client.gather(self, futures, errors='raise', direct=None, asynchronous=None)
Gather futures from distributed memory
Accepts a future, nested container of futures, iterator, or queue. The return type will match the input type.
- Parameters:
futures (Collection of futures) – This can be a possibly nested collection of Future objects. Collections can be lists, sets, or dictionaries
errors (string) – Either ‘raise’ or ‘skip’ if we should raise if a future has erred or skip its inclusion in the output collection
direct (boolean) – Whether or not to connect directly to the workers, or to ask the scheduler to serve as intermediary. This can also be set when creating the Client.
asynchronous (bool) – If True the client is in asynchronous mode
- Returns:
results (a collection of the same type as the input, but now with)
gathered results rather than futures
Examples
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> x = c.submit(add, 1, 2) >>> c.gather(x) 3 >>> c.gather([x, [x], x]) # support lists and dicts [3, [3], 3]
See also
Client.scatterSend data out to cluster