new-site/scripts/workers/cdr_presets/asterisk.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

66 lines
2.5 KiB
Python

"""Asterisk preset — SFTP pull from the Asterisk Master.csv location."""
from __future__ import annotations
from datetime import datetime
from typing import Iterable, Optional
from .base import BasePreset, CredentialField, FetchedFile
from ..cdr_transports.sftp_transport import SFTPTransport
class AsteriskPreset(BasePreset):
PRESET_SLUG = "asterisk"
LABEL = "Asterisk / AsteriskNOW / FreePBX"
CDR_FORMAT = "asterisk"
TRANSPORT_METHOD = "sftp"
DEFAULT_CRON = "0 2 * * *"
CREDENTIAL_FIELDS = (
CredentialField("host", "Asterisk host / IP", "text"),
CredentialField("port", "SSH port", "number", required=False),
CredentialField("username", "SSH username", "text"),
CredentialField("password", "SSH password", "password",
required=False, sensitive=True,
help="Leave blank if using SSH key auth."),
CredentialField("private_key", "SSH private key", "ssh_key",
required=False, sensitive=True),
CredentialField("remote_dir", "Asterisk CDR directory", "text",
help="Typically /var/log/asterisk/cdr-csv/"),
CredentialField("file_glob", "File pattern", "text",
required=False,
help="Default Master.csv; or *.csv for rotating logs."),
)
def _transport(self, cfg: dict, secrets: dict) -> SFTPTransport:
remote_dir = cfg.get("remote_dir", "/var/log/asterisk/cdr-csv")
glob = cfg.get("file_glob", "Master.csv")
return SFTPTransport(
host=cfg["host"],
port=int(cfg.get("port") or 22),
username=cfg["username"],
password=secrets.get("password"),
private_key=secrets.get("private_key"),
remote_glob=f"{remote_dir.rstrip('/')}/{glob}",
)
def validate(self, profile_config: dict, secrets: dict) -> tuple[bool, str]:
try:
return self._transport(profile_config, secrets).validate()
except Exception as exc:
return False, f"{exc}"
def fetch(
self,
profile_config: dict,
secrets: dict,
since: Optional[datetime],
) -> Iterable[FetchedFile]:
transport = self._transport(profile_config, secrets)
for remote in transport.list_since(since):
yield FetchedFile(
remote_path=remote.path,
mtime=remote.mtime,
content=transport.fetch(remote.path),
size_bytes=remote.size_bytes,
)