Extending the Runner

The retained runner is static, not plugin-based. To add a stage or integrate custom status reporting, extend the existing runner surfaces instead of bypassing them.

Stage contract

A stage function has this contract:

def run_some_stage(context: PipelineContext) -> dict[str, str] | None:
    ...

Return values:

  • return a dict[str, str] of artefact labels to root-relative paths when the stage succeeds
  • return None when the stage is intentionally skipped
  • raise an exception when the stage fails and should abort the run

The runner handles manifest updates around the stage call. Do not update context.manifest directly from a normal stage implementation unless you are changing core runner behaviour.

What is available in PipelineContext

PipelineContext gives each stage access to:

Field Use it for
run_config Runtime settings such as output root, seeds, and curve sample counts
raw_cfg The loaded YAML config as a mutable mapping
X, y Loaded dataset inputs
paths Stage directories and manifest path
manifest Current run manifest
model_kwargs Effective sampler overrides passed into model build
model Built PanelMMM, available after Stage 00

Artifact helpers

Use the helpers in abacus/pipeline/artifacts.py:

  • write_json(...)
  • write_dataframe(...)
  • write_dataset(...)
  • write_idata(...)
  • write_text(...)
  • save_figure(...)
  • copy_file(...)

Use context.paths.relative(path) when building the artefact mapping that the stage returns. The manifest expects root-relative paths, not absolute paths.

Adding a new stage

To add a new built-in stage, update these places:

  1. abacus/pipeline/artifacts.py Add the stage directory name to STAGE_DIRECTORIES.
  2. abacus/pipeline/runner.py Add a PipelineStageSpec to PIPELINE_STAGE_SPECS.
  3. abacus/pipeline/runner.py Add the stage function to the stage_functions mapping inside run_pipeline(...).
  4. abacus/pipeline/stages/__init__.py Export the new stage helper if you want it available from the stage package.

Minimal stage example

from abacus.pipeline.artifacts import write_dataframe


def run_custom_stage(context):
    if context.model is None:
        raise ValueError("Model has not been initialized before the custom stage.")

    stage_dir = context.paths.stage_dirs["custom"]
    output_path = stage_dir / "custom_summary.csv"

    frame = context.model.summary.total_contribution(output_format="pandas")
    write_dataframe(output_path, frame)

    return {
        "custom_summary": context.paths.relative(output_path),
    }

Optional stage pattern

If a stage should only run when a config block is present, follow the same pattern as Stage 70:

def run_optional_stage(context):
    cfg = context.raw_cfg.get("my_optional_block")
    if cfg is None:
        return None
    ...

Returning None is what marks the stage as skipped in the manifest.

Failure semantics

If your stage raises an exception:

  • the stage is marked failed
  • the run is marked failed
  • later pending stages are marked not_reached
  • run_pipeline(...) re-raises the exception

That means stage code should only catch exceptions when it can recover locally and still produce a valid artefact set.

Adding structured reporting

If you want progress callbacks without changing the core stage code, implement a PipelineReporter and pass it to run_pipeline(...).

The reporter protocol methods are:

  • on_pipeline_start(...)
  • on_stage_start(...)
  • on_stage_end(...)
  • on_pipeline_end(...)
  • on_pipeline_error(...)

This is the right extension point for:

  • notebooks or dashboards that want progress updates
  • lightweight orchestration wrappers
  • structured logging around pipeline runs

Consuming the manifest programmatically

The manifest is written after every stage transition, so external tools can poll run_manifest.json during execution.

Typical uses:

  • check whether the optimisation stage was skipped
  • discover stage artefact paths without hard-coding filenames
  • detect the first failed stage and its error message

See Output Directory Schema for the manifest fields and status values.