new-site/scripts/workers/access_stim_monitor.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

249 lines
8.6 KiB
Python

"""Daily access-stimulation monitor.
Implements the FCC 47 CFR 61.3(bbb) "access stimulation" detection rule:
when a local-exchange carrier's terminating interstate traffic exceeds
originating interstate traffic by a 3:1 ratio over a rolling 6-month
window, the carrier is deemed an "access-stimulating" LEC and must
either benchmark its terminating switched-access rate to the other LEC's
or refund the differential. For a CLEC reseller, tripping this threshold
is a compliance red-flag: the rate-cap disclosure must appear on the
499-A filing's ``term_switched_access`` revenue line, and an admin
review is required before the Q1 filing goes out.
Flow (for each active ``cdr_ingestion_profiles`` row):
1. Aggregate 6 months of ``cdr_traffic_studies`` into:
term_minutes = SUM(retail_minutes)
orig_minutes = SUM(wholesale_minutes)
(Retail traffic → terminating proxy; wholesale traffic →
originating proxy. Per the 499-A plan spec: this is a conservative
proxy until direction-tagged CDRs are mandatory.)
2. If ``orig_minutes > 0`` and ``term/orig > 3.0`` *and* the profile
hasn't been flagged in the last 30 days:
- Stamp ``access_stim_flagged_at=NOW()`` on the profile
- Persist evidence payload to ``access_stim_evidence_json``
- Create an admin ToDo via ERPNext for the Accounting Advisor role
Usage:
python -m scripts.workers.access_stim_monitor
python -m scripts.workers.access_stim_monitor --dry-run
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
from datetime import datetime, timezone
from typing import Optional
import psycopg2
import psycopg2.extras
log = logging.getLogger("access_stim_monitor")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
ACCESS_STIM_RATIO_THRESHOLD = 3.0
RE_FLAG_COOLDOWN_DAYS = 30
WINDOW_LABEL = "6m"
# ─── Admin ToDo helper ────────────────────────────────────────────────────
def _create_admin_todo(profile_id: int, description: str) -> None:
"""Create an ERPNext ToDo for the Accounting Advisor role.
Mirrors the pattern in
``scripts.workers.services.cores_frn_registration.CORESFRNRegistrationHandler._create_admin_todo``.
Failures are swallowed (logged) to avoid blocking the monitor's
evidence-persistence path.
"""
try:
from scripts.workers.erpnext_client import ERPNextClient
ERPNextClient().create_resource(
"ToDo",
{
"description": f"[access-stim-monitor] profile_id={profile_id}\n\n{description}",
"priority": "High",
"role": "Accounting Advisor",
},
)
except Exception as exc: # pragma: no cover
log.error("Could not create admin ToDo for profile %s: %s", profile_id, exc)
# ─── Monitor core ─────────────────────────────────────────────────────────
def evaluate_profile(
conn,
profile_id: int,
*,
dry_run: bool = False,
) -> dict:
"""Evaluate one profile. Returns a summary dict."""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT COALESCE(SUM(COALESCE(retail_minutes, 0) + COALESCE(wholesale_minutes, 0)), 0)
AS total_minutes,
COALESCE(SUM(COALESCE(retail_minutes, 0)), 0) AS term_minutes,
COALESCE(SUM(COALESCE(wholesale_minutes, 0)), 0) AS orig_minutes
FROM cdr_traffic_studies
WHERE profile_id = %s
AND generated_at >= NOW() - INTERVAL '6 months'
""",
(profile_id,),
)
agg = cur.fetchone() or {}
term_minutes = int(agg.get("term_minutes") or 0)
orig_minutes = int(agg.get("orig_minutes") or 0)
if orig_minutes <= 0:
return {
"profile_id": profile_id,
"flagged": False,
"reason": "no_orig_traffic",
"term_minutes": term_minutes,
"orig_minutes": orig_minutes,
}
ratio = term_minutes / orig_minutes
if ratio <= ACCESS_STIM_RATIO_THRESHOLD:
return {
"profile_id": profile_id,
"flagged": False,
"reason": "below_threshold",
"ratio": round(ratio, 3),
"term_minutes": term_minutes,
"orig_minutes": orig_minutes,
}
# Threshold exceeded — check cooldown
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT access_stim_flagged_at
FROM cdr_ingestion_profiles
WHERE id = %s
""",
(profile_id,),
)
row = cur.fetchone() or {}
last_flagged = row.get("access_stim_flagged_at")
if last_flagged is not None:
now = datetime.now(tz=last_flagged.tzinfo) if last_flagged.tzinfo else datetime.utcnow()
delta = now - last_flagged
if delta.days < RE_FLAG_COOLDOWN_DAYS:
return {
"profile_id": profile_id,
"flagged": False,
"reason": "cooldown",
"ratio": round(ratio, 3),
"term_minutes": term_minutes,
"orig_minutes": orig_minutes,
"last_flagged_at": last_flagged.isoformat(),
}
evidence = {
"ratio": round(ratio, 3),
"term_minutes": term_minutes,
"orig_minutes": orig_minutes,
"threshold": ACCESS_STIM_RATIO_THRESHOLD,
"window": WINDOW_LABEL,
"evaluated_at": datetime.now(tz=timezone.utc).isoformat(),
}
if dry_run:
log.info(
"[dry-run] profile %s would be flagged: ratio=%.2f:1 (term=%d, orig=%d)",
profile_id, ratio, term_minutes, orig_minutes,
)
return {
"profile_id": profile_id,
"flagged": True,
"dry_run": True,
"evidence": evidence,
}
with conn.cursor() as cur:
cur.execute(
"""
UPDATE cdr_ingestion_profiles
SET access_stim_flagged_at = NOW(),
access_stim_evidence_json = %s::jsonb
WHERE id = %s
""",
(json.dumps(evidence), profile_id),
)
conn.commit()
description = (
f"Access stimulation 3:1 threshold exceeded for profile {profile_id} "
f"(ratio {ratio:.2f}:1). Revenue reported on term_switched_access "
f"should include 47 CFR 61.3(bbb) rate-cap disclosure.\n\n"
f"Evidence: term_minutes={term_minutes:,}, orig_minutes={orig_minutes:,}, "
f"window={WINDOW_LABEL}, evaluated_at={evidence['evaluated_at']}."
)
_create_admin_todo(profile_id, description)
log.warning(
"profile %s flagged access-stim ratio=%.2f:1 (term=%d, orig=%d)",
profile_id, ratio, term_minutes, orig_minutes,
)
return {
"profile_id": profile_id,
"flagged": True,
"dry_run": False,
"evidence": evidence,
}
def run_once(*, dry_run: bool = False) -> dict:
conn = psycopg2.connect(DATABASE_URL)
summaries: list[dict] = []
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT id
FROM cdr_ingestion_profiles
WHERE active = TRUE
ORDER BY id
""",
)
profiles = cur.fetchall()
for p in profiles:
summaries.append(evaluate_profile(conn, p["id"], dry_run=dry_run))
finally:
conn.close()
flagged = sum(1 for s in summaries if s.get("flagged") and not s.get("dry_run"))
dry_flagged = sum(1 for s in summaries if s.get("flagged") and s.get("dry_run"))
return {
"evaluated": len(summaries),
"flagged": flagged,
"dry_flagged": dry_flagged,
"summaries": summaries,
}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--dry-run",
action="store_true",
help="Evaluate but do not update profiles or create ToDos.",
)
args = parser.parse_args()
result = run_once(dry_run=args.dry_run)
print(json.dumps(result, default=str, indent=2))
if __name__ == "__main__":
main()