Tutorial 7: Error Handling and Resilience Patterns¶
What You Will Learn¶
By the end of this tutorial you will:
Understand how Scalable propagates and records errors across distributed workers.
Implement retry strategies for transient failures.
Use the telemetry failure log to diagnose root causes.
Handle worker crashes, timeouts, and preemption gracefully.
Build fault-tolerant workflows with partial-result recovery.
Use the AI diagnostic assistant to analyze failures.
Prerequisites¶
Completed Tutorial 1: Getting Started with Scalable and Tutorial 6: Monitoring and Observability with Telemetry.
Scalable installed (
pip install scalable).For AI diagnosis:
pip install scalable[ai].
Scenario¶
Your production pipeline runs 200 energy demand scenarios overnight. Some scenarios fail due to transient issues (network timeouts pulling data, OOM on edge-case inputs, worker preemption on shared HPC clusters). You need a workflow that tolerates partial failures, recovers what it can, and provides clear diagnostics for what went wrong.
Step 1: Understanding Error Propagation¶
When a function submitted to Scalable raises an exception, the error is:
Captured by the Dask worker.
Serialized and transmitted back to the client.
Recorded in telemetry (
failures.jsonl).Re-raised when you call
.result()orclient.gather().
from scalable import ScalableSession
def flaky_simulation(scenario_id: int) -> dict:
"""Simulates a function that sometimes fails."""
if scenario_id % 7 == 0:
raise RuntimeError(f"OOM: scenario {scenario_id} exceeded memory limit")
return {"scenario": scenario_id, "result": scenario_id * 42}
session = ScalableSession.from_yaml("./scalable.yaml", target="local")
client = session.start()
futures = [client.submit(flaky_simulation, i, tag="analysis") for i in range(20)]
# This will raise on the first failed future:
try:
results = client.gather(futures)
except RuntimeError as e:
print(f"Workflow failed: {e}")
Step 2: Gathering with Error Tolerance¶
Instead of failing on the first error, collect results and errors separately:
from distributed import as_completed
session = ScalableSession.from_yaml("./scalable.yaml", target="local")
client = session.start()
futures = [client.submit(flaky_simulation, i, tag="analysis") for i in range(20)]
succeeded = []
failed = []
for future in as_completed(futures):
try:
result = future.result()
succeeded.append(result)
except Exception as e:
failed.append({
"error": str(e),
"type": type(e).__name__,
"key": future.key,
})
print(f"Succeeded: {len(succeeded)}, Failed: {len(failed)}")
for f in failed:
print(f" [{f['type']}] {f['error']}")
session.close()
Expected output:
Succeeded: 17, Failed: 3
[RuntimeError] OOM: scenario 0 exceeded memory limit
[RuntimeError] OOM: scenario 7 exceeded memory limit
[RuntimeError] OOM: scenario 14 exceeded memory limit
Pattern: Partial Success. This is the recommended approach for batch workflows. Gather all results, log failures, and decide whether to proceed with partial data or abort.
Step 3: Implementing Retry Logic¶
For transient failures (network issues, preempted workers), retries often succeed. Implement exponential backoff:
import time
from distributed import as_completed
def submit_with_retry(client, func, *args, tag, max_retries=3, backoff=2.0):
"""Submit a function with exponential backoff retry."""
last_error = None
for attempt in range(max_retries + 1):
future = client.submit(func, *args, tag=tag)
try:
return future.result(timeout=300) # 5-minute timeout
except Exception as e:
last_error = e
if attempt < max_retries:
wait = backoff ** attempt
print(f" Attempt {attempt + 1} failed: {e}. Retrying in {wait}s...")
time.sleep(wait)
else:
raise last_error
# Usage
session = ScalableSession.from_yaml("./scalable.yaml", target="local")
client = session.start()
results = []
permanent_failures = []
for scenario_id in range(20):
try:
result = submit_with_retry(
client, flaky_simulation, scenario_id,
tag="analysis", max_retries=3
)
results.append(result)
except Exception as e:
permanent_failures.append({"scenario": scenario_id, "error": str(e)})
print(f"Completed: {len(results)}, Permanent failures: {len(permanent_failures)}")
session.close()
When to retry vs. fail fast:
Failure Type |
Strategy |
Rationale |
|---|---|---|
Network timeout |
Retry (3x, exponential) |
Transient; usually resolves |
OOM (out of memory) |
Fail fast or retry with more resources |
Persistent; same inputs will fail again |
Worker preemption |
Retry (unlimited, with backoff) |
External; will succeed when rescheduled |
Input validation error |
Fail fast |
Bug in data; retrying won’t help |
Dependency import error |
Fail fast |
Container/environment issue |
Step 4: Timeout Management¶
Long-running tasks need timeouts to prevent runaway processes:
from concurrent.futures import TimeoutError
future = client.submit(expensive_simulation, params, tag="demeter")
try:
result = future.result(timeout=3600) # 1-hour timeout
except TimeoutError:
print("Task exceeded 1-hour timeout")
future.cancel()
# Log and continue with remaining tasks
For Slurm-backed workers, walltime provides a hard ceiling:
targets:
hpc:
provider: slurm
walltime: "04:00:00" # Workers killed after 4 hours
If a worker hits its walltime, Slurm terminates the process. Dask detects the
lost worker and marks its tasks as failed with a KilledWorker exception.
Your error-handling code should treat this as a retryable failure.
Step 5: Telemetry Failure Records¶
Every failure is recorded in failures.jsonl:
{
"failure_class": "RuntimeError",
"message": "OOM: scenario 7 exceeded memory limit",
"timestamp": "2026-05-20T04:15:30Z",
"details": {
"phase": "task_execution",
"task_id": "run_demeter_scenario-7",
"worker_id": "worker-demeter-2",
"traceback": "Traceback (most recent call last):\n ..."
}
}
Analyze failure patterns:
import json
from pathlib import Path
from collections import Counter
run_dir = Path(".scalable/runs/run-20260520T.../")
failures = []
with open(run_dir / "failures.jsonl") as f:
for line in f:
failures.append(json.loads(line))
# Group by failure class
by_class = Counter(f["failure_class"] for f in failures)
print("Failures by type:")
for cls, count in by_class.most_common():
print(f" {cls}: {count}")
# Find the most common error message pattern
by_message = Counter(f["message"].split(":")[0] for f in failures)
print("\nTop error patterns:")
for msg, count in by_message.most_common(5):
print(f" {msg}: {count}")
Expected output:
Failures by type:
RuntimeError: 8
MemoryError: 3
TimeoutError: 2
Top error patterns:
OOM: 8
MemoryError: 3
TimeoutError: 2
Step 6: AI-Assisted Diagnosis¶
When failures are complex, the AI diagnostic assistant (scalable[ai])
analyzes telemetry and provides human-readable explanations:
scalable diagnose --latest --no-ai
Diagnosis for run-20260520T...-demeter-lulcc-a1b2c3d4:
⚠ 13 failures detected across 3 categories:
1. RuntimeError (OOM) — 8 occurrences
Pattern: Scenarios with large input datasets (>2GB) exceed the 16G
memory allocation for gcam workers.
Suggestion: Increase component memory to 32G or chunk large inputs.
2. MemoryError — 3 occurrences
Pattern: Worker process exhausted system memory during pandas concat.
Suggestion: Use chunked processing or increase max_workers to spread load.
3. TimeoutError — 2 occurrences
Pattern: Network calls to external data API timed out after 300s.
Suggestion: Increase timeout or add retry logic for external calls.
Programmatic API:
from scalable.ai import diagnose_run
result = diagnose_run(
run_dir=".scalable/runs/run-20260520T.../",
no_ai=True, # Use heuristic analysis (no LLM required)
)
print(result.summary)
for finding in result.findings:
print(f" [{finding.severity}] {finding.category}: {finding.suggestion}")
Step 7: Graceful Session Shutdown¶
Proper shutdown ensures telemetry is finalized even when errors occur:
from scalable import ScalableSession
session = ScalableSession.from_yaml("./scalable.yaml", target="local")
try:
client = session.start()
futures = [client.submit(process, i, tag="analysis") for i in range(100)]
results = []
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
print(f"Task failed: {e}")
except Exception as e:
print(f"Fatal error: {e}")
finally:
# ALWAYS close the session — this finalizes telemetry
session.close()
The session.close() method:
Shuts down the Dask client.
Records the final run status (
completedorfailed).Writes summary statistics to
run.json.Generates Parquet snapshots if enabled.
Resets the telemetry context.
If you skip ``session.close()``: Telemetry files remain valid (JSONL is
append-safe) but the run status stays running and summary stats won’t be
computed.
Step 8: Fault-Tolerant Pipeline Pattern¶
For production pipelines, combine all resilience patterns:
"""Fault-tolerant pipeline with retry, partial success, and diagnostics."""
from scalable import ScalableSession, cacheable
from distributed import as_completed
import time
@cacheable(return_type=dict, scenario_id=int)
def run_scenario(scenario_id: int) -> dict:
"""Cached computation — won't re-run on retry if previously succeeded."""
# ... expensive computation ...
return {"scenario": scenario_id, "result": scenario_id * 42}
def run_pipeline():
session = ScalableSession.from_yaml("./scalable.yaml", target="local")
try:
client = session.start()
scenarios = list(range(200))
# Phase 1: Submit all with retry
succeeded = {}
failed = {}
retry_queue = [(s, 0) for s in scenarios] # (scenario, attempt)
while retry_queue:
batch = retry_queue[:50] # Process in batches of 50
retry_queue = retry_queue[50:]
futures = {
client.submit(run_scenario, s, tag="analysis"): (s, attempt)
for s, attempt in batch
}
for future in as_completed(futures):
scenario_id, attempt = futures[future]
try:
result = future.result(timeout=600)
succeeded[scenario_id] = result
except Exception as e:
if attempt < 3:
# Retry with backoff
time.sleep(2 ** attempt)
retry_queue.append((scenario_id, attempt + 1))
else:
failed[scenario_id] = str(e)
# Phase 2: Report results
print(f"Pipeline complete: {len(succeeded)} succeeded, {len(failed)} failed")
if failed:
print("Permanent failures:")
for s, err in sorted(failed.items()):
print(f" Scenario {s}: {err}")
return succeeded
finally:
session.close()
if __name__ == "__main__":
results = run_pipeline()
Step 9: Worker Health Monitoring¶
Detect and respond to unhealthy workers:
# Check worker status during long-running workflows
info = client.scheduler_info()
workers = info.get("workers", {})
for addr, worker_info in workers.items():
memory_used = worker_info.get("metrics", {}).get("memory", 0)
memory_limit = worker_info.get("memory_limit", 1)
utilization = memory_used / memory_limit
if utilization > 0.9:
print(f"WARNING: Worker {addr} at {utilization*100:.0f}% memory")
# Consider scaling up or migrating tasks
Step 10: Post-Failure Recovery¶
After a failed run, use caching and telemetry to resume efficiently:
"""Resume a pipeline from where it left off."""
import json
from pathlib import Path
# Find what succeeded in the previous run
prev_run = Path(".scalable/runs/run-20260519T.../")
prev_tasks = []
with open(prev_run / "tasks.jsonl") as f:
for line in f:
prev_tasks.append(json.loads(line))
completed_scenarios = {
t["task_id"] for t in prev_tasks if t.get("state") == "succeeded"
}
# Only run what failed or wasn't attempted
all_scenarios = set(range(200))
remaining = all_scenarios - completed_scenarios
print(f"Resuming: {len(remaining)} scenarios remaining (skipping {len(completed_scenarios)} cached)")
# The @cacheable decorator handles this automatically — even without
# explicit resume logic, cached scenarios will return instantly.
# This pattern is useful when you want explicit control.
Troubleshooting¶
- “KilledWorker” exception but task should have succeeded
The Slurm job hit its walltime or was preempted. Increase
walltimein the target or reduce per-task computation time by splitting into smaller chunks.- Retry logic causes duplicate computation
If using
@cacheable, retried tasks automatically hit the cache (they won’t recompute). Without caching, retries execute the function again. For idempotent functions this is safe; for functions with side effects, add deduplication logic.- “Cannot serialize” errors on exception propagation
Some custom exception classes aren’t serializable. Dask workers must serialize exceptions to send them back to the client. Keep exception classes simple (inherit from built-in exceptions, avoid unpicklable attributes).
- Session status shows “running” after crash
If the process crashes before
session.close()runs, the run status staysrunning. The telemetry data is still valid — inspect it manually or runscalable diagnose --run-id <id>to analyze.
Next Steps¶
Tutorial 8: Deployment Workflows with Kubernetes — Handle pod evictions and node failures in Kubernetes deployments.
Tutorial 4: Performance Optimization and Caching — Use caching to make retries free after partial completion.
Tutorial 9: ML-Driven Resource Advising and Scaling — Let ML-driven advising predict and prevent resource-related failures.