"""Pipeline canary — short-window smoke test before each full sort.
A multi-hour KS2 / KS4 sort can fail at the MEX / preprocessing stage
hours into the run because of an architecture incompatibility, missing
CUDA kernel, broken Docker image, or any other "first-time" failure
that only manifests once the sorter actually starts processing data.
The fix that already happened to the configured environment is the
goal of the preflight in :mod:`.guards._preflight`; the canary covers
the residual: failures that need the data + the code paths to actually
run end-to-end before they show themselves.
Operation
---------
When ``ExecutionConfig.canary_first_n_s > 0``,
:func:`run_canary` clones the live config, restricts the recording
window to ``[0, canary_first_n_s]`` seconds, turns off post-sort
exporters and figure generation, relaxes curation so a low-yield
window does not falsely fail, and runs the same backend on the same
recording into ``<inter_path>/_canary/``. The MEX compile, model
loads, Docker image start, and the sorter's first preprocessing pass
all execute under realistic conditions — that is the point.
Failure handling
----------------
* **Classified failure** (``InsufficientActivityError``,
``BiologicalSortFailure``, ``EnvironmentSortFailure``,
``ResourceSortFailure``): returned to the caller so the full sort
is aborted before launch and the canary's exception is propagated
as the recording's classified result.
* **Unexpected failure** (anything else, including the canary itself
hitting OOM or running out of disk): logged but **not** propagated —
the canary is a smoke test, not a hard gate. The full sort proceeds
and the live watchdogs handle resource-shaped issues at runtime.
Edge cases
----------
* Recording shorter than ``canary_first_n_s`` → skip is performed by
the call site in :mod:`spikelab.spike_sorting.pipeline` (where the
recording duration is known), not inside :func:`run_canary`. If the
caller invokes :func:`run_canary` on a too-short recording, the
backend's slicing clamps to the actual duration and the canary
effectively runs against the whole recording.
* Canary leaves a small amount of intermediate state under
``<inter_path>/_canary/``; cleanup is best-effort on success.
"""
from __future__ import annotations
import logging
import math
import os
import shutil
import time
from pathlib import Path
from typing import Any, Optional
import numpy as np
from ._exceptions import (
CLASSIFIED_FAILURES as _CLASSIFIED_FAILURES,
EnvironmentSortFailure,
)
_logger = logging.getLogger(__name__)
def _build_canary_config(config: Any, canary_window_s: float) -> Any:
"""Return a config clone tuned for a short smoke-test sort.
The clone preserves everything that affects whether the sort can
*start* (sorter selection, sorter params, MEX compile, Docker
image, model paths, recording loader settings) and turns off
everything that only matters for the *output* of a real sort
(post-sort exporters, figures, the per-recording report, the
second preflight pass that already ran).
Curation is disabled because a tiny window legitimately yields
too few units, and a curation-driven empty result would surface
as a false-positive canary failure.
Parameters:
config (SortingPipelineConfig): The original pipeline config.
canary_window_s (float): Seconds of the recording the canary
should sort.
Returns:
canary_config (SortingPipelineConfig): Deep copy with canary
overrides applied.
"""
# Scale the inactivity baseline with the canary window: a 30s
# smoke test should not wait the same 5 min as a multi-hour sort
# before flagging a hang. The floor (120s) absorbs Docker / MEX
# cold-start; the ceiling (300s) caps the timeout at the original
# full-sort default for unusually large windows.
canary_inactivity_base_s = min(300.0, max(120.0, 4.0 * canary_window_s))
overrides = {
# Restrict to the leading window; clear any rec_chunks that
# would otherwise force the loader into a multi-segment path.
# Note: ``start_time_s`` / ``end_time_s`` are allowed to
# coexist with the per-file rec_chunks that directory
# concatenation auto-populates — the loader treats time
# slicing as an explicit override of those auto-populated
# boundaries (see ``_load_recording_with_state`` in
# ``recording_io.py`` for the precedence rules).
"start_time_s": 0.0,
"end_time_s": float(canary_window_s),
"rec_chunks": [],
"rec_chunks_s": [],
# Skip curation — too few units in a 30 s window is normal.
"curate_first": False,
"curate_second": False,
# ``tee_log_policy="keep"`` preserves the canary's tee log
# for debugging. The tee log lives outside the per-pid
# ``_canary_<pid>/`` folder, so ``_wipe_canary_folder`` does
# not clean it up — operators should sweep stale canary tee
# logs from the parent results folder periodically.
# Skip post-sort exporters — canary outputs are discarded.
"compile_single_recording": False,
"compile_to_mat": False,
"compile_to_npz": False,
"compile_waveforms": False,
"save_raw_pkl": False,
"save_dl_data": False,
# Skip figures and the human-readable report.
"create_figures": False,
"create_unit_figures": False,
"generate_sorting_report": False,
# Avoid double-running preflight (the outer caller already
# ran it) and keep the canary log file in place so a failure
# is debuggable.
"preflight": False,
"tee_log_policy": "keep",
"sorter_inactivity_base_s": canary_inactivity_base_s,
}
return config.override(**overrides)
def _wipe_canary_folder(folder: Path, *, strict: bool = False) -> None:
"""Best-effort cleanup of the canary's intermediate folder.
Parameters:
folder (Path): Folder to wipe.
strict (bool): When True, raises if the wipe fails. Used at
entry-time so a permission-denied wipe is surfaced
rather than silently running the canary against a
partially-cleaned folder. Defaults to False (best-effort
cleanup) for the post-canary cleanup call site.
"""
try:
if folder.exists():
if strict:
# ignore_errors=False — any failure raises so the
# caller can decide how to surface it.
shutil.rmtree(folder)
else:
shutil.rmtree(folder, ignore_errors=True)
except Exception as exc:
if strict:
raise
_logger.warning("cleanup of %s failed: %r", folder, exc)
[docs]
def run_canary(
config: Any,
recording: Any,
rec_path: Any,
inter_path: Any,
*,
sorter_name: Optional[str] = None,
rec_name: str = "canary",
rng: Any = None,
) -> Optional[BaseException]:
"""Run a short-window smoke test of the configured backend.
Builds a canary clone of *config* (see :func:`_build_canary_config`),
spins up a fresh backend instance against that clone, and invokes
:func:`spikelab.spike_sorting.pipeline.process_recording` against
a ``<inter_path>/_canary/`` subdirectory.
Parameters:
config (SortingPipelineConfig): Live pipeline configuration.
Read but never mutated.
recording: Pre-loaded ``BaseRecording`` for the canary, or
``None`` when only a path is available.
rec_path: Path to the recording on disk. Used by the backend
loader when *recording* is ``None``.
inter_path: The recording's intermediate folder. The canary
writes under a ``_canary`` sub-folder so the real sort's
artefacts are untouched.
sorter_name (str or None): Override the sorter resolved from
``config.sorter.sorter_name``. Mostly used by tests.
rec_name (str): Short identifier for the canary in log
output.
rng (np.random.Generator or None): Optional RNG passed
through to ``process_recording`` for reproducibility.
Returns:
result (BaseException or None): A classified exception when
the canary discovered a failure the full sort would also
have hit; ``None`` when the canary succeeded *or* when
the canary itself hit an unexpected non-classified
failure (which the live watchdogs are responsible for
during the real run).
"""
canary_window_s = float(getattr(config.execution, "canary_first_n_s", 0.0))
# NaN comparisons are always False, so a NaN window would skip the
# ``<= 0`` guard and proceed to build a meaningless canary with
# ``end_time_s=NaN``. Treat NaN the same as "disabled".
if math.isnan(canary_window_s) or canary_window_s <= 0:
return None
# The canary backend is constructed below with a derived clone of
# *config*; the full sort retains its own config reference, so no
# state is shared between the two runs. (The pre-refactor design
# mutated ``_globals.*`` to apply canary overrides and needed a
# snapshot/restore wrapper here; that bridge was removed in Phase 5
# of ``iat/TO_IMPLEMENT.md`` when the global state was deleted.)
canary_config = _build_canary_config(config, canary_window_s)
# Per-pid subfolder so two direct callers of run_canary against
# the same inter_path cannot race on the wipe + mkdir. The
# standard pipeline flow already serialises via acquire_sort_lock
# on inter_path, but run_canary is also exposed as a public
# function and direct callers have no such protection.
canary_root = Path(inter_path) / f"_canary_{os.getpid()}"
# Strict wipe at entry — running the canary against a partially-
# cleaned folder could mask sorter behaviour. The cleanup-phase
# wipe at exit stays best-effort.
_wipe_canary_folder(canary_root, strict=True)
canary_root.mkdir(parents=True, exist_ok=True)
canary_inter = canary_root / "inter"
canary_results = canary_root / "results"
sorter = sorter_name or getattr(config.sorter, "sorter_name", "")
_logger.info(
"running %.1fs smoke test for %s via %s",
canary_window_s,
rec_name,
sorter,
)
started_t = time.monotonic()
try:
from .backends import get_backend_class, list_sorters
from .pipeline import process_recording
known_sorters = list_sorters()
if sorter not in known_sorters:
raise EnvironmentSortFailure(
f"unknown sorter name: {sorter!r}. "
f"Known sorters: {sorted(known_sorters)}"
)
backend_cls = get_backend_class(sorter)
canary_backend = backend_cls(canary_config)
result = process_recording(
canary_backend,
canary_config,
rec_name,
rec_path,
canary_inter,
canary_results,
rec_loaded=recording,
rec_chunks=None,
rec_chunk_names=None,
rng=rng,
)
except _CLASSIFIED_FAILURES as exc:
_logger.warning("classified failure: %s: %s", type(exc).__name__, exc)
_wipe_canary_folder(canary_root)
return exc
except (KeyboardInterrupt, SystemExit):
# User abort or watchdog interrupt — never swallow. Clean up
# the canary folder and let the interrupt propagate so the
# outer pipeline tears down promptly.
_wipe_canary_folder(canary_root)
raise
except Exception as exc:
# Unexpected failure — the canary is a smoke test, not a hard
# gate. Log and let the full sort proceed; live watchdogs
# handle resource-shaped issues at runtime.
_logger.warning(
"non-classified failure (%s: %s); proceeding with the full sort.",
type(exc).__name__,
exc,
)
_wipe_canary_folder(canary_root)
return None
if isinstance(result, _CLASSIFIED_FAILURES):
_logger.warning("classified failure: %s: %s", type(result).__name__, result)
_wipe_canary_folder(canary_root)
return result
if isinstance(result, (KeyboardInterrupt, SystemExit)):
# process_recording returned the interrupt as a value rather
# than raising; surface it the same way as the raised path.
_wipe_canary_folder(canary_root)
raise result
if isinstance(result, BaseException):
_logger.warning(
"non-classified failure (%s: %s); proceeding with the full sort.",
type(result).__name__,
result,
)
_wipe_canary_folder(canary_root)
return None
elapsed_s = time.monotonic() - started_t
n_units = _extract_unit_count(result)
if n_units is None:
_logger.info(
"passed in %.1fs; proceeding with the full sort.",
elapsed_s,
)
else:
_logger.info(
"passed: produced %d unit(s) in %.1fs; proceeding with the full sort.",
n_units,
elapsed_s,
)
_wipe_canary_folder(canary_root)
return None
def _extract_unit_count(result: Any) -> Optional[int]:
"""Best-effort unit count from a ``process_recording`` success result.
``process_recording`` returns either a single ``SpikeData`` or a
``(sd, sd_curated)`` tuple depending on
``ExecutionConfig.save_raw_pkl``. When neither shape matches, or
when the object lacks ``N`` (number of units), returns ``None``
so the caller falls back to a unit-count-less log line.
"""
candidate = result
if isinstance(result, tuple) and result:
# Prefer the curated SpikeData if present (last entry).
candidate = result[-1]
n = getattr(candidate, "N", None)
# Accept numpy integer types (np.int64, etc.) as well as Python int.
# SpikeData.N is sometimes assigned from numpy operations such as
# np.unique(...).size, which returns a numpy scalar.
if isinstance(n, (int, np.integer)):
return int(n)
return None