new-site/scripts/workers/cdr_transports/sftp_transport.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

99 lines
3.1 KiB
Python

"""SFTP transport (paramiko)."""
from __future__ import annotations
import io
import logging
import os
import posixpath
from datetime import datetime
from typing import Iterable, Optional
from .base import BaseTransport, RemoteFile, TransportError
logger = logging.getLogger(__name__)
class SFTPTransport(BaseTransport):
TRANSPORT_SLUG = "sftp"
def _connect(self):
try:
import paramiko
except ImportError as exc:
raise TransportError("paramiko not installed") from exc
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
kwargs: dict = {
"hostname": self.host,
"port": self.port or 22,
"username": self.username,
"timeout": self.timeout,
"allow_agent": False,
"look_for_keys": False,
}
if self.private_key:
pkey = paramiko.RSAKey.from_private_key(io.StringIO(self.private_key))
kwargs["pkey"] = pkey
else:
kwargs["password"] = self.password
client.connect(**kwargs)
return client
def _remote_dir(self) -> str:
# If remote_glob is a path+glob ("/var/log/cdr/*.csv"), split dir off
if "/" in self.remote_glob:
return posixpath.dirname(self.remote_glob) or "/"
return self.extra.get("remote_dir", ".")
def _glob_pattern(self) -> str:
if "/" in self.remote_glob:
return posixpath.basename(self.remote_glob)
return self.remote_glob
def validate(self) -> tuple[bool, str]:
try:
client = self._connect()
try:
sftp = client.open_sftp()
sftp.listdir(self._remote_dir())
return True, "connected + listed remote directory"
finally:
client.close()
except Exception as exc:
return False, f"SFTP validate failed: {exc}"
def list_since(self, since: Optional[datetime]) -> Iterable[RemoteFile]:
client = self._connect()
try:
sftp = client.open_sftp()
remote_dir = self._remote_dir()
pattern = self._glob_pattern()
for attr in sftp.listdir_attr(remote_dir):
if not self._match_glob_basename(attr.filename, pattern):
continue
mtime = datetime.utcfromtimestamp(attr.st_mtime or 0)
if since and mtime <= since:
continue
yield RemoteFile(
path=posixpath.join(remote_dir, attr.filename),
mtime=mtime,
size_bytes=attr.st_size or 0,
)
finally:
client.close()
def fetch(self, remote_path: str) -> bytes:
client = self._connect()
try:
sftp = client.open_sftp()
with sftp.open(remote_path, "rb") as fh:
return fh.read()
finally:
client.close()
@staticmethod
def _match_glob_basename(name: str, pattern: str) -> bool:
import fnmatch
return fnmatch.fnmatch(name, pattern) if pattern else True