"""CDR Analysis Handler — rolls CDRs for a reporting period into a signed traffic study PDF+XLSX and returns the files for delivery. Triggered by a paid `cdr-analysis` service order (or when the admin re-runs the aggregation after more data lands). Assumes ingestion has already happened via cdr_ingester — this handler only reads `cdr_calls`. The computation is revenue-first, minutes as the cross-check. If the profile has `minutes_only_estimation_enabled = TRUE` and no revenue data, we compute minutes-only and label the study accordingly. """ from __future__ import annotations import logging import os from collections import defaultdict from datetime import datetime from decimal import Decimal from typing import Optional import psycopg2 import psycopg2.extras from .base_handler import BaseServiceHandler logger = logging.getLogger(__name__) class CDRAnalysisHandler(BaseServiceHandler): SERVICE_SLUG = "cdr-analysis" SERVICE_NAME = "CDR Traffic Study" REQUIRES_LLM = False async def process(self, order_data: dict) -> list[str]: work_dir = self._make_work_dir() order_number = order_data["name"] entity = order_data.get("entity", {}) intake = order_data.get("intake_data") or {} # Resolve profile + reporting period reporting_year = int(intake.get("reporting_year") or datetime.utcnow().year - 1) reporting_period = intake.get("reporting_period", "ANNUAL") profile = self._load_profile(entity.get("id")) if profile is None: logger.warning( "CDRAnalysisHandler: no cdr_ingestion_profile for entity %s", entity.get("id"), ) return [] # ── Aggregate the classified calls ───────────────────────── stats = self._aggregate( profile_id=profile["id"], year=reporting_year, period=reporting_period, minutes_only=profile.get("minutes_only_estimation_enabled", False), ) # ── Persist the study ────────────────────────────────────── study_id = self._upsert_study(profile_id=profile["id"], year=reporting_year, period=reporting_period, stats=stats) # ── Render PDF + XLSX ────────────────────────────────────── from scripts.document_gen.templates.cdr_traffic_study_generator import ( generate_traffic_study_docx, generate_traffic_study_xlsx, ) date_str = datetime.now().strftime("%Y%m%d") docx_path = os.path.join( work_dir, f"traffic_study_{order_number}_{reporting_year}_{reporting_period}_{date_str}.docx", ) xlsx_path = os.path.join( work_dir, f"traffic_study_{order_number}_{reporting_year}_{reporting_period}_{date_str}.xlsx", ) study_row = dict(stats, reporting_year=reporting_year, reporting_period=reporting_period) generated: list[str] = [] try: result_docx = generate_traffic_study_docx( study=study_row, entity_name=entity.get("legal_name", ""), frn=entity.get("frn", ""), filer_id_499=entity.get("filer_id_499", ""), output_path=docx_path, ) if result_docx: generated.append(result_docx) try: generated.append(self._convert_to_pdf(result_docx)) except Exception as exc: logger.warning("Traffic study PDF conversion failed: %s", exc) except Exception as exc: logger.warning("DOCX generation failed: %s", exc) try: result_xlsx = generate_traffic_study_xlsx( study=study_row, entity_name=entity.get("legal_name", ""), output_path=xlsx_path, ) if result_xlsx: generated.append(result_xlsx) except Exception as exc: logger.warning("XLSX generation failed: %s", exc) # ── Update the traffic_study row with MinIO paths (after upload) ── # (actual MinIO upload is done by job_server.py after this returns) logger.info( "CDRAnalysisHandler: study #%s generated for profile %s (%s %s)", study_id, profile["id"], reporting_year, reporting_period, ) return generated # ------------------------------------------------------------------ # DB helpers # ------------------------------------------------------------------ def _connect(self): return psycopg2.connect(os.environ.get("DATABASE_URL", "")) def _load_profile(self, entity_id: Optional[int]) -> Optional[dict]: if not entity_id: return None try: conn = self._connect() except Exception as exc: logger.warning("CDRAnalysisHandler: PG connect failed: %s", exc) return None try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( "SELECT * FROM cdr_ingestion_profiles WHERE telecom_entity_id=%s", (entity_id,), ) row = cur.fetchone() return dict(row) if row else None finally: conn.close() _PERIOD_MONTHS = { "Q1": (1, 3), "Q2": (4, 6), "Q3": (7, 9), "Q4": (10, 12), "ANNUAL": (1, 12), } def _aggregate(self, profile_id: int, year: int, period: str, minutes_only: bool) -> dict: start_m, end_m = self._PERIOD_MONTHS[period] start_dt = datetime(year, start_m, 1) end_dt = ( datetime(year + 1, 1, 1) if end_m == 12 else datetime(year, end_m + 1, 1) ) conn = self._connect() try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ SELECT jurisdiction, customer_type, orig_state_region, billing_state_region, COALESCE(SUM(duration_sec), 0) AS total_secs, COALESCE(SUM(billed_amount_cents), 0) AS total_cents, COUNT(*) AS call_count FROM cdr_calls WHERE profile_id=%s AND start_time >= %s AND start_time < %s GROUP BY GROUPING SETS ( (jurisdiction), (customer_type), (orig_state_region), (billing_state_region) ) """, (profile_id, start_dt, end_dt), ) rollups = cur.fetchall() cur.execute( """ SELECT COALESCE(SUM(duration_sec), 0) AS total_secs, COALESCE(SUM(billed_amount_cents), 0) AS total_cents, COUNT(*) AS total_calls FROM cdr_calls WHERE profile_id=%s AND start_time >= %s AND start_time < %s """, (profile_id, start_dt, end_dt), ) totals = cur.fetchone() finally: conn.close() total_secs = totals["total_secs"] or 0 total_cents = totals["total_cents"] or 0 total_calls = totals["total_calls"] or 0 # Prefer revenue denominator unless minutes_only opt-in use_revenue = total_cents > 0 and not minutes_only juris_secs: dict[str, int] = defaultdict(int) juris_cents: dict[str, int] = defaultdict(int) bucket_secs: dict[str, int] = defaultdict(int) orig_secs: dict[str, int] = defaultdict(int) billing_secs: dict[str, int] = defaultdict(int) for row in rollups: secs = row["total_secs"] or 0 cents = row["total_cents"] or 0 if row["jurisdiction"] is not None: juris_secs[row["jurisdiction"]] = secs juris_cents[row["jurisdiction"]] = cents if row["customer_type"] is not None: bucket_secs[row["customer_type"]] = secs if row["orig_state_region"] is not None: orig_secs[row["orig_state_region"]] = secs if row["billing_state_region"] is not None: billing_secs[row["billing_state_region"]] = secs def pct_from_revenue(jur: str) -> Optional[float]: if total_cents <= 0: return None return round((juris_cents.get(jur, 0) / total_cents) * 100, 4) def pct_from_minutes(jur: str) -> Optional[float]: if total_secs <= 0: return None return round((juris_secs.get(jur, 0) / total_secs) * 100, 4) def region_pcts(src: dict[str, int]) -> dict[str, float]: if total_secs <= 0: return {} return { name: round((secs / total_secs) * 100, 4) for name, secs in src.items() if name } return { "total_calls": total_calls, "total_minutes": int(total_secs / 60), "total_revenue_cents": total_cents, "interstate_pct": pct_from_revenue("interstate") if use_revenue else None, "intrastate_pct": pct_from_revenue("intrastate") if use_revenue else None, "international_pct": pct_from_revenue("international") if use_revenue else None, "indeterminate_pct": pct_from_revenue("indeterminate") if use_revenue else None, "interstate_pct_minutes": pct_from_minutes("interstate"), "intrastate_pct_minutes": pct_from_minutes("intrastate"), "international_pct_minutes": pct_from_minutes("international"), "indeterminate_pct_minutes": pct_from_minutes("indeterminate"), "wholesale_minutes": bucket_secs.get("wholesale", 0), "retail_minutes": bucket_secs.get("retail", 0), "orig_state_regions_json": region_pcts(orig_secs), "billing_state_regions_json": region_pcts(billing_secs), "methodology": ( "Percentages computed from per-call billed revenue; minutes " "figures shown as cross-check." if use_revenue else "Percentages computed from billed minutes — per-call revenue " "not available; profile has minutes_only_estimation_enabled = TRUE." ), } def _upsert_study(self, *, profile_id: int, year: int, period: str, stats: dict) -> int: conn = self._connect() try: with conn.cursor() as cur: cur.execute( """ INSERT INTO cdr_traffic_studies ( profile_id, reporting_year, reporting_period, total_calls, total_minutes, total_revenue_cents, interstate_pct, intrastate_pct, international_pct, indeterminate_pct, interstate_pct_minutes, intrastate_pct_minutes, international_pct_minutes, indeterminate_pct_minutes, wholesale_minutes, retail_minutes, orig_state_regions_json, billing_state_regions_json, methodology, generated_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb, %s::jsonb, %s, NOW() ) ON CONFLICT (profile_id, reporting_year, reporting_period) DO UPDATE SET total_calls=EXCLUDED.total_calls, total_minutes=EXCLUDED.total_minutes, total_revenue_cents=EXCLUDED.total_revenue_cents, interstate_pct=EXCLUDED.interstate_pct, intrastate_pct=EXCLUDED.intrastate_pct, international_pct=EXCLUDED.international_pct, indeterminate_pct=EXCLUDED.indeterminate_pct, interstate_pct_minutes=EXCLUDED.interstate_pct_minutes, intrastate_pct_minutes=EXCLUDED.intrastate_pct_minutes, international_pct_minutes=EXCLUDED.international_pct_minutes, indeterminate_pct_minutes=EXCLUDED.indeterminate_pct_minutes, wholesale_minutes=EXCLUDED.wholesale_minutes, retail_minutes=EXCLUDED.retail_minutes, orig_state_regions_json=EXCLUDED.orig_state_regions_json, billing_state_regions_json=EXCLUDED.billing_state_regions_json, methodology=EXCLUDED.methodology, generated_at=NOW() RETURNING id """, ( profile_id, year, period, stats["total_calls"], stats["total_minutes"], stats["total_revenue_cents"], stats["interstate_pct"], stats["intrastate_pct"], stats["international_pct"], stats["indeterminate_pct"], stats["interstate_pct_minutes"], stats["intrastate_pct_minutes"], stats["international_pct_minutes"], stats["indeterminate_pct_minutes"], stats["wholesale_minutes"], stats["retail_minutes"], psycopg2.extras.Json(stats["orig_state_regions_json"]), psycopg2.extras.Json(stats["billing_state_regions_json"]), stats["methodology"], ), ) new_id = cur.fetchone()[0] conn.commit() return new_id finally: conn.close()