Source code for spikelab.data_loaders.s3_utils

"""
Utilities for handling S3-backed inputs.

These helpers support:
- Detecting S3 URLs (`s3://...` and common `https://...amazonaws.com/...` forms)
- Parsing bucket/key pairs from S3 URLs
- Downloading S3 objects to local temporary files for downstream processing
- Treating local paths and S3 URLs uniformly (`ensure_local_file`)

This module intentionally has **no** dependency on the MCP server implementation
so it can be reused by the core analysis package and other integrations.
"""

from __future__ import annotations

import os
import tempfile
from pathlib import Path
from typing import Optional, Tuple
from urllib.parse import urlparse

try:
    import boto3
    from botocore.exceptions import ClientError, NoCredentialsError
except ImportError:  # pragma: no cover
    boto3 = None
    ClientError = Exception
    NoCredentialsError = Exception


def _build_s3_kwargs(
    aws_access_key_id=None,
    aws_secret_access_key=None,
    aws_session_token=None,
    region_name=None,
):
    """Build boto3 client kwargs from optional credential parameters."""
    kwargs = {}
    if aws_access_key_id:
        kwargs["aws_access_key_id"] = aws_access_key_id
    if aws_secret_access_key:
        kwargs["aws_secret_access_key"] = aws_secret_access_key
    if aws_session_token:
        kwargs["aws_session_token"] = aws_session_token
    if region_name:
        kwargs["region_name"] = region_name
    return kwargs


__all__ = [
    "is_s3_url",
    "parse_s3_url",
    "download_from_s3",
    "upload_to_s3",
    "ensure_local_file",
]


[docs] def is_s3_url(url: str) -> bool: """Return True if url looks like an S3 URL (s3:// or https://...amazonaws.com). Parameters: url (str): URL string to check. Surrounding whitespace is stripped and the scheme prefix is matched case-insensitively (so ``S3://``, `` s3://... `` etc. are all recognised). Returns: is_s3 (bool): True if the URL matches an S3 pattern. """ url = url.strip() lower = url.lower() if lower.startswith("s3://"): return True if lower.startswith("https://") or lower.startswith("http://"): parsed = urlparse(url) host = parsed.netloc.lower() # Anchor the host check to the right-hand side so a hostile # host like ``s3.evil.amazonaws.com.attacker.example`` (which # passes a naive ``in`` substring test) does not slip # through. Accept any host that ENDS with ``.amazonaws.com`` # and either starts with ``s3`` (path-style endpoints like # ``s3.amazonaws.com``, ``s3.us-west-2.amazonaws.com``) or # contains ``.s3`` followed by either ``.amazonaws.com`` or # ``.<region>.amazonaws.com`` (virtual-hosted-style like # ``my-bucket.s3.amazonaws.com``). if not host.endswith(".amazonaws.com") and host != "amazonaws.com": return False if host.startswith("s3.") or host == "s3.amazonaws.com": return True if ".s3." in host or host.endswith(".s3.amazonaws.com"): return True return False return False
[docs] def parse_s3_url(url: str) -> Tuple[str, str]: """Parse an S3 URL into (bucket, key). Supported forms include s3://bucket/key, path-style HTTPS (s3.amazonaws.com/bucket/key), and virtual-hosted-style HTTPS (bucket.s3.amazonaws.com/key), with optional region subdomains. Parameters: url (str): S3 URL to parse. Returns: bucket_key (tuple[str, str]): The (bucket, key) pair extracted from the URL. Raises: ValueError: If the URL format is not recognised or has no object key. """ if url.startswith("s3://"): path = url[5:] parts = path.split("/", 1) bucket = parts[0] key = parts[1] if len(parts) > 1 else "" # Strip a single trailing slash so ``s3://bucket/key/`` is # treated the same as ``s3://bucket/key`` and reaches a real # object instead of falling through to boto3 as an empty # prefix (which produces a cryptic NoSuchKey). key = key.rstrip("/") if not key: raise ValueError( f"S3 URL '{url}' has no object key. " "A bucket-only URL cannot identify a downloadable object." ) return bucket, key if url.startswith("https://") or url.startswith("http://"): parsed = urlparse(url) host = parsed.netloc path = parsed.path.lstrip("/") # Path-style: https://s3.../bucket/key if host.startswith("s3") and "amazonaws.com" in host: parts = path.split("/", 1) if not parts or parts[0] == "": raise ValueError(f"Invalid S3 URL format: {url}") bucket = parts[0] key = parts[1] if len(parts) > 1 else "" if not key or key == "/": raise ValueError( f"S3 URL '{url}' has no object key. " "A bucket-only URL cannot identify a downloadable object." ) return bucket, key # Virtual-hosted-style: https://bucket.s3.../key if ".s3" in host and "amazonaws.com" in host: bucket = host.split(".s3", 1)[0] key = path if not key or key == "/": raise ValueError( f"S3 URL '{url}' has no object key. " "A bucket-only URL cannot identify a downloadable object." ) return bucket, key raise ValueError(f"Invalid S3 URL format: {url}") raise ValueError(f"Not an S3 URL: {url}")
[docs] def download_from_s3( url: str, local_path: Optional[str] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, ) -> str: """Download a single S3 object to a local file and return the local path. Parameters: url (str): S3 URL of the object to download. local_path (str | None): Destination file path. If None, a temporary file is created. aws_access_key_id (str | None): AWS access key ID. aws_secret_access_key (str | None): AWS secret access key. aws_session_token (str | None): AWS session token for temporary credentials. region_name (str | None): AWS region name. Returns: local_path (str): Path to the downloaded local file. Raises: ImportError: If boto3 is not installed. ValueError: If the URL is not an S3 URL or the bucket/key is not found. PermissionError: If access to the S3 object is denied. RuntimeError: If the download fails for another reason. """ if boto3 is None: raise ImportError( "boto3 is required for S3 downloads. Install it with: pip install boto3" ) if not is_s3_url(url): raise ValueError(f"Not an S3 URL: {url}") bucket, key = parse_s3_url(url) s3_kwargs = _build_s3_kwargs( aws_access_key_id, aws_secret_access_key, aws_session_token, region_name ) s3_client = boto3.client("s3", **s3_kwargs) # Track whether we allocated the temp file ourselves so we can # clean it up on failure (the previous path always left the empty # NamedTemporaryFile on disk after a download exception, even when # the caller never saw the path). created_temp = local_path is None if local_path is None: suffix = Path(key).suffix if key else ".tmp" temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) local_path = temp_file.name temp_file.close() dirpath = os.path.dirname(local_path) if dirpath: os.makedirs(dirpath, exist_ok=True) download_succeeded = False try: s3_client.download_file(bucket, key, local_path) download_succeeded = True return local_path except ClientError as e: error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "") if error_code == "NoSuchBucket": raise ValueError(f"S3 bucket not found: {bucket}") from e if error_code == "NoSuchKey": raise ValueError(f"S3 key not found: {key} in bucket {bucket}") from e if error_code in ("AccessDenied", "Forbidden"): raise PermissionError(f"Access denied to s3://{bucket}/{key}") from e raise RuntimeError(f"Error downloading from S3: {e}") from e except NoCredentialsError as e: raise RuntimeError( "AWS credentials not found. Set AWS_ACCESS_KEY_ID and " "AWS_SECRET_ACCESS_KEY environment variables or configure AWS credentials." ) from e finally: # Tidy up the auto-created temp file when download_file # raised (ClientError, NoCredentialsError, or any other # exception including KeyboardInterrupt). Caller-supplied # ``local_path`` is left untouched — the caller owns it. if not download_succeeded and created_temp: try: os.remove(local_path) except OSError: pass
[docs] def upload_to_s3( local_path: str, s3_url: str, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, ) -> str: """Upload a local file to S3 and return the S3 URL. Parameters: local_path (str): Path to the local file to upload. s3_url (str): Destination S3 URL (s3://bucket/key). aws_access_key_id (str | None): AWS access key ID. aws_secret_access_key (str | None): AWS secret access key. aws_session_token (str | None): AWS session token for temporary credentials. region_name (str | None): AWS region name. Returns: s3_url (str): The S3 URL the file was uploaded to. Raises: ImportError: If boto3 is not installed. FileNotFoundError: If the local file does not exist. ValueError: If the URL is not an S3 URL or the bucket is not found. PermissionError: If access to the S3 bucket is denied. RuntimeError: If the upload fails for another reason. """ if not is_s3_url(s3_url): raise ValueError(f"Not an S3 URL: {s3_url}") if not os.path.exists(local_path): raise FileNotFoundError(f"Local file not found: {local_path}") if boto3 is None: raise ImportError( "boto3 is required for S3 uploads. Install it with: pip install boto3" ) bucket, key = parse_s3_url(s3_url) s3_kwargs = _build_s3_kwargs( aws_access_key_id, aws_secret_access_key, aws_session_token, region_name ) s3_client = boto3.client("s3", **s3_kwargs) try: s3_client.upload_file(local_path, bucket, key) return s3_url except ClientError as e: error_code = getattr(e, "response", {}).get("Error", {}).get("Code", "") if error_code == "NoSuchBucket": raise ValueError(f"S3 bucket not found: {bucket}") from e if error_code in ("AccessDenied", "Forbidden"): raise PermissionError(f"Access denied to s3://{bucket}/{key}") from e raise RuntimeError(f"Error uploading to S3: {e}") from e except NoCredentialsError as e: raise RuntimeError( "AWS credentials not found. Set AWS_ACCESS_KEY_ID and " "AWS_SECRET_ACCESS_KEY environment variables or configure AWS credentials." ) from e
[docs] def ensure_local_file( file_path_or_url: str, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, ) -> Tuple[str, bool]: """Return (local_path, is_temporary) for a local path or S3 URL. If the input is an S3 URL, the object is downloaded to a temporary file. If it is a local path, it is returned as-is. Parameters: file_path_or_url (str): Local file path or S3 URL. aws_access_key_id (str | None): AWS access key ID. aws_secret_access_key (str | None): AWS secret access key. aws_session_token (str | None): AWS session token for temporary credentials. region_name (str | None): AWS region name. Returns: result (tuple[str, bool]): A (local_path, is_temporary) pair. is_temporary is True when the file was downloaded from S3 and the caller should delete it after use. Raises: FileNotFoundError: If a local path is given and the file does not exist. """ if is_s3_url(file_path_or_url): local_path = download_from_s3( file_path_or_url, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token, region_name=region_name, ) return local_path, True if not os.path.exists(file_path_or_url): raise FileNotFoundError(f"File not found: {file_path_or_url}") return file_path_or_url, False