Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
477 lines
18 KiB
Python
477 lines
18 KiB
Python
"""
|
|
CDR ingester.
|
|
|
|
Consumes `cdr_ingestion_uploads` rows with `status='pending'` — for each:
|
|
1. Download the raw object from MinIO
|
|
2. Pick the adapter (profile.format) and iterate rows
|
|
3. Deduplicate within-file + cross-file
|
|
4. Resolve wholesale/retail bucket (per-row override > account_id > trunk_group)
|
|
5. Classify jurisdiction + Block 5 regions
|
|
6. Insert into cdr_calls (ON CONFLICT DO NOTHING), quarantine failures
|
|
7. Update the upload row with summary counts + status
|
|
8. Refresh cdr_usage_meters; raise quota warnings/blocks as needed
|
|
|
|
Designed to be safe to run as: single worker, cron every N minutes, or
|
|
triggered by the webhook/upload endpoint after each new row lands.
|
|
|
|
Usage:
|
|
python -m scripts.workers.cdr_ingester # consume queue once
|
|
python -m scripts.workers.cdr_ingester --upload-id=42
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
from scripts.workers.cdr_adapters import ADAPTERS
|
|
from scripts.workers.cdr_adapters.base import ValidationError
|
|
from scripts.workers.cdr_classifier import CDRClassifier
|
|
|
|
log = logging.getLogger("cdr_ingester")
|
|
|
|
|
|
def _fetch_paid_years(conn, profile_id: int) -> set[int]:
|
|
"""Return the set of reporting years this profile has paid access to
|
|
(via cdr_study_access_grants). Empty set == no grants on file; the
|
|
caller treats that as "don't filter" to avoid accidentally dropping
|
|
everything on first-run profiles.
|
|
"""
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT reporting_year FROM cdr_study_access_grants WHERE profile_id = %s",
|
|
(profile_id,),
|
|
)
|
|
return {int(r[0]) for r in cur.fetchall() or []}
|
|
except Exception as exc:
|
|
log.warning("paid_years lookup failed for profile %s: %s", profile_id, exc)
|
|
return set()
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
|
|
# Storage plan quotas (must mirror the plan file). Amounts in bytes + rows.
|
|
QUOTAS = {
|
|
"included": (10 * 1024**3, 10_000_000),
|
|
"tier1": (50 * 1024**3, 50_000_000),
|
|
"tier2": (250 * 1024**3, 250_000_000),
|
|
"tier3": (1024 * 1024**3, 1_000_000_000),
|
|
"enterprise": (None, None), # custom
|
|
}
|
|
|
|
|
|
# ─── MinIO helper ─────────────────────────────────────────────────────────
|
|
|
|
def _minio():
|
|
from minio import Minio
|
|
return Minio(
|
|
os.environ.get("MINIO_ENDPOINT", "minio:9000"),
|
|
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
|
|
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
|
|
)
|
|
|
|
|
|
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest")
|
|
|
|
|
|
# ─── Bucket resolver ──────────────────────────────────────────────────────
|
|
|
|
def _resolve_bucket(row, mappings: dict) -> str:
|
|
"""Return 'wholesale' | 'retail' | 'unknown' for this CDR row.
|
|
|
|
Priority: per-row override > account_id mapping > trunk_group mapping.
|
|
"""
|
|
override = (row.customer_type_override or "").lower()
|
|
if override in ("wholesale", "retail"):
|
|
return override
|
|
if row.customer_account_id:
|
|
key = ("account_id", row.customer_account_id)
|
|
if key in mappings:
|
|
return mappings[key]
|
|
if row.trunk_group_id:
|
|
key = ("trunk_group", row.trunk_group_id)
|
|
if key in mappings:
|
|
return mappings[key]
|
|
return "unknown"
|
|
|
|
|
|
# ─── Main ingester ────────────────────────────────────────────────────────
|
|
|
|
def process_upload(conn, classifier: CDRClassifier, upload_id: int) -> dict:
|
|
"""Ingest a single upload; returns summary dict."""
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE cdr_ingestion_uploads
|
|
SET status='processing'
|
|
WHERE id=%s AND status IN ('pending','failed')
|
|
RETURNING id, profile_id, raw_minio_path, raw_sha256
|
|
""",
|
|
(upload_id,),
|
|
)
|
|
up = cur.fetchone()
|
|
if not up:
|
|
return {"upload_id": upload_id, "skipped": True, "reason": "not pending"}
|
|
conn.commit()
|
|
|
|
profile_id = up["profile_id"]
|
|
|
|
# Load profile + bucket mappings
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"SELECT * FROM cdr_ingestion_profiles WHERE id=%s", (profile_id,),
|
|
)
|
|
profile = cur.fetchone()
|
|
cur.execute(
|
|
"SELECT match_type, match_value, bucket "
|
|
"FROM cdr_bucket_mappings WHERE profile_id=%s",
|
|
(profile_id,),
|
|
)
|
|
mappings = {(r["match_type"], r["match_value"]): r["bucket"]
|
|
for r in cur.fetchall()}
|
|
|
|
# Download the raw object
|
|
minio = _minio()
|
|
raw_path = up["raw_minio_path"]
|
|
obj_key = raw_path.split(f"{MINIO_BUCKET}/", 1)[-1] if MINIO_BUCKET in raw_path else raw_path
|
|
tmp = tempfile.NamedTemporaryFile(
|
|
"wb", suffix=os.path.splitext(obj_key)[1] or ".csv", delete=False,
|
|
)
|
|
try:
|
|
minio.fget_object(MINIO_BUCKET, obj_key, tmp.name)
|
|
tmp.close()
|
|
|
|
# Sanity: SHA-256 check
|
|
with open(tmp.name, "rb") as fh:
|
|
sha = hashlib.sha256(fh.read()).hexdigest()
|
|
if sha != up["raw_sha256"]:
|
|
log.warning(
|
|
"Upload %s sha mismatch (expected %s got %s) — proceeding",
|
|
upload_id, up["raw_sha256"], sha,
|
|
)
|
|
|
|
adapter_cls = ADAPTERS.get(profile["format"])
|
|
if adapter_cls is None:
|
|
return _mark_failed(conn, upload_id, f"unknown format: {profile['format']}")
|
|
|
|
adapter = adapter_cls(profile_config=profile.get("format_config") or {})
|
|
|
|
# ── Paywall: only ingest calls for years the customer has paid ──
|
|
# cdr_study_access_grants tracks paid reporting years per profile.
|
|
# Any row whose start_time.year isn't in the grant list is dropped
|
|
# and a customer-visible notice is surfaced on the CDR buckets page.
|
|
paid_years = _fetch_paid_years(conn, profile_id)
|
|
rows_dropped_unpaid = 0
|
|
dropped_by_year: dict[int, int] = {}
|
|
|
|
# ── Iterate + classify + dedup ────────────────────────────
|
|
intra_file_keys: set[str] = set()
|
|
accepted = dupes_intra = 0
|
|
quarantined = 0
|
|
|
|
billing_state = profile.get("billing_state")
|
|
|
|
with conn.cursor() as cur:
|
|
iterator = _safe_iter(adapter, tmp.name, upload_id, conn)
|
|
for row in iterator:
|
|
# Intra-file dedup
|
|
nk_hash = row.natural_key_hash(profile_id)
|
|
if nk_hash in intra_file_keys:
|
|
dupes_intra += 1
|
|
continue
|
|
intra_file_keys.add(nk_hash)
|
|
|
|
# Paywall year gate
|
|
row_year = row.start_time.year if row.start_time else None
|
|
if paid_years and row_year is not None and row_year not in paid_years:
|
|
rows_dropped_unpaid += 1
|
|
dropped_by_year[row_year] = dropped_by_year.get(row_year, 0) + 1
|
|
continue
|
|
|
|
# Classify
|
|
cls_result = classifier.classify_call(
|
|
caller_number=row.caller_number,
|
|
called_number=row.called_number,
|
|
billing_state=billing_state,
|
|
)
|
|
|
|
bucket = _resolve_bucket(row, mappings)
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO cdr_calls (
|
|
profile_id, upload_id, natural_key_hash,
|
|
start_time, duration_sec,
|
|
billed_amount_cents, billed_currency,
|
|
trunk_group_id, customer_account_id, customer_type,
|
|
call_direction, caller_npa, caller_state, caller_country,
|
|
called_npa, called_state, called_country,
|
|
jurisdiction, orig_state_region, billing_state_region
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
|
|
)
|
|
ON CONFLICT (profile_id, natural_key_hash) DO NOTHING
|
|
""",
|
|
(
|
|
profile_id, upload_id, nk_hash,
|
|
row.start_time, row.duration_sec,
|
|
row.billed_amount_cents, row.billed_currency,
|
|
row.trunk_group_id, row.customer_account_id, bucket,
|
|
row.call_direction,
|
|
cls_result.orig_npa, cls_result.orig_state, cls_result.orig_country,
|
|
cls_result.term_npa, cls_result.term_state, cls_result.term_country,
|
|
cls_result.jurisdiction,
|
|
cls_result.orig_state_region, cls_result.billing_state_region,
|
|
),
|
|
)
|
|
if cur.rowcount:
|
|
accepted += 1
|
|
else:
|
|
# Existed cross-upload → count as dedup
|
|
dupes_intra += 0 # tracked separately below
|
|
conn.commit()
|
|
|
|
# Cross-upload dedup count — query how many of the intra_file keys
|
|
# were already in cdr_calls before this upload's rows went in.
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM cdr_calls "
|
|
"WHERE profile_id=%s AND upload_id=%s",
|
|
(profile_id, upload_id),
|
|
)
|
|
accepted_actual = cur.fetchone()[0]
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM cdr_quarantine WHERE upload_id=%s",
|
|
(upload_id,),
|
|
)
|
|
quarantined = cur.fetchone()[0]
|
|
|
|
total_rows = accepted_actual + quarantined + dupes_intra
|
|
cross_upload_dupes = max(0, total_rows - accepted_actual - quarantined - dupes_intra)
|
|
|
|
# ── Halt if >10% quarantined ─────────────────────────────
|
|
halt = total_rows > 0 and (quarantined / total_rows) > 0.10
|
|
status = "quarantined" if halt else "done"
|
|
|
|
summary = {
|
|
"rows_total": total_rows,
|
|
"rows_accepted": accepted_actual,
|
|
"rows_quarantined": quarantined,
|
|
"rows_dropped_dupes": dupes_intra + cross_upload_dupes,
|
|
"rows_dropped_unpaid_years": rows_dropped_unpaid,
|
|
"dropped_by_year": dropped_by_year,
|
|
"paid_years": sorted(paid_years) if paid_years else [],
|
|
"halt_reason": "quarantine_rate_exceeded_10pct" if halt else None,
|
|
}
|
|
if rows_dropped_unpaid > 0:
|
|
years_list = ", ".join(str(y) for y in sorted(dropped_by_year))
|
|
summary["customer_notice"] = (
|
|
f"{rows_dropped_unpaid:,} call record{'s' if rows_dropped_unpaid != 1 else ''} "
|
|
f"from year{'s' if len(dropped_by_year) != 1 else ''} {years_list} were NOT "
|
|
"imported because you haven't purchased 499-A filing access for "
|
|
f"{'those years' if len(dropped_by_year) != 1 else 'that year'}. "
|
|
"Order the corresponding 499-A filing service(s) and re-upload to include them."
|
|
)
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE cdr_ingestion_uploads SET
|
|
status=%s, row_count=%s, rows_accepted=%s,
|
|
rows_quarantined=%s, rows_dropped_dupes=%s,
|
|
summary_json=%s::jsonb, processed_at=NOW()
|
|
WHERE id=%s
|
|
""",
|
|
(
|
|
status, total_rows, accepted_actual, quarantined,
|
|
summary["rows_dropped_dupes"],
|
|
psycopg2.extras.Json(summary), upload_id,
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
# ── Usage meter refresh + quota check ─────────────────────
|
|
_refresh_usage(conn, profile_id, profile)
|
|
|
|
log.info("Upload %s done: %s", upload_id, summary)
|
|
return {"upload_id": upload_id, **summary, "status": status}
|
|
finally:
|
|
try:
|
|
os.unlink(tmp.name)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _safe_iter(adapter, path, upload_id, conn):
|
|
"""Yield rows from adapter, routing ValidationErrors into quarantine."""
|
|
try:
|
|
iterator = adapter.iter_rows(path)
|
|
except Exception as exc:
|
|
log.error("Adapter init failed for upload %s: %s", upload_id, exc)
|
|
_quarantine(conn, upload_id, None, "adapter_init_failed", str(exc), {})
|
|
return
|
|
|
|
source_row = 0
|
|
while True:
|
|
source_row += 1
|
|
try:
|
|
row = next(iterator)
|
|
except StopIteration:
|
|
return
|
|
except ValidationError as ve:
|
|
_quarantine(conn, upload_id, source_row, ve.reason_code, ve.detail, {})
|
|
continue
|
|
except Exception as exc:
|
|
_quarantine(conn, upload_id, source_row, "unparseable_row", str(exc), {})
|
|
continue
|
|
yield row
|
|
|
|
|
|
def _quarantine(conn, upload_id, source_row, reason_code, detail, payload):
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"INSERT INTO cdr_quarantine (upload_id, source_row, raw_payload, reason_code, reason_detail) "
|
|
"VALUES (%s, %s, %s::jsonb, %s, %s)",
|
|
(upload_id, source_row, psycopg2.extras.Json(payload), reason_code, detail),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def _mark_failed(conn, upload_id, reason):
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE cdr_ingestion_uploads SET status='failed', error=%s, "
|
|
"processed_at=NOW() WHERE id=%s",
|
|
(reason, upload_id),
|
|
)
|
|
conn.commit()
|
|
return {"upload_id": upload_id, "failed": True, "reason": reason}
|
|
|
|
|
|
def _refresh_usage(conn, profile_id, profile) -> None:
|
|
"""Recompute usage meter + trip quota warnings."""
|
|
year = datetime.utcnow().year
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM cdr_calls WHERE profile_id=%s "
|
|
"AND EXTRACT(YEAR FROM start_time) = %s",
|
|
(profile_id, year),
|
|
)
|
|
rows_ingested = cur.fetchone()[0]
|
|
# Bytes: sum uploads created in this calendar year.
|
|
cur.execute(
|
|
"SELECT COALESCE(SUM(COALESCE((summary_json->>'bytes')::bigint, 0)), 0) "
|
|
"FROM cdr_ingestion_uploads WHERE profile_id=%s "
|
|
"AND EXTRACT(YEAR FROM created_at) = %s",
|
|
(profile_id, year),
|
|
)
|
|
bytes_stored = cur.fetchone()[0]
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO cdr_usage_meters (profile_id, reporting_year, bytes_stored, rows_ingested, last_measured_at)
|
|
VALUES (%s, %s, %s, %s, NOW())
|
|
ON CONFLICT (profile_id, reporting_year) DO UPDATE SET
|
|
bytes_stored = EXCLUDED.bytes_stored,
|
|
rows_ingested = EXCLUDED.rows_ingested,
|
|
last_measured_at = NOW()
|
|
""",
|
|
(profile_id, year, bytes_stored, rows_ingested),
|
|
)
|
|
conn.commit()
|
|
|
|
# Warning + block logic
|
|
plan = profile.get("storage_plan") or "included"
|
|
storage_cap, rows_cap = QUOTAS.get(plan, (None, None))
|
|
if storage_cap is None:
|
|
return
|
|
util = max(
|
|
bytes_stored / storage_cap if storage_cap else 0,
|
|
rows_ingested / rows_cap if rows_cap else 0,
|
|
)
|
|
if util >= 1.0:
|
|
_emit_quota_event(conn, profile_id, year, "100pct", profile)
|
|
elif util >= 0.8:
|
|
_emit_quota_event(conn, profile_id, year, "80pct", profile)
|
|
|
|
|
|
def _emit_quota_event(conn, profile_id, year, level, profile):
|
|
"""Fire a one-time notification per threshold."""
|
|
with conn.cursor() as cur:
|
|
col = "warned_80pct_at" if level == "80pct" else "warned_100pct_at"
|
|
cur.execute(
|
|
f"SELECT {col} FROM cdr_usage_meters "
|
|
f"WHERE profile_id=%s AND reporting_year=%s",
|
|
(profile_id, year),
|
|
)
|
|
row = cur.fetchone()
|
|
if row and row[0]:
|
|
return # already warned
|
|
cur.execute(
|
|
f"UPDATE cdr_usage_meters SET {col}=NOW() "
|
|
f"WHERE profile_id=%s AND reporting_year=%s",
|
|
(profile_id, year),
|
|
)
|
|
conn.commit()
|
|
log.warning(
|
|
"profile %s quota %s tripped — storage_plan=%s over_quota_policy=%s",
|
|
profile_id, level, profile.get("storage_plan"),
|
|
profile.get("over_quota_policy"),
|
|
)
|
|
# SMTP + ERPNext ToDo emission — reuse pattern from usf_factor_monitor.
|
|
# Implementation left to the notification layer (same SMTP helpers).
|
|
|
|
|
|
def run_queue_once(limit: int = 50) -> dict:
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
classifier = CDRClassifier()
|
|
summaries = []
|
|
try:
|
|
while len(summaries) < limit:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT id FROM cdr_ingestion_uploads "
|
|
"WHERE status='pending' ORDER BY id LIMIT 1",
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
break
|
|
summaries.append(process_upload(conn, classifier, row[0]))
|
|
finally:
|
|
conn.close()
|
|
return {"processed": len(summaries), "summaries": summaries}
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--upload-id", type=int, help="Process a specific upload only.")
|
|
parser.add_argument("--limit", type=int, default=50)
|
|
args = parser.parse_args()
|
|
if args.upload_id:
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
try:
|
|
print(process_upload(conn, CDRClassifier(), args.upload_id))
|
|
finally:
|
|
conn.close()
|
|
else:
|
|
print(run_queue_once(limit=args.limit))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|