""" CDR ingester. Consumes `cdr_ingestion_uploads` rows with `status='pending'` — for each: 1. Download the raw object from MinIO 2. Pick the adapter (profile.format) and iterate rows 3. Deduplicate within-file + cross-file 4. Resolve wholesale/retail bucket (per-row override > account_id > trunk_group) 5. Classify jurisdiction + Block 5 regions 6. Insert into cdr_calls (ON CONFLICT DO NOTHING), quarantine failures 7. Update the upload row with summary counts + status 8. Refresh cdr_usage_meters; raise quota warnings/blocks as needed Designed to be safe to run as: single worker, cron every N minutes, or triggered by the webhook/upload endpoint after each new row lands. Usage: python -m scripts.workers.cdr_ingester # consume queue once python -m scripts.workers.cdr_ingester --upload-id=42 """ from __future__ import annotations import argparse import hashlib import io import logging import os import sys import tempfile from datetime import datetime, timezone from typing import Optional import psycopg2 import psycopg2.extras from scripts.workers.cdr_adapters import ADAPTERS from scripts.workers.cdr_adapters.base import ValidationError from scripts.workers.cdr_classifier import CDRClassifier log = logging.getLogger("cdr_ingester") def _fetch_paid_years(conn, profile_id: int) -> set[int]: """Return the set of reporting years this profile has paid access to (via cdr_study_access_grants). Empty set == no grants on file; the caller treats that as "don't filter" to avoid accidentally dropping everything on first-run profiles. """ try: with conn.cursor() as cur: cur.execute( "SELECT reporting_year FROM cdr_study_access_grants WHERE profile_id = %s", (profile_id,), ) return {int(r[0]) for r in cur.fetchall() or []} except Exception as exc: log.warning("paid_years lookup failed for profile %s: %s", profile_id, exc) return set() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) DATABASE_URL = os.environ.get("DATABASE_URL", "") # Storage plan quotas (must mirror the plan file). Amounts in bytes + rows. QUOTAS = { "included": (10 * 1024**3, 10_000_000), "tier1": (50 * 1024**3, 50_000_000), "tier2": (250 * 1024**3, 250_000_000), "tier3": (1024 * 1024**3, 1_000_000_000), "enterprise": (None, None), # custom } # ─── MinIO helper ───────────────────────────────────────────────────────── def _minio(): from minio import Minio return Minio( os.environ.get("MINIO_ENDPOINT", "minio:9000"), access_key=os.environ.get("MINIO_ACCESS_KEY", ""), secret_key=os.environ.get("MINIO_SECRET_KEY", ""), secure=os.environ.get("MINIO_SECURE", "false").lower() == "true", ) MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest") # ─── Bucket resolver ────────────────────────────────────────────────────── def _resolve_bucket(row, mappings: dict) -> str: """Return 'wholesale' | 'retail' | 'unknown' for this CDR row. Priority: per-row override > account_id mapping > trunk_group mapping. """ override = (row.customer_type_override or "").lower() if override in ("wholesale", "retail"): return override if row.customer_account_id: key = ("account_id", row.customer_account_id) if key in mappings: return mappings[key] if row.trunk_group_id: key = ("trunk_group", row.trunk_group_id) if key in mappings: return mappings[key] return "unknown" # ─── Main ingester ──────────────────────────────────────────────────────── def process_upload(conn, classifier: CDRClassifier, upload_id: int) -> dict: """Ingest a single upload; returns summary dict.""" with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ UPDATE cdr_ingestion_uploads SET status='processing' WHERE id=%s AND status IN ('pending','failed') RETURNING id, profile_id, raw_minio_path, raw_sha256 """, (upload_id,), ) up = cur.fetchone() if not up: return {"upload_id": upload_id, "skipped": True, "reason": "not pending"} conn.commit() profile_id = up["profile_id"] # Load profile + bucket mappings with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( "SELECT * FROM cdr_ingestion_profiles WHERE id=%s", (profile_id,), ) profile = cur.fetchone() cur.execute( "SELECT match_type, match_value, bucket " "FROM cdr_bucket_mappings WHERE profile_id=%s", (profile_id,), ) mappings = {(r["match_type"], r["match_value"]): r["bucket"] for r in cur.fetchall()} # Download the raw object minio = _minio() raw_path = up["raw_minio_path"] obj_key = raw_path.split(f"{MINIO_BUCKET}/", 1)[-1] if MINIO_BUCKET in raw_path else raw_path tmp = tempfile.NamedTemporaryFile( "wb", suffix=os.path.splitext(obj_key)[1] or ".csv", delete=False, ) try: minio.fget_object(MINIO_BUCKET, obj_key, tmp.name) tmp.close() # Sanity: SHA-256 check with open(tmp.name, "rb") as fh: sha = hashlib.sha256(fh.read()).hexdigest() if sha != up["raw_sha256"]: log.warning( "Upload %s sha mismatch (expected %s got %s) — proceeding", upload_id, up["raw_sha256"], sha, ) adapter_cls = ADAPTERS.get(profile["format"]) if adapter_cls is None: return _mark_failed(conn, upload_id, f"unknown format: {profile['format']}") adapter = adapter_cls(profile_config=profile.get("format_config") or {}) # ── Paywall: only ingest calls for years the customer has paid ── # cdr_study_access_grants tracks paid reporting years per profile. # Any row whose start_time.year isn't in the grant list is dropped # and a customer-visible notice is surfaced on the CDR buckets page. paid_years = _fetch_paid_years(conn, profile_id) rows_dropped_unpaid = 0 dropped_by_year: dict[int, int] = {} # ── Iterate + classify + dedup ──────────────────────────── intra_file_keys: set[str] = set() accepted = dupes_intra = 0 quarantined = 0 billing_state = profile.get("billing_state") with conn.cursor() as cur: iterator = _safe_iter(adapter, tmp.name, upload_id, conn) for row in iterator: # Intra-file dedup nk_hash = row.natural_key_hash(profile_id) if nk_hash in intra_file_keys: dupes_intra += 1 continue intra_file_keys.add(nk_hash) # Paywall year gate row_year = row.start_time.year if row.start_time else None if paid_years and row_year is not None and row_year not in paid_years: rows_dropped_unpaid += 1 dropped_by_year[row_year] = dropped_by_year.get(row_year, 0) + 1 continue # Classify cls_result = classifier.classify_call( caller_number=row.caller_number, called_number=row.called_number, billing_state=billing_state, ) bucket = _resolve_bucket(row, mappings) cur.execute( """ INSERT INTO cdr_calls ( profile_id, upload_id, natural_key_hash, start_time, duration_sec, billed_amount_cents, billed_currency, trunk_group_id, customer_account_id, customer_type, call_direction, caller_npa, caller_state, caller_country, called_npa, called_state, called_country, jurisdiction, orig_state_region, billing_state_region ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) ON CONFLICT (profile_id, natural_key_hash) DO NOTHING """, ( profile_id, upload_id, nk_hash, row.start_time, row.duration_sec, row.billed_amount_cents, row.billed_currency, row.trunk_group_id, row.customer_account_id, bucket, row.call_direction, cls_result.orig_npa, cls_result.orig_state, cls_result.orig_country, cls_result.term_npa, cls_result.term_state, cls_result.term_country, cls_result.jurisdiction, cls_result.orig_state_region, cls_result.billing_state_region, ), ) if cur.rowcount: accepted += 1 else: # Existed cross-upload → count as dedup dupes_intra += 0 # tracked separately below conn.commit() # Cross-upload dedup count — query how many of the intra_file keys # were already in cdr_calls before this upload's rows went in. with conn.cursor() as cur: cur.execute( "SELECT COUNT(*) FROM cdr_calls " "WHERE profile_id=%s AND upload_id=%s", (profile_id, upload_id), ) accepted_actual = cur.fetchone()[0] cur.execute( "SELECT COUNT(*) FROM cdr_quarantine WHERE upload_id=%s", (upload_id,), ) quarantined = cur.fetchone()[0] total_rows = accepted_actual + quarantined + dupes_intra cross_upload_dupes = max(0, total_rows - accepted_actual - quarantined - dupes_intra) # ── Halt if >10% quarantined ───────────────────────────── halt = total_rows > 0 and (quarantined / total_rows) > 0.10 status = "quarantined" if halt else "done" summary = { "rows_total": total_rows, "rows_accepted": accepted_actual, "rows_quarantined": quarantined, "rows_dropped_dupes": dupes_intra + cross_upload_dupes, "rows_dropped_unpaid_years": rows_dropped_unpaid, "dropped_by_year": dropped_by_year, "paid_years": sorted(paid_years) if paid_years else [], "halt_reason": "quarantine_rate_exceeded_10pct" if halt else None, } if rows_dropped_unpaid > 0: years_list = ", ".join(str(y) for y in sorted(dropped_by_year)) summary["customer_notice"] = ( f"{rows_dropped_unpaid:,} call record{'s' if rows_dropped_unpaid != 1 else ''} " f"from year{'s' if len(dropped_by_year) != 1 else ''} {years_list} were NOT " "imported because you haven't purchased 499-A filing access for " f"{'those years' if len(dropped_by_year) != 1 else 'that year'}. " "Order the corresponding 499-A filing service(s) and re-upload to include them." ) with conn.cursor() as cur: cur.execute( """ UPDATE cdr_ingestion_uploads SET status=%s, row_count=%s, rows_accepted=%s, rows_quarantined=%s, rows_dropped_dupes=%s, summary_json=%s::jsonb, processed_at=NOW() WHERE id=%s """, ( status, total_rows, accepted_actual, quarantined, summary["rows_dropped_dupes"], psycopg2.extras.Json(summary), upload_id, ), ) conn.commit() # ── Usage meter refresh + quota check ───────────────────── _refresh_usage(conn, profile_id, profile) log.info("Upload %s done: %s", upload_id, summary) return {"upload_id": upload_id, **summary, "status": status} finally: try: os.unlink(tmp.name) except OSError: pass def _safe_iter(adapter, path, upload_id, conn): """Yield rows from adapter, routing ValidationErrors into quarantine.""" try: iterator = adapter.iter_rows(path) except Exception as exc: log.error("Adapter init failed for upload %s: %s", upload_id, exc) _quarantine(conn, upload_id, None, "adapter_init_failed", str(exc), {}) return source_row = 0 while True: source_row += 1 try: row = next(iterator) except StopIteration: return except ValidationError as ve: _quarantine(conn, upload_id, source_row, ve.reason_code, ve.detail, {}) continue except Exception as exc: _quarantine(conn, upload_id, source_row, "unparseable_row", str(exc), {}) continue yield row def _quarantine(conn, upload_id, source_row, reason_code, detail, payload): with conn.cursor() as cur: cur.execute( "INSERT INTO cdr_quarantine (upload_id, source_row, raw_payload, reason_code, reason_detail) " "VALUES (%s, %s, %s::jsonb, %s, %s)", (upload_id, source_row, psycopg2.extras.Json(payload), reason_code, detail), ) conn.commit() def _mark_failed(conn, upload_id, reason): with conn.cursor() as cur: cur.execute( "UPDATE cdr_ingestion_uploads SET status='failed', error=%s, " "processed_at=NOW() WHERE id=%s", (reason, upload_id), ) conn.commit() return {"upload_id": upload_id, "failed": True, "reason": reason} def _refresh_usage(conn, profile_id, profile) -> None: """Recompute usage meter + trip quota warnings.""" year = datetime.utcnow().year with conn.cursor() as cur: cur.execute( "SELECT COUNT(*) FROM cdr_calls WHERE profile_id=%s " "AND EXTRACT(YEAR FROM start_time) = %s", (profile_id, year), ) rows_ingested = cur.fetchone()[0] # Bytes: sum uploads created in this calendar year. cur.execute( "SELECT COALESCE(SUM(COALESCE((summary_json->>'bytes')::bigint, 0)), 0) " "FROM cdr_ingestion_uploads WHERE profile_id=%s " "AND EXTRACT(YEAR FROM created_at) = %s", (profile_id, year), ) bytes_stored = cur.fetchone()[0] cur.execute( """ INSERT INTO cdr_usage_meters (profile_id, reporting_year, bytes_stored, rows_ingested, last_measured_at) VALUES (%s, %s, %s, %s, NOW()) ON CONFLICT (profile_id, reporting_year) DO UPDATE SET bytes_stored = EXCLUDED.bytes_stored, rows_ingested = EXCLUDED.rows_ingested, last_measured_at = NOW() """, (profile_id, year, bytes_stored, rows_ingested), ) conn.commit() # Warning + block logic plan = profile.get("storage_plan") or "included" storage_cap, rows_cap = QUOTAS.get(plan, (None, None)) if storage_cap is None: return util = max( bytes_stored / storage_cap if storage_cap else 0, rows_ingested / rows_cap if rows_cap else 0, ) if util >= 1.0: _emit_quota_event(conn, profile_id, year, "100pct", profile) elif util >= 0.8: _emit_quota_event(conn, profile_id, year, "80pct", profile) def _emit_quota_event(conn, profile_id, year, level, profile): """Fire a one-time notification per threshold.""" with conn.cursor() as cur: col = "warned_80pct_at" if level == "80pct" else "warned_100pct_at" cur.execute( f"SELECT {col} FROM cdr_usage_meters " f"WHERE profile_id=%s AND reporting_year=%s", (profile_id, year), ) row = cur.fetchone() if row and row[0]: return # already warned cur.execute( f"UPDATE cdr_usage_meters SET {col}=NOW() " f"WHERE profile_id=%s AND reporting_year=%s", (profile_id, year), ) conn.commit() log.warning( "profile %s quota %s tripped — storage_plan=%s over_quota_policy=%s", profile_id, level, profile.get("storage_plan"), profile.get("over_quota_policy"), ) # SMTP + ERPNext ToDo emission — reuse pattern from usf_factor_monitor. # Implementation left to the notification layer (same SMTP helpers). def run_queue_once(limit: int = 50) -> dict: conn = psycopg2.connect(DATABASE_URL) classifier = CDRClassifier() summaries = [] try: while len(summaries) < limit: with conn.cursor() as cur: cur.execute( "SELECT id FROM cdr_ingestion_uploads " "WHERE status='pending' ORDER BY id LIMIT 1", ) row = cur.fetchone() if not row: break summaries.append(process_upload(conn, classifier, row[0])) finally: conn.close() return {"processed": len(summaries), "summaries": summaries} def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--upload-id", type=int, help="Process a specific upload only.") parser.add_argument("--limit", type=int, default=50) args = parser.parse_args() if args.upload_id: conn = psycopg2.connect(DATABASE_URL) try: print(process_upload(conn, CDRClassifier(), args.upload_id)) finally: conn.close() else: print(run_queue_once(limit=args.limit)) if __name__ == "__main__": main()