Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
93 lines
3.5 KiB
Python
93 lines
3.5 KiB
Python
"""S3 transport — pulls CDRs from a customer-provided S3 (or S3-compatible) bucket.
|
|
|
|
Credentials:
|
|
* username / password → AWS access_key / secret_key
|
|
* extra["endpoint_url"] (optional — set for S3-compatible backends like
|
|
Wasabi, MinIO, Backblaze)
|
|
* extra["region"] (default "us-east-1")
|
|
* extra["bucket"] (required)
|
|
* remote_glob — object-key prefix or glob (e.g. "cdrs/2026/*.csv.gz")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import fnmatch
|
|
import logging
|
|
import posixpath
|
|
from datetime import datetime, timezone
|
|
from typing import Iterable, Optional
|
|
|
|
from .base import BaseTransport, RemoteFile, TransportError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class S3Transport(BaseTransport):
|
|
TRANSPORT_SLUG = "s3"
|
|
|
|
def _client(self):
|
|
try:
|
|
import boto3
|
|
except ImportError as exc:
|
|
raise TransportError("boto3 not installed") from exc
|
|
kwargs = {
|
|
"aws_access_key_id": self.username,
|
|
"aws_secret_access_key": self.password,
|
|
"region_name": self.extra.get("region", "us-east-1"),
|
|
}
|
|
if self.extra.get("endpoint_url"):
|
|
kwargs["endpoint_url"] = self.extra["endpoint_url"]
|
|
return boto3.client("s3", **kwargs)
|
|
|
|
def _bucket(self) -> str:
|
|
bucket = self.extra.get("bucket") or self.host
|
|
if not bucket:
|
|
raise TransportError("S3 bucket not configured (extra.bucket)")
|
|
return bucket
|
|
|
|
def _prefix_and_pattern(self) -> tuple[str, str]:
|
|
"""Split remote_glob into (key-prefix, glob-pattern)."""
|
|
if "*" in self.remote_glob or "?" in self.remote_glob:
|
|
# Pattern — the prefix is everything up to the first wildcard.
|
|
# e.g. "cdrs/2026/*.csv.gz" -> prefix="cdrs/2026/"
|
|
idx = min(
|
|
self.remote_glob.find(c) for c in "*?["
|
|
if c in self.remote_glob
|
|
)
|
|
return self.remote_glob[:idx], self.remote_glob
|
|
# No wildcards — treat as a directory prefix.
|
|
prefix = self.remote_glob.rstrip("/") + "/"
|
|
return prefix, "*"
|
|
|
|
def validate(self) -> tuple[bool, str]:
|
|
try:
|
|
client = self._client()
|
|
prefix, _ = self._prefix_and_pattern()
|
|
resp = client.list_objects_v2(
|
|
Bucket=self._bucket(), Prefix=prefix, MaxKeys=1,
|
|
)
|
|
return True, f"bucket reachable (KeyCount={resp.get('KeyCount', 0)})"
|
|
except Exception as exc:
|
|
return False, f"S3 validate failed: {exc}"
|
|
|
|
def list_since(self, since: Optional[datetime]) -> Iterable[RemoteFile]:
|
|
client = self._client()
|
|
bucket = self._bucket()
|
|
prefix, pattern = self._prefix_and_pattern()
|
|
paginator = client.get_paginator("list_objects_v2")
|
|
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
|
|
for obj in page.get("Contents", []) or []:
|
|
key = obj["Key"]
|
|
if pattern != "*" and not fnmatch.fnmatch(key, pattern):
|
|
continue
|
|
mtime = obj["LastModified"]
|
|
if mtime.tzinfo is not None:
|
|
mtime = mtime.astimezone(timezone.utc).replace(tzinfo=None)
|
|
if since and mtime <= since:
|
|
continue
|
|
yield RemoteFile(path=key, mtime=mtime, size_bytes=obj.get("Size", 0))
|
|
|
|
def fetch(self, remote_path: str) -> bytes:
|
|
client = self._client()
|
|
resp = client.get_object(Bucket=self._bucket(), Key=remote_path)
|
|
return resp["Body"].read()
|