Tutorial 6: Monitoring and Observability with Telemetry

What You Will Learn

By the end of this tutorial you will:

  • Understand Scalable’s telemetry data model and event types.

  • Read and analyze JSONL telemetry files programmatically.

  • Generate reports from the CLI and Python API.

  • Build custom dashboards from telemetry data.

  • Use telemetry history to inform resource recommendations.

  • Configure telemetry persistence and export (Parquet, remote storage).

Prerequisites

Scenario

Your team runs the energy forecasting pipeline multiple times per week. You need to track performance trends, identify slow tasks, monitor resource utilization, and justify cloud spending to stakeholders. Scalable’s built-in telemetry provides all this data without external observability infrastructure.

Step 1: Telemetry Architecture

Every manifest-driven run (via ScalableSession or scalable run) automatically records structured events to disk:

.scalable/runs/
└── run-20260520T035200Z-demeter-lulcc-a1b2c3d4/
    ├── run.json           # Run metadata (start time, target, manifest lock)
    ├── manifest.yaml      # Snapshot of the manifest used
    ├── plan.json          # Execution plan snapshot
    ├── tasks.jsonl        # Task lifecycle events
    ├── resources.jsonl    # Resource utilization snapshots
    ├── workers.jsonl      # Worker lifecycle events
    ├── cache.jsonl        # Cache hit/miss events
    ├── failures.jsonl     # Error/failure records
    ├── artifacts.jsonl    # Artifact store operations
    └── cost.jsonl         # Cost tracking events

Each .jsonl file contains one JSON object per line — a format optimized for append-only writes and streaming reads.

Design rationale: JSONL was chosen over SQLite or a time-series database because it requires no external dependencies, survives process crashes (each line is independently valid), and can be trivially loaded into pandas, jq, or any JSON-capable tool.

Step 2: Run Metadata

The run.json file contains the run’s identity and configuration:

{
  "run_id": "run-20260520T035200Z-demeter-lulcc-a1b2c3d4",
  "project_name": "demeter-lulcc",
  "target_name": "local",
  "provider_name": "local",
  "manifest_lock": "sha256:a3b8f1...",
  "started_at": "2026-05-20T03:52:00Z",
  "status": "completed",
  "ended_at": "2026-05-20T03:58:30Z"
}

Key fields:

  • manifest_lock — content hash proving which exact configuration produced this run. Two runs with identical locks are reproducible.

  • status — one of running, completed, failed, cancelled.

Step 3: Task Events

tasks.jsonl records the full lifecycle of every submitted task:

{"task_id": "run_demeter_scenario-0", "task_name": "run_demeter_scenario", "state": "submitted", "timestamp": "2026-05-20T03:52:01Z", "tag": "gcam"}
{"task_id": "run_demeter_scenario-0", "task_name": "run_demeter_scenario", "state": "running", "timestamp": "2026-05-20T03:52:02Z", "worker_id": "worker-demeter-0"}
{"task_id": "run_demeter_scenario-0", "task_name": "run_demeter_scenario", "state": "succeeded", "timestamp": "2026-05-20T03:55:30Z", "duration_s": 208.5}

States: submittedrunningsucceeded | failed | cancelled

Analyzing task durations:

import json
import pandas as pd
from pathlib import Path

run_dir = Path(".scalable/runs/run-20260520T035200Z-demeter-lulcc-a1b2c3d4")

tasks = []
with open(run_dir / "tasks.jsonl") as f:
    for line in f:
        tasks.append(json.loads(line))

df = pd.DataFrame(tasks)
completed = df[df["state"] == "succeeded"]

print(f"Total tasks: {len(completed)}")
print(f"Mean duration: {completed['duration_s'].mean():.1f}s")
print(f"Max duration: {completed['duration_s'].max():.1f}s")
print(f"P95 duration: {completed['duration_s'].quantile(0.95):.1f}s")

Expected output:

Total tasks: 50
Mean duration: 185.3s
Max duration: 312.7s
P95 duration: 280.1s

Step 4: Resource Utilization Events

resources.jsonl tracks CPU and memory usage per task and per worker:

{"entity_type": "task", "entity_id": "run_demeter_scenario-0", "cpu_percent": 78.5, "memory_mb": 14200, "timestamp": "2026-05-20T03:53:00Z"}
{"entity_type": "worker", "entity_id": "worker-demeter-0", "cpu_percent": 82.1, "memory_mb": 15800, "timestamp": "2026-05-20T03:53:00Z"}

Build a utilization timeline:

resources = []
with open(run_dir / "resources.jsonl") as f:
    for line in f:
        resources.append(json.loads(line))

res_df = pd.DataFrame(resources)
res_df["timestamp"] = pd.to_datetime(res_df["timestamp"])

# Average CPU utilization over time
worker_resources = res_df[res_df["entity_type"] == "worker"]
timeline = worker_resources.groupby(
    pd.Grouper(key="timestamp", freq="30s")
).agg({"cpu_percent": "mean", "memory_mb": "mean"})

print(timeline.head(10))

This data helps identify:

  • Under-provisioned workers — consistently >90% CPU means you need more workers or larger instance types.

  • Over-provisioned workers — consistently <30% CPU means you’re paying for unused capacity.

  • Memory pressure — memory approaching the limit suggests increasing the component’s memory allocation.

Step 5: Worker Lifecycle Events

workers.jsonl records when workers start, become idle, and terminate:

{"worker_id": "worker-demeter-0", "event": "started", "tag": "gcam", "timestamp": "2026-05-20T03:52:00Z"}
{"worker_id": "worker-demeter-0", "event": "task_assigned", "task_id": "run_demeter_scenario-0", "timestamp": "2026-05-20T03:52:01Z"}
{"worker_id": "worker-demeter-0", "event": "idle", "timestamp": "2026-05-20T03:55:30Z"}
{"worker_id": "worker-demeter-0", "event": "removed", "timestamp": "2026-05-20T03:58:00Z", "reason": "scale_down"}

This lets you calculate:

  • Worker utilization — fraction of time each worker spent executing vs idle.

  • Scale efficiency — whether adaptive scaling decisions were timely.

  • Cold-start overhead — time between started and first task_assigned.

Step 6: CLI Reports

The quickest way to review a run:

# Latest run summary
scalable report --latest
═══════════════════════════════════════════════════════════
Run Report: run-20260520T035200Z-demeter-lulcc-a1b2c3d4
═══════════════════════════════════════════════════════════
Status: completed
Target: local (provider: local)
Duration: 6m 30s
Manifest lock: sha256:a3b8f1...

Tasks:
  Submitted: 50
  Succeeded: 50
  Failed: 0
  Cache hits: 12

Workers:
  demeter: 4 started, 0 failed
  postprocess: 2 started, 0 failed

Resource Usage (mean):
  CPU: 72.4%
  Memory: 11.2 GiB / 16.0 GiB (70%)

Export as JSON for downstream processing:

scalable report --latest --format json --output report.json
{
  "run_id": "run-20260520T035200Z-demeter-lulcc-a1b2c3d4",
  "status": "completed",
  "duration_seconds": 390,
  "tasks": {"submitted": 50, "succeeded": 50, "failed": 0},
  "cache": {"hits": 12, "misses": 38},
  "cost_estimate": {"total": 0.0, "compute": 0.0}
}

Step 7: Programmatic Report Access

Use the telemetry collectors for rich programmatic analysis:

from scalable.telemetry.collectors import summarize_run, iter_run_dirs
from pathlib import Path

# Get the latest run directory
runs_dir = Path(".scalable/runs")
run_dirs = sorted(iter_run_dirs(runs_dir))
latest = run_dirs[-1]

# Generate summary
summary = summarize_run(latest)
print(f"Run: {summary['run_id']}")
print(f"Duration: {summary['duration_seconds']:.0f}s")
print(f"Tasks succeeded: {summary['tasks_succeeded']}")
print(f"Tasks failed: {summary['tasks_failed']}")

Step 8: Historical Trend Analysis

Compare performance across multiple runs:

from scalable.telemetry.collectors import iter_run_dirs, read_jsonl
from pathlib import Path
import pandas as pd

runs_dir = Path(".scalable/runs")
run_summaries = []

for run_dir in iter_run_dirs(runs_dir):
    run_json = run_dir / "run.json"
    if not run_json.exists():
        continue

    meta = pd.read_json(run_json, typ="series")
    tasks = read_jsonl(run_dir / "tasks.jsonl")
    succeeded = [t for t in tasks if t.get("state") == "succeeded"]

    run_summaries.append({
        "run_id": meta.get("run_id"),
        "started_at": meta.get("started_at"),
        "target": meta.get("target_name"),
        "total_tasks": len(succeeded),
        "mean_duration": (
            sum(t.get("duration_s", 0) for t in succeeded) / len(succeeded)
            if succeeded else 0
        ),
    })

history = pd.DataFrame(run_summaries)
history["started_at"] = pd.to_datetime(history["started_at"])
history = history.sort_values("started_at")

print("Performance trend (last 10 runs):")
print(history[["started_at", "target", "total_tasks", "mean_duration"]].tail(10))

Expected output:

Performance trend (last 10 runs):
  started_at           target  total_tasks  mean_duration
  2026-05-10 14:00:00  local   50           210.5
  2026-05-12 09:30:00  local   50           205.2
  2026-05-14 16:00:00  hpc     50           45.8
  2026-05-15 10:00:00  hpc     50           44.1
  2026-05-18 08:00:00  aws     100          38.2
  ...

Step 9: Parquet Export for Analytics

For large-scale analysis or integration with data warehouses, enable Parquet snapshots:

export SCALABLE_TELEMETRY_PARQUET=1
python workflow.py

This writes columnar Parquet files alongside the JSONL:

.scalable/runs/run-.../
├── tasks.jsonl
├── tasks.parquet      # ← Parquet snapshot
├── resources.jsonl
├── resources.parquet  # ← Parquet snapshot
└── ...

Load directly into pandas or any Parquet-compatible tool:

import pandas as pd
df = pd.read_parquet(".scalable/runs/run-.../tasks.parquet")
print(df.describe())

Step 10: Telemetry Configuration

Variable

Default

Effect

SCALABLE_TELEMETRY

1

Set to 0 to disable all telemetry recording.

SCALABLE_TELEMETRY_PARQUET

0

Set to 1 to emit Parquet snapshots at run close.

SCALABLE_RUNS_DIR

.scalable/runs

Base directory for run telemetry.

When to disable telemetry: Unit tests, benchmarking micro-operations, or environments where disk I/O is constrained. For production runs, always leave telemetry enabled — the overhead is negligible (<1% of total runtime) and the data is invaluable for debugging.

Troubleshooting

No telemetry data after a run

Ensure you are using the Session API (ScalableSession) or the scalable run CLI. The legacy imperative API (SlurmCluster directly) does not automatically record telemetry unless you manually configure a TelemetryStore.

“FileNotFoundError: .scalable/runs”

The runs directory is created automatically on first run. If you’re querying before any run has completed, the directory won’t exist yet.

Parquet files not generated

Set SCALABLE_TELEMETRY_PARQUET=1 before starting the session. The setting is read at session creation time.

Report shows “0 tasks” but workflow completed

The telemetry store must be active when tasks are submitted. If you create a ScalableClient outside a session (e.g., connecting to an existing cluster), telemetry won’t be recorded unless explicitly configured.

Next Steps