new-site/scripts/workers/cdr_classifier.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

242 lines
8.8 KiB
Python

"""
CDR classifier — jurisdictional classification + Block 5 regional bucketing.
Given a caller number and called number, produces:
* jurisdiction: 'interstate' | 'intrastate' | 'international' | 'indeterminate'
* orig_country / orig_state / orig_npa
* term_country / term_state / term_npa
* orig_state_region: FCC Block 5 region based on ORIGINATING state
* billing_state_region: FCC Block 5 region based on the carrier's
billing-address state (passed in — same for every call on the profile)
Classification rules (`classify_call`):
if caller.country != 'US' or called.country != 'US':
-> international
elif not caller.state or not called.state:
-> indeterminate
elif caller.state == called.state:
-> intrastate (v1 does NOT split out 'local' — requires LATA/LERG)
else:
-> interstate
Reads `nanpa_area_codes` + `fcc_block5_regions` from Postgres. Both tables
are small (~600 rows + 60 rows), so we cache them in-process on first call.
Usage:
from scripts.workers.cdr_classifier import CDRClassifier, ClassificationResult
cls = CDRClassifier()
result = cls.classify_call(
caller_number='+14155551212',
called_number='+12125559999',
billing_state='FL',
)
# -> ClassificationResult(jurisdiction='interstate',
# orig_state='CA', term_state='NY', orig_state_region='West Coast',
# billing_state_region='Southeast', ...)
"""
from __future__ import annotations
import logging
import os
import re
from dataclasses import dataclass
from functools import lru_cache
from typing import Optional
import psycopg2
import psycopg2.extras
log = logging.getLogger("cdr_classifier")
# ── E.164 / NANP number parsing ──────────────────────────────────────────
_NANP_NUMBER_RE = re.compile(r"^\+?1?(\d{3})(\d{3})(\d{4})$")
_E164_RE = re.compile(r"^\+(\d{1,3})(\d+)$")
def _normalize_e164(number: str) -> str:
"""Strip everything except digits and a leading +."""
if not number:
return ""
number = str(number).strip()
# Common noise in CDRs: "tel:", "sip:user@...", quoted display name
if "@" in number:
number = number.split("@", 1)[0]
if ":" in number:
number = number.split(":", 1)[-1]
number = re.sub(r"[^\d+]", "", number)
if number.startswith("00"):
number = "+" + number[2:]
elif number.startswith("+"):
pass
elif len(number) == 10:
number = "+1" + number
elif len(number) == 11 and number.startswith("1"):
number = "+" + number
# else: leave alone — will fall through to indeterminate
return number
def _extract_npa(number: str) -> Optional[str]:
"""Return the NANP area code if this is a recognizable NANP number."""
if not number:
return None
m = _NANP_NUMBER_RE.match(number.lstrip("+"))
if m:
return m.group(1)
return None
def _extract_country_code(number: str) -> Optional[str]:
"""Return the ITU country calling code (e.g. '1', '44', '33')."""
if not number.startswith("+"):
return None
m = _E164_RE.match(number)
if not m:
return None
cc = m.group(1)
# ITU codes are 1, 7, or 3-digit. Match greedy-then-back-off.
if cc.startswith("1"):
return "1"
if cc.startswith("7") and cc[0] == "7":
return "7"
# 3-digit country codes are the common case
return cc[:3] if len(cc) >= 3 else cc
# ── Classification result ────────────────────────────────────────────────
@dataclass
class ClassificationResult:
jurisdiction: str
orig_country: Optional[str] = None
orig_state: Optional[str] = None
orig_npa: Optional[str] = None
term_country: Optional[str] = None
term_state: Optional[str] = None
term_npa: Optional[str] = None
orig_state_region: Optional[str] = None
billing_state_region: Optional[str] = None
# ── Classifier (caches reference data) ───────────────────────────────────
class CDRClassifier:
"""Stateful classifier with in-process caches for NPA + region lookups.
Instantiated once per worker process; `classify_call` is pure given
the cached tables. Cache is loaded lazily on first call.
"""
def __init__(self, database_url: Optional[str] = None):
self._database_url = database_url or os.environ.get("DATABASE_URL", "")
self._npa_cache: dict[str, tuple[str, Optional[str]]] = {} # npa -> (country, state)
self._region_cache: dict[str, str] = {} # state -> region
self._cache_loaded = False
# ── Cache loading ────────────────────────────────────────────────
def _ensure_cache(self) -> None:
if self._cache_loaded:
return
if not self._database_url:
log.warning("CDRClassifier: no DATABASE_URL — classifier will be no-op")
self._cache_loaded = True
return
conn = psycopg2.connect(self._database_url)
try:
with conn.cursor() as cur:
cur.execute("SELECT npa, country, state FROM nanpa_area_codes")
for npa, country, state in cur.fetchall():
self._npa_cache[npa] = (country or "US", state)
cur.execute("SELECT state_code, region_name FROM fcc_block5_regions")
for state_code, region_name in cur.fetchall():
self._region_cache[state_code] = region_name
finally:
conn.close()
self._cache_loaded = True
log.info(
"CDRClassifier: cached %d NPAs + %d Block 5 regions",
len(self._npa_cache), len(self._region_cache),
)
# ── Number → geography ───────────────────────────────────────────
def resolve_number(self, raw_number: str) -> dict:
"""Return {country, state, npa} for a raw CDR number string."""
self._ensure_cache()
e164 = _normalize_e164(raw_number)
if not e164:
return {"country": None, "state": None, "npa": None}
cc = _extract_country_code(e164)
if cc != "1":
# Non-NANP international — jurisdiction is determined by
# country code, but we don't need the state.
return {"country": cc or None, "state": None, "npa": None}
npa = _extract_npa(e164)
if not npa:
return {"country": None, "state": None, "npa": None}
info = self._npa_cache.get(npa)
if not info:
# Unknown NPA — treat as indeterminate rather than guessing
return {"country": None, "state": None, "npa": npa}
country, state = info
return {"country": country, "state": state, "npa": npa}
# ── Main entry point ────────────────────────────────────────────
def classify_call(
self,
*,
caller_number: str,
called_number: str,
billing_state: Optional[str] = None,
) -> ClassificationResult:
"""Classify a single call."""
self._ensure_cache()
orig = self.resolve_number(caller_number)
term = self.resolve_number(called_number)
# Jurisdiction
if orig.get("country") and orig["country"] != "US":
jurisdiction = "international"
elif term.get("country") and term["country"] != "US":
jurisdiction = "international"
elif not orig.get("state") or not term.get("state"):
jurisdiction = "indeterminate"
elif orig["state"] == term["state"]:
# v1 does not split out 'local' vs 'intrastate toll' — that
# needs LATA/LERG. 499-A folds them together as intrastate.
jurisdiction = "intrastate"
else:
jurisdiction = "interstate"
# Block 5 regional mappings
orig_region = (
self._region_cache.get(orig["state"]) if orig.get("state") else None
)
billing_region = (
self._region_cache.get(billing_state) if billing_state else None
)
return ClassificationResult(
jurisdiction=jurisdiction,
orig_country=orig.get("country"),
orig_state=orig.get("state"),
orig_npa=orig.get("npa"),
term_country=term.get("country"),
term_state=term.get("state"),
term_npa=term.get("npa"),
orig_state_region=orig_region,
billing_state_region=billing_region,
)