"""NetSapiens CDRv2 adapter (NDJSON / JSON array). NetSapiens emits CDRs either as a JSON array (via the ``/cdr`` REST endpoint with paginated pages) or as newline-delimited JSON (via streaming export). Both shapes use the same record schema; this adapter accepts either. Key fields (NetSapiens CDRv2): orig_from_uri, orig_to_uri -> caller / called SIP URIs orig_callid, term_callid -> two call-legs (we stitch on the term_callid where present) time_start, time_answer, time_release -> timestamps (ISO-8601) duration -> seconds on the billed leg charge / cost / rate -> per-call revenue orig_sub, term_sub -> subscriber identifiers orig_carrier, term_carrier -> trunk/carrier IDs release_code -> disposition Natural key: NetSapiens distinguishes orig and term call legs. We use ``term_callid`` when present (the billed leg), falling back to ``orig_callid``. That dedups the two-leg SBC emission cleanly. """ from __future__ import annotations import json import logging from typing import Iterator from .base import BaseCDRAdapter, CDRRow, ValidationError logger = logging.getLogger(__name__) class NetSapiensAdapter(BaseCDRAdapter): FORMAT_SLUG = "netsapiens" def iter_rows(self, local_path: str) -> Iterator[CDRRow]: with open(local_path, "r", encoding="utf-8", errors="replace") as fh: first = fh.read(1) fh.seek(0) if first == "[": records = json.load(fh) else: records = (json.loads(line) for line in fh if line.strip()) for i, record in enumerate(records, start=1): try: start = self.parse_ts(record.get("time_start") or record.get("start_time")) duration = self.parse_duration(record.get("duration") or 0) caller = _extract_uri_number(record.get("orig_from_uri") or record.get("from_uri")) called = _extract_uri_number(record.get("orig_to_uri") or record.get("to_uri")) billed = None for col in ("charge", "cost", "total_charge"): if record.get(col) not in (None, ""): billed = self.parse_cents(record[col]) if billed is not None: break # Prefer term_callid (billed leg) as the natural key — # collapses ingress+egress legs of a single call. nkey = ( record.get("term_callid") or record.get("orig_callid") or f"{caller}|{called}|{start.isoformat()}|{duration}" ) trunk = ( record.get("term_carrier") or record.get("orig_carrier") or "" ) row = CDRRow( start_time=start, caller_number=caller, called_number=called, duration_sec=duration, billed_amount_cents=billed, billed_currency=("USD" if billed is not None else None), trunk_group_id=trunk.strip() or None, customer_account_id=( record.get("orig_sub") or record.get("term_sub") or "" ).strip() or None, disposition=(record.get("release_code") or "").strip().lower() or None, call_direction=(record.get("direction") or "").strip().lower() or None, natural_key=nkey.strip(), source_file=local_path, source_row=i, raw=record if isinstance(record, dict) else {}, ) self.validate_row(row) yield row except ValidationError: raise except Exception as exc: raise ValidationError("unparseable_row", str(exc)) from exc def _extract_uri_number(uri: str | None) -> str: """Pull the user portion out of a SIP URI.""" if not uri: return "" s = str(uri).strip() if s.startswith("sip:") or s.startswith("sips:"): s = s.split(":", 1)[1] if "@" in s: s = s.split("@", 1)[0] # Strip parameters like ";user=phone" if ";" in s: s = s.split(";", 1)[0] return s.strip()