"""ICC revenue ingester. Consumes ``icc_ingestion_uploads`` rows with ``status='pending'`` — for each: 1. Mark the upload ``status='parsing'`` 2. Download the raw object from MinIO (falling back to a local filesystem path when the MinIO client isn't available, e.g. in dev/CI) 3. Resolve the adapter from ``source_format`` and iterate the file 4. Bulk-insert rows via ``psycopg2.extras.execute_values`` with ``ON CONFLICT DO NOTHING`` on ``(profile_id, reporting_year, reporting_quarter, natural_key_hash)`` 5. Update the upload with accepted/rejected counts, parsed_at, and a summary JSON blob containing counterparties_count, total_revenue_cents, and by_category breakdown 6. On any exception during adapter dispatch or iteration, mark the upload ``failed`` with ``error_message=str(exc)`` Usage: python -m scripts.workers.icc_ingester # drain pending queue python -m scripts.workers.icc_ingester --upload-id=42 python -m scripts.workers.icc_ingester --limit=10 """ from __future__ import annotations import argparse import json import logging import os import sys import tempfile from collections import defaultdict from dataclasses import asdict from datetime import datetime from typing import Iterable, List, Optional import psycopg2 import psycopg2.extras from scripts.workers.icc_adapters import ADAPTERS, get_adapter from scripts.workers.icc_adapters.common import IccRevenueLine, ValidationError log = logging.getLogger("icc_ingester") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) DATABASE_URL = os.environ.get("DATABASE_URL", "") MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest") BATCH_SIZE = 500 # ─── MinIO helper (optional) ────────────────────────────────────────────── def _minio(): """Return a configured MinIO client, or ``None`` if the lib isn't installed. When ``None`` is returned, the ingester falls back to treating ``raw_minio_path`` as a direct filesystem path — useful for dev and for tests where uploads are written straight to ``/tmp``. """ try: from minio import Minio except ImportError: log.warning("minio client not installed — will attempt local-filesystem fallback") return None return Minio( os.environ.get("MINIO_ENDPOINT", "minio:9000"), access_key=os.environ.get("MINIO_ACCESS_KEY", ""), secret_key=os.environ.get("MINIO_SECRET_KEY", ""), secure=os.environ.get("MINIO_SECURE", "false").lower() == "true", ) def _fetch_object(raw_minio_path: str, suffix_hint: str = "") -> str: """Download ``raw_minio_path`` to a temp file; return the local path. Falls back to treating the path as a local file when the MinIO client is unavailable or the path doesn't look MinIO-shaped. """ # Dev fallback — path exists on disk if os.path.isfile(raw_minio_path): return raw_minio_path client = _minio() if client is None: raise FileNotFoundError( f"MinIO client unavailable and {raw_minio_path!r} does not exist on disk" ) obj_key = raw_minio_path if MINIO_BUCKET in raw_minio_path: obj_key = raw_minio_path.split(f"{MINIO_BUCKET}/", 1)[-1] suffix = suffix_hint or os.path.splitext(obj_key)[1] or ".dat" tmp = tempfile.NamedTemporaryFile("wb", suffix=suffix, delete=False) tmp.close() client.fget_object(MINIO_BUCKET, obj_key, tmp.name) return tmp.name # ─── Ingester core ──────────────────────────────────────────────────────── def _fetch_paid_years(conn, profile_id: int) -> set[int]: """Return the set of reporting_years this profile has paid access for. Used by the year-paywall gate. An empty set means no grants on file — the caller treats that as "don't filter" (legacy behavior) rather than "drop everything" to avoid silently losing data on first run. """ try: with conn.cursor() as cur: cur.execute( "SELECT reporting_year FROM cdr_study_access_grants WHERE profile_id = %s", (profile_id,), ) rows = cur.fetchall() return {int(r[0]) for r in rows} if rows else set() except Exception as exc: log.warning("paid_years lookup failed for profile %s: %s", profile_id, exc) return set() def _create_paywall_notice_todo( *, upload_id: int, profile_id: int, dropped_counts: dict, ) -> None: """Create an admin ToDo so the ops team knows the customer has unlocked data sitting behind the paywall that they haven't paid for. Customers see the notice client-side via summary_json.customer_notice. """ try: from scripts.workers.erpnext_client import ERPNextClient years = ", ".join(f"{y} ({n} rows)" for y, n in sorted(dropped_counts.items())) ERPNextClient().create_resource( "ToDo", { "description": ( f"[icc-paywall] Upload {upload_id} / profile {profile_id}: " f"dropped rows for unpaid years — {years}. Customer was " "notified on the ICC import step. Follow up if they want " "to purchase the prior-year filing(s)." ), "priority": "Low", "role": "Accounting Advisor", }, ) except Exception as exc: log.debug("paywall ToDo creation failed: %s", exc) def _extract_reporting_year(summary_json, created_at: datetime) -> int: """Pick the ``reporting_year`` for this upload. Preference order: 1. ``summary_json.reporting_year`` if the uploader set it 2. ``created_at.year - 1`` (typical 499-A filing convention — the April filing covers the prior calendar year) 3. current UTC year as a last-ditch fallback """ if isinstance(summary_json, dict): y = summary_json.get("reporting_year") if isinstance(y, int) and 2000 < y < 2100: return y if isinstance(y, str) and y.strip().isdigit(): return int(y.strip()) if isinstance(created_at, datetime): return max(2000, created_at.year - 1) return datetime.utcnow().year - 1 def _bulk_insert( conn, upload_id: int, rows: Iterable[IccRevenueLine], ) -> tuple[int, int, dict]: """Insert rows in batches; return (accepted, total_attempted, summary).""" accepted = 0 total_attempted = 0 counterparties: set[str] = set() total_revenue_cents = 0 by_category: dict[str, dict] = defaultdict( lambda: {"count": 0, "revenue_cents": 0, "minutes_of_use": 0} ) batch: List[tuple] = [] def _flush(cur) -> int: if not batch: return 0 psycopg2.extras.execute_values( cur, """ INSERT INTO icc_revenue_lines ( profile_id, reporting_year, reporting_quarter, icc_category, counterparty_legal_name, counterparty_ocn, counterparty_country, revenue_cents, minutes_of_use, source_upload_id, source_line_no, natural_key_hash, raw_row ) VALUES %s ON CONFLICT (profile_id, reporting_year, reporting_quarter, natural_key_hash) DO NOTHING """, batch, template=( "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb)" ), ) inserted = cur.rowcount or 0 batch.clear() return inserted with conn.cursor() as cur: for row in rows: total_attempted += 1 counterparties.add( (row.counterparty_ocn or row.counterparty_legal_name or "").upper() ) total_revenue_cents += row.revenue_cents bucket = by_category[row.icc_category] bucket["count"] += 1 bucket["revenue_cents"] += row.revenue_cents if row.minutes_of_use: bucket["minutes_of_use"] += row.minutes_of_use batch.append(( row.profile_id, row.reporting_year, row.reporting_quarter, row.icc_category, row.counterparty_legal_name, row.counterparty_ocn, row.counterparty_country, row.revenue_cents, row.minutes_of_use, upload_id, row.source_line_no, row.natural_key_hash(), json.dumps(row.raw_row, default=str), )) if len(batch) >= BATCH_SIZE: accepted += _flush(cur) accepted += _flush(cur) conn.commit() summary = { "counterparties_count": len({c for c in counterparties if c}), "total_revenue_cents": total_revenue_cents, "by_category": {k: dict(v) for k, v in by_category.items()}, "rows_attempted": total_attempted, } return accepted, total_attempted, summary def _safe_iter(adapter, path: str) -> Iterable[IccRevenueLine]: """Wrap adapter.iter_rows so per-row ValidationErrors don't halt. Yields ``(row, None)`` for a clean row and ``(None, error)`` for a rejected row. """ try: iterator = adapter.iter_rows(path) except NotImplementedError: raise # propagate to outer handler except Exception as exc: # adapter init failure log.error("ICC adapter init failed: %s", exc) raise while True: try: row = next(iterator) except StopIteration: return except ValidationError as ve: yield (None, ve) continue except NotImplementedError: raise except Exception as exc: yield (None, ValidationError("unparseable_row", str(exc))) continue yield (row, None) def process_upload(conn, upload_id: int) -> dict: """Ingest a single upload; returns summary dict.""" with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ UPDATE icc_ingestion_uploads SET status='parsing' WHERE id=%s AND status IN ('pending','failed') RETURNING id, profile_id, customer_id, source_format, raw_minio_path, raw_sha256, summary_json, created_at """, (upload_id,), ) up = cur.fetchone() if not up: return {"upload_id": upload_id, "skipped": True, "reason": "not pending"} conn.commit() source_format = up["source_format"] profile_id = up["profile_id"] reporting_year = _extract_reporting_year(up.get("summary_json"), up.get("created_at")) local_path: Optional[str] = None try: adapter_cls = get_adapter(source_format) if adapter_cls is None: raise ValueError(f"no adapter registered for source_format={source_format!r}") suffix_hint = "" if source_format == "cabs_bos": suffix_hint = ".txt" elif source_format == "edi_810": suffix_hint = ".edi" elif source_format == "8yy_qry": suffix_hint = ".xml" elif source_format in ("itu_tas", "icss"): suffix_hint = ".tsv" elif source_format == "wholesale_sip_csv": suffix_hint = ".csv" elif source_format == "carrier_invoice_pdf": suffix_hint = ".pdf" local_path = _fetch_object(up["raw_minio_path"], suffix_hint) adapter = adapter_cls(profile_id=profile_id, reporting_year=reporting_year) rows_accepted = 0 rows_rejected = 0 clean_rows: list[IccRevenueLine] = [] # Paywall-gate: only ingest rows for reporting years the customer # has paid for (cdr_study_access_grants). Drop everything else and # surface to the customer via summary_json so they know what was # dropped and can purchase access if they want it retained. paid_years = _fetch_paid_years(conn, profile_id) rows_dropped_unpaid = 0 dropped_year_counts: dict[int, int] = {} for row, err in _safe_iter(adapter, local_path): if err is not None: rows_rejected += 1 log.warning( "Upload %s line %s rejected (%s): %s", upload_id, getattr(err, "reason_code", "?"), err.reason_code, err.detail, ) continue # Year-paywall: if the row's reporting_year isn't in the # customer's paid grant list, drop it. row_year = getattr(row, "reporting_year", reporting_year) if paid_years and row_year not in paid_years: rows_dropped_unpaid += 1 dropped_year_counts[row_year] = dropped_year_counts.get(row_year, 0) + 1 continue clean_rows.append(row) inserted, attempted, summary = _bulk_insert(conn, upload_id, clean_rows) rows_accepted = inserted # Rows attempted but not inserted due to dedup → neither accepted nor rejected dedup_collisions = max(0, attempted - inserted) summary.update({ "rows_accepted": rows_accepted, "rows_rejected": rows_rejected, "rows_dropped_unpaid_years": rows_dropped_unpaid, "dropped_by_year": dropped_year_counts, "paid_years": sorted(paid_years) if paid_years else [], "dedup_collisions": dedup_collisions, "reporting_year": reporting_year, "source_format": source_format, }) # Notify the customer if anything was dropped. The UI surfaces # summary_json on the IccImportStep — add a friendly message. if rows_dropped_unpaid > 0: years_list = ", ".join(str(y) for y in sorted(dropped_year_counts)) summary["customer_notice"] = ( f"{rows_dropped_unpaid} row{'s' if rows_dropped_unpaid != 1 else ''} from " f"year{'s' if len(dropped_year_counts) != 1 else ''} {years_list} were NOT " "imported because you haven't purchased 499-A filing access for " f"{'those years' if len(dropped_year_counts) != 1 else 'that year'}. " "Order the corresponding 499-A filing service(s) and re-upload to include them." ) _create_paywall_notice_todo( upload_id=upload_id, profile_id=profile_id, dropped_counts=dropped_year_counts, ) with conn.cursor() as cur: cur.execute( """ UPDATE icc_ingestion_uploads SET status='complete', rows_accepted=%s, rows_rejected=%s, summary_json=%s::jsonb, parsed_at=NOW(), error_message=NULL WHERE id=%s """, (rows_accepted, rows_rejected, psycopg2.extras.Json(summary), upload_id), ) conn.commit() log.info("Upload %s complete: %s", upload_id, summary) return {"upload_id": upload_id, "status": "complete", **summary} except Exception as exc: conn.rollback() log.exception("Upload %s failed", upload_id) _mark_failed(conn, upload_id, str(exc)) return {"upload_id": upload_id, "status": "failed", "error": str(exc)} finally: # Clean up temp file when we pulled from MinIO (not when path was a # user-provided local file) if local_path and local_path != up["raw_minio_path"]: try: os.unlink(local_path) except OSError: pass def _mark_failed(conn, upload_id: int, reason: str) -> None: with conn.cursor() as cur: cur.execute( """ UPDATE icc_ingestion_uploads SET status='failed', error_message=%s, parsed_at=NOW() WHERE id=%s """, (reason[:2000], upload_id), ) conn.commit() def run_queue_once(limit: int = 50) -> dict: """Drain up to ``limit`` pending uploads; return an aggregate summary.""" conn = psycopg2.connect(DATABASE_URL) summaries = [] try: while len(summaries) < limit: with conn.cursor() as cur: cur.execute( """ SELECT id FROM icc_ingestion_uploads WHERE status='pending' ORDER BY id LIMIT 1 """, ) row = cur.fetchone() if not row: break summaries.append(process_upload(conn, row[0])) finally: conn.close() return {"processed": len(summaries), "summaries": summaries} def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--upload-id", type=int, help="Process a specific upload only.") parser.add_argument("--limit", type=int, default=50, help="Max uploads per run.") args = parser.parse_args() if args.upload_id: conn = psycopg2.connect(DATABASE_URL) try: print(json.dumps(process_upload(conn, args.upload_id), default=str, indent=2)) finally: conn.close() else: print(json.dumps(run_queue_once(limit=args.limit), default=str, indent=2)) if __name__ == "__main__": main()