Tutorial 6: Monitoring and Observability with Telemetry¶
What You Will Learn¶
By the end of this tutorial you will:
Understand Scalable’s telemetry data model and event types.
Read and analyze JSONL telemetry files programmatically.
Generate reports from the CLI and Python API.
Build custom dashboards from telemetry data.
Use telemetry history to inform resource recommendations.
Configure telemetry persistence and export (Parquet, remote storage).
Prerequisites¶
Completed Tutorial 1: Getting Started with Scalable.
At least one completed Scalable run (to have telemetry data).
pandasinstalled (part of Scalable’s core dependencies).
Scenario¶
Your team runs the energy forecasting pipeline multiple times per week. You need to track performance trends, identify slow tasks, monitor resource utilization, and justify cloud spending to stakeholders. Scalable’s built-in telemetry provides all this data without external observability infrastructure.
Step 1: Telemetry Architecture¶
Every manifest-driven run (via ScalableSession or scalable run)
automatically records structured events to disk:
.scalable/runs/
└── run-20260520T035200Z-demeter-lulcc-a1b2c3d4/
├── run.json # Run metadata (start time, target, manifest lock)
├── manifest.yaml # Snapshot of the manifest used
├── plan.json # Execution plan snapshot
├── tasks.jsonl # Task lifecycle events
├── resources.jsonl # Resource utilization snapshots
├── workers.jsonl # Worker lifecycle events
├── cache.jsonl # Cache hit/miss events
├── failures.jsonl # Error/failure records
├── artifacts.jsonl # Artifact store operations
└── cost.jsonl # Cost tracking events
Each .jsonl file contains one JSON object per line — a format optimized
for append-only writes and streaming reads.
Design rationale: JSONL was chosen over SQLite or a time-series database because it requires no external dependencies, survives process crashes (each line is independently valid), and can be trivially loaded into pandas, jq, or any JSON-capable tool.
Step 2: Run Metadata¶
The run.json file contains the run’s identity and configuration:
{
"run_id": "run-20260520T035200Z-demeter-lulcc-a1b2c3d4",
"project_name": "demeter-lulcc",
"target_name": "local",
"provider_name": "local",
"manifest_lock": "sha256:a3b8f1...",
"started_at": "2026-05-20T03:52:00Z",
"status": "completed",
"ended_at": "2026-05-20T03:58:30Z"
}
Key fields:
manifest_lock— content hash proving which exact configuration produced this run. Two runs with identical locks are reproducible.status— one ofrunning,completed,failed,cancelled.
Step 3: Task Events¶
tasks.jsonl records the full lifecycle of every submitted task:
{"task_id": "run_demeter_scenario-0", "task_name": "run_demeter_scenario", "state": "submitted", "timestamp": "2026-05-20T03:52:01Z", "tag": "gcam"}
{"task_id": "run_demeter_scenario-0", "task_name": "run_demeter_scenario", "state": "running", "timestamp": "2026-05-20T03:52:02Z", "worker_id": "worker-demeter-0"}
{"task_id": "run_demeter_scenario-0", "task_name": "run_demeter_scenario", "state": "succeeded", "timestamp": "2026-05-20T03:55:30Z", "duration_s": 208.5}
States: submitted → running → succeeded | failed | cancelled
Analyzing task durations:
import json
import pandas as pd
from pathlib import Path
run_dir = Path(".scalable/runs/run-20260520T035200Z-demeter-lulcc-a1b2c3d4")
tasks = []
with open(run_dir / "tasks.jsonl") as f:
for line in f:
tasks.append(json.loads(line))
df = pd.DataFrame(tasks)
completed = df[df["state"] == "succeeded"]
print(f"Total tasks: {len(completed)}")
print(f"Mean duration: {completed['duration_s'].mean():.1f}s")
print(f"Max duration: {completed['duration_s'].max():.1f}s")
print(f"P95 duration: {completed['duration_s'].quantile(0.95):.1f}s")
Expected output:
Total tasks: 50
Mean duration: 185.3s
Max duration: 312.7s
P95 duration: 280.1s
Step 4: Resource Utilization Events¶
resources.jsonl tracks CPU and memory usage per task and per worker:
{"entity_type": "task", "entity_id": "run_demeter_scenario-0", "cpu_percent": 78.5, "memory_mb": 14200, "timestamp": "2026-05-20T03:53:00Z"}
{"entity_type": "worker", "entity_id": "worker-demeter-0", "cpu_percent": 82.1, "memory_mb": 15800, "timestamp": "2026-05-20T03:53:00Z"}
Build a utilization timeline:
resources = []
with open(run_dir / "resources.jsonl") as f:
for line in f:
resources.append(json.loads(line))
res_df = pd.DataFrame(resources)
res_df["timestamp"] = pd.to_datetime(res_df["timestamp"])
# Average CPU utilization over time
worker_resources = res_df[res_df["entity_type"] == "worker"]
timeline = worker_resources.groupby(
pd.Grouper(key="timestamp", freq="30s")
).agg({"cpu_percent": "mean", "memory_mb": "mean"})
print(timeline.head(10))
This data helps identify:
Under-provisioned workers — consistently >90% CPU means you need more workers or larger instance types.
Over-provisioned workers — consistently <30% CPU means you’re paying for unused capacity.
Memory pressure — memory approaching the limit suggests increasing the component’s memory allocation.
Step 5: Worker Lifecycle Events¶
workers.jsonl records when workers start, become idle, and terminate:
{"worker_id": "worker-demeter-0", "event": "started", "tag": "gcam", "timestamp": "2026-05-20T03:52:00Z"}
{"worker_id": "worker-demeter-0", "event": "task_assigned", "task_id": "run_demeter_scenario-0", "timestamp": "2026-05-20T03:52:01Z"}
{"worker_id": "worker-demeter-0", "event": "idle", "timestamp": "2026-05-20T03:55:30Z"}
{"worker_id": "worker-demeter-0", "event": "removed", "timestamp": "2026-05-20T03:58:00Z", "reason": "scale_down"}
This lets you calculate:
Worker utilization — fraction of time each worker spent executing vs idle.
Scale efficiency — whether adaptive scaling decisions were timely.
Cold-start overhead — time between
startedand firsttask_assigned.
Step 6: CLI Reports¶
The quickest way to review a run:
# Latest run summary
scalable report --latest
═══════════════════════════════════════════════════════════
Run Report: run-20260520T035200Z-demeter-lulcc-a1b2c3d4
═══════════════════════════════════════════════════════════
Status: completed
Target: local (provider: local)
Duration: 6m 30s
Manifest lock: sha256:a3b8f1...
Tasks:
Submitted: 50
Succeeded: 50
Failed: 0
Cache hits: 12
Workers:
demeter: 4 started, 0 failed
postprocess: 2 started, 0 failed
Resource Usage (mean):
CPU: 72.4%
Memory: 11.2 GiB / 16.0 GiB (70%)
Export as JSON for downstream processing:
scalable report --latest --format json --output report.json
{
"run_id": "run-20260520T035200Z-demeter-lulcc-a1b2c3d4",
"status": "completed",
"duration_seconds": 390,
"tasks": {"submitted": 50, "succeeded": 50, "failed": 0},
"cache": {"hits": 12, "misses": 38},
"cost_estimate": {"total": 0.0, "compute": 0.0}
}
Step 7: Programmatic Report Access¶
Use the telemetry collectors for rich programmatic analysis:
from scalable.telemetry.collectors import summarize_run, iter_run_dirs
from pathlib import Path
# Get the latest run directory
runs_dir = Path(".scalable/runs")
run_dirs = sorted(iter_run_dirs(runs_dir))
latest = run_dirs[-1]
# Generate summary
summary = summarize_run(latest)
print(f"Run: {summary['run_id']}")
print(f"Duration: {summary['duration_seconds']:.0f}s")
print(f"Tasks succeeded: {summary['tasks_succeeded']}")
print(f"Tasks failed: {summary['tasks_failed']}")
Step 8: Historical Trend Analysis¶
Compare performance across multiple runs:
from scalable.telemetry.collectors import iter_run_dirs, read_jsonl
from pathlib import Path
import pandas as pd
runs_dir = Path(".scalable/runs")
run_summaries = []
for run_dir in iter_run_dirs(runs_dir):
run_json = run_dir / "run.json"
if not run_json.exists():
continue
meta = pd.read_json(run_json, typ="series")
tasks = read_jsonl(run_dir / "tasks.jsonl")
succeeded = [t for t in tasks if t.get("state") == "succeeded"]
run_summaries.append({
"run_id": meta.get("run_id"),
"started_at": meta.get("started_at"),
"target": meta.get("target_name"),
"total_tasks": len(succeeded),
"mean_duration": (
sum(t.get("duration_s", 0) for t in succeeded) / len(succeeded)
if succeeded else 0
),
})
history = pd.DataFrame(run_summaries)
history["started_at"] = pd.to_datetime(history["started_at"])
history = history.sort_values("started_at")
print("Performance trend (last 10 runs):")
print(history[["started_at", "target", "total_tasks", "mean_duration"]].tail(10))
Expected output:
Performance trend (last 10 runs):
started_at target total_tasks mean_duration
2026-05-10 14:00:00 local 50 210.5
2026-05-12 09:30:00 local 50 205.2
2026-05-14 16:00:00 hpc 50 45.8
2026-05-15 10:00:00 hpc 50 44.1
2026-05-18 08:00:00 aws 100 38.2
...
Step 9: Parquet Export for Analytics¶
For large-scale analysis or integration with data warehouses, enable Parquet snapshots:
export SCALABLE_TELEMETRY_PARQUET=1
python workflow.py
This writes columnar Parquet files alongside the JSONL:
.scalable/runs/run-.../
├── tasks.jsonl
├── tasks.parquet # ← Parquet snapshot
├── resources.jsonl
├── resources.parquet # ← Parquet snapshot
└── ...
Load directly into pandas or any Parquet-compatible tool:
import pandas as pd
df = pd.read_parquet(".scalable/runs/run-.../tasks.parquet")
print(df.describe())
Step 10: Telemetry Configuration¶
Variable |
Default |
Effect |
|---|---|---|
|
|
Set to |
|
|
Set to |
|
|
Base directory for run telemetry. |
When to disable telemetry: Unit tests, benchmarking micro-operations, or environments where disk I/O is constrained. For production runs, always leave telemetry enabled — the overhead is negligible (<1% of total runtime) and the data is invaluable for debugging.
Troubleshooting¶
- No telemetry data after a run
Ensure you are using the Session API (
ScalableSession) or thescalable runCLI. The legacy imperative API (SlurmClusterdirectly) does not automatically record telemetry unless you manually configure aTelemetryStore.- “FileNotFoundError: .scalable/runs”
The runs directory is created automatically on first run. If you’re querying before any run has completed, the directory won’t exist yet.
- Parquet files not generated
Set
SCALABLE_TELEMETRY_PARQUET=1before starting the session. The setting is read at session creation time.- Report shows “0 tasks” but workflow completed
The telemetry store must be active when tasks are submitted. If you create a
ScalableClientoutside a session (e.g., connecting to an existing cluster), telemetry won’t be recorded unless explicitly configured.
Next Steps¶
Tutorial 7: Error Handling and Resilience Patterns — Use failure events to diagnose and recover from errors.
Tutorial 9: ML-Driven Resource Advising and Scaling — Feed telemetry history into the ML advisor for predictive resource recommendations.
Tutorial 5: Cloud Integration with AWS and GCP — Monitor cloud costs through telemetry cost events.