new-site/scripts/workers/icc_ingester.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

483 lines
18 KiB
Python

"""ICC revenue ingester.
Consumes ``icc_ingestion_uploads`` rows with ``status='pending'`` — for
each:
1. Mark the upload ``status='parsing'``
2. Download the raw object from MinIO (falling back to a local
filesystem path when the MinIO client isn't available, e.g. in
dev/CI)
3. Resolve the adapter from ``source_format`` and iterate the file
4. Bulk-insert rows via ``psycopg2.extras.execute_values`` with
``ON CONFLICT DO NOTHING`` on
``(profile_id, reporting_year, reporting_quarter, natural_key_hash)``
5. Update the upload with accepted/rejected counts, parsed_at, and a
summary JSON blob containing counterparties_count,
total_revenue_cents, and by_category breakdown
6. On any exception during adapter dispatch or iteration, mark the
upload ``failed`` with ``error_message=str(exc)``
Usage:
python -m scripts.workers.icc_ingester # drain pending queue
python -m scripts.workers.icc_ingester --upload-id=42
python -m scripts.workers.icc_ingester --limit=10
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import tempfile
from collections import defaultdict
from dataclasses import asdict
from datetime import datetime
from typing import Iterable, List, Optional
import psycopg2
import psycopg2.extras
from scripts.workers.icc_adapters import ADAPTERS, get_adapter
from scripts.workers.icc_adapters.common import IccRevenueLine, ValidationError
log = logging.getLogger("icc_ingester")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest")
BATCH_SIZE = 500
# ─── MinIO helper (optional) ──────────────────────────────────────────────
def _minio():
"""Return a configured MinIO client, or ``None`` if the lib isn't installed.
When ``None`` is returned, the ingester falls back to treating
``raw_minio_path`` as a direct filesystem path — useful for dev and
for tests where uploads are written straight to ``/tmp``.
"""
try:
from minio import Minio
except ImportError:
log.warning("minio client not installed — will attempt local-filesystem fallback")
return None
return Minio(
os.environ.get("MINIO_ENDPOINT", "minio:9000"),
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
)
def _fetch_object(raw_minio_path: str, suffix_hint: str = "") -> str:
"""Download ``raw_minio_path`` to a temp file; return the local path.
Falls back to treating the path as a local file when the MinIO client
is unavailable or the path doesn't look MinIO-shaped.
"""
# Dev fallback — path exists on disk
if os.path.isfile(raw_minio_path):
return raw_minio_path
client = _minio()
if client is None:
raise FileNotFoundError(
f"MinIO client unavailable and {raw_minio_path!r} does not exist on disk"
)
obj_key = raw_minio_path
if MINIO_BUCKET in raw_minio_path:
obj_key = raw_minio_path.split(f"{MINIO_BUCKET}/", 1)[-1]
suffix = suffix_hint or os.path.splitext(obj_key)[1] or ".dat"
tmp = tempfile.NamedTemporaryFile("wb", suffix=suffix, delete=False)
tmp.close()
client.fget_object(MINIO_BUCKET, obj_key, tmp.name)
return tmp.name
# ─── Ingester core ────────────────────────────────────────────────────────
def _fetch_paid_years(conn, profile_id: int) -> set[int]:
"""Return the set of reporting_years this profile has paid access for.
Used by the year-paywall gate. An empty set means no grants on file —
the caller treats that as "don't filter" (legacy behavior) rather
than "drop everything" to avoid silently losing data on first run.
"""
try:
with conn.cursor() as cur:
cur.execute(
"SELECT reporting_year FROM cdr_study_access_grants WHERE profile_id = %s",
(profile_id,),
)
rows = cur.fetchall()
return {int(r[0]) for r in rows} if rows else set()
except Exception as exc:
log.warning("paid_years lookup failed for profile %s: %s", profile_id, exc)
return set()
def _create_paywall_notice_todo(
*, upload_id: int, profile_id: int, dropped_counts: dict,
) -> None:
"""Create an admin ToDo so the ops team knows the customer has
unlocked data sitting behind the paywall that they haven't paid for.
Customers see the notice client-side via summary_json.customer_notice.
"""
try:
from scripts.workers.erpnext_client import ERPNextClient
years = ", ".join(f"{y} ({n} rows)" for y, n in sorted(dropped_counts.items()))
ERPNextClient().create_resource(
"ToDo",
{
"description": (
f"[icc-paywall] Upload {upload_id} / profile {profile_id}: "
f"dropped rows for unpaid years — {years}. Customer was "
"notified on the ICC import step. Follow up if they want "
"to purchase the prior-year filing(s)."
),
"priority": "Low",
"role": "Accounting Advisor",
},
)
except Exception as exc:
log.debug("paywall ToDo creation failed: %s", exc)
def _extract_reporting_year(summary_json, created_at: datetime) -> int:
"""Pick the ``reporting_year`` for this upload.
Preference order:
1. ``summary_json.reporting_year`` if the uploader set it
2. ``created_at.year - 1`` (typical 499-A filing convention — the
April filing covers the prior calendar year)
3. current UTC year as a last-ditch fallback
"""
if isinstance(summary_json, dict):
y = summary_json.get("reporting_year")
if isinstance(y, int) and 2000 < y < 2100:
return y
if isinstance(y, str) and y.strip().isdigit():
return int(y.strip())
if isinstance(created_at, datetime):
return max(2000, created_at.year - 1)
return datetime.utcnow().year - 1
def _bulk_insert(
conn,
upload_id: int,
rows: Iterable[IccRevenueLine],
) -> tuple[int, int, dict]:
"""Insert rows in batches; return (accepted, total_attempted, summary)."""
accepted = 0
total_attempted = 0
counterparties: set[str] = set()
total_revenue_cents = 0
by_category: dict[str, dict] = defaultdict(
lambda: {"count": 0, "revenue_cents": 0, "minutes_of_use": 0}
)
batch: List[tuple] = []
def _flush(cur) -> int:
if not batch:
return 0
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO icc_revenue_lines (
profile_id, reporting_year, reporting_quarter,
icc_category, counterparty_legal_name, counterparty_ocn,
counterparty_country, revenue_cents, minutes_of_use,
source_upload_id, source_line_no, natural_key_hash, raw_row
) VALUES %s
ON CONFLICT (profile_id, reporting_year, reporting_quarter, natural_key_hash)
DO NOTHING
""",
batch,
template=(
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb)"
),
)
inserted = cur.rowcount or 0
batch.clear()
return inserted
with conn.cursor() as cur:
for row in rows:
total_attempted += 1
counterparties.add(
(row.counterparty_ocn or row.counterparty_legal_name or "").upper()
)
total_revenue_cents += row.revenue_cents
bucket = by_category[row.icc_category]
bucket["count"] += 1
bucket["revenue_cents"] += row.revenue_cents
if row.minutes_of_use:
bucket["minutes_of_use"] += row.minutes_of_use
batch.append((
row.profile_id,
row.reporting_year,
row.reporting_quarter,
row.icc_category,
row.counterparty_legal_name,
row.counterparty_ocn,
row.counterparty_country,
row.revenue_cents,
row.minutes_of_use,
upload_id,
row.source_line_no,
row.natural_key_hash(),
json.dumps(row.raw_row, default=str),
))
if len(batch) >= BATCH_SIZE:
accepted += _flush(cur)
accepted += _flush(cur)
conn.commit()
summary = {
"counterparties_count": len({c for c in counterparties if c}),
"total_revenue_cents": total_revenue_cents,
"by_category": {k: dict(v) for k, v in by_category.items()},
"rows_attempted": total_attempted,
}
return accepted, total_attempted, summary
def _safe_iter(adapter, path: str) -> Iterable[IccRevenueLine]:
"""Wrap adapter.iter_rows so per-row ValidationErrors don't halt.
Yields ``(row, None)`` for a clean row and ``(None, error)`` for a
rejected row.
"""
try:
iterator = adapter.iter_rows(path)
except NotImplementedError:
raise # propagate to outer handler
except Exception as exc: # adapter init failure
log.error("ICC adapter init failed: %s", exc)
raise
while True:
try:
row = next(iterator)
except StopIteration:
return
except ValidationError as ve:
yield (None, ve)
continue
except NotImplementedError:
raise
except Exception as exc:
yield (None, ValidationError("unparseable_row", str(exc)))
continue
yield (row, None)
def process_upload(conn, upload_id: int) -> dict:
"""Ingest a single upload; returns summary dict."""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
UPDATE icc_ingestion_uploads
SET status='parsing'
WHERE id=%s AND status IN ('pending','failed')
RETURNING id, profile_id, customer_id, source_format,
raw_minio_path, raw_sha256, summary_json, created_at
""",
(upload_id,),
)
up = cur.fetchone()
if not up:
return {"upload_id": upload_id, "skipped": True, "reason": "not pending"}
conn.commit()
source_format = up["source_format"]
profile_id = up["profile_id"]
reporting_year = _extract_reporting_year(up.get("summary_json"), up.get("created_at"))
local_path: Optional[str] = None
try:
adapter_cls = get_adapter(source_format)
if adapter_cls is None:
raise ValueError(f"no adapter registered for source_format={source_format!r}")
suffix_hint = ""
if source_format == "cabs_bos":
suffix_hint = ".txt"
elif source_format == "edi_810":
suffix_hint = ".edi"
elif source_format == "8yy_qry":
suffix_hint = ".xml"
elif source_format in ("itu_tas", "icss"):
suffix_hint = ".tsv"
elif source_format == "wholesale_sip_csv":
suffix_hint = ".csv"
elif source_format == "carrier_invoice_pdf":
suffix_hint = ".pdf"
local_path = _fetch_object(up["raw_minio_path"], suffix_hint)
adapter = adapter_cls(profile_id=profile_id, reporting_year=reporting_year)
rows_accepted = 0
rows_rejected = 0
clean_rows: list[IccRevenueLine] = []
# Paywall-gate: only ingest rows for reporting years the customer
# has paid for (cdr_study_access_grants). Drop everything else and
# surface to the customer via summary_json so they know what was
# dropped and can purchase access if they want it retained.
paid_years = _fetch_paid_years(conn, profile_id)
rows_dropped_unpaid = 0
dropped_year_counts: dict[int, int] = {}
for row, err in _safe_iter(adapter, local_path):
if err is not None:
rows_rejected += 1
log.warning(
"Upload %s line %s rejected (%s): %s",
upload_id, getattr(err, "reason_code", "?"),
err.reason_code, err.detail,
)
continue
# Year-paywall: if the row's reporting_year isn't in the
# customer's paid grant list, drop it.
row_year = getattr(row, "reporting_year", reporting_year)
if paid_years and row_year not in paid_years:
rows_dropped_unpaid += 1
dropped_year_counts[row_year] = dropped_year_counts.get(row_year, 0) + 1
continue
clean_rows.append(row)
inserted, attempted, summary = _bulk_insert(conn, upload_id, clean_rows)
rows_accepted = inserted
# Rows attempted but not inserted due to dedup → neither accepted nor rejected
dedup_collisions = max(0, attempted - inserted)
summary.update({
"rows_accepted": rows_accepted,
"rows_rejected": rows_rejected,
"rows_dropped_unpaid_years": rows_dropped_unpaid,
"dropped_by_year": dropped_year_counts,
"paid_years": sorted(paid_years) if paid_years else [],
"dedup_collisions": dedup_collisions,
"reporting_year": reporting_year,
"source_format": source_format,
})
# Notify the customer if anything was dropped. The UI surfaces
# summary_json on the IccImportStep — add a friendly message.
if rows_dropped_unpaid > 0:
years_list = ", ".join(str(y) for y in sorted(dropped_year_counts))
summary["customer_notice"] = (
f"{rows_dropped_unpaid} row{'s' if rows_dropped_unpaid != 1 else ''} from "
f"year{'s' if len(dropped_year_counts) != 1 else ''} {years_list} were NOT "
"imported because you haven't purchased 499-A filing access for "
f"{'those years' if len(dropped_year_counts) != 1 else 'that year'}. "
"Order the corresponding 499-A filing service(s) and re-upload to include them."
)
_create_paywall_notice_todo(
upload_id=upload_id, profile_id=profile_id,
dropped_counts=dropped_year_counts,
)
with conn.cursor() as cur:
cur.execute(
"""
UPDATE icc_ingestion_uploads SET
status='complete',
rows_accepted=%s,
rows_rejected=%s,
summary_json=%s::jsonb,
parsed_at=NOW(),
error_message=NULL
WHERE id=%s
""",
(rows_accepted, rows_rejected, psycopg2.extras.Json(summary), upload_id),
)
conn.commit()
log.info("Upload %s complete: %s", upload_id, summary)
return {"upload_id": upload_id, "status": "complete", **summary}
except Exception as exc:
conn.rollback()
log.exception("Upload %s failed", upload_id)
_mark_failed(conn, upload_id, str(exc))
return {"upload_id": upload_id, "status": "failed", "error": str(exc)}
finally:
# Clean up temp file when we pulled from MinIO (not when path was a
# user-provided local file)
if local_path and local_path != up["raw_minio_path"]:
try:
os.unlink(local_path)
except OSError:
pass
def _mark_failed(conn, upload_id: int, reason: str) -> None:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE icc_ingestion_uploads
SET status='failed',
error_message=%s,
parsed_at=NOW()
WHERE id=%s
""",
(reason[:2000], upload_id),
)
conn.commit()
def run_queue_once(limit: int = 50) -> dict:
"""Drain up to ``limit`` pending uploads; return an aggregate summary."""
conn = psycopg2.connect(DATABASE_URL)
summaries = []
try:
while len(summaries) < limit:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id FROM icc_ingestion_uploads
WHERE status='pending'
ORDER BY id
LIMIT 1
""",
)
row = cur.fetchone()
if not row:
break
summaries.append(process_upload(conn, row[0]))
finally:
conn.close()
return {"processed": len(summaries), "summaries": summaries}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--upload-id", type=int, help="Process a specific upload only.")
parser.add_argument("--limit", type=int, default=50, help="Max uploads per run.")
args = parser.parse_args()
if args.upload_id:
conn = psycopg2.connect(DATABASE_URL)
try:
print(json.dumps(process_upload(conn, args.upload_id), default=str, indent=2))
finally:
conn.close()
else:
print(json.dumps(run_queue_once(limit=args.limit), default=str, indent=2))
if __name__ == "__main__":
main()