Tutorial 3: Scaling Strategies with Providers

What You Will Learn

By the end of this tutorial you will:

  • Understand Scalable’s provider architecture and how it abstracts execution backends.

  • Configure and use the Local, Slurm, and Cloud providers.

  • Choose appropriate scaling strategies for different workload profiles.

  • Implement manual scaling, adaptive scaling, and objective-driven planning.

  • Monitor scaling decisions through the Session API.

Prerequisites

Scenario

Your energy forecasting pipeline has grown. Development happens locally with 2–4 workers. Production runs on an HPC cluster with 64+ workers. Burst capacity uses cloud auto-scaling. You need a unified scaling approach that works across all three environments.

Step 1: The Provider Architecture

Scalable separates what runs from where it runs through the DeploymentProvider protocol:

┌──────────────┐     ┌──────────────────┐     ┌─────────────┐
│  Manifest    │────▶│ DeploymentSpec    │────▶│  Provider   │
│ (scalable.yaml)    │ (provider-neutral)│     │ (backend)   │
└──────────────┘     └──────────────────┘     └──────┬──────┘
                                                      │
                     ┌────────────────────────────────┼────────┐
                     │                                │        │
               ┌─────▼──────┐  ┌──────▼──────┐  ┌───▼────────┐
               │   Local    │  │    Slurm    │  │Cloud / K8s  │
               │  Provider  │  │  Provider   │  │  Provider   │
               └────────────┘  └─────────────┘  └─────────────┘

Every provider implements the same interface:

class DeploymentProvider(Protocol):
    name: str

    def validate(self, spec: DeploymentSpec) -> ValidationReport: ...
    def build_cluster(self, spec: DeploymentSpec) -> ClusterHandle: ...
    def scale(self, cluster: ClusterHandle, plan: ScalePlan) -> None: ...
    def estimate_cost(self, spec: DeploymentSpec, plan: ScalePlan) -> CostEstimate | None: ...

This means your workflow code is provider-agnostic — the same client.submit(func, arg, tag="demeter") call works identically whether the cluster is local threads, Slurm jobs, or Kubernetes pods.

Step 2: Local Provider — Development & CI

The LocalProvider wraps Dask’s LocalCluster. It is the fastest way to iterate:

targets:
  local:
    provider: local
    max_workers: 4
    threads_per_worker: 2
    processes: false
    containers: none

Key options:

Option

Default

Behavior

max_workers

1

Total worker count across all component groups.

threads_per_worker

1

Threads per Dask worker process/thread.

processes

false

true → each worker is a separate process (memory isolation). false → threaded workers (faster startup, shared memory).

containers

none

none = bare-metal; docker = future container support.

When to use processes vs threads:

  • Threads (processes: false): Best for I/O-bound tasks, quick iteration, and CI where startup speed matters. All workers share one process, so a memory leak in one affects all.

  • Processes (processes: true): Best for CPU-bound tasks or tasks that hold the GIL (e.g., calling C extensions that don’t release it). Each worker is isolated but has serialization overhead.

Running with the local provider:

from scalable import ScalableSession

session = ScalableSession.from_yaml("./scalable.yaml", target="local")
plan = session.plan(dry_run=True)
print(f"Scale plan: {plan.scale_plan}")
# {'analysis': ResourceRequest(cpus=1, memory='1G'), count=4}

client = session.start(plan)
# ... submit work ...
session.close()

Step 3: Slurm Provider — HPC Scaling

The SlurmProvider submits Dask workers as Slurm batch jobs. Each job runs inside a container (via Apptainer) on allocated HPC nodes:

targets:
  hpc:
    provider: slurm
    queue: batch
    account: GCIMS
    walltime: "04:00:00"
    interface: ib0

components:
  demeter:
    image: ghcr.io/jgcri/demeter:2.0.1
    cpus: 10
    memory: 20G
    mounts:
      /qfs/people/user/work/gcam-core: /gcam-core
      /rcfs: /rcfs

The Slurm provider:

  1. Generates sbatch scripts for each worker (one job per worker).

  2. Passes resource requests (CPUs, memory, walltime) to the scheduler.

  3. Launches workers inside Apptainer containers with the specified mounts.

  4. Workers connect back to the Dask scheduler on the host via the network interface (ib0 for InfiniBand, eth0 for Ethernet).

Scaling Slurm workers manually:

from scalable import SlurmCluster, ScalableClient

cluster = SlurmCluster(
    queue="batch",
    account="GCIMS",
    walltime="04:00:00",
    interface="ib0",
)

# Register component profiles
cluster.add_container(
    tag="demeter",
    cpus=10,
    memory="20G",
    dirs={"/qfs/people/user/work/gcam-core": "/gcam-core"},
)

# Scale up — submits 5 Slurm jobs
cluster.add_workers(n=5, tag="demeter")

# Submit work
client = ScalableClient(cluster)
futures = [client.submit(run_demeter_scenario, scenario, tag="demeter") for scenario in scenarios]
results = client.gather(futures)

# Scale down — cancels 3 Slurm jobs
cluster.remove_workers(n=3, tag="demeter")

Why explicit tag-based scaling? HPC jobs are expensive. Unlike cloud auto-scaling where you can spin up instances in seconds, Slurm jobs may wait in queue for minutes or hours. Scalable gives you explicit control over how many workers to allocate per component, so you can match your budget and queue availability.

Step 4: Session-Based Scaling with Objectives

The Session API supports objective-driven planning that automatically determines worker counts:

from scalable import ScalableSession

session = ScalableSession.from_yaml("./scalable.yaml", target="hpc")

# Minimize cost: fewest workers that finish within walltime
plan = session.plan(
    objective="minimize cost",
    policy="safe",
)
print(f"Workers: {plan.scale_plan}")
# Might allocate 3 workers × gcam

# Minimize time: maximum parallelism within resource limits
plan = session.plan(
    objective="minimize time",
    policy="aggressive",
)
print(f"Workers: {plan.scale_plan}")
# Might allocate 16 workers × gcam

# Balance: cost-time Pareto front midpoint
plan = session.plan(
    objective="balance",
    policy="safe",
)

Objectives:

  • "minimize cost" — Fewest workers that keep total runtime within walltime.

  • "minimize time" — Maximum workers within resource bounds.

  • "balance" — Midpoint between the two extremes.

Policies:

  • "safe" — Add headroom (20% over predicted requirements). Prefer fewer scaling decisions.

  • "aggressive" — Pack tightly. Scale immediately on threshold.

  • "manual" — Use exactly the worker counts from the manifest (no adjustment).

Step 5: Adaptive Scaling at Runtime

For long-running workflows where task load varies, the AdaptiveScaler monitors queue depth and adjusts workers in real-time:

from scalable import AdaptiveScaler, ScalableSession

session = ScalableSession.from_yaml("./scalable.yaml", target="aws")
client = session.start()

scaler = AdaptiveScaler(
    min_workers={"demeter": 2, "postprocess": 1},
    max_workers={"demeter": 20, "postprocess": 10},
    scale_up_threshold=0.8,    # Scale up when 80% of workers are busy
    scale_down_threshold=0.2,  # Scale down when <20% utilization
    cooldown_seconds=120,      # Wait 2 min between decisions
)

# In your task submission loop:
for batch in scenario_batches:
    futures = [client.submit(run_demeter_scenario, s, tag="demeter") for s in batch]

    # Evaluate scaling after each batch
    decision = scaler.evaluate(
        pending_tasks=[{"tag": "gcam"} for _ in range(len(batch))],
        active_workers={"demeter": client.worker_count("gcam")},
    )

    if decision.has_changes:
        print(f"Scaling: +{decision.workers_to_add} -{decision.workers_to_remove}")
        print(f"Reason: {decision.reasoning}")
        # Apply the decision (provider-specific)
        # ...

The AdaptiveScaler returns a ScaleDecision with:

  • workers_to_add: dict mapping tag → count to add.

  • workers_to_remove: dict mapping tag → count to remove.

  • reasoning: human-readable explanation of the decision.

  • confidence: model confidence (0.0–1.0) in the recommendation.

  • predicted_completion_time: estimated seconds to finish remaining tasks.

Step 6: Cloud Provider Auto-Scaling

Cloud providers (AWS, GCP) support declarative adaptive scaling via manifest configuration:

targets:
  aws:
    provider: aws
    region: us-east-1
    cluster_type: fargate
    adaptive:
      minimum: 2
      maximum: 20

The cloud provider handles scale-up/down automatically based on the Dask scheduler’s task backlog. The minimum and maximum set hard bounds:

  • Minimum workers are always running (reduces cold-start latency).

  • Maximum caps costs during burst periods.

Cost-performance trade-off:

┌────────────────────────────────────────────────────────┐
│  Aggressive (max workers)                              │
│  ├── Fastest completion                                │
│  ├── Highest cost                                      │
│  └── Risk: idle workers during low-load phases         │
│                                                        │
│  Conservative (min workers)                            │
│  ├── Lowest cost                                       │
│  ├── Slowest completion                                │
│  └── Risk: queue buildup during bursts                 │
│                                                        │
│  Adaptive (dynamic scaling)                            │
│  ├── Best cost-performance ratio                       │
│  ├── Requires cooldown tuning                          │
│  └── Latency: scale-up takes 30–90s for cloud          │
└────────────────────────────────────────────────────────┘

Step 7: Heterogeneous Worker Pools

Real workflows often need different resource profiles running simultaneously. Scalable supports heterogeneous pools via multiple components:

components:
  demeter:
    cpus: 8
    memory: 32G
    tags: [compute-heavy]

  postprocess:
    cpus: 2
    memory: 4G
    tags: [io-bound]

tasks:
  simulate:
    component: demeter
  analyze:
    component: postprocess

In your workflow, you submit to each pool independently:

# Compute-heavy tasks go to gcam workers
sim_futures = [
    client.submit(run_simulation, params, tag="demeter")
    for params in simulation_params
]

# Wait for simulations, then post-process on lighter workers
sim_results = client.gather(sim_futures)

analysis_futures = [
    client.submit(aggregate, result, tag="postprocess")
    for result in sim_results
]

final = client.gather(analysis_futures)

This pattern avoids over-provisioning: expensive 32 GB workers handle the heavy lifting while cheap 4 GB workers handle aggregation.

Step 8: Scaling Decision Monitoring

Track all scaling decisions via telemetry:

# After workflow completes
session.close()

# Review scaling history
for decision in scaler.decision_history:
    print(
        f"[{decision.timestamp}] "
        f"+{decision.workers_to_add} "
        f"-{decision.workers_to_remove} "
        f"({decision.reasoning})"
    )

Telemetry also records worker lifecycle events:

scalable report --latest
Workers:
  demeter: 5 started, 3 removed, 2 final
  postprocess: 2 started, 0 removed, 2 final
Scaling events: 3 scale-up, 1 scale-down

Troubleshooting

Slurm workers never connect

Check that the interface option matches your cluster’s high-speed network (ib0, eth0, etc.). Workers must reach the scheduler host on this interface. Also ensure firewall rules allow the Dask scheduler port (default 8786).

Cloud workers take too long to start

Fargate cold-start can take 30–90 seconds. Set adaptive.minimum to at least 1–2 so warm workers are always available. For EC2-backed clusters, pre-warmed AMIs reduce startup time.

“max_workers must be a positive integer”

This validation error means max_workers was set to 0, a negative number, or a non-integer type. Check for YAML parsing issues (e.g., quoting numbers as strings).

Workers idle but no tasks are submitted

If using adaptive scaling with a high minimum, workers persist even when no work is available. Lower adaptive.minimum or add a cooldown_seconds of at least 60 to the AdaptiveScaler.

Next Steps