Batch Jobs

The spikelab.batch_jobs sub-package provides Kubernetes batch-job launching helpers for submitting SpikeLab analysis workloads to a cluster. It requires the batch-jobs optional dependency group:

pip install spikelab[batch-jobs]

See the Batch Jobs guide for usage examples.

Models

Pydantic models defining job specifications, cluster profiles, and related configuration.

Typed models used by the batch job launcher.

class spikelab.batch_jobs.models.ContainerSpec(*args, **kwargs)[source]

Bases: BaseModel

Container runtime details for a single-job pod.

image_pull_policy: Literal['Always', 'IfNotPresent', 'Never'] = 'IfNotPresent'
class spikelab.batch_jobs.models.ResourceSpec(*args, **kwargs)[source]

Bases: BaseModel

Resource requests/limits for a job container.

requests_cpu: str = '1'
requests_memory: str = '2Gi'
limits_cpu: str = '1'
limits_memory: str = '2Gi'
class spikelab.batch_jobs.models.VolumeMountSpec(*args, **kwargs)[source]

Bases: BaseModel

Pod volume + mount target information.

sub_path: str | None = None
secret_name: str | None = None
pvc_name: str | None = None
read_only: bool = True
class spikelab.batch_jobs.models.NamespaceHookSpec(*args, **kwargs)[source]

Bases: BaseModel

Per-namespace overrides applied when a job targets a specific namespace.

image_pull_policy: Literal['Always', 'IfNotPresent', 'Never'] | None = None
class spikelab.batch_jobs.models.StoragePathTemplates(*args, **kwargs)[source]

Bases: BaseModel

Python format-string templates for S3 artifact paths.

Available placeholders: {prefix}, {run_id}, {filename}.

inputs: str = '{prefix}inputs/{run_id}/{filename}'
outputs: str = '{prefix}outputs/{run_id}/'
logs: str = '{prefix}logs/{run_id}/'
class spikelab.batch_jobs.models.PolicyConfig(*args, **kwargs)[source]

Bases: BaseModel

Configurable thresholds for the cluster policy engine.

block_sleep_infinity: bool = True
warn_request_limit_mismatch: bool = True
class spikelab.batch_jobs.models.JobSpec(*args, **kwargs)[source]

Bases: BaseModel

High-level description of a Kubernetes batch job.

Single-container assumption: container is a single ContainerSpec, not a list. The rendered job.yaml.j2 template targets one container per pod (named analysis). Multi-container patterns (sidecars for log shipping, init containers for fetch) are not supported by the current template.

name_prefix: str = 'analysis-job'
namespace: str = 'default'
container: ContainerSpec
resources: ResourceSpec
class spikelab.batch_jobs.models.ClusterProfile(*args, **kwargs)[source]

Bases: BaseModel

Cluster defaults that can be merged with a JobSpec.

All organisation-specific configuration (images, secrets, S3 buckets, namespace hooks) belongs in profile YAML files, not in Python source.

name: str
namespace: str = 'default'
default_s3_prefix: str | None = None
endpoint_url: str | None = None
region_name: str | None = None
class spikelab.batch_jobs.models.SubmitResult(*args, **kwargs)[source]

Bases: BaseModel

Result returned by job submission methods.

job_name: str
manifest_yaml: str
run_id: str
uploaded_input_uri: str
output_prefix: str
logs_prefix: str
job_type: Literal['workspace', 'sorting', 'prepared']

Policy Engine

Cluster policy preflight checks using profile-driven thresholds.

Cluster policy preflight checks for job specs.

Thresholds are read from the active ClusterProfile so that different clusters can enforce different rules.

class spikelab.batch_jobs.policy.PolicyFinding(code, level, message)[source]

Bases: object

code: str
level: Literal['PASS', 'WARN', 'BLOCK']
message: str
__init__(code, level, message)
spikelab.batch_jobs.policy.evaluate_policy(job_spec, profile, *, include_passes=False)[source]

Evaluate policy checks using profile-driven thresholds.

Returns a list of PolicyFinding entries describing the outcome of each policy check.

Parameters:
  • job_spec (JobSpec) – The job to evaluate.

  • profile (ClusterProfile) – Cluster profile carrying the PolicyConfig thresholds.

  • include_passes (bool) – When True, every check emits a PolicyFinding(level="PASS", ...) entry alongside the WARN / BLOCK entries — useful for compliance audit tooling that needs a record that each check ran and passed. Default False (Tier L-C4): the common case (CLI preflight, job submission) gets terse output containing only WARN / BLOCK entries, eliminating the ~4 PASS-line log noise per compliant submission.

Returns:

Per-check outcomes.

Return type:

findings (list[PolicyFinding])

spikelab.batch_jobs.policy.summarize_preflight(findings)[source]

Return aggregate level and text summary.

Empty input: when findings is an empty iterable the function returns ("PASS", ""). Callers that distinguish “policy ran and everything passed” from “policy did not run / produced no findings” must inspect the source iterable’s length rather than relying on the returned level alone.

Return type:

tuple[Literal['PASS', 'WARN', 'BLOCK'], str]

Profile Loading

Load cluster profile presets for job execution.

spikelab.batch_jobs.profiles.load_cluster_profile(path)[source]

Load a profile from an explicit YAML path.

Return type:

ClusterProfile

spikelab.batch_jobs.profiles.load_profile_from_name(name)[source]

Load one of the built-in profile files by name.

Unknown names fall back to defaults.yaml for backward compatibility, but emit a UserWarning listing the recognised profile aliases so a typo doesn’t silently land on the wrong profile.

Return type:

ClusterProfile

Run Session

High-level orchestration for packaging, uploading, and job submission.

High-level run orchestration for packaging, uploading, and job submission.

class spikelab.batch_jobs.session.RunSession(*, profile, backend, storage_client, credentials=None)[source]

Bases: object

Coordinates artifact packaging, job submission, and result retrieval.

__init__(*, profile, backend, storage_client, credentials=None)[source]
render_manifest(*, job_name, job_spec, run_id)[source]

Render a Kubernetes Job manifest from a spec and profile.

Return type:

str

submit_workspace_job(*, workspace, script, job_spec, allow_policy_risk=False, bundle_input_paths=None, metadata=None)[source]

Save a workspace, bundle it with a script, and submit a job.

Parameters:
  • workspace (Any) – An AnalysisWorkspace instance or a str path to an existing workspace base path (without extension).

  • script (str) – Path to the analysis script to run inside the container.

  • job_spec (JobSpec) – Kubernetes job specification.

  • allow_policy_risk (bool) – Override policy BLOCK findings.

  • bundle_input_paths (iterable[str] | None) – Extra files to include in the bundle.

  • metadata (dict | None) – Arbitrary metadata written into the bundle manifest.

Returns:

Submission details including the

output prefix where the updated workspace will appear.

Return type:

result (SubmitResult)

submit_sorting_job(*, recording_paths, config=None, config_overrides=None, job_spec, allow_policy_risk=False, metadata=None)[source]

Bundle recording files with a sorting config and submit a job.

Parameters:
  • recording_paths (list[str]) – Paths to recording files.

  • config (Any) – A SortingPipelineConfig instance, a preset name string (e.g. "kilosort4"), or None for defaults.

  • config_overrides (dict | None) – Flat keyword overrides applied to the config via config.override().

  • job_spec (JobSpec) – Kubernetes job specification.

  • allow_policy_risk (bool) – Override policy BLOCK findings.

  • metadata (dict | None) – Arbitrary metadata written into the bundle manifest.

Returns:

Submission details including the

output prefix where sorted results will appear.

Return type:

result (SubmitResult)

submit_prepared_job(*, job_spec, run_id=None, allow_policy_risk=False)[source]

Submit a job without generating bundle artifacts.

Parameters:
  • job_spec (JobSpec) – K8s job spec.

  • run_id (str | None) – Optional explicit run identifier. Must be a single path component — no /, \, or .. segments. Defaults to a random UUID hex when None.

  • allow_policy_risk (bool) – Bypass policy preflight BLOCK findings.

Returns:

The submitted job descriptor.

Return type:

result (SubmitResult)

Notes

  • Unlike submit_workspace_job / submit_sorting_job, this path skips package_analysis_bundle (which has its own traversal guard on run_id). The same traversal check is applied here so an operator-supplied run_id like "../escape" cannot escape the storage prefix.

retrieve_result(submit_result, local_dir)[source]

Download job outputs and return an AnalysisWorkspace.

Parameters:
  • submit_result (SubmitResult) – The result from a prior submit_workspace_job or submit_sorting_job call.

  • local_dir (str) – Local directory to download outputs into.

Returns:

The workspace produced by the

job. For workspace jobs this is the updated workspace; for sorting jobs it contains per-recording namespaces with SpikeData at key "spikedata".

Return type:

workspace (AnalysisWorkspace)

Notes

  • Call wait_for_completion before calling this method to ensure the job has finished.

wait_for_completion(*, job_name, max_wait_seconds=3600, poll_interval_seconds=10, max_consecutive_failures=5, max_backoff_seconds=300)[source]

Poll until completion/failure or timeout and return final state.

Transient backend errors (kubectl network blips, K8s API server restarts, urllib3 timeouts) no longer abort the wait. The poll is wrapped in try/except; consecutive failures accumulate, sleep doubles per failure (exponential backoff capped at max_backoff_seconds), and the loop gives up only when either:

  • max_consecutive_failures polls in a row raise — the backend is considered unavailable; return "Backend unavailable";

  • the kubernetes-client path raises ApiException(404) — the job was deleted out-of-band, no completion can be confirmed; return "Failed";

  • max_wait_seconds elapses — return "Timeout".

A successful poll resets both the consecutive-failure counter and the backoff sleep to poll_interval_seconds.

Parameters:
  • job_name (str) – K8s job identifier.

  • max_wait_seconds (int) – Overall wall-clock budget.

  • poll_interval_seconds (int) – Base sleep between polls.

  • max_consecutive_failures (int) – How many polls may fail back-to-back before declaring the backend unavailable.

  • max_backoff_seconds (int) – Cap on the exponential-backoff sleep so a long blip doesn’t extend the next poll arbitrarily.

Returns:

One of "Complete", "Failed",

"Timeout", "Backend unavailable".

Return type:

state (str)

Kubernetes Backend

Kubernetes backend for batch job submission and monitoring.

class spikelab.batch_jobs.backend_k8s.KubernetesBatchJobBackend(namespace='default', kubeconfig=None, use_kubectl_fallback=True)[source]

Bases: object

Backend wrapper around Kubernetes client with kubectl fallback.

__init__(namespace='default', kubeconfig=None, use_kubectl_fallback=True)[source]
apply_manifest(manifest_path_or_str)[source]

Apply a job manifest by YAML file path or raw YAML string.

Returns the job’s metadata.name (consistent across both paths). Previously the kubectl-fallback path returned the raw stdout of kubectl apply (e.g. "job.batch/myjob created\n") while the Python-client path returned the clean name — callers had no portable way to extract the identifier without sniffing the backend.

Raises ValueError if the manifest’s metadata.namespace is set and disagrees with the backend’s self.namespace — this would otherwise silently deploy into the backend’s namespace, contrary to the rendered manifest. Manifests with no metadata.namespace are accepted and assigned the backend’s namespace as before.

Return type:

str

delete_job(name)[source]

Delete a job and its pods. Idempotent: missing jobs are a no-op.

Matches the kubectl --ignore-not-found=true semantic on the fallback path so the two delete paths behave the same way for the missing-job case. Previously the Python kubernetes-client path propagated ApiException(404) verbatim while the kubectl path exited cleanly.

Return type:

None

job_status(name)[source]

Return one of Pending/Running/Complete/Failed/Unknown.

Return type:

str

pods_for_job(job_name)[source]

Return pod names associated with a job.

Return type:

List[str]

stream_logs(pod_name, follow=True)[source]

Yield log lines from a pod.

Return type:

Iterator[str]

S3 Storage

S3-compatible storage helpers for batch job artifacts.

class spikelab.batch_jobs.storage_s3.S3StorageClient(*, prefix=None, path_templates=None, endpoint_url=None, region_name=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]

Bases: object

Small wrapper around boto3 for upload/download URI handling.

Path layout is controlled by path_templates (a StoragePathTemplates instance loaded from the active profile).

__init__(*, prefix=None, path_templates=None, endpoint_url=None, region_name=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]
build_uri(*, run_id, filename, category='inputs')[source]

Build an S3 URI for a file using the active path templates.

category should be one of the keys defined on StoragePathTemplates ("inputs", "outputs", "logs"). An unknown category silently falls back to the inputs template and emits a UserWarning so typos (“input”, “logs/”, etc.) don’t quietly land in the wrong S3 prefix.

Return type:

str

upload_file(*, local_path, s3_uri)[source]

Upload a local file to S3 and return the URI.

Raises FileNotFoundError if local_path does not exist rather than deferring to boto3’s less informative error.

Return type:

str

upload_bundle(*, local_zip, run_id)[source]

Upload a zip bundle to S3 under the inputs path template.

Return type:

str

output_prefix_for_run(run_id)[source]

Return the S3 prefix for a run’s output files.

Return type:

str

logs_prefix_for_run(run_id)[source]

Return the S3 prefix for a run’s log files.

Return type:

str

download_file(*, s3_uri, local_path)[source]

Download a single file from S3.

Parameters:
  • s3_uri (str) – Full s3://bucket/key URI.

  • local_path (str) – Destination path on disk.

Returns:

The same local_path for convenience.

Return type:

local_path (str)

download_output(*, run_id, filename, local_dir)[source]

Download a file from the output prefix of a run.

Parameters:
  • run_id (str) – Run identifier.

  • filename (str) – Name of the file within the output prefix. .. segments are rejected to prevent path traversal outside local_dir.

  • local_dir (str) – Local directory to save the file into.

Returns:

Absolute path of the downloaded file.

Return type:

local_path (str)

DEFAULT_LIST_OUTPUT_LIMIT = 10000
list_output_files(run_id, *, max_keys=None)[source]

List object keys under the output prefix of a run.

Parameters:
  • run_id (str) – Run identifier.

  • max_keys (int | None) – Cap on the number of keys returned. Defaults to DEFAULT_LIST_OUTPUT_LIMIT (10000) to guard against unbounded memory use on long-running jobs that produced thousands of intermediate files (QC figures, per-recording reports, etc.). Pass an explicit larger value if the caller really needs the full list; exceeding the cap raises ValueError rather than silently truncating.

Returns:

S3 object keys found under the output prefix.

Return type:

keys (list[str])

Raises:

ValueError – When more than max_keys objects exist under the prefix.

Artifact Packager

Create uploadable analysis bundles for batch job execution.

spikelab.batch_jobs.artifact_packager.package_analysis_bundle(*, input_paths, run_id, output_dir, output_format, metadata=None, recording_files=None)[source]

Create a run zip bundle and return its absolute path.

Parameters:
  • input_paths (Iterable[str]) – Paths to copy into the bundle.

  • run_id (str) – RFC-1123-style identifier for the bundle.

  • output_dir (str) – Directory the zip is written into.

  • output_format (str) – One of "workspace", "sorting", "custom". Controls which entrypoint the container will dispatch to.

  • metadata (dict, optional) – Arbitrary JSON-serialisable metadata included in the manifest.

  • recording_files (Iterable[str], optional) – When output_format='sorting', the basenames of the input files that the pod-side entrypoint should treat as recordings. Tier L-C2: replaces the name-blacklist heuristic in entrypoints/sorting.main with a declared whitelist. Ignored for "workspace" and "custom" formats. Required for "sorting" — omitting it raises ValueError.

Return type:

str

Credentials

Credential resolution and redaction utilities.

class spikelab.batch_jobs.credentials.ResolvedCredentials(kubeconfig, aws_access_key_id, aws_secret_access_key, aws_session_token)[source]

Bases: object

kubeconfig: str | None
aws_access_key_id: str | None
aws_secret_access_key: str | None
aws_session_token: str | None
__init__(kubeconfig, aws_access_key_id, aws_secret_access_key, aws_session_token)
spikelab.batch_jobs.credentials.resolve_credentials(*, kubeconfig=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]

Resolve credentials with explicit args first, then environment.

Return type:

ResolvedCredentials

spikelab.batch_jobs.credentials.redact_sensitive_map(values)[source]

Redact common secret values before logging.

Return type:

Dict[str, str]

Notes

  • Keys are matched against word-boundary patterns for SECRET, TOKEN, and PASSWORD. Previously the substring check redacted SECRETS_PATH (and similar non-secret keys that happened to contain SECRET) as a false positive — the value of SECRETS_PATH is a filesystem path, not a credential.

  • None inputs render as the literal "<unset>" so the audit log can distinguish “credential is not configured” from “credential is configured but empty” (the prior implementation collapsed both into "").

Validation

Validation helpers for CLI/API job inputs.

spikelab.batch_jobs.validation.validate_job_spec(payload)[source]

Parse and validate a raw job spec payload.

Return type:

JobSpec

spikelab.batch_jobs.validation.summarize_validation_error(exc)[source]

Return a human-readable validation summary.

Each pydantic issue lands on its own line under an "Invalid job config:" header so multi-issue errors stay scannable. Previously the issues were semicolon-joined into a single dense line, which became hard to read once nested locations appeared.

Return type:

str