"""46Labs preset — Peering / NOVA platform REST API.""" from __future__ import annotations import json import urllib.parse import urllib.request from datetime import datetime, timedelta from typing import Iterable, Optional from .base import BasePreset, CredentialField, FetchedFile class FortySixLabsPreset(BasePreset): PRESET_SLUG = "fortysix_labs" LABEL = "46Labs (Peering / NOVA)" CDR_FORMAT = "generic_csv" TRANSPORT_METHOD = "api" DEFAULT_CRON = "0 2 * * *" CREDENTIAL_FIELDS = ( CredentialField("api_host", "46Labs API host", "text", help="e.g. https://api.46labs.com"), CredentialField("account_id", "Account ID", "text"), CredentialField("api_key", "API key", "password", sensitive=True), ) FORMAT_CONFIG = { "start_time": "start_time", "caller_number": "ani", "called_number": "dnis", "duration_sec": "billable_duration", "billed_amount": "call_cost", "call_id": "cdr_id", "trunk_group": "ingress_trunk", "disposition": "release_cause", "ts_format": "%Y-%m-%dT%H:%M:%SZ", } def _request(self, url: str, api_key: str) -> bytes: req = urllib.request.Request( url, headers={"Authorization": f"Bearer {api_key}", "Accept": "application/json"}, ) with urllib.request.urlopen(req, timeout=60) as resp: return resp.read() def validate(self, profile_config: dict, secrets: dict) -> tuple[bool, str]: try: base = profile_config["api_host"].rstrip("/") acct = profile_config["account_id"] self._request( f"{base}/v1/accounts/{acct}", api_key=secrets["api_key"], ) return True, "46Labs account reachable" except Exception as exc: return False, f"46Labs validate failed: {exc}" def fetch( self, profile_config: dict, secrets: dict, since: Optional[datetime], ) -> Iterable[FetchedFile]: base = profile_config["api_host"].rstrip("/") acct = profile_config["account_id"] since = since or (datetime.utcnow() - timedelta(days=1)) end = datetime.utcnow() records: list[dict] = [] cursor: Optional[str] = None while True: qs = urllib.parse.urlencode({ "start_time": since.strftime("%Y-%m-%dT%H:%M:%SZ"), "end_time": end.strftime("%Y-%m-%dT%H:%M:%SZ"), "limit": 1000, **({"cursor": cursor} if cursor else {}), }) body = self._request( f"{base}/v1/accounts/{acct}/cdr/search?{qs}", api_key=secrets["api_key"], ) payload = json.loads(body) items = payload.get("cdrs", []) or payload.get("data", []) records.extend(items) cursor = payload.get("next_cursor") if not cursor or not items: break if not records: return ndjson = "\n".join(json.dumps(r) for r in records).encode("utf-8") yield FetchedFile( remote_path=f"fortysix_labs_cdrs_{end:%Y%m%dT%H%M%SZ}.ndjson", mtime=end, content=ndjson, size_bytes=len(ndjson), )