Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
483 lines
18 KiB
Python
483 lines
18 KiB
Python
"""ICC revenue ingester.
|
|
|
|
Consumes ``icc_ingestion_uploads`` rows with ``status='pending'`` — for
|
|
each:
|
|
|
|
1. Mark the upload ``status='parsing'``
|
|
2. Download the raw object from MinIO (falling back to a local
|
|
filesystem path when the MinIO client isn't available, e.g. in
|
|
dev/CI)
|
|
3. Resolve the adapter from ``source_format`` and iterate the file
|
|
4. Bulk-insert rows via ``psycopg2.extras.execute_values`` with
|
|
``ON CONFLICT DO NOTHING`` on
|
|
``(profile_id, reporting_year, reporting_quarter, natural_key_hash)``
|
|
5. Update the upload with accepted/rejected counts, parsed_at, and a
|
|
summary JSON blob containing counterparties_count,
|
|
total_revenue_cents, and by_category breakdown
|
|
6. On any exception during adapter dispatch or iteration, mark the
|
|
upload ``failed`` with ``error_message=str(exc)``
|
|
|
|
Usage:
|
|
|
|
python -m scripts.workers.icc_ingester # drain pending queue
|
|
python -m scripts.workers.icc_ingester --upload-id=42
|
|
python -m scripts.workers.icc_ingester --limit=10
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from collections import defaultdict
|
|
from dataclasses import asdict
|
|
from datetime import datetime
|
|
from typing import Iterable, List, Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
from scripts.workers.icc_adapters import ADAPTERS, get_adapter
|
|
from scripts.workers.icc_adapters.common import IccRevenueLine, ValidationError
|
|
|
|
log = logging.getLogger("icc_ingester")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest")
|
|
|
|
BATCH_SIZE = 500
|
|
|
|
|
|
# ─── MinIO helper (optional) ──────────────────────────────────────────────
|
|
|
|
def _minio():
|
|
"""Return a configured MinIO client, or ``None`` if the lib isn't installed.
|
|
|
|
When ``None`` is returned, the ingester falls back to treating
|
|
``raw_minio_path`` as a direct filesystem path — useful for dev and
|
|
for tests where uploads are written straight to ``/tmp``.
|
|
"""
|
|
try:
|
|
from minio import Minio
|
|
except ImportError:
|
|
log.warning("minio client not installed — will attempt local-filesystem fallback")
|
|
return None
|
|
return Minio(
|
|
os.environ.get("MINIO_ENDPOINT", "minio:9000"),
|
|
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
|
|
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
|
|
)
|
|
|
|
|
|
def _fetch_object(raw_minio_path: str, suffix_hint: str = "") -> str:
|
|
"""Download ``raw_minio_path`` to a temp file; return the local path.
|
|
|
|
Falls back to treating the path as a local file when the MinIO client
|
|
is unavailable or the path doesn't look MinIO-shaped.
|
|
"""
|
|
# Dev fallback — path exists on disk
|
|
if os.path.isfile(raw_minio_path):
|
|
return raw_minio_path
|
|
|
|
client = _minio()
|
|
if client is None:
|
|
raise FileNotFoundError(
|
|
f"MinIO client unavailable and {raw_minio_path!r} does not exist on disk"
|
|
)
|
|
|
|
obj_key = raw_minio_path
|
|
if MINIO_BUCKET in raw_minio_path:
|
|
obj_key = raw_minio_path.split(f"{MINIO_BUCKET}/", 1)[-1]
|
|
suffix = suffix_hint or os.path.splitext(obj_key)[1] or ".dat"
|
|
tmp = tempfile.NamedTemporaryFile("wb", suffix=suffix, delete=False)
|
|
tmp.close()
|
|
client.fget_object(MINIO_BUCKET, obj_key, tmp.name)
|
|
return tmp.name
|
|
|
|
|
|
# ─── Ingester core ────────────────────────────────────────────────────────
|
|
|
|
def _fetch_paid_years(conn, profile_id: int) -> set[int]:
|
|
"""Return the set of reporting_years this profile has paid access for.
|
|
|
|
Used by the year-paywall gate. An empty set means no grants on file —
|
|
the caller treats that as "don't filter" (legacy behavior) rather
|
|
than "drop everything" to avoid silently losing data on first run.
|
|
"""
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT reporting_year FROM cdr_study_access_grants WHERE profile_id = %s",
|
|
(profile_id,),
|
|
)
|
|
rows = cur.fetchall()
|
|
return {int(r[0]) for r in rows} if rows else set()
|
|
except Exception as exc:
|
|
log.warning("paid_years lookup failed for profile %s: %s", profile_id, exc)
|
|
return set()
|
|
|
|
|
|
def _create_paywall_notice_todo(
|
|
*, upload_id: int, profile_id: int, dropped_counts: dict,
|
|
) -> None:
|
|
"""Create an admin ToDo so the ops team knows the customer has
|
|
unlocked data sitting behind the paywall that they haven't paid for.
|
|
Customers see the notice client-side via summary_json.customer_notice.
|
|
"""
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
years = ", ".join(f"{y} ({n} rows)" for y, n in sorted(dropped_counts.items()))
|
|
ERPNextClient().create_resource(
|
|
"ToDo",
|
|
{
|
|
"description": (
|
|
f"[icc-paywall] Upload {upload_id} / profile {profile_id}: "
|
|
f"dropped rows for unpaid years — {years}. Customer was "
|
|
"notified on the ICC import step. Follow up if they want "
|
|
"to purchase the prior-year filing(s)."
|
|
),
|
|
"priority": "Low",
|
|
"role": "Accounting Advisor",
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
log.debug("paywall ToDo creation failed: %s", exc)
|
|
|
|
|
|
def _extract_reporting_year(summary_json, created_at: datetime) -> int:
|
|
"""Pick the ``reporting_year`` for this upload.
|
|
|
|
Preference order:
|
|
1. ``summary_json.reporting_year`` if the uploader set it
|
|
2. ``created_at.year - 1`` (typical 499-A filing convention — the
|
|
April filing covers the prior calendar year)
|
|
3. current UTC year as a last-ditch fallback
|
|
"""
|
|
if isinstance(summary_json, dict):
|
|
y = summary_json.get("reporting_year")
|
|
if isinstance(y, int) and 2000 < y < 2100:
|
|
return y
|
|
if isinstance(y, str) and y.strip().isdigit():
|
|
return int(y.strip())
|
|
if isinstance(created_at, datetime):
|
|
return max(2000, created_at.year - 1)
|
|
return datetime.utcnow().year - 1
|
|
|
|
|
|
def _bulk_insert(
|
|
conn,
|
|
upload_id: int,
|
|
rows: Iterable[IccRevenueLine],
|
|
) -> tuple[int, int, dict]:
|
|
"""Insert rows in batches; return (accepted, total_attempted, summary)."""
|
|
accepted = 0
|
|
total_attempted = 0
|
|
counterparties: set[str] = set()
|
|
total_revenue_cents = 0
|
|
by_category: dict[str, dict] = defaultdict(
|
|
lambda: {"count": 0, "revenue_cents": 0, "minutes_of_use": 0}
|
|
)
|
|
|
|
batch: List[tuple] = []
|
|
|
|
def _flush(cur) -> int:
|
|
if not batch:
|
|
return 0
|
|
psycopg2.extras.execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO icc_revenue_lines (
|
|
profile_id, reporting_year, reporting_quarter,
|
|
icc_category, counterparty_legal_name, counterparty_ocn,
|
|
counterparty_country, revenue_cents, minutes_of_use,
|
|
source_upload_id, source_line_no, natural_key_hash, raw_row
|
|
) VALUES %s
|
|
ON CONFLICT (profile_id, reporting_year, reporting_quarter, natural_key_hash)
|
|
DO NOTHING
|
|
""",
|
|
batch,
|
|
template=(
|
|
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb)"
|
|
),
|
|
)
|
|
inserted = cur.rowcount or 0
|
|
batch.clear()
|
|
return inserted
|
|
|
|
with conn.cursor() as cur:
|
|
for row in rows:
|
|
total_attempted += 1
|
|
counterparties.add(
|
|
(row.counterparty_ocn or row.counterparty_legal_name or "").upper()
|
|
)
|
|
total_revenue_cents += row.revenue_cents
|
|
bucket = by_category[row.icc_category]
|
|
bucket["count"] += 1
|
|
bucket["revenue_cents"] += row.revenue_cents
|
|
if row.minutes_of_use:
|
|
bucket["minutes_of_use"] += row.minutes_of_use
|
|
batch.append((
|
|
row.profile_id,
|
|
row.reporting_year,
|
|
row.reporting_quarter,
|
|
row.icc_category,
|
|
row.counterparty_legal_name,
|
|
row.counterparty_ocn,
|
|
row.counterparty_country,
|
|
row.revenue_cents,
|
|
row.minutes_of_use,
|
|
upload_id,
|
|
row.source_line_no,
|
|
row.natural_key_hash(),
|
|
json.dumps(row.raw_row, default=str),
|
|
))
|
|
if len(batch) >= BATCH_SIZE:
|
|
accepted += _flush(cur)
|
|
accepted += _flush(cur)
|
|
conn.commit()
|
|
|
|
summary = {
|
|
"counterparties_count": len({c for c in counterparties if c}),
|
|
"total_revenue_cents": total_revenue_cents,
|
|
"by_category": {k: dict(v) for k, v in by_category.items()},
|
|
"rows_attempted": total_attempted,
|
|
}
|
|
return accepted, total_attempted, summary
|
|
|
|
|
|
def _safe_iter(adapter, path: str) -> Iterable[IccRevenueLine]:
|
|
"""Wrap adapter.iter_rows so per-row ValidationErrors don't halt.
|
|
|
|
Yields ``(row, None)`` for a clean row and ``(None, error)`` for a
|
|
rejected row.
|
|
"""
|
|
try:
|
|
iterator = adapter.iter_rows(path)
|
|
except NotImplementedError:
|
|
raise # propagate to outer handler
|
|
except Exception as exc: # adapter init failure
|
|
log.error("ICC adapter init failed: %s", exc)
|
|
raise
|
|
|
|
while True:
|
|
try:
|
|
row = next(iterator)
|
|
except StopIteration:
|
|
return
|
|
except ValidationError as ve:
|
|
yield (None, ve)
|
|
continue
|
|
except NotImplementedError:
|
|
raise
|
|
except Exception as exc:
|
|
yield (None, ValidationError("unparseable_row", str(exc)))
|
|
continue
|
|
yield (row, None)
|
|
|
|
|
|
def process_upload(conn, upload_id: int) -> dict:
|
|
"""Ingest a single upload; returns summary dict."""
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE icc_ingestion_uploads
|
|
SET status='parsing'
|
|
WHERE id=%s AND status IN ('pending','failed')
|
|
RETURNING id, profile_id, customer_id, source_format,
|
|
raw_minio_path, raw_sha256, summary_json, created_at
|
|
""",
|
|
(upload_id,),
|
|
)
|
|
up = cur.fetchone()
|
|
if not up:
|
|
return {"upload_id": upload_id, "skipped": True, "reason": "not pending"}
|
|
conn.commit()
|
|
|
|
source_format = up["source_format"]
|
|
profile_id = up["profile_id"]
|
|
reporting_year = _extract_reporting_year(up.get("summary_json"), up.get("created_at"))
|
|
|
|
local_path: Optional[str] = None
|
|
try:
|
|
adapter_cls = get_adapter(source_format)
|
|
if adapter_cls is None:
|
|
raise ValueError(f"no adapter registered for source_format={source_format!r}")
|
|
|
|
suffix_hint = ""
|
|
if source_format == "cabs_bos":
|
|
suffix_hint = ".txt"
|
|
elif source_format == "edi_810":
|
|
suffix_hint = ".edi"
|
|
elif source_format == "8yy_qry":
|
|
suffix_hint = ".xml"
|
|
elif source_format in ("itu_tas", "icss"):
|
|
suffix_hint = ".tsv"
|
|
elif source_format == "wholesale_sip_csv":
|
|
suffix_hint = ".csv"
|
|
elif source_format == "carrier_invoice_pdf":
|
|
suffix_hint = ".pdf"
|
|
local_path = _fetch_object(up["raw_minio_path"], suffix_hint)
|
|
|
|
adapter = adapter_cls(profile_id=profile_id, reporting_year=reporting_year)
|
|
|
|
rows_accepted = 0
|
|
rows_rejected = 0
|
|
clean_rows: list[IccRevenueLine] = []
|
|
|
|
# Paywall-gate: only ingest rows for reporting years the customer
|
|
# has paid for (cdr_study_access_grants). Drop everything else and
|
|
# surface to the customer via summary_json so they know what was
|
|
# dropped and can purchase access if they want it retained.
|
|
paid_years = _fetch_paid_years(conn, profile_id)
|
|
rows_dropped_unpaid = 0
|
|
dropped_year_counts: dict[int, int] = {}
|
|
|
|
for row, err in _safe_iter(adapter, local_path):
|
|
if err is not None:
|
|
rows_rejected += 1
|
|
log.warning(
|
|
"Upload %s line %s rejected (%s): %s",
|
|
upload_id, getattr(err, "reason_code", "?"),
|
|
err.reason_code, err.detail,
|
|
)
|
|
continue
|
|
# Year-paywall: if the row's reporting_year isn't in the
|
|
# customer's paid grant list, drop it.
|
|
row_year = getattr(row, "reporting_year", reporting_year)
|
|
if paid_years and row_year not in paid_years:
|
|
rows_dropped_unpaid += 1
|
|
dropped_year_counts[row_year] = dropped_year_counts.get(row_year, 0) + 1
|
|
continue
|
|
clean_rows.append(row)
|
|
|
|
inserted, attempted, summary = _bulk_insert(conn, upload_id, clean_rows)
|
|
rows_accepted = inserted
|
|
# Rows attempted but not inserted due to dedup → neither accepted nor rejected
|
|
dedup_collisions = max(0, attempted - inserted)
|
|
|
|
summary.update({
|
|
"rows_accepted": rows_accepted,
|
|
"rows_rejected": rows_rejected,
|
|
"rows_dropped_unpaid_years": rows_dropped_unpaid,
|
|
"dropped_by_year": dropped_year_counts,
|
|
"paid_years": sorted(paid_years) if paid_years else [],
|
|
"dedup_collisions": dedup_collisions,
|
|
"reporting_year": reporting_year,
|
|
"source_format": source_format,
|
|
})
|
|
|
|
# Notify the customer if anything was dropped. The UI surfaces
|
|
# summary_json on the IccImportStep — add a friendly message.
|
|
if rows_dropped_unpaid > 0:
|
|
years_list = ", ".join(str(y) for y in sorted(dropped_year_counts))
|
|
summary["customer_notice"] = (
|
|
f"{rows_dropped_unpaid} row{'s' if rows_dropped_unpaid != 1 else ''} from "
|
|
f"year{'s' if len(dropped_year_counts) != 1 else ''} {years_list} were NOT "
|
|
"imported because you haven't purchased 499-A filing access for "
|
|
f"{'those years' if len(dropped_year_counts) != 1 else 'that year'}. "
|
|
"Order the corresponding 499-A filing service(s) and re-upload to include them."
|
|
)
|
|
_create_paywall_notice_todo(
|
|
upload_id=upload_id, profile_id=profile_id,
|
|
dropped_counts=dropped_year_counts,
|
|
)
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE icc_ingestion_uploads SET
|
|
status='complete',
|
|
rows_accepted=%s,
|
|
rows_rejected=%s,
|
|
summary_json=%s::jsonb,
|
|
parsed_at=NOW(),
|
|
error_message=NULL
|
|
WHERE id=%s
|
|
""",
|
|
(rows_accepted, rows_rejected, psycopg2.extras.Json(summary), upload_id),
|
|
)
|
|
conn.commit()
|
|
|
|
log.info("Upload %s complete: %s", upload_id, summary)
|
|
return {"upload_id": upload_id, "status": "complete", **summary}
|
|
|
|
except Exception as exc:
|
|
conn.rollback()
|
|
log.exception("Upload %s failed", upload_id)
|
|
_mark_failed(conn, upload_id, str(exc))
|
|
return {"upload_id": upload_id, "status": "failed", "error": str(exc)}
|
|
finally:
|
|
# Clean up temp file when we pulled from MinIO (not when path was a
|
|
# user-provided local file)
|
|
if local_path and local_path != up["raw_minio_path"]:
|
|
try:
|
|
os.unlink(local_path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _mark_failed(conn, upload_id: int, reason: str) -> None:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE icc_ingestion_uploads
|
|
SET status='failed',
|
|
error_message=%s,
|
|
parsed_at=NOW()
|
|
WHERE id=%s
|
|
""",
|
|
(reason[:2000], upload_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def run_queue_once(limit: int = 50) -> dict:
|
|
"""Drain up to ``limit`` pending uploads; return an aggregate summary."""
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
summaries = []
|
|
try:
|
|
while len(summaries) < limit:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT id FROM icc_ingestion_uploads
|
|
WHERE status='pending'
|
|
ORDER BY id
|
|
LIMIT 1
|
|
""",
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
break
|
|
summaries.append(process_upload(conn, row[0]))
|
|
finally:
|
|
conn.close()
|
|
return {"processed": len(summaries), "summaries": summaries}
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--upload-id", type=int, help="Process a specific upload only.")
|
|
parser.add_argument("--limit", type=int, default=50, help="Max uploads per run.")
|
|
args = parser.parse_args()
|
|
if args.upload_id:
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
try:
|
|
print(json.dumps(process_upload(conn, args.upload_id), default=str, indent=2))
|
|
finally:
|
|
conn.close()
|
|
else:
|
|
print(json.dumps(run_queue_once(limit=args.limit), default=str, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|