"""HTTPS transport — for switches that expose a REST or file-download API. The remote endpoint is expected to be one of: (a) a directory listing in JSON (array of objects with name + mtime + size), or (b) a single file endpoint that is polled directly (treat as a named file with current-timestamp mtime and an If-Modified-Since header). Authentication options: * Bearer token (``extra["bearer_token"]``) * Basic auth (``username``/``password``) * Custom header(s) (``extra["headers"]`` — dict) """ from __future__ import annotations import base64 import json import logging import posixpath from datetime import datetime from email.utils import parsedate_to_datetime from typing import Iterable, Optional from urllib.parse import urljoin import urllib.request import urllib.error from .base import BaseTransport, RemoteFile, TransportError logger = logging.getLogger(__name__) class HTTPSTransport(BaseTransport): TRANSPORT_SLUG = "https" def _headers(self) -> dict: headers = {"User-Agent": "PerformanceWest-CDR-Puller/1.0"} if self.username and self.password: token = base64.b64encode( f"{self.username}:{self.password}".encode("utf-8") ).decode("ascii") headers["Authorization"] = f"Basic {token}" elif self.extra.get("bearer_token"): headers["Authorization"] = f"Bearer {self.extra['bearer_token']}" for k, v in (self.extra.get("headers") or {}).items(): headers[k] = v return headers def _base_url(self) -> str: scheme = self.extra.get("scheme", "https") port = f":{self.port}" if self.port and self.port not in (80, 443) else "" return f"{scheme}://{self.host}{port}/" def _request(self, url: str, timeout: Optional[int] = None) -> bytes: req = urllib.request.Request(url, headers=self._headers()) try: with urllib.request.urlopen(req, timeout=timeout or self.timeout) as resp: return resp.read() except urllib.error.HTTPError as exc: raise TransportError(f"HTTP {exc.code}: {exc.reason}") from exc except Exception as exc: raise TransportError(str(exc)) from exc def validate(self) -> tuple[bool, str]: try: url = urljoin(self._base_url(), self.remote_glob.lstrip("/")) # Use HEAD if the endpoint supports it, fall back to GET. req = urllib.request.Request(url, headers=self._headers(), method="HEAD") try: with urllib.request.urlopen(req, timeout=self.timeout) as resp: return True, f"HTTP {resp.status}" except urllib.error.HTTPError as exc: if exc.code == 405: # method not allowed → fallback to GET self._request(url) return True, "GET ok" return False, f"HTTP {exc.code}: {exc.reason}" except Exception as exc: return False, f"HTTPS validate failed: {exc}" def list_since(self, since: Optional[datetime]) -> Iterable[RemoteFile]: """Query the JSON listing endpoint (extra['listing_url'] or remote_glob).""" listing_url = urljoin( self._base_url(), (self.extra.get("listing_url") or self.remote_glob).lstrip("/"), ) try: body = self._request(listing_url) parsed = json.loads(body.decode("utf-8")) except Exception as exc: logger.warning("HTTPS listing failed: %s", exc) return if isinstance(parsed, dict) and "files" in parsed: parsed = parsed["files"] if not isinstance(parsed, list): logger.warning("HTTPS listing did not return an array") return for entry in parsed: name = entry.get("name") or entry.get("path") if not name: continue mtime_raw = entry.get("modified") or entry.get("mtime") or entry.get("last_modified") mtime = _parse_timestamp(mtime_raw) if mtime_raw else datetime.utcnow() if since and mtime <= since: continue yield RemoteFile( path=name, mtime=mtime, size_bytes=int(entry.get("size", 0) or 0), ) def fetch(self, remote_path: str) -> bytes: url = urljoin(self._base_url(), remote_path.lstrip("/")) return self._request(url) def _parse_timestamp(value) -> datetime: """Accept ISO-8601, RFC 2822, or Unix epoch.""" if isinstance(value, (int, float)): return datetime.utcfromtimestamp(value) s = str(value).strip() if s.isdigit(): return datetime.utcfromtimestamp(int(s)) try: return datetime.fromisoformat(s.replace("Z", "+00:00")).replace(tzinfo=None) except ValueError: pass try: return parsedate_to_datetime(s).replace(tzinfo=None) except (TypeError, ValueError): return datetime.utcnow()