Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
249 lines
8.6 KiB
Python
249 lines
8.6 KiB
Python
"""Daily access-stimulation monitor.
|
|
|
|
Implements the FCC 47 CFR 61.3(bbb) "access stimulation" detection rule:
|
|
when a local-exchange carrier's terminating interstate traffic exceeds
|
|
originating interstate traffic by a 3:1 ratio over a rolling 6-month
|
|
window, the carrier is deemed an "access-stimulating" LEC and must
|
|
either benchmark its terminating switched-access rate to the other LEC's
|
|
or refund the differential. For a CLEC reseller, tripping this threshold
|
|
is a compliance red-flag: the rate-cap disclosure must appear on the
|
|
499-A filing's ``term_switched_access`` revenue line, and an admin
|
|
review is required before the Q1 filing goes out.
|
|
|
|
Flow (for each active ``cdr_ingestion_profiles`` row):
|
|
|
|
1. Aggregate 6 months of ``cdr_traffic_studies`` into:
|
|
term_minutes = SUM(retail_minutes)
|
|
orig_minutes = SUM(wholesale_minutes)
|
|
(Retail traffic → terminating proxy; wholesale traffic →
|
|
originating proxy. Per the 499-A plan spec: this is a conservative
|
|
proxy until direction-tagged CDRs are mandatory.)
|
|
2. If ``orig_minutes > 0`` and ``term/orig > 3.0`` *and* the profile
|
|
hasn't been flagged in the last 30 days:
|
|
- Stamp ``access_stim_flagged_at=NOW()`` on the profile
|
|
- Persist evidence payload to ``access_stim_evidence_json``
|
|
- Create an admin ToDo via ERPNext for the Accounting Advisor role
|
|
|
|
Usage:
|
|
|
|
python -m scripts.workers.access_stim_monitor
|
|
python -m scripts.workers.access_stim_monitor --dry-run
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
log = logging.getLogger("access_stim_monitor")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
|
|
ACCESS_STIM_RATIO_THRESHOLD = 3.0
|
|
RE_FLAG_COOLDOWN_DAYS = 30
|
|
WINDOW_LABEL = "6m"
|
|
|
|
|
|
# ─── Admin ToDo helper ────────────────────────────────────────────────────
|
|
|
|
def _create_admin_todo(profile_id: int, description: str) -> None:
|
|
"""Create an ERPNext ToDo for the Accounting Advisor role.
|
|
|
|
Mirrors the pattern in
|
|
``scripts.workers.services.cores_frn_registration.CORESFRNRegistrationHandler._create_admin_todo``.
|
|
Failures are swallowed (logged) to avoid blocking the monitor's
|
|
evidence-persistence path.
|
|
"""
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
ERPNextClient().create_resource(
|
|
"ToDo",
|
|
{
|
|
"description": f"[access-stim-monitor] profile_id={profile_id}\n\n{description}",
|
|
"priority": "High",
|
|
"role": "Accounting Advisor",
|
|
},
|
|
)
|
|
except Exception as exc: # pragma: no cover
|
|
log.error("Could not create admin ToDo for profile %s: %s", profile_id, exc)
|
|
|
|
|
|
# ─── Monitor core ─────────────────────────────────────────────────────────
|
|
|
|
def evaluate_profile(
|
|
conn,
|
|
profile_id: int,
|
|
*,
|
|
dry_run: bool = False,
|
|
) -> dict:
|
|
"""Evaluate one profile. Returns a summary dict."""
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT COALESCE(SUM(COALESCE(retail_minutes, 0) + COALESCE(wholesale_minutes, 0)), 0)
|
|
AS total_minutes,
|
|
COALESCE(SUM(COALESCE(retail_minutes, 0)), 0) AS term_minutes,
|
|
COALESCE(SUM(COALESCE(wholesale_minutes, 0)), 0) AS orig_minutes
|
|
FROM cdr_traffic_studies
|
|
WHERE profile_id = %s
|
|
AND generated_at >= NOW() - INTERVAL '6 months'
|
|
""",
|
|
(profile_id,),
|
|
)
|
|
agg = cur.fetchone() or {}
|
|
term_minutes = int(agg.get("term_minutes") or 0)
|
|
orig_minutes = int(agg.get("orig_minutes") or 0)
|
|
|
|
if orig_minutes <= 0:
|
|
return {
|
|
"profile_id": profile_id,
|
|
"flagged": False,
|
|
"reason": "no_orig_traffic",
|
|
"term_minutes": term_minutes,
|
|
"orig_minutes": orig_minutes,
|
|
}
|
|
ratio = term_minutes / orig_minutes
|
|
if ratio <= ACCESS_STIM_RATIO_THRESHOLD:
|
|
return {
|
|
"profile_id": profile_id,
|
|
"flagged": False,
|
|
"reason": "below_threshold",
|
|
"ratio": round(ratio, 3),
|
|
"term_minutes": term_minutes,
|
|
"orig_minutes": orig_minutes,
|
|
}
|
|
|
|
# Threshold exceeded — check cooldown
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT access_stim_flagged_at
|
|
FROM cdr_ingestion_profiles
|
|
WHERE id = %s
|
|
""",
|
|
(profile_id,),
|
|
)
|
|
row = cur.fetchone() or {}
|
|
last_flagged = row.get("access_stim_flagged_at")
|
|
if last_flagged is not None:
|
|
now = datetime.now(tz=last_flagged.tzinfo) if last_flagged.tzinfo else datetime.utcnow()
|
|
delta = now - last_flagged
|
|
if delta.days < RE_FLAG_COOLDOWN_DAYS:
|
|
return {
|
|
"profile_id": profile_id,
|
|
"flagged": False,
|
|
"reason": "cooldown",
|
|
"ratio": round(ratio, 3),
|
|
"term_minutes": term_minutes,
|
|
"orig_minutes": orig_minutes,
|
|
"last_flagged_at": last_flagged.isoformat(),
|
|
}
|
|
|
|
evidence = {
|
|
"ratio": round(ratio, 3),
|
|
"term_minutes": term_minutes,
|
|
"orig_minutes": orig_minutes,
|
|
"threshold": ACCESS_STIM_RATIO_THRESHOLD,
|
|
"window": WINDOW_LABEL,
|
|
"evaluated_at": datetime.now(tz=timezone.utc).isoformat(),
|
|
}
|
|
|
|
if dry_run:
|
|
log.info(
|
|
"[dry-run] profile %s would be flagged: ratio=%.2f:1 (term=%d, orig=%d)",
|
|
profile_id, ratio, term_minutes, orig_minutes,
|
|
)
|
|
return {
|
|
"profile_id": profile_id,
|
|
"flagged": True,
|
|
"dry_run": True,
|
|
"evidence": evidence,
|
|
}
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE cdr_ingestion_profiles
|
|
SET access_stim_flagged_at = NOW(),
|
|
access_stim_evidence_json = %s::jsonb
|
|
WHERE id = %s
|
|
""",
|
|
(json.dumps(evidence), profile_id),
|
|
)
|
|
conn.commit()
|
|
|
|
description = (
|
|
f"Access stimulation 3:1 threshold exceeded for profile {profile_id} "
|
|
f"(ratio {ratio:.2f}:1). Revenue reported on term_switched_access "
|
|
f"should include 47 CFR 61.3(bbb) rate-cap disclosure.\n\n"
|
|
f"Evidence: term_minutes={term_minutes:,}, orig_minutes={orig_minutes:,}, "
|
|
f"window={WINDOW_LABEL}, evaluated_at={evidence['evaluated_at']}."
|
|
)
|
|
_create_admin_todo(profile_id, description)
|
|
log.warning(
|
|
"profile %s flagged access-stim ratio=%.2f:1 (term=%d, orig=%d)",
|
|
profile_id, ratio, term_minutes, orig_minutes,
|
|
)
|
|
return {
|
|
"profile_id": profile_id,
|
|
"flagged": True,
|
|
"dry_run": False,
|
|
"evidence": evidence,
|
|
}
|
|
|
|
|
|
def run_once(*, dry_run: bool = False) -> dict:
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
summaries: list[dict] = []
|
|
try:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT id
|
|
FROM cdr_ingestion_profiles
|
|
WHERE active = TRUE
|
|
ORDER BY id
|
|
""",
|
|
)
|
|
profiles = cur.fetchall()
|
|
for p in profiles:
|
|
summaries.append(evaluate_profile(conn, p["id"], dry_run=dry_run))
|
|
finally:
|
|
conn.close()
|
|
flagged = sum(1 for s in summaries if s.get("flagged") and not s.get("dry_run"))
|
|
dry_flagged = sum(1 for s in summaries if s.get("flagged") and s.get("dry_run"))
|
|
return {
|
|
"evaluated": len(summaries),
|
|
"flagged": flagged,
|
|
"dry_flagged": dry_flagged,
|
|
"summaries": summaries,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Evaluate but do not update profiles or create ToDos.",
|
|
)
|
|
args = parser.parse_args()
|
|
result = run_once(dry_run=args.dry_run)
|
|
print(json.dumps(result, default=str, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|