Scalable Example Workflow¶
This demo is presented as a sequence of small steps so you can follow and run each stage independently.
The workflow demonstrates how Scalable coordinates a small integrated modeling pipeline across multiple software environments. GCAM runs first to generate a model database, a lightweight extraction step reads the needed time series from that database, and Stitches uses the extracted trajectory to build gridded climate outputs. Each stage is submitted as a normal Python function, but Scalable decides which container profile should execute it and passes results between stages as Dask futures.
Step 1: Imports and logging¶
The first step imports Scalable’s public API and a few modules used by the
example functions. from scalable import * provides the cluster, client,
caching decorators, and type helpers used later in the workflow. os is used
inside worker functions for path handling, and gcam_config represents a
project-specific module that describes how GCAM configuration files should be
hashed for caching.
Logging is set to DEBUG so the example prints more information about worker
startup, task submission, container selection, and any errors that occur. During
development this is useful because HPC failures can otherwise appear as delayed
or silent worker timeouts.
from scalable import *
import logging
import os
import gcam_config
logging.basicConfig(level=logging.DEBUG)
Step 2: Create the cluster¶
The SlurmCluster object describes how Scalable should request resources from
the HPC scheduler. The queue, walltime, and account values map to the
same concepts you would normally provide in a Slurm batch script. This object
does not yet start model work; it records the scheduler settings that will be
used later when workers are added.
silence_logs=False keeps worker and scheduler output visible. That is
helpful for a tutorial because it exposes what Scalable is doing behind the
scenes, including whether workers were accepted by Slurm and whether they
connected back to the Dask scheduler.
cluster = SlurmCluster(
queue='short',
walltime='02:00:00',
account='GCIMS',
silence_logs=False,
)
Step 3: Register container profiles¶
Use one container profile per software environment. A container profile tells
Scalable which image target to use, how many CPUs and how much memory each
worker should reserve, and which host directories should be mounted inside the
container. Tags such as "gcam" and "stitches" become routing labels: when
a task is submitted with a matching tag, Scalable sends it to workers running
that environment.
Directory mappings are written as host_path: container_path. Functions that
run inside the container should use the container paths, not the original host
paths. This keeps code consistent even if the host filesystem layout differs
between local setup and the HPC system.
# GCAM can use multiple threads, so this profile reserves 6 CPUs.
cluster.add_container(
tag="gcam",
cpus=6,
memory="20G",
dirs={
"/path/to/gcam-core/exe": "/gcam-core/exe",
"/path/to/gcam-core/input": "/gcam-core/input",
"/path/to/gcam-core/output": "/gcam-core/output",
"/path/to/shared/data": "/data",
},
)
# Stitches is typically single-threaded in this workflow.
cluster.add_container(
tag="stitches",
cpus=1,
memory="50G",
dirs={"/path/to/shared/data": "/data", "/path/to/archive/data": "/archive"},
)
# Xanthos profile (not used below, shown as an additional profile example).
cluster.add_container(
tag="xanthos",
cpus=1,
memory="20G",
dirs={
"/path/to/shared/data": "/data",
"/path/to/archive/data": "/archive",
"/path/to/project/scratch": "/scratch",
},
)
Step 4: Define worker functions¶
Define each function once, then submit them in dependency order. These functions are normal Python functions, but they should import large or container-specific dependencies inside the function body. Doing so ensures imports happen on the worker inside the correct container rather than on the login node or client process.
The @cacheable decorator marks expensive deterministic steps whose outputs
can be reused. When Scalable sees the same function inputs and compatible type
metadata, it can avoid repeating work and return the cached output instead. The
return_type and custom type hints, such as config_file=gcam_config.GcamConfig,
help Scalable hash non-trivial inputs and outputs reliably.
The first function runs GCAM for a requested model period and returns the path to
the generated database directory. get_worker().id is used in the database
name so concurrent GCAM workers do not overwrite one another’s output.
@cacheable(return_type=DirType, config_file=gcam_config.GcamConfig)
def run_gcam(config_file, period):
import gcamwrapper as gw
from dask.distributed import get_worker
g = gw.Gcam(os.path.basename(config_file), "/gcam-core/exe")
g.run_period(g.convert_year_to_period(period))
dbname = "/gcam-core/output/" + get_worker().id + "database"
return g.print_xmldb(dbname)
The database reader runs after GCAM completes. It receives the database path
returned by run_gcam, opens it with gcamreader, selects one query from a
batch-query XML file, and returns the query result as tabular data. Because the
input may be a future, Scalable waits for the upstream GCAM task before running
this function.
def readdb(db_path):
import gcamreader
import os
conn = gcamreader.LocalDBConn(os.path.dirname(db_path), os.path.basename(db_path))
query = gcamreader.parse_batch_query("/path/to/sample-queries.xml")[2]
return conn.runQuery(query)
The interpolation helper converts sparse model-year output into an annual time series. Stitches expects a continuous trajectory, so this helper fills the years between GCAM time points with linearly interpolated values.
def interp(years, values):
import numpy as np
min_year = min(years)
max_year = max(years)
new_years = np.arange(min_year, max_year + 1)
new_values = np.zeros(len(new_years))
for index, year in enumerate(new_years):
if np.isin(year, years):
new_values[index] = values[year == years][0]
else:
less_year = max(years[year > years])
more_year = min(years[year < years])
less_value = values[np.where(less_year == years)[0][0]]
more_value = values[np.where(more_year == years)[0][0]]
p = (year - less_year) / (more_year - less_year)
new_values[index] = p * more_value + (1 - p) * less_value
return new_years, new_values
stitch_prep transforms the GCAM query result into the target format expected
by Stitches. It loads a matching archive, selects candidate climate-model data,
interpolates the GCAM trajectory, normalizes the values relative to the
1975–2014 baseline, and builds a Stitches recipe describing which archive
segments should be combined to match the target trajectory.
This function is also cacheable because recipe generation can be expensive and is deterministic for the same input data and archive. If the GCAM output has not changed, rerunning the workflow can skip this preparation step.
@cacheable
def stitch_prep(df):
import numpy as np
import pandas as pd
import pkg_resources
import stitches
path = pkg_resources.resource_filename('stitches', 'data/matching_archive_staggered.csv')
data = pd.read_csv(path)
end_yr_vector = np.arange(2100, 1800, -9)
data = stitches.fx_processing.subset_archive(
staggered_archive=data,
end_yr_vector=end_yr_vector,
)
model_data = data[(data["model"] == "CanESM5") & (data["experiment"].str.contains('ssp585'))]
years, values = interp(np.array(df["Year"]), np.array(df["value"]))
df = pd.DataFrame({"year": years, "value": values})
df['variable'] = 'tas'
df['model'] = ''
df['ensemble'] = ''
df['experiment'] = 'GCAM7-Ref'
df['unit'] = 'degC change from avg over 1975~2014'
df.value = df.value - np.mean(df.value[(df.year <= 2014) & (df.year >= 1975)])
df = df[['variable', 'experiment', 'ensemble', 'model', 'year', 'value', 'unit']]
target_chunk = stitches.fx_processing.chunk_ts(df, n=9)
target_data = stitches.fx_processing.get_chunk_info(target_chunk)
stitches_recipe = None
for _ in range(10):
stitches_recipe = stitches.make_recipe(
target_data,
model_data,
tol=0.0,
N_matches=1,
res='day',
non_tas_variables=['tasmin', 'pr', 'hurs', 'sfcWind', 'rsds', 'rlds'],
)
last_period_length = (
stitches_recipe['target_end_yr'].values[-1] - stitches_recipe['target_start_yr'].values[-1]
)
asy = stitches_recipe['archive_start_yr'].values
asy[-1] = stitches_recipe['archive_end_yr'].values[-1] - last_period_length
stitches_recipe['archive_start_yr'] = asy.copy()
return stitches_recipe
run_stitches consumes the recipe and writes gridded stitched outputs to the
requested output directory. The function sets Dask’s internal scheduler to
"synchronous" while calling Stitches because Stitches may create its own Dask
work internally. Keeping that work local to the selected worker avoids common
pickling and cross-environment issues when the broader Scalable cluster contains
workers with different containers.
@cacheable
def run_stitches(recipe, output_path):
import dask
import stitches
# Synchronous mode avoids common pickling issues in mixed environments.
with dask.config.set(scheduler="synchronous"):
return stitches.gridded_stitching(output_path, recipe)
Step 5: Start initial workers and client¶
This step starts only the workers needed for the first stage of the workflow: two GCAM workers. Starting a small number of workers up front keeps the resource request focused on the immediate work instead of reserving every possible container at once.
The ScalableClient connects to the cluster and is used for all subsequent
task submission, dependency tracking, and result retrieval. It behaves like a
Dask client with additional Scalable routing options such as tag and n.
cluster.add_workers(n=2, tag="gcam")
sc_client = ScalableClient(cluster)
Step 6: Submit GCAM and database extraction tasks¶
The first submitted task runs GCAM in the "gcam" container. The n=1 value
means the task needs one worker slot, while tag="gcam" restricts execution
to workers created from the GCAM profile. The call returns immediately with a
future instead of blocking until GCAM finishes.
The second task uses future1 as its input. Passing futures directly is how
the workflow graph is constructed: Scalable knows readdb depends on
run_gcam and will not execute the database query until GCAM has produced its
database path. This allows you to describe the pipeline without manually polling
for completion.
future1 = sc_client.submit(
run_gcam,
"/path/to/gcam-core/exe/configuration_ref.xml",
2100,
n=1,
tag="gcam",
)
future2 = sc_client.submit(readdb, future1, n=1, tag="gcam")
Step 7: Add stitches worker and prepare stitching inputs¶
After the GCAM extraction stage is in the graph, the workflow adds a worker for
the "stitches" profile. This demonstrates dynamic scaling: the Stitches
environment is started only when the workflow is about to need it.
stitch_prep is submitted to the Stitches worker and depends on future2.
Once the database extraction returns tabular data, this task converts that data
into a Stitches recipe. The task runs in the Stitches container because the
function imports and calls the Stitches package.
cluster.add_workers(n=1, tag="stitches")
future3 = sc_client.submit(stitch_prep, future2, n=1, tag="stitches")
Step 8: Scale down unused GCAM workers¶
Only remove workers after downstream tasks no longer depend on them. At this point, the GCAM run and database extraction have already been submitted, and the remaining work is handled by the Stitches profile. Removing the GCAM workers frees scheduler resources and reduces the chance of idle workers occupying an allocation that another job could use.
In a production workflow, remove workers only after you are confident no queued or future downstream work needs that container. If later tasks still need GCAM libraries or files that are only available in the GCAM environment, keep those workers alive until those tasks are submitted and completed.
cluster.remove_workers(n=2, tag="gcam")
Step 9: Run stitches and fetch final output¶
The final computational task passes the Stitches recipe future into
run_stitches. As with earlier steps, using a future as an argument creates a
dependency edge: Stitches will not start until recipe preparation finishes. The
output path is a container-visible directory where Stitches should write final
products.
future4.result() blocks the client process until the final task completes,
then returns the value produced by run_stitches. In small examples printing
the result is convenient; in a larger workflow you might gather multiple futures,
write a manifest, or submit additional post-processing tasks instead.
future4 = sc_client.submit(
run_stitches,
future3,
'/path/to/output/directory/',
n=1,
tag='stitches',
)
print(future4.result())
Step 10: Close the cluster¶
Closing the cluster shuts down the Dask client, scheduler, and any remaining workers. This cleanup step is important on shared HPC systems because it releases Slurm allocations and prevents orphaned worker jobs from continuing to consume resources after the workflow has finished.
cluster.close()
This demo shows a complete multi-stage workflow where each stage runs in the appropriate container profile and passes futures to downstream steps. If you run into issues, open one at https://github.com/JGCRI/scalable/issues.