"""Cluster policy preflight checks for job specs.
Thresholds are read from the active :class:`ClusterProfile` so that
different clusters can enforce different rules.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Iterable, List, Literal, Sequence
from .models import ClusterProfile, JobSpec, PolicyConfig
Level = Literal["PASS", "WARN", "BLOCK"]
[docs]
@dataclass
class PolicyFinding:
code: str
level: Level
message: str
_SLEEP_THRESHOLD_DEFAULT = 86_400 # 24 hours in seconds — backstop when no
# PolicyConfig is supplied (kept as a
# module constant for direct callers).
def _contains_disallowed_sleep(
command: Sequence[str],
args: Sequence[str],
*,
threshold_s: int = _SLEEP_THRESHOLD_DEFAULT,
) -> bool:
"""Detect idle-placeholder sleep patterns in batch job commands.
This is a best-effort heuristic, not a security boundary. It catches
common idle patterns (``sleep infinity``, ``sleep inf``, bare
``sleep``, and ``sleep <large_number>``) but cannot detect arbitrary
constructs like ``while true; do sleep 60; done`` or obfuscated
variants. The goal is to flag accidental misuse, not to prevent
determined circumvention.
Parameters:
command (Sequence[str]): The container's command tokens.
args (Sequence[str]): The container's arg tokens.
threshold_s (int): Cap (in seconds) above which a bare
``sleep <number>`` is considered idle. Defaults to 24h.
Pulled from ``PolicyConfig.sleep_duration_threshold_s`` by
``evaluate_policy``.
Notes:
- The bare-``sleep`` check fires only when ``sleep`` is the sole
token across ``command + args``. A trailing token that
happens to be the literal string ``"sleep"`` (e.g.
``["python", "-c", "sleep"]``, where ``"sleep"`` is a Python
snippet, not a shell command) is intentionally NOT flagged.
This avoids false positives on commands that legitimately
pass the string ``"sleep"`` as an argument. The downside is
that a determined operator can sneak in a real
``exec("sleep infinity")``-style payload; this is documented
as out-of-scope for the heuristic.
"""
# Flatten multi-word tokens (e.g., ["sleep infinity"] from sh -c)
# into individual words for consistent token-pair matching.
all_tokens = []
for tok in [*command, *args]:
all_tokens.extend(tok.split())
# Bare "sleep" as the sole command (no duration argument)
if len(all_tokens) == 1 and all_tokens[0].lower() == "sleep":
return True
# Check token pairs: "sleep infinity", "sleep inf", "sleep <large_number>"
for i, tok in enumerate(all_tokens):
if tok.lower() != "sleep" or i + 1 >= len(all_tokens):
continue
next_tok = all_tokens[i + 1].lower()
if next_tok in ("infinity", "inf"):
return True
try:
duration = float(next_tok)
# Flag any non-finite duration (NaN, +inf, -inf) as suspicious —
# the actual sleep binary rejects these, and a job spec with
# such a token is almost certainly a bug or an obfuscation
# attempt around the literal "inf" / "infinity" check above.
if not math.isfinite(duration) or duration >= threshold_s:
return True
except (ValueError, IndexError):
pass
return False
[docs]
def evaluate_policy(
job_spec: JobSpec,
profile: ClusterProfile,
*,
include_passes: bool = False,
) -> List[PolicyFinding]:
"""Evaluate policy checks using profile-driven thresholds.
Returns a list of :class:`PolicyFinding` entries describing the
outcome of each policy check.
Parameters:
job_spec (JobSpec): The job to evaluate.
profile (ClusterProfile): Cluster profile carrying the
:class:`PolicyConfig` thresholds.
include_passes (bool): When True, every check emits a
``PolicyFinding(level="PASS", ...)`` entry alongside the
WARN / BLOCK entries — useful for compliance audit
tooling that needs a record that each check ran and
passed. Default False (Tier L-C4): the common case
(CLI preflight, job submission) gets terse output
containing only WARN / BLOCK entries, eliminating the
~4 PASS-line log noise per compliant submission.
Returns:
findings (list[PolicyFinding]): Per-check outcomes.
"""
findings: List[PolicyFinding] = []
res = job_spec.resources
cfg: PolicyConfig = profile.policy
if res.requests_gpu > cfg.max_interactive_gpus:
findings.append(
PolicyFinding(
"interactive_gpu_limit",
"WARN",
f"Requested GPUs exceed interactive limit guidance "
f"({cfg.max_interactive_gpus} GPUs).",
)
)
elif include_passes:
findings.append(
PolicyFinding(
"interactive_gpu_limit",
"PASS",
"GPU request is within interactive guidance.",
)
)
sleep_present = _contains_disallowed_sleep(
job_spec.container.command,
job_spec.container.args,
threshold_s=cfg.sleep_duration_threshold_s,
)
if cfg.block_sleep_infinity and sleep_present:
findings.append(
PolicyFinding(
"sleep_in_batch_job",
"BLOCK",
"Batch jobs containing 'sleep infinity' or trailing sleep "
"are disallowed.",
)
)
elif sleep_present:
# ``block_sleep_infinity`` is disabled in the profile, but a
# sleep pattern *was* detected. WARN keeps the audit trail
# honest — the previous code emitted a PASS even though the
# pattern was present, just not blocked. WARN is always
# emitted regardless of include_passes.
findings.append(
PolicyFinding(
"sleep_in_batch_job",
"WARN",
"Sleep pattern detected but block_sleep_infinity is "
"disabled in this profile; the job will be permitted "
"but the pattern is recorded for audit.",
)
)
elif include_passes:
findings.append(
PolicyFinding(
"sleep_in_batch_job",
"PASS",
"No forbidden sleep patterns detected in command/args.",
)
)
if cfg.warn_request_limit_mismatch and (
res.requests_cpu != res.limits_cpu or res.requests_memory != res.limits_memory
):
findings.append(
PolicyFinding(
"request_limit_mismatch",
"WARN",
"Cluster recommends requests close to limits; tune with monitoring.",
)
)
elif include_passes:
findings.append(
PolicyFinding(
"request_limit_mismatch",
"PASS",
"CPU/memory requests and limits are aligned.",
)
)
if not job_spec.active_deadline_seconds:
# No deadline set; the cluster's own max applies via Kubernetes.
# Audit-only PASS — gated by include_passes.
if include_passes:
findings.append(
PolicyFinding(
"long_runtime",
"PASS",
"No active_deadline_seconds set; cluster default applies.",
)
)
elif job_spec.active_deadline_seconds > cfg.max_runtime_seconds:
findings.append(
PolicyFinding(
"long_runtime",
"WARN",
f"Runtime ({job_spec.active_deadline_seconds}s) exceeds "
f"configured maximum ({cfg.max_runtime_seconds}s).",
)
)
elif include_passes:
findings.append(
PolicyFinding(
"long_runtime",
"PASS",
f"Runtime ({job_spec.active_deadline_seconds}s) within "
f"configured maximum ({cfg.max_runtime_seconds}s).",
)
)
return findings
[docs]
def summarize_preflight(findings: Iterable[PolicyFinding]) -> tuple[Level, str]:
"""Return aggregate level and text summary.
Empty input: when *findings* is an empty iterable the function
returns ``("PASS", "")``. Callers that distinguish "policy ran and
everything passed" from "policy did not run / produced no
findings" must inspect the source iterable's length rather than
relying on the returned level alone.
"""
levels = {finding.level for finding in findings}
if "BLOCK" in levels:
status: Level = "BLOCK"
elif "WARN" in levels:
status = "WARN"
else:
status = "PASS"
text = "\n".join(
f"[{finding.level}] {finding.code}: {finding.message}" for finding in findings
)
return status, text