Submitting Functions

class scalable.ScalableClient(cluster: Any, *args: Any, **kwargs: Any)[source]

Bases: Client

Client for submitting tasks to a Dask cluster. Inherits the dask client object.

Parameters:

cluster (Cluster) – The cluster object to connect to for submitting tasks.

__init__(cluster: Any, *args: Any, **kwargs: Any) None[source]

Initialize a client bound to an existing cluster/scheduler.

scalable.ScalableClient.submit(self, func: Any, *args: Any, tag: str | None = None, n: int = 1, **kwargs: Any) Any

Submit a function to be ran by workers in the cluster.

Parameters:
  • func (function) – Function to be scheduled for execution.

  • *args (tuple) – Optional positional arguments to pass to the function.

  • tag (str (optional)) – User-defined tag for the container that can run func. If not provided, func is assigned to be ran on a random container.

  • n (int (default 1)) – Number of workers needed to run this task. Meant to be used with tag. Multiple workers can be useful for application level distributed computing.

  • **kwargs (dict (optional)) – Optional key-value pairs to be passed to the function.

Examples

>>> c = client.submit(add, a, b)
Returns:

Returns the future object that runs the function.

Return type:

Future

Raises:
  • TypeError – If ‘func’ is not callable, a TypeError is raised.

  • ValueError – If ‘allow_other_workers’is True and ‘workers’ is None, a ValueError is raised.

scalable.ScalableClient.map(self, func: Any, *parameters: Iterable[Any], tag: str | None = None, n: int = 1, **kwargs: Any) Any

Map a function on multiple sets of arguments to run the function multiple times with different inputs.

Parameters:
  • func (function) – Function to be scheduled for execution.

  • parameters (list of lists) – Lists of parameters to be passed to the function. The first list should have the first parameter values, the second list should have the second parameter values, and so on. The lists should be of the same length.

  • tag (str (optional)) – User-defined tag for the container that can run func. If not provided, func is assigned to be ran on a random container.

  • n (int (default 1)) – Number of workers needed to run this task. Meant to be used with tag. Multiple workers can be useful for application level distributed computing.

  • *args (tuple) – Positional arguments to pass to dask client’s map method.

  • **kwargs (dict) – Keyword arguments to pass to dask client’s map method.

Examples

>>> def add(a, b): ...
>>> L = client.map(add, [[1, 2, 3], [4, 5, 6]])
Returns:

Returns a list of future objects, each for a separate run of the function with the given parameters.

Return type:

List of futures

scalable.ScalableClient.get_versions(self, check: bool = False, packages: list[str] | None = None) Any

Return version info for the scheduler, all workers and myself

Parameters:
  • check (bool) – Raise ValueError if all required & optional packages do not match. Default is False.

  • packages (list) – Extra package names to check.

Examples

>>> c.get_versions()
>>> c.get_versions(packages=['sklearn', 'geopandas'])
scalable.ScalableClient.cancel(self, futures: Any, *args: Any, **kwargs: Any) Any

Cancel running futures This stops future tasks from being scheduled if they have not yet run and deletes them if they have already run. After calling, this result and all dependent results will no longer be accessible

Parameters:
  • futures (future | future, list) – One or more futures to cancel (as a list).

  • *args (tuple) – Positional arguments to pass to dask client’s cancel method.

  • **kwargs (dict) – Keyword arguments to pass to dask client’s cancel method.

scalable.ScalableClient.close(self, timeout: Any = <no_default>) Any

Close this client

Clients will also close automatically when your Python session ends

Parameters:

timeout (number) – Time in seconds after which to raise a dask.distributed.TimeoutError

Inherited Dask Client API

ScalableClient extends distributed.Client and keeps core Dask client behavior unchanged for methods not overridden by Scalable.

Note

Methods such as gather are inherited directly from distributed.Client.

distributed.Client.gather(self, futures, errors='raise', direct=None, asynchronous=None)

Gather futures from distributed memory

Accepts a future, nested container of futures, iterator, or queue. The return type will match the input type.

Parameters:
  • futures (Collection of futures) – This can be a possibly nested collection of Future objects. Collections can be lists, sets, or dictionaries

  • errors (string) – Either ‘raise’ or ‘skip’ if we should raise if a future has erred or skip its inclusion in the output collection

  • direct (boolean) – Whether or not to connect directly to the workers, or to ask the scheduler to serve as intermediary. This can also be set when creating the Client.

  • asynchronous (bool) – If True the client is in asynchronous mode

Returns:

  • results (a collection of the same type as the input, but now with)

  • gathered results rather than futures

Examples

>>> from operator import add
>>> c = Client('127.0.0.1:8787')
>>> x = c.submit(add, 1, 2)
>>> c.gather(x)
3
>>> c.gather([x, [x], x])  # support lists and dicts
[3, [3], 3]

See also

Client.scatter

Send data out to cluster