new-site/scripts/workers/cdr_transports/https_transport.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

129 lines
4.9 KiB
Python

"""HTTPS transport — for switches that expose a REST or file-download API.
The remote endpoint is expected to be one of:
(a) a directory listing in JSON (array of objects with name + mtime + size), or
(b) a single file endpoint that is polled directly (treat as a named file
with current-timestamp mtime and an If-Modified-Since header).
Authentication options:
* Bearer token (``extra["bearer_token"]``)
* Basic auth (``username``/``password``)
* Custom header(s) (``extra["headers"]`` — dict)
"""
from __future__ import annotations
import base64
import json
import logging
import posixpath
from datetime import datetime
from email.utils import parsedate_to_datetime
from typing import Iterable, Optional
from urllib.parse import urljoin
import urllib.request
import urllib.error
from .base import BaseTransport, RemoteFile, TransportError
logger = logging.getLogger(__name__)
class HTTPSTransport(BaseTransport):
TRANSPORT_SLUG = "https"
def _headers(self) -> dict:
headers = {"User-Agent": "PerformanceWest-CDR-Puller/1.0"}
if self.username and self.password:
token = base64.b64encode(
f"{self.username}:{self.password}".encode("utf-8")
).decode("ascii")
headers["Authorization"] = f"Basic {token}"
elif self.extra.get("bearer_token"):
headers["Authorization"] = f"Bearer {self.extra['bearer_token']}"
for k, v in (self.extra.get("headers") or {}).items():
headers[k] = v
return headers
def _base_url(self) -> str:
scheme = self.extra.get("scheme", "https")
port = f":{self.port}" if self.port and self.port not in (80, 443) else ""
return f"{scheme}://{self.host}{port}/"
def _request(self, url: str, timeout: Optional[int] = None) -> bytes:
req = urllib.request.Request(url, headers=self._headers())
try:
with urllib.request.urlopen(req, timeout=timeout or self.timeout) as resp:
return resp.read()
except urllib.error.HTTPError as exc:
raise TransportError(f"HTTP {exc.code}: {exc.reason}") from exc
except Exception as exc:
raise TransportError(str(exc)) from exc
def validate(self) -> tuple[bool, str]:
try:
url = urljoin(self._base_url(), self.remote_glob.lstrip("/"))
# Use HEAD if the endpoint supports it, fall back to GET.
req = urllib.request.Request(url, headers=self._headers(), method="HEAD")
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
return True, f"HTTP {resp.status}"
except urllib.error.HTTPError as exc:
if exc.code == 405: # method not allowed → fallback to GET
self._request(url)
return True, "GET ok"
return False, f"HTTP {exc.code}: {exc.reason}"
except Exception as exc:
return False, f"HTTPS validate failed: {exc}"
def list_since(self, since: Optional[datetime]) -> Iterable[RemoteFile]:
"""Query the JSON listing endpoint (extra['listing_url'] or remote_glob)."""
listing_url = urljoin(
self._base_url(),
(self.extra.get("listing_url") or self.remote_glob).lstrip("/"),
)
try:
body = self._request(listing_url)
parsed = json.loads(body.decode("utf-8"))
except Exception as exc:
logger.warning("HTTPS listing failed: %s", exc)
return
if isinstance(parsed, dict) and "files" in parsed:
parsed = parsed["files"]
if not isinstance(parsed, list):
logger.warning("HTTPS listing did not return an array")
return
for entry in parsed:
name = entry.get("name") or entry.get("path")
if not name:
continue
mtime_raw = entry.get("modified") or entry.get("mtime") or entry.get("last_modified")
mtime = _parse_timestamp(mtime_raw) if mtime_raw else datetime.utcnow()
if since and mtime <= since:
continue
yield RemoteFile(
path=name, mtime=mtime,
size_bytes=int(entry.get("size", 0) or 0),
)
def fetch(self, remote_path: str) -> bytes:
url = urljoin(self._base_url(), remote_path.lstrip("/"))
return self._request(url)
def _parse_timestamp(value) -> datetime:
"""Accept ISO-8601, RFC 2822, or Unix epoch."""
if isinstance(value, (int, float)):
return datetime.utcfromtimestamp(value)
s = str(value).strip()
if s.isdigit():
return datetime.utcfromtimestamp(int(s))
try:
return datetime.fromisoformat(s.replace("Z", "+00:00")).replace(tzinfo=None)
except ValueError:
pass
try:
return parsedate_to_datetime(s).replace(tzinfo=None)
except (TypeError, ValueError):
return datetime.utcnow()