"""Daily access-stimulation monitor. Implements the FCC 47 CFR 61.3(bbb) "access stimulation" detection rule: when a local-exchange carrier's terminating interstate traffic exceeds originating interstate traffic by a 3:1 ratio over a rolling 6-month window, the carrier is deemed an "access-stimulating" LEC and must either benchmark its terminating switched-access rate to the other LEC's or refund the differential. For a CLEC reseller, tripping this threshold is a compliance red-flag: the rate-cap disclosure must appear on the 499-A filing's ``term_switched_access`` revenue line, and an admin review is required before the Q1 filing goes out. Flow (for each active ``cdr_ingestion_profiles`` row): 1. Aggregate 6 months of ``cdr_traffic_studies`` into: term_minutes = SUM(retail_minutes) orig_minutes = SUM(wholesale_minutes) (Retail traffic → terminating proxy; wholesale traffic → originating proxy. Per the 499-A plan spec: this is a conservative proxy until direction-tagged CDRs are mandatory.) 2. If ``orig_minutes > 0`` and ``term/orig > 3.0`` *and* the profile hasn't been flagged in the last 30 days: - Stamp ``access_stim_flagged_at=NOW()`` on the profile - Persist evidence payload to ``access_stim_evidence_json`` - Create an admin ToDo via ERPNext for the Accounting Advisor role Usage: python -m scripts.workers.access_stim_monitor python -m scripts.workers.access_stim_monitor --dry-run """ from __future__ import annotations import argparse import json import logging import os import sys from datetime import datetime, timezone from typing import Optional import psycopg2 import psycopg2.extras log = logging.getLogger("access_stim_monitor") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) DATABASE_URL = os.environ.get("DATABASE_URL", "") ACCESS_STIM_RATIO_THRESHOLD = 3.0 RE_FLAG_COOLDOWN_DAYS = 30 WINDOW_LABEL = "6m" # ─── Admin ToDo helper ──────────────────────────────────────────────────── def _create_admin_todo(profile_id: int, description: str) -> None: """Create an ERPNext ToDo for the Accounting Advisor role. Mirrors the pattern in ``scripts.workers.services.cores_frn_registration.CORESFRNRegistrationHandler._create_admin_todo``. Failures are swallowed (logged) to avoid blocking the monitor's evidence-persistence path. """ try: from scripts.workers.erpnext_client import ERPNextClient ERPNextClient().create_resource( "ToDo", { "description": f"[access-stim-monitor] profile_id={profile_id}\n\n{description}", "priority": "High", "role": "Accounting Advisor", }, ) except Exception as exc: # pragma: no cover log.error("Could not create admin ToDo for profile %s: %s", profile_id, exc) # ─── Monitor core ───────────────────────────────────────────────────────── def evaluate_profile( conn, profile_id: int, *, dry_run: bool = False, ) -> dict: """Evaluate one profile. Returns a summary dict.""" with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ SELECT COALESCE(SUM(COALESCE(retail_minutes, 0) + COALESCE(wholesale_minutes, 0)), 0) AS total_minutes, COALESCE(SUM(COALESCE(retail_minutes, 0)), 0) AS term_minutes, COALESCE(SUM(COALESCE(wholesale_minutes, 0)), 0) AS orig_minutes FROM cdr_traffic_studies WHERE profile_id = %s AND generated_at >= NOW() - INTERVAL '6 months' """, (profile_id,), ) agg = cur.fetchone() or {} term_minutes = int(agg.get("term_minutes") or 0) orig_minutes = int(agg.get("orig_minutes") or 0) if orig_minutes <= 0: return { "profile_id": profile_id, "flagged": False, "reason": "no_orig_traffic", "term_minutes": term_minutes, "orig_minutes": orig_minutes, } ratio = term_minutes / orig_minutes if ratio <= ACCESS_STIM_RATIO_THRESHOLD: return { "profile_id": profile_id, "flagged": False, "reason": "below_threshold", "ratio": round(ratio, 3), "term_minutes": term_minutes, "orig_minutes": orig_minutes, } # Threshold exceeded — check cooldown with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ SELECT access_stim_flagged_at FROM cdr_ingestion_profiles WHERE id = %s """, (profile_id,), ) row = cur.fetchone() or {} last_flagged = row.get("access_stim_flagged_at") if last_flagged is not None: now = datetime.now(tz=last_flagged.tzinfo) if last_flagged.tzinfo else datetime.utcnow() delta = now - last_flagged if delta.days < RE_FLAG_COOLDOWN_DAYS: return { "profile_id": profile_id, "flagged": False, "reason": "cooldown", "ratio": round(ratio, 3), "term_minutes": term_minutes, "orig_minutes": orig_minutes, "last_flagged_at": last_flagged.isoformat(), } evidence = { "ratio": round(ratio, 3), "term_minutes": term_minutes, "orig_minutes": orig_minutes, "threshold": ACCESS_STIM_RATIO_THRESHOLD, "window": WINDOW_LABEL, "evaluated_at": datetime.now(tz=timezone.utc).isoformat(), } if dry_run: log.info( "[dry-run] profile %s would be flagged: ratio=%.2f:1 (term=%d, orig=%d)", profile_id, ratio, term_minutes, orig_minutes, ) return { "profile_id": profile_id, "flagged": True, "dry_run": True, "evidence": evidence, } with conn.cursor() as cur: cur.execute( """ UPDATE cdr_ingestion_profiles SET access_stim_flagged_at = NOW(), access_stim_evidence_json = %s::jsonb WHERE id = %s """, (json.dumps(evidence), profile_id), ) conn.commit() description = ( f"Access stimulation 3:1 threshold exceeded for profile {profile_id} " f"(ratio {ratio:.2f}:1). Revenue reported on term_switched_access " f"should include 47 CFR 61.3(bbb) rate-cap disclosure.\n\n" f"Evidence: term_minutes={term_minutes:,}, orig_minutes={orig_minutes:,}, " f"window={WINDOW_LABEL}, evaluated_at={evidence['evaluated_at']}." ) _create_admin_todo(profile_id, description) log.warning( "profile %s flagged access-stim ratio=%.2f:1 (term=%d, orig=%d)", profile_id, ratio, term_minutes, orig_minutes, ) return { "profile_id": profile_id, "flagged": True, "dry_run": False, "evidence": evidence, } def run_once(*, dry_run: bool = False) -> dict: conn = psycopg2.connect(DATABASE_URL) summaries: list[dict] = [] try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ SELECT id FROM cdr_ingestion_profiles WHERE active = TRUE ORDER BY id """, ) profiles = cur.fetchall() for p in profiles: summaries.append(evaluate_profile(conn, p["id"], dry_run=dry_run)) finally: conn.close() flagged = sum(1 for s in summaries if s.get("flagged") and not s.get("dry_run")) dry_flagged = sum(1 for s in summaries if s.get("flagged") and s.get("dry_run")) return { "evaluated": len(summaries), "flagged": flagged, "dry_flagged": dry_flagged, "summaries": summaries, } def main() -> None: parser = argparse.ArgumentParser() parser.add_argument( "--dry-run", action="store_true", help="Evaluate but do not update profiles or create ToDos.", ) args = parser.parse_args() result = run_once(dry_run=args.dry_run) print(json.dumps(result, default=str, indent=2)) if __name__ == "__main__": main()