Tutorial 3: Scaling Strategies with Providers¶
What You Will Learn¶
By the end of this tutorial you will:
Understand Scalable’s provider architecture and how it abstracts execution backends.
Configure and use the Local, Slurm, and Cloud providers.
Choose appropriate scaling strategies for different workload profiles.
Implement manual scaling, adaptive scaling, and objective-driven planning.
Monitor scaling decisions through the Session API.
Prerequisites¶
Completed Tutorial 1: Getting Started with Scalable and Tutorial 2: Mastering the Manifest System.
For HPC sections: access to a Slurm cluster (or follow along conceptually).
For cloud sections:
pip install scalable[cloud](or follow along conceptually).
Scenario¶
Your energy forecasting pipeline has grown. Development happens locally with 2–4 workers. Production runs on an HPC cluster with 64+ workers. Burst capacity uses cloud auto-scaling. You need a unified scaling approach that works across all three environments.
Step 1: The Provider Architecture¶
Scalable separates what runs from where it runs through the
DeploymentProvider protocol:
┌──────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Manifest │────▶│ DeploymentSpec │────▶│ Provider │
│ (scalable.yaml) │ (provider-neutral)│ │ (backend) │
└──────────────┘ └──────────────────┘ └──────┬──────┘
│
┌────────────────────────────────┼────────┐
│ │ │
┌─────▼──────┐ ┌──────▼──────┐ ┌───▼────────┐
│ Local │ │ Slurm │ │Cloud / K8s │
│ Provider │ │ Provider │ │ Provider │
└────────────┘ └─────────────┘ └─────────────┘
Every provider implements the same interface:
class DeploymentProvider(Protocol):
name: str
def validate(self, spec: DeploymentSpec) -> ValidationReport: ...
def build_cluster(self, spec: DeploymentSpec) -> ClusterHandle: ...
def scale(self, cluster: ClusterHandle, plan: ScalePlan) -> None: ...
def estimate_cost(self, spec: DeploymentSpec, plan: ScalePlan) -> CostEstimate | None: ...
This means your workflow code is provider-agnostic — the same
client.submit(func, arg, tag="demeter") call works identically whether the
cluster is local threads, Slurm jobs, or Kubernetes pods.
Step 2: Local Provider — Development & CI¶
The LocalProvider wraps Dask’s
LocalCluster. It is the fastest way to iterate:
targets:
local:
provider: local
max_workers: 4
threads_per_worker: 2
processes: false
containers: none
Key options:
Option |
Default |
Behavior |
|---|---|---|
|
1 |
Total worker count across all component groups. |
|
1 |
Threads per Dask worker process/thread. |
|
|
|
|
|
|
When to use processes vs threads:
Threads (
processes: false): Best for I/O-bound tasks, quick iteration, and CI where startup speed matters. All workers share one process, so a memory leak in one affects all.Processes (
processes: true): Best for CPU-bound tasks or tasks that hold the GIL (e.g., calling C extensions that don’t release it). Each worker is isolated but has serialization overhead.
Running with the local provider:
from scalable import ScalableSession
session = ScalableSession.from_yaml("./scalable.yaml", target="local")
plan = session.plan(dry_run=True)
print(f"Scale plan: {plan.scale_plan}")
# {'analysis': ResourceRequest(cpus=1, memory='1G'), count=4}
client = session.start(plan)
# ... submit work ...
session.close()
Step 3: Slurm Provider — HPC Scaling¶
The SlurmProvider submits Dask workers as
Slurm batch jobs. Each job runs inside a container (via Apptainer) on allocated
HPC nodes:
targets:
hpc:
provider: slurm
queue: batch
account: GCIMS
walltime: "04:00:00"
interface: ib0
components:
demeter:
image: ghcr.io/jgcri/demeter:2.0.1
cpus: 10
memory: 20G
mounts:
/qfs/people/user/work/gcam-core: /gcam-core
/rcfs: /rcfs
The Slurm provider:
Generates
sbatchscripts for each worker (one job per worker).Passes resource requests (CPUs, memory, walltime) to the scheduler.
Launches workers inside Apptainer containers with the specified mounts.
Workers connect back to the Dask scheduler on the host via the network interface (
ib0for InfiniBand,eth0for Ethernet).
Scaling Slurm workers manually:
from scalable import SlurmCluster, ScalableClient
cluster = SlurmCluster(
queue="batch",
account="GCIMS",
walltime="04:00:00",
interface="ib0",
)
# Register component profiles
cluster.add_container(
tag="demeter",
cpus=10,
memory="20G",
dirs={"/qfs/people/user/work/gcam-core": "/gcam-core"},
)
# Scale up — submits 5 Slurm jobs
cluster.add_workers(n=5, tag="demeter")
# Submit work
client = ScalableClient(cluster)
futures = [client.submit(run_demeter_scenario, scenario, tag="demeter") for scenario in scenarios]
results = client.gather(futures)
# Scale down — cancels 3 Slurm jobs
cluster.remove_workers(n=3, tag="demeter")
Why explicit tag-based scaling? HPC jobs are expensive. Unlike cloud auto-scaling where you can spin up instances in seconds, Slurm jobs may wait in queue for minutes or hours. Scalable gives you explicit control over how many workers to allocate per component, so you can match your budget and queue availability.
Step 4: Session-Based Scaling with Objectives¶
The Session API supports objective-driven planning that automatically determines worker counts:
from scalable import ScalableSession
session = ScalableSession.from_yaml("./scalable.yaml", target="hpc")
# Minimize cost: fewest workers that finish within walltime
plan = session.plan(
objective="minimize cost",
policy="safe",
)
print(f"Workers: {plan.scale_plan}")
# Might allocate 3 workers × gcam
# Minimize time: maximum parallelism within resource limits
plan = session.plan(
objective="minimize time",
policy="aggressive",
)
print(f"Workers: {plan.scale_plan}")
# Might allocate 16 workers × gcam
# Balance: cost-time Pareto front midpoint
plan = session.plan(
objective="balance",
policy="safe",
)
Objectives:
"minimize cost"— Fewest workers that keep total runtime within walltime."minimize time"— Maximum workers within resource bounds."balance"— Midpoint between the two extremes.
Policies:
"safe"— Add headroom (20% over predicted requirements). Prefer fewer scaling decisions."aggressive"— Pack tightly. Scale immediately on threshold."manual"— Use exactly the worker counts from the manifest (no adjustment).
Step 5: Adaptive Scaling at Runtime¶
For long-running workflows where task load varies, the
AdaptiveScaler monitors queue depth
and adjusts workers in real-time:
from scalable import AdaptiveScaler, ScalableSession
session = ScalableSession.from_yaml("./scalable.yaml", target="aws")
client = session.start()
scaler = AdaptiveScaler(
min_workers={"demeter": 2, "postprocess": 1},
max_workers={"demeter": 20, "postprocess": 10},
scale_up_threshold=0.8, # Scale up when 80% of workers are busy
scale_down_threshold=0.2, # Scale down when <20% utilization
cooldown_seconds=120, # Wait 2 min between decisions
)
# In your task submission loop:
for batch in scenario_batches:
futures = [client.submit(run_demeter_scenario, s, tag="demeter") for s in batch]
# Evaluate scaling after each batch
decision = scaler.evaluate(
pending_tasks=[{"tag": "gcam"} for _ in range(len(batch))],
active_workers={"demeter": client.worker_count("gcam")},
)
if decision.has_changes:
print(f"Scaling: +{decision.workers_to_add} -{decision.workers_to_remove}")
print(f"Reason: {decision.reasoning}")
# Apply the decision (provider-specific)
# ...
The AdaptiveScaler returns a ScaleDecision
with:
workers_to_add: dict mapping tag → count to add.workers_to_remove: dict mapping tag → count to remove.reasoning: human-readable explanation of the decision.confidence: model confidence (0.0–1.0) in the recommendation.predicted_completion_time: estimated seconds to finish remaining tasks.
Step 6: Cloud Provider Auto-Scaling¶
Cloud providers (AWS, GCP) support declarative adaptive scaling via manifest configuration:
targets:
aws:
provider: aws
region: us-east-1
cluster_type: fargate
adaptive:
minimum: 2
maximum: 20
The cloud provider handles scale-up/down automatically based on the Dask
scheduler’s task backlog. The minimum and maximum set hard bounds:
Minimum workers are always running (reduces cold-start latency).
Maximum caps costs during burst periods.
Cost-performance trade-off:
┌────────────────────────────────────────────────────────┐
│ Aggressive (max workers) │
│ ├── Fastest completion │
│ ├── Highest cost │
│ └── Risk: idle workers during low-load phases │
│ │
│ Conservative (min workers) │
│ ├── Lowest cost │
│ ├── Slowest completion │
│ └── Risk: queue buildup during bursts │
│ │
│ Adaptive (dynamic scaling) │
│ ├── Best cost-performance ratio │
│ ├── Requires cooldown tuning │
│ └── Latency: scale-up takes 30–90s for cloud │
└────────────────────────────────────────────────────────┘
Step 7: Heterogeneous Worker Pools¶
Real workflows often need different resource profiles running simultaneously. Scalable supports heterogeneous pools via multiple components:
components:
demeter:
cpus: 8
memory: 32G
tags: [compute-heavy]
postprocess:
cpus: 2
memory: 4G
tags: [io-bound]
tasks:
simulate:
component: demeter
analyze:
component: postprocess
In your workflow, you submit to each pool independently:
# Compute-heavy tasks go to gcam workers
sim_futures = [
client.submit(run_simulation, params, tag="demeter")
for params in simulation_params
]
# Wait for simulations, then post-process on lighter workers
sim_results = client.gather(sim_futures)
analysis_futures = [
client.submit(aggregate, result, tag="postprocess")
for result in sim_results
]
final = client.gather(analysis_futures)
This pattern avoids over-provisioning: expensive 32 GB workers handle the heavy lifting while cheap 4 GB workers handle aggregation.
Step 8: Scaling Decision Monitoring¶
Track all scaling decisions via telemetry:
# After workflow completes
session.close()
# Review scaling history
for decision in scaler.decision_history:
print(
f"[{decision.timestamp}] "
f"+{decision.workers_to_add} "
f"-{decision.workers_to_remove} "
f"({decision.reasoning})"
)
Telemetry also records worker lifecycle events:
scalable report --latest
Workers:
demeter: 5 started, 3 removed, 2 final
postprocess: 2 started, 0 removed, 2 final
Scaling events: 3 scale-up, 1 scale-down
Troubleshooting¶
- Slurm workers never connect
Check that the
interfaceoption matches your cluster’s high-speed network (ib0,eth0, etc.). Workers must reach the scheduler host on this interface. Also ensure firewall rules allow the Dask scheduler port (default 8786).- Cloud workers take too long to start
Fargate cold-start can take 30–90 seconds. Set
adaptive.minimumto at least 1–2 so warm workers are always available. For EC2-backed clusters, pre-warmed AMIs reduce startup time.- “max_workers must be a positive integer”
This validation error means
max_workerswas set to0, a negative number, or a non-integer type. Check for YAML parsing issues (e.g., quoting numbers as strings).- Workers idle but no tasks are submitted
If using adaptive scaling with a high minimum, workers persist even when no work is available. Lower
adaptive.minimumor add acooldown_secondsof at least 60 to theAdaptiveScaler.
Next Steps¶
Tutorial 4: Performance Optimization and Caching — Reduce redundant computation when scaling means re-running failed tasks.
Tutorial 5: Cloud Integration with AWS and GCP — Full AWS and GCP deployment walkthrough.
Tutorial 6: Monitoring and Observability with Telemetry — Use telemetry data to inform scaling decisions.