new-site/scripts/workers/cdr_ingester.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

477 lines
18 KiB
Python

"""
CDR ingester.
Consumes `cdr_ingestion_uploads` rows with `status='pending'` — for each:
1. Download the raw object from MinIO
2. Pick the adapter (profile.format) and iterate rows
3. Deduplicate within-file + cross-file
4. Resolve wholesale/retail bucket (per-row override > account_id > trunk_group)
5. Classify jurisdiction + Block 5 regions
6. Insert into cdr_calls (ON CONFLICT DO NOTHING), quarantine failures
7. Update the upload row with summary counts + status
8. Refresh cdr_usage_meters; raise quota warnings/blocks as needed
Designed to be safe to run as: single worker, cron every N minutes, or
triggered by the webhook/upload endpoint after each new row lands.
Usage:
python -m scripts.workers.cdr_ingester # consume queue once
python -m scripts.workers.cdr_ingester --upload-id=42
"""
from __future__ import annotations
import argparse
import hashlib
import io
import logging
import os
import sys
import tempfile
from datetime import datetime, timezone
from typing import Optional
import psycopg2
import psycopg2.extras
from scripts.workers.cdr_adapters import ADAPTERS
from scripts.workers.cdr_adapters.base import ValidationError
from scripts.workers.cdr_classifier import CDRClassifier
log = logging.getLogger("cdr_ingester")
def _fetch_paid_years(conn, profile_id: int) -> set[int]:
"""Return the set of reporting years this profile has paid access to
(via cdr_study_access_grants). Empty set == no grants on file; the
caller treats that as "don't filter" to avoid accidentally dropping
everything on first-run profiles.
"""
try:
with conn.cursor() as cur:
cur.execute(
"SELECT reporting_year FROM cdr_study_access_grants WHERE profile_id = %s",
(profile_id,),
)
return {int(r[0]) for r in cur.fetchall() or []}
except Exception as exc:
log.warning("paid_years lookup failed for profile %s: %s", profile_id, exc)
return set()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
# Storage plan quotas (must mirror the plan file). Amounts in bytes + rows.
QUOTAS = {
"included": (10 * 1024**3, 10_000_000),
"tier1": (50 * 1024**3, 50_000_000),
"tier2": (250 * 1024**3, 250_000_000),
"tier3": (1024 * 1024**3, 1_000_000_000),
"enterprise": (None, None), # custom
}
# ─── MinIO helper ─────────────────────────────────────────────────────────
def _minio():
from minio import Minio
return Minio(
os.environ.get("MINIO_ENDPOINT", "minio:9000"),
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
)
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest")
# ─── Bucket resolver ──────────────────────────────────────────────────────
def _resolve_bucket(row, mappings: dict) -> str:
"""Return 'wholesale' | 'retail' | 'unknown' for this CDR row.
Priority: per-row override > account_id mapping > trunk_group mapping.
"""
override = (row.customer_type_override or "").lower()
if override in ("wholesale", "retail"):
return override
if row.customer_account_id:
key = ("account_id", row.customer_account_id)
if key in mappings:
return mappings[key]
if row.trunk_group_id:
key = ("trunk_group", row.trunk_group_id)
if key in mappings:
return mappings[key]
return "unknown"
# ─── Main ingester ────────────────────────────────────────────────────────
def process_upload(conn, classifier: CDRClassifier, upload_id: int) -> dict:
"""Ingest a single upload; returns summary dict."""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
UPDATE cdr_ingestion_uploads
SET status='processing'
WHERE id=%s AND status IN ('pending','failed')
RETURNING id, profile_id, raw_minio_path, raw_sha256
""",
(upload_id,),
)
up = cur.fetchone()
if not up:
return {"upload_id": upload_id, "skipped": True, "reason": "not pending"}
conn.commit()
profile_id = up["profile_id"]
# Load profile + bucket mappings
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"SELECT * FROM cdr_ingestion_profiles WHERE id=%s", (profile_id,),
)
profile = cur.fetchone()
cur.execute(
"SELECT match_type, match_value, bucket "
"FROM cdr_bucket_mappings WHERE profile_id=%s",
(profile_id,),
)
mappings = {(r["match_type"], r["match_value"]): r["bucket"]
for r in cur.fetchall()}
# Download the raw object
minio = _minio()
raw_path = up["raw_minio_path"]
obj_key = raw_path.split(f"{MINIO_BUCKET}/", 1)[-1] if MINIO_BUCKET in raw_path else raw_path
tmp = tempfile.NamedTemporaryFile(
"wb", suffix=os.path.splitext(obj_key)[1] or ".csv", delete=False,
)
try:
minio.fget_object(MINIO_BUCKET, obj_key, tmp.name)
tmp.close()
# Sanity: SHA-256 check
with open(tmp.name, "rb") as fh:
sha = hashlib.sha256(fh.read()).hexdigest()
if sha != up["raw_sha256"]:
log.warning(
"Upload %s sha mismatch (expected %s got %s) — proceeding",
upload_id, up["raw_sha256"], sha,
)
adapter_cls = ADAPTERS.get(profile["format"])
if adapter_cls is None:
return _mark_failed(conn, upload_id, f"unknown format: {profile['format']}")
adapter = adapter_cls(profile_config=profile.get("format_config") or {})
# ── Paywall: only ingest calls for years the customer has paid ──
# cdr_study_access_grants tracks paid reporting years per profile.
# Any row whose start_time.year isn't in the grant list is dropped
# and a customer-visible notice is surfaced on the CDR buckets page.
paid_years = _fetch_paid_years(conn, profile_id)
rows_dropped_unpaid = 0
dropped_by_year: dict[int, int] = {}
# ── Iterate + classify + dedup ────────────────────────────
intra_file_keys: set[str] = set()
accepted = dupes_intra = 0
quarantined = 0
billing_state = profile.get("billing_state")
with conn.cursor() as cur:
iterator = _safe_iter(adapter, tmp.name, upload_id, conn)
for row in iterator:
# Intra-file dedup
nk_hash = row.natural_key_hash(profile_id)
if nk_hash in intra_file_keys:
dupes_intra += 1
continue
intra_file_keys.add(nk_hash)
# Paywall year gate
row_year = row.start_time.year if row.start_time else None
if paid_years and row_year is not None and row_year not in paid_years:
rows_dropped_unpaid += 1
dropped_by_year[row_year] = dropped_by_year.get(row_year, 0) + 1
continue
# Classify
cls_result = classifier.classify_call(
caller_number=row.caller_number,
called_number=row.called_number,
billing_state=billing_state,
)
bucket = _resolve_bucket(row, mappings)
cur.execute(
"""
INSERT INTO cdr_calls (
profile_id, upload_id, natural_key_hash,
start_time, duration_sec,
billed_amount_cents, billed_currency,
trunk_group_id, customer_account_id, customer_type,
call_direction, caller_npa, caller_state, caller_country,
called_npa, called_state, called_country,
jurisdiction, orig_state_region, billing_state_region
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON CONFLICT (profile_id, natural_key_hash) DO NOTHING
""",
(
profile_id, upload_id, nk_hash,
row.start_time, row.duration_sec,
row.billed_amount_cents, row.billed_currency,
row.trunk_group_id, row.customer_account_id, bucket,
row.call_direction,
cls_result.orig_npa, cls_result.orig_state, cls_result.orig_country,
cls_result.term_npa, cls_result.term_state, cls_result.term_country,
cls_result.jurisdiction,
cls_result.orig_state_region, cls_result.billing_state_region,
),
)
if cur.rowcount:
accepted += 1
else:
# Existed cross-upload → count as dedup
dupes_intra += 0 # tracked separately below
conn.commit()
# Cross-upload dedup count — query how many of the intra_file keys
# were already in cdr_calls before this upload's rows went in.
with conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM cdr_calls "
"WHERE profile_id=%s AND upload_id=%s",
(profile_id, upload_id),
)
accepted_actual = cur.fetchone()[0]
cur.execute(
"SELECT COUNT(*) FROM cdr_quarantine WHERE upload_id=%s",
(upload_id,),
)
quarantined = cur.fetchone()[0]
total_rows = accepted_actual + quarantined + dupes_intra
cross_upload_dupes = max(0, total_rows - accepted_actual - quarantined - dupes_intra)
# ── Halt if >10% quarantined ─────────────────────────────
halt = total_rows > 0 and (quarantined / total_rows) > 0.10
status = "quarantined" if halt else "done"
summary = {
"rows_total": total_rows,
"rows_accepted": accepted_actual,
"rows_quarantined": quarantined,
"rows_dropped_dupes": dupes_intra + cross_upload_dupes,
"rows_dropped_unpaid_years": rows_dropped_unpaid,
"dropped_by_year": dropped_by_year,
"paid_years": sorted(paid_years) if paid_years else [],
"halt_reason": "quarantine_rate_exceeded_10pct" if halt else None,
}
if rows_dropped_unpaid > 0:
years_list = ", ".join(str(y) for y in sorted(dropped_by_year))
summary["customer_notice"] = (
f"{rows_dropped_unpaid:,} call record{'s' if rows_dropped_unpaid != 1 else ''} "
f"from year{'s' if len(dropped_by_year) != 1 else ''} {years_list} were NOT "
"imported because you haven't purchased 499-A filing access for "
f"{'those years' if len(dropped_by_year) != 1 else 'that year'}. "
"Order the corresponding 499-A filing service(s) and re-upload to include them."
)
with conn.cursor() as cur:
cur.execute(
"""
UPDATE cdr_ingestion_uploads SET
status=%s, row_count=%s, rows_accepted=%s,
rows_quarantined=%s, rows_dropped_dupes=%s,
summary_json=%s::jsonb, processed_at=NOW()
WHERE id=%s
""",
(
status, total_rows, accepted_actual, quarantined,
summary["rows_dropped_dupes"],
psycopg2.extras.Json(summary), upload_id,
),
)
conn.commit()
# ── Usage meter refresh + quota check ─────────────────────
_refresh_usage(conn, profile_id, profile)
log.info("Upload %s done: %s", upload_id, summary)
return {"upload_id": upload_id, **summary, "status": status}
finally:
try:
os.unlink(tmp.name)
except OSError:
pass
def _safe_iter(adapter, path, upload_id, conn):
"""Yield rows from adapter, routing ValidationErrors into quarantine."""
try:
iterator = adapter.iter_rows(path)
except Exception as exc:
log.error("Adapter init failed for upload %s: %s", upload_id, exc)
_quarantine(conn, upload_id, None, "adapter_init_failed", str(exc), {})
return
source_row = 0
while True:
source_row += 1
try:
row = next(iterator)
except StopIteration:
return
except ValidationError as ve:
_quarantine(conn, upload_id, source_row, ve.reason_code, ve.detail, {})
continue
except Exception as exc:
_quarantine(conn, upload_id, source_row, "unparseable_row", str(exc), {})
continue
yield row
def _quarantine(conn, upload_id, source_row, reason_code, detail, payload):
with conn.cursor() as cur:
cur.execute(
"INSERT INTO cdr_quarantine (upload_id, source_row, raw_payload, reason_code, reason_detail) "
"VALUES (%s, %s, %s::jsonb, %s, %s)",
(upload_id, source_row, psycopg2.extras.Json(payload), reason_code, detail),
)
conn.commit()
def _mark_failed(conn, upload_id, reason):
with conn.cursor() as cur:
cur.execute(
"UPDATE cdr_ingestion_uploads SET status='failed', error=%s, "
"processed_at=NOW() WHERE id=%s",
(reason, upload_id),
)
conn.commit()
return {"upload_id": upload_id, "failed": True, "reason": reason}
def _refresh_usage(conn, profile_id, profile) -> None:
"""Recompute usage meter + trip quota warnings."""
year = datetime.utcnow().year
with conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM cdr_calls WHERE profile_id=%s "
"AND EXTRACT(YEAR FROM start_time) = %s",
(profile_id, year),
)
rows_ingested = cur.fetchone()[0]
# Bytes: sum uploads created in this calendar year.
cur.execute(
"SELECT COALESCE(SUM(COALESCE((summary_json->>'bytes')::bigint, 0)), 0) "
"FROM cdr_ingestion_uploads WHERE profile_id=%s "
"AND EXTRACT(YEAR FROM created_at) = %s",
(profile_id, year),
)
bytes_stored = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO cdr_usage_meters (profile_id, reporting_year, bytes_stored, rows_ingested, last_measured_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (profile_id, reporting_year) DO UPDATE SET
bytes_stored = EXCLUDED.bytes_stored,
rows_ingested = EXCLUDED.rows_ingested,
last_measured_at = NOW()
""",
(profile_id, year, bytes_stored, rows_ingested),
)
conn.commit()
# Warning + block logic
plan = profile.get("storage_plan") or "included"
storage_cap, rows_cap = QUOTAS.get(plan, (None, None))
if storage_cap is None:
return
util = max(
bytes_stored / storage_cap if storage_cap else 0,
rows_ingested / rows_cap if rows_cap else 0,
)
if util >= 1.0:
_emit_quota_event(conn, profile_id, year, "100pct", profile)
elif util >= 0.8:
_emit_quota_event(conn, profile_id, year, "80pct", profile)
def _emit_quota_event(conn, profile_id, year, level, profile):
"""Fire a one-time notification per threshold."""
with conn.cursor() as cur:
col = "warned_80pct_at" if level == "80pct" else "warned_100pct_at"
cur.execute(
f"SELECT {col} FROM cdr_usage_meters "
f"WHERE profile_id=%s AND reporting_year=%s",
(profile_id, year),
)
row = cur.fetchone()
if row and row[0]:
return # already warned
cur.execute(
f"UPDATE cdr_usage_meters SET {col}=NOW() "
f"WHERE profile_id=%s AND reporting_year=%s",
(profile_id, year),
)
conn.commit()
log.warning(
"profile %s quota %s tripped — storage_plan=%s over_quota_policy=%s",
profile_id, level, profile.get("storage_plan"),
profile.get("over_quota_policy"),
)
# SMTP + ERPNext ToDo emission — reuse pattern from usf_factor_monitor.
# Implementation left to the notification layer (same SMTP helpers).
def run_queue_once(limit: int = 50) -> dict:
conn = psycopg2.connect(DATABASE_URL)
classifier = CDRClassifier()
summaries = []
try:
while len(summaries) < limit:
with conn.cursor() as cur:
cur.execute(
"SELECT id FROM cdr_ingestion_uploads "
"WHERE status='pending' ORDER BY id LIMIT 1",
)
row = cur.fetchone()
if not row:
break
summaries.append(process_upload(conn, classifier, row[0]))
finally:
conn.close()
return {"processed": len(summaries), "summaries": summaries}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--upload-id", type=int, help="Process a specific upload only.")
parser.add_argument("--limit", type=int, default=50)
args = parser.parse_args()
if args.upload_id:
conn = psycopg2.connect(DATABASE_URL)
try:
print(process_upload(conn, CDRClassifier(), args.upload_id))
finally:
conn.close()
else:
print(run_queue_once(limit=args.limit))
if __name__ == "__main__":
main()