new-site/scripts/workers/compliance_alert_list.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

405 lines
14 KiB
Python

"""
compliance_alert_list.py — Build a Listmonk mailing list of FCC carriers
who are behind on at least one filing.
Identifies non-compliant carriers from LOCAL database tables (no external
API calls) and upserts them into Listmonk list 6 "FCC Carriers - Compliance
Alert". Designed to run weekly as a systemd timer.
Compliance signals detected:
- rmd_missing: Active 499 filer (voice) with no RMD filing
- rmd_stale: RMD certification > 13 months old (annual due Mar 1)
- rmd_removed: Carrier removed from RMD for non-compliance
- cores_red_light: CORES red-light status (debt to FCC)
Usage:
python -m scripts.workers.compliance_alert_list
python -m scripts.workers.compliance_alert_list --dry-run
Environment variables:
DATABASE_URL PostgreSQL DSN
LISTMONK_URL e.g. https://lists.performancewest.net
LISTMONK_USER API username
LISTMONK_PASSWORD API token
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
import time
from collections import defaultdict
from typing import Optional
import psycopg2
import psycopg2.extras
import requests
LOG = logging.getLogger("workers.compliance_alert_list")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
LISTMONK_URL = os.environ.get(
"LISTMONK_URL", "https://lists.performancewest.net"
).rstrip("/")
LISTMONK_USER = os.environ.get("LISTMONK_USER", "api")
LISTMONK_PASS = os.environ.get(
"LISTMONK_PASSWORD", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y"
)
# Listmonk list IDs
LIST_COMPLIANCE = 6 # FCC Carriers - Compliance Alert
LIST_DIRECT = 3 # FCC Carriers - Direct Contacts
LIST_COUNSEL = 4 # FCC Carriers - Outside Counsel
DELAY = 0.05 # seconds between Listmonk API calls
ISSUE_LABELS = {
"rmd_missing": "Missing RMD filing — voice providers must register in the Robocall Mitigation Database",
"rmd_stale": "RMD recertification overdue — annual recertification was due March 1",
"rmd_removed": "Removed from RMD — carrier was removed for non-compliance with robocall mitigation rules",
"cores_red_light": "FCC CORES red-light status — outstanding debt to the FCC blocks new filings",
"recert_unknown": "RMD recertification status unknown — verify your annual certification is current",
}
def get_noncompliant_carriers(conn) -> list[dict]:
"""Query local DB for carriers behind on filings.
Returns a list of dicts, each with: email, name, frn, company, state,
issue, is_outside_counsel. A single carrier may appear multiple times
(once per issue).
"""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# Signal 1: Voice provider in 499 filers but missing from RMD.
# These carriers likely have no contact email (email comes from RMD),
# so we try to join back to 499 filer data for whatever we have.
# All 499 filers without an RMD entry. The service_type column is
# typically NULL in our data, so we don't filter on it — any active
# 499 filer that's not in RMD is potentially non-compliant.
# These carriers usually lack contact email (email comes from RMD),
# so most won't be importable to Listmonk. We still count them.
cur.execute("""
SELECT f.frn, f.legal_name AS company, f.state,
NULL AS email, NULL AS name, FALSE AS is_outside_counsel,
'rmd_missing' AS issue
FROM fcc_499_filers f
LEFT JOIN fcc_rmd r ON r.frn = f.frn
WHERE r.id IS NULL
""")
signal_1 = cur.fetchall()
LOG.info("Signal 1 (RMD missing): %d carriers", len(signal_1))
# Signal 2: RMD certification stale (>13 months — annual due Mar 1)
cur.execute("""
SELECT r.frn, r.business_name AS company,
r.contact_email AS email, r.contact_name AS name,
COALESCE(r.is_outside_counsel, FALSE) AS is_outside_counsel,
'rmd_stale' AS issue,
NULL AS state
FROM fcc_rmd r
WHERE r.contact_email IS NOT NULL
AND r.contact_email != ''
AND r.removed_from_rmd = FALSE
AND r.last_recertified < CURRENT_DATE - INTERVAL '13 months'
""")
signal_2 = cur.fetchall()
LOG.info("Signal 2 (RMD stale): %d carriers", len(signal_2))
# Signal 3: Removed from RMD
cur.execute("""
SELECT rm.frn, rm.business_name AS company,
rm.contact_email AS email, rm.contact_name AS name,
FALSE AS is_outside_counsel,
'rmd_removed' AS issue,
NULL AS state
FROM fcc_rmd_removed rm
WHERE rm.contact_email IS NOT NULL
AND rm.contact_email != ''
""")
signal_3 = cur.fetchall()
LOG.info("Signal 3 (RMD removed): %d carriers", len(signal_3))
# Signal 4: CORES red light
cur.execute("""
SELECT r.frn, r.business_name AS company,
r.contact_email AS email, r.contact_name AS name,
COALESCE(r.is_outside_counsel, FALSE) AS is_outside_counsel,
'cores_red_light' AS issue,
NULL AS state
FROM fcc_rmd r
WHERE r.red_light_status = 'red'
AND r.contact_email IS NOT NULL
AND r.contact_email != ''
""")
signal_4 = cur.fetchall()
LOG.info("Signal 4 (CORES red light): %d carriers", len(signal_4))
# Signal 5: RMD carrier with email but unknown recertification date.
# These carriers are in the RMD but we don't know if they recertified
# this year. They're prime candidates for a compliance check reminder.
cur.execute("""
SELECT r.frn, r.business_name AS company,
r.contact_email AS email, r.contact_name AS name,
COALESCE(r.is_outside_counsel, FALSE) AS is_outside_counsel,
'recert_unknown' AS issue,
NULL AS state
FROM fcc_rmd r
WHERE r.contact_email IS NOT NULL
AND LENGTH(r.contact_email) > 0
AND r.removed_from_rmd = FALSE
AND r.last_recertified IS NULL
""")
signal_5 = cur.fetchall()
LOG.info("Signal 5 (recert unknown): %d carriers", len(signal_5))
cur.close()
return [dict(r) for r in signal_1 + signal_2 + signal_3 + signal_4 + signal_5]
def aggregate_by_email(rows: list[dict]) -> list[dict]:
"""Deduplicate and aggregate issues per email address.
Returns one entry per email with a list of all compliance issues.
Carriers without email are dropped (can't subscribe them).
"""
by_email: dict[str, dict] = {}
for row in rows:
email = (row.get("email") or "").strip().lower()
if not email or "@" not in email:
continue
if email not in by_email:
by_email[email] = {
"email": email,
"name": row.get("name") or "",
"frn": row.get("frn") or "",
"company": row.get("company") or "",
"state": row.get("state") or "",
"is_outside_counsel": row.get("is_outside_counsel", False),
"issues": [],
}
issue = row["issue"]
if issue not in by_email[email]["issues"]:
by_email[email]["issues"].append(issue)
# Prefer non-empty values
entry = by_email[email]
if not entry["name"] and row.get("name"):
entry["name"] = row["name"]
if not entry["company"] and row.get("company"):
entry["company"] = row["company"]
if not entry["state"] and row.get("state"):
entry["state"] = row["state"]
if row.get("is_outside_counsel"):
entry["is_outside_counsel"] = True
return list(by_email.values())
def ensure_list_exists(session: requests.Session) -> int:
"""Ensure Listmonk list 6 exists. Creates it if missing. Returns list ID."""
try:
r = session.get(
f"{LISTMONK_URL}/api/lists",
params={"per_page": 50},
timeout=15,
)
lists = r.json().get("data", {}).get("results", [])
for lst in lists:
if lst["id"] == LIST_COMPLIANCE:
LOG.info("List %d already exists: %s", lst["id"], lst["name"])
return lst["id"]
except Exception as exc:
LOG.warning("Could not check existing lists: %s", exc)
# Create the list
try:
r = session.post(
f"{LISTMONK_URL}/api/lists",
json={
"name": "FCC Carriers - Compliance Alert",
"type": "public",
"optin": "single",
"tags": ["fcc", "compliance", "alert"],
},
timeout=15,
)
data = r.json().get("data", {})
list_id = data.get("id", LIST_COMPLIANCE)
LOG.info("Created list %d: %s", list_id, data.get("name"))
return list_id
except Exception as exc:
LOG.error("Could not create compliance alert list: %s", exc)
return LIST_COMPLIANCE
def upsert_subscriber(
email: str, name: str, lists: list[int],
attribs: dict, session: requests.Session,
) -> Optional[int]:
"""Create or update a Listmonk subscriber. Returns subscriber ID or None."""
payload = {
"email": email.lower().strip(),
"name": (name or "").strip() or email.split("@")[0],
"status": "enabled",
"lists": lists,
"attribs": attribs,
"preconfirm_subscriptions": True,
}
try:
r = session.post(
f"{LISTMONK_URL}/api/subscribers",
json=payload, timeout=15,
)
data = r.json().get("data", r.json())
if r.status_code == 409 or data.get("duplicate"):
# Already exists — add to lists
try:
sr = session.get(
f"{LISTMONK_URL}/api/subscribers",
params={"query": f"email = '{email.lower().strip()}'", "per_page": 1},
timeout=15,
)
existing = sr.json().get("data", {}).get("results", [])
if existing:
sub_id = existing[0]["id"]
session.put(
f"{LISTMONK_URL}/api/subscribers/lists",
json={"ids": [sub_id], "action": "add",
"target_list_ids": lists, "status": "confirmed"},
timeout=15,
)
# Update attributes
session.put(
f"{LISTMONK_URL}/api/subscribers/{sub_id}",
json={"attribs": attribs},
timeout=15,
)
return sub_id
except Exception:
pass
return None
return data.get("id")
except Exception as exc:
LOG.debug("Subscriber upsert error for %s: %s", email, exc)
return None
def run(dry_run: bool = False) -> dict:
"""Main entry point. Returns summary dict."""
summary = {
"signals": {},
"unique_emails": 0,
"upserted": 0,
"skipped": 0,
"errors": 0,
}
if not DATABASE_URL:
LOG.error("DATABASE_URL not set")
return summary
conn = psycopg2.connect(DATABASE_URL)
try:
rows = get_noncompliant_carriers(conn)
finally:
conn.close()
# Count per signal
signal_counts: dict[str, int] = defaultdict(int)
for r in rows:
signal_counts[r["issue"]] += 1
summary["signals"] = dict(signal_counts)
# Aggregate by email
subscribers = aggregate_by_email(rows)
summary["unique_emails"] = len(subscribers)
LOG.info(
"Signals: %s | Unique emails: %d",
", ".join(f"{k}={v}" for k, v in sorted(signal_counts.items())),
len(subscribers),
)
if dry_run:
LOG.info("DRY RUN — not touching Listmonk")
for sub in subscribers[:5]:
LOG.info(
" %s (%s) — %s — issues: %s",
sub["email"], sub["company"], sub["frn"],
", ".join(sub["issues"]),
)
if len(subscribers) > 5:
LOG.info(" ... and %d more", len(subscribers) - 5)
return summary
# Listmonk session
session = requests.Session()
session.auth = (LISTMONK_USER, LISTMONK_PASS)
list_id = ensure_list_exists(session)
for sub in subscribers:
# Build list assignment
lists = [list_id]
if sub["is_outside_counsel"]:
lists.append(LIST_COUNSEL)
else:
lists.append(LIST_DIRECT)
# Build attributes
issues_human = "; ".join(
ISSUE_LABELS.get(i, i) for i in sub["issues"]
)
attribs = {
"fcc_frn": sub["frn"],
"company": sub["company"],
"state": sub["state"],
"compliance_issues": sub["issues"],
"compliance_issues_human": issues_human,
"is_outside_counsel": sub["is_outside_counsel"],
"source": "compliance_alert_list",
}
sub_id = upsert_subscriber(
email=sub["email"],
name=sub["name"],
lists=lists,
attribs=attribs,
session=session,
)
if sub_id:
summary["upserted"] += 1
else:
summary["skipped"] += 1
time.sleep(DELAY)
LOG.info(
"Done: %d upserted, %d skipped, %d errors",
summary["upserted"], summary["skipped"], summary["errors"],
)
return summary
def main():
parser = argparse.ArgumentParser(
description="Build Listmonk mailing list of FCC carriers behind on filings",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Query DB and print results without touching Listmonk",
)
args = parser.parse_args()
result = run(dry_run=args.dry_run)
LOG.info("Summary: %s", result)
if __name__ == "__main__":
main()