Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
77 lines
2.6 KiB
Python
77 lines
2.6 KiB
Python
"""Base class for CDR pull transports.
|
|
|
|
Every transport implements three methods:
|
|
* validate() — test the credentials + reachability
|
|
* list_since(mtime) — list RemoteFile entries newer than mtime
|
|
* fetch(remote_path) -> bytes — download a single file
|
|
The cdr_puller worker orchestrates: validate on config save, list on each
|
|
poll, fetch + stream to MinIO for each new file.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import fnmatch
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Iterable, Optional
|
|
|
|
|
|
@dataclass
|
|
class RemoteFile:
|
|
path: str # remote key (e.g. "/var/log/cdr/2026-04-15.csv")
|
|
mtime: datetime
|
|
size_bytes: int = 0
|
|
|
|
|
|
class TransportError(Exception):
|
|
"""Raised when a transport operation fails non-fatally.
|
|
|
|
Callers retry or surface to the admin; they do NOT mark the profile
|
|
permanently failed unless multiple consecutive attempts raise.
|
|
"""
|
|
|
|
|
|
class BaseTransport:
|
|
"""Abstract transport. Subclasses implement the three methods below."""
|
|
|
|
TRANSPORT_SLUG = ""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
host: str,
|
|
port: Optional[int] = None,
|
|
username: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
private_key: Optional[str] = None,
|
|
remote_glob: str = "*",
|
|
timeout: int = 30,
|
|
extra: Optional[dict] = None,
|
|
):
|
|
self.host = host
|
|
self.port = port
|
|
self.username = username
|
|
self.password = password
|
|
self.private_key = private_key
|
|
self.remote_glob = remote_glob
|
|
self.timeout = timeout
|
|
self.extra = extra or {}
|
|
|
|
# ──────────────────────────────────────────────────────────────────
|
|
|
|
def validate(self) -> tuple[bool, str]:
|
|
"""Attempt to connect + list the target directory. Returns (ok, detail)."""
|
|
raise NotImplementedError
|
|
|
|
def list_since(self, since: Optional[datetime]) -> Iterable[RemoteFile]:
|
|
"""Yield RemoteFile entries with mtime > since that match remote_glob."""
|
|
raise NotImplementedError
|
|
|
|
def fetch(self, remote_path: str) -> bytes:
|
|
"""Download and return the file bytes."""
|
|
raise NotImplementedError
|
|
|
|
# ── Helpers ────────────────────────────────────────────────────────
|
|
|
|
def _match_glob(self, name: str) -> bool:
|
|
return fnmatch.fnmatch(name, self.remote_glob)
|