Batch Jobs
The spikelab.batch_jobs sub-package provides Kubernetes batch-job
launching helpers for submitting SpikeLab analysis workloads to a cluster.
It requires the batch-jobs optional dependency group:
pip install spikelab[batch-jobs]
See the Batch Jobs guide for usage examples.
Models
Pydantic models defining job specifications, cluster profiles, and related configuration.
Typed models used by the batch job launcher.
- class spikelab.batch_jobs.models.ContainerSpec(*args, **kwargs)[source]
Bases:
BaseModelContainer runtime details for a single-job pod.
- class spikelab.batch_jobs.models.ResourceSpec(*args, **kwargs)[source]
Bases:
BaseModelResource requests/limits for a job container.
- class spikelab.batch_jobs.models.VolumeMountSpec(*args, **kwargs)[source]
Bases:
BaseModelPod volume + mount target information.
- class spikelab.batch_jobs.models.NamespaceHookSpec(*args, **kwargs)[source]
Bases:
BaseModelPer-namespace overrides applied when a job targets a specific namespace.
- class spikelab.batch_jobs.models.StoragePathTemplates(*args, **kwargs)[source]
Bases:
BaseModelPython format-string templates for S3 artifact paths.
Available placeholders:
{prefix},{run_id},{filename}.
- class spikelab.batch_jobs.models.PolicyConfig(*args, **kwargs)[source]
Bases:
BaseModelConfigurable thresholds for the cluster policy engine.
- class spikelab.batch_jobs.models.JobSpec(*args, **kwargs)[source]
Bases:
BaseModelHigh-level description of a Kubernetes batch job.
Single-container assumption:
containeris a singleContainerSpec, not a list. The renderedjob.yaml.j2template targets one container per pod (namedanalysis). Multi-container patterns (sidecars for log shipping, init containers for fetch) are not supported by the current template.- container: ContainerSpec
- resources: ResourceSpec
Policy Engine
Cluster policy preflight checks using profile-driven thresholds.
Cluster policy preflight checks for job specs.
Thresholds are read from the active ClusterProfile so that
different clusters can enforce different rules.
- class spikelab.batch_jobs.policy.PolicyFinding(code, level, message)[source]
Bases:
object- __init__(code, level, message)
- spikelab.batch_jobs.policy.evaluate_policy(job_spec, profile, *, include_passes=False)[source]
Evaluate policy checks using profile-driven thresholds.
Returns a list of
PolicyFindingentries describing the outcome of each policy check.- Parameters:
job_spec (JobSpec) – The job to evaluate.
profile (ClusterProfile) – Cluster profile carrying the
PolicyConfigthresholds.include_passes (bool) – When True, every check emits a
PolicyFinding(level="PASS", ...)entry alongside the WARN / BLOCK entries — useful for compliance audit tooling that needs a record that each check ran and passed. Default False (Tier L-C4): the common case (CLI preflight, job submission) gets terse output containing only WARN / BLOCK entries, eliminating the ~4 PASS-line log noise per compliant submission.
- Returns:
Per-check outcomes.
- Return type:
findings (list[PolicyFinding])
- spikelab.batch_jobs.policy.summarize_preflight(findings)[source]
Return aggregate level and text summary.
Empty input: when findings is an empty iterable the function returns
("PASS", ""). Callers that distinguish “policy ran and everything passed” from “policy did not run / produced no findings” must inspect the source iterable’s length rather than relying on the returned level alone.
Profile Loading
Load cluster profile presets for job execution.
- spikelab.batch_jobs.profiles.load_cluster_profile(path)[source]
Load a profile from an explicit YAML path.
- Return type:
- spikelab.batch_jobs.profiles.load_profile_from_name(name)[source]
Load one of the built-in profile files by name.
Unknown names fall back to
defaults.yamlfor backward compatibility, but emit aUserWarninglisting the recognised profile aliases so a typo doesn’t silently land on the wrong profile.- Return type:
Run Session
High-level orchestration for packaging, uploading, and job submission.
High-level run orchestration for packaging, uploading, and job submission.
- class spikelab.batch_jobs.session.RunSession(*, profile, backend, storage_client, credentials=None)[source]
Bases:
objectCoordinates artifact packaging, job submission, and result retrieval.
- render_manifest(*, job_name, job_spec, run_id)[source]
Render a Kubernetes Job manifest from a spec and profile.
- Return type:
- submit_workspace_job(*, workspace, script, job_spec, allow_policy_risk=False, bundle_input_paths=None, metadata=None)[source]
Save a workspace, bundle it with a script, and submit a job.
- Parameters:
workspace (
Any) – AnAnalysisWorkspaceinstance or astrpath to an existing workspace base path (without extension).script (str) – Path to the analysis script to run inside the container.
job_spec (JobSpec) – Kubernetes job specification.
allow_policy_risk (bool) – Override policy BLOCK findings.
bundle_input_paths (iterable[str] | None) – Extra files to include in the bundle.
metadata (dict | None) – Arbitrary metadata written into the bundle manifest.
- Returns:
- Submission details including the
output prefix where the updated workspace will appear.
- Return type:
result (SubmitResult)
- submit_sorting_job(*, recording_paths, config=None, config_overrides=None, job_spec, allow_policy_risk=False, metadata=None)[source]
Bundle recording files with a sorting config and submit a job.
- Parameters:
config (
Any) – ASortingPipelineConfiginstance, a preset name string (e.g."kilosort4"), or None for defaults.config_overrides (dict | None) – Flat keyword overrides applied to the config via
config.override().job_spec (JobSpec) – Kubernetes job specification.
allow_policy_risk (bool) – Override policy BLOCK findings.
metadata (dict | None) – Arbitrary metadata written into the bundle manifest.
- Returns:
- Submission details including the
output prefix where sorted results will appear.
- Return type:
result (SubmitResult)
- submit_prepared_job(*, job_spec, run_id=None, allow_policy_risk=False)[source]
Submit a job without generating bundle artifacts.
- Parameters:
- Returns:
The submitted job descriptor.
- Return type:
result (SubmitResult)
Notes
Unlike
submit_workspace_job/submit_sorting_job, this path skipspackage_analysis_bundle(which has its own traversal guard on run_id). The same traversal check is applied here so an operator-supplied run_id like"../escape"cannot escape the storage prefix.
- retrieve_result(submit_result, local_dir)[source]
Download job outputs and return an AnalysisWorkspace.
- Parameters:
submit_result (SubmitResult) – The result from a prior
submit_workspace_joborsubmit_sorting_jobcall.local_dir (str) – Local directory to download outputs into.
- Returns:
- The workspace produced by the
job. For workspace jobs this is the updated workspace; for sorting jobs it contains per-recording namespaces with SpikeData at key
"spikedata".
- Return type:
workspace (AnalysisWorkspace)
Notes
Call
wait_for_completionbefore calling this method to ensure the job has finished.
- wait_for_completion(*, job_name, max_wait_seconds=3600, poll_interval_seconds=10, max_consecutive_failures=5, max_backoff_seconds=300)[source]
Poll until completion/failure or timeout and return final state.
Transient backend errors (kubectl network blips, K8s API server restarts, urllib3 timeouts) no longer abort the wait. The poll is wrapped in try/except; consecutive failures accumulate, sleep doubles per failure (exponential backoff capped at
max_backoff_seconds), and the loop gives up only when either:max_consecutive_failurespolls in a row raise — the backend is considered unavailable; return"Backend unavailable";the kubernetes-client path raises
ApiException(404)— the job was deleted out-of-band, no completion can be confirmed; return"Failed";max_wait_secondselapses — return"Timeout".
A successful poll resets both the consecutive-failure counter and the backoff sleep to
poll_interval_seconds.- Parameters:
job_name (str) – K8s job identifier.
max_wait_seconds (int) – Overall wall-clock budget.
poll_interval_seconds (int) – Base sleep between polls.
max_consecutive_failures (int) – How many polls may fail back-to-back before declaring the backend unavailable.
max_backoff_seconds (int) – Cap on the exponential-backoff sleep so a long blip doesn’t extend the next poll arbitrarily.
- Returns:
- One of
"Complete","Failed", "Timeout","Backend unavailable".
- One of
- Return type:
state (str)
Kubernetes Backend
Kubernetes backend for batch job submission and monitoring.
- class spikelab.batch_jobs.backend_k8s.KubernetesBatchJobBackend(namespace='default', kubeconfig=None, use_kubectl_fallback=True)[source]
Bases:
objectBackend wrapper around Kubernetes client with kubectl fallback.
- apply_manifest(manifest_path_or_str)[source]
Apply a job manifest by YAML file path or raw YAML string.
Returns the job’s
metadata.name(consistent across both paths). Previously the kubectl-fallback path returned the raw stdout ofkubectl apply(e.g."job.batch/myjob created\n") while the Python-client path returned the clean name — callers had no portable way to extract the identifier without sniffing the backend.Raises
ValueErrorif the manifest’smetadata.namespaceis set and disagrees with the backend’sself.namespace— this would otherwise silently deploy into the backend’s namespace, contrary to the rendered manifest. Manifests with nometadata.namespaceare accepted and assigned the backend’s namespace as before.- Return type:
- delete_job(name)[source]
Delete a job and its pods. Idempotent: missing jobs are a no-op.
Matches the
kubectl --ignore-not-found=truesemantic on the fallback path so the two delete paths behave the same way for the missing-job case. Previously the Python kubernetes-client path propagatedApiException(404)verbatim while the kubectl path exited cleanly.- Return type:
S3 Storage
S3-compatible storage helpers for batch job artifacts.
- class spikelab.batch_jobs.storage_s3.S3StorageClient(*, prefix=None, path_templates=None, endpoint_url=None, region_name=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]
Bases:
objectSmall wrapper around boto3 for upload/download URI handling.
Path layout is controlled by path_templates (a
StoragePathTemplatesinstance loaded from the active profile).- __init__(*, prefix=None, path_templates=None, endpoint_url=None, region_name=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]
- build_uri(*, run_id, filename, category='inputs')[source]
Build an S3 URI for a file using the active path templates.
categoryshould be one of the keys defined onStoragePathTemplates("inputs","outputs","logs"). An unknown category silently falls back to theinputstemplate and emits aUserWarningso typos (“input”, “logs/”, etc.) don’t quietly land in the wrong S3 prefix.- Return type:
- upload_file(*, local_path, s3_uri)[source]
Upload a local file to S3 and return the URI.
Raises
FileNotFoundErroriflocal_pathdoes not exist rather than deferring to boto3’s less informative error.- Return type:
- upload_bundle(*, local_zip, run_id)[source]
Upload a zip bundle to S3 under the inputs path template.
- Return type:
- download_output(*, run_id, filename, local_dir)[source]
Download a file from the output prefix of a run.
- Parameters:
- Returns:
Absolute path of the downloaded file.
- Return type:
local_path (str)
- DEFAULT_LIST_OUTPUT_LIMIT = 10000
- list_output_files(run_id, *, max_keys=None)[source]
List object keys under the output prefix of a run.
- Parameters:
run_id (str) – Run identifier.
max_keys (int | None) – Cap on the number of keys returned. Defaults to
DEFAULT_LIST_OUTPUT_LIMIT(10000) to guard against unbounded memory use on long-running jobs that produced thousands of intermediate files (QC figures, per-recording reports, etc.). Pass an explicit larger value if the caller really needs the full list; exceeding the cap raisesValueErrorrather than silently truncating.
- Returns:
S3 object keys found under the output prefix.
- Return type:
- Raises:
ValueError – When more than
max_keysobjects exist under the prefix.
Artifact Packager
Create uploadable analysis bundles for batch job execution.
- spikelab.batch_jobs.artifact_packager.package_analysis_bundle(*, input_paths, run_id, output_dir, output_format, metadata=None, recording_files=None)[source]
Create a run zip bundle and return its absolute path.
- Parameters:
input_paths (Iterable[str]) – Paths to copy into the bundle.
run_id (str) – RFC-1123-style identifier for the bundle.
output_dir (str) – Directory the zip is written into.
output_format (str) – One of
"workspace","sorting","custom". Controls which entrypoint the container will dispatch to.metadata (dict, optional) – Arbitrary JSON-serialisable metadata included in the manifest.
recording_files (Iterable[str], optional) – When
output_format='sorting', the basenames of the input files that the pod-side entrypoint should treat as recordings. Tier L-C2: replaces the name-blacklist heuristic inentrypoints/sorting.mainwith a declared whitelist. Ignored for"workspace"and"custom"formats. Required for"sorting"— omitting it raisesValueError.
- Return type:
Credentials
Credential resolution and redaction utilities.
- class spikelab.batch_jobs.credentials.ResolvedCredentials(kubeconfig, aws_access_key_id, aws_secret_access_key, aws_session_token)[source]
Bases:
object- __init__(kubeconfig, aws_access_key_id, aws_secret_access_key, aws_session_token)
- spikelab.batch_jobs.credentials.resolve_credentials(*, kubeconfig=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None)[source]
Resolve credentials with explicit args first, then environment.
- Return type:
- spikelab.batch_jobs.credentials.redact_sensitive_map(values)[source]
Redact common secret values before logging.
Notes
Keys are matched against word-boundary patterns for
SECRET,TOKEN, andPASSWORD. Previously the substring check redactedSECRETS_PATH(and similar non-secret keys that happened to containSECRET) as a false positive — the value ofSECRETS_PATHis a filesystem path, not a credential.Noneinputs render as the literal"<unset>"so the audit log can distinguish “credential is not configured” from “credential is configured but empty” (the prior implementation collapsed both into"").
Validation
Validation helpers for CLI/API job inputs.
- spikelab.batch_jobs.validation.validate_job_spec(payload)[source]
Parse and validate a raw job spec payload.
- Return type:
- spikelab.batch_jobs.validation.summarize_validation_error(exc)[source]
Return a human-readable validation summary.
Each pydantic issue lands on its own line under an
"Invalid job config:"header so multi-issue errors stay scannable. Previously the issues were semicolon-joined into a single dense line, which became hard to read once nested locations appeared.- Return type: