"""FreeSWITCH preset — SFTP pull from mod_cdr_csv output directory.""" from __future__ import annotations from datetime import datetime from typing import Iterable, Optional from .base import BasePreset, CredentialField, FetchedFile from ..cdr_transports.sftp_transport import SFTPTransport class FreeSWITCHPreset(BasePreset): PRESET_SLUG = "freeswitch" LABEL = "FreeSWITCH (mod_cdr_csv)" CDR_FORMAT = "freeswitch" TRANSPORT_METHOD = "sftp" DEFAULT_CRON = "0 2 * * *" CREDENTIAL_FIELDS = ( CredentialField("host", "FreeSWITCH host / IP", "text", help="Hostname or IP reachable via SSH."), CredentialField("port", "SSH port", "number", required=False, help="Default 22."), CredentialField("username", "SSH username", "text"), CredentialField("password", "SSH password", "password", required=False, sensitive=True, help="Leave blank if using SSH key auth."), CredentialField("private_key", "SSH private key", "ssh_key", required=False, sensitive=True, help="PEM-format private key. Preferred over password."), CredentialField("remote_dir", "CDR directory on the server", "text", help="Typically /var/log/freeswitch/cdr-csv/"), ) def _transport(self, cfg: dict, secrets: dict) -> SFTPTransport: remote_dir = cfg.get("remote_dir", "/var/log/freeswitch/cdr-csv") return SFTPTransport( host=cfg["host"], port=int(cfg.get("port") or 22), username=cfg["username"], password=secrets.get("password"), private_key=secrets.get("private_key"), remote_glob=f"{remote_dir.rstrip('/')}/*.csv", ) def validate(self, profile_config: dict, secrets: dict) -> tuple[bool, str]: try: return self._transport(profile_config, secrets).validate() except Exception as exc: return False, f"{exc}" def fetch( self, profile_config: dict, secrets: dict, since: Optional[datetime], ) -> Iterable[FetchedFile]: transport = self._transport(profile_config, secrets) for remote in transport.list_since(since): yield FetchedFile( remote_path=remote.path, mtime=remote.mtime, content=transport.fetch(remote.path), size_bytes=remote.size_bytes, )