Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
405 lines
14 KiB
Python
405 lines
14 KiB
Python
"""
|
|
compliance_alert_list.py — Build a Listmonk mailing list of FCC carriers
|
|
who are behind on at least one filing.
|
|
|
|
Identifies non-compliant carriers from LOCAL database tables (no external
|
|
API calls) and upserts them into Listmonk list 6 "FCC Carriers - Compliance
|
|
Alert". Designed to run weekly as a systemd timer.
|
|
|
|
Compliance signals detected:
|
|
- rmd_missing: Active 499 filer (voice) with no RMD filing
|
|
- rmd_stale: RMD certification > 13 months old (annual due Mar 1)
|
|
- rmd_removed: Carrier removed from RMD for non-compliance
|
|
- cores_red_light: CORES red-light status (debt to FCC)
|
|
|
|
Usage:
|
|
python -m scripts.workers.compliance_alert_list
|
|
python -m scripts.workers.compliance_alert_list --dry-run
|
|
|
|
Environment variables:
|
|
DATABASE_URL PostgreSQL DSN
|
|
LISTMONK_URL e.g. https://lists.performancewest.net
|
|
LISTMONK_USER API username
|
|
LISTMONK_PASSWORD API token
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from collections import defaultdict
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import requests
|
|
|
|
LOG = logging.getLogger("workers.compliance_alert_list")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
LISTMONK_URL = os.environ.get(
|
|
"LISTMONK_URL", "https://lists.performancewest.net"
|
|
).rstrip("/")
|
|
LISTMONK_USER = os.environ.get("LISTMONK_USER", "api")
|
|
LISTMONK_PASS = os.environ.get(
|
|
"LISTMONK_PASSWORD", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y"
|
|
)
|
|
|
|
# Listmonk list IDs
|
|
LIST_COMPLIANCE = 6 # FCC Carriers - Compliance Alert
|
|
LIST_DIRECT = 3 # FCC Carriers - Direct Contacts
|
|
LIST_COUNSEL = 4 # FCC Carriers - Outside Counsel
|
|
|
|
DELAY = 0.05 # seconds between Listmonk API calls
|
|
|
|
ISSUE_LABELS = {
|
|
"rmd_missing": "Missing RMD filing — voice providers must register in the Robocall Mitigation Database",
|
|
"rmd_stale": "RMD recertification overdue — annual recertification was due March 1",
|
|
"rmd_removed": "Removed from RMD — carrier was removed for non-compliance with robocall mitigation rules",
|
|
"cores_red_light": "FCC CORES red-light status — outstanding debt to the FCC blocks new filings",
|
|
"recert_unknown": "RMD recertification status unknown — verify your annual certification is current",
|
|
}
|
|
|
|
|
|
def get_noncompliant_carriers(conn) -> list[dict]:
|
|
"""Query local DB for carriers behind on filings.
|
|
|
|
Returns a list of dicts, each with: email, name, frn, company, state,
|
|
issue, is_outside_counsel. A single carrier may appear multiple times
|
|
(once per issue).
|
|
"""
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
# Signal 1: Voice provider in 499 filers but missing from RMD.
|
|
# These carriers likely have no contact email (email comes from RMD),
|
|
# so we try to join back to 499 filer data for whatever we have.
|
|
# All 499 filers without an RMD entry. The service_type column is
|
|
# typically NULL in our data, so we don't filter on it — any active
|
|
# 499 filer that's not in RMD is potentially non-compliant.
|
|
# These carriers usually lack contact email (email comes from RMD),
|
|
# so most won't be importable to Listmonk. We still count them.
|
|
cur.execute("""
|
|
SELECT f.frn, f.legal_name AS company, f.state,
|
|
NULL AS email, NULL AS name, FALSE AS is_outside_counsel,
|
|
'rmd_missing' AS issue
|
|
FROM fcc_499_filers f
|
|
LEFT JOIN fcc_rmd r ON r.frn = f.frn
|
|
WHERE r.id IS NULL
|
|
""")
|
|
signal_1 = cur.fetchall()
|
|
LOG.info("Signal 1 (RMD missing): %d carriers", len(signal_1))
|
|
|
|
# Signal 2: RMD certification stale (>13 months — annual due Mar 1)
|
|
cur.execute("""
|
|
SELECT r.frn, r.business_name AS company,
|
|
r.contact_email AS email, r.contact_name AS name,
|
|
COALESCE(r.is_outside_counsel, FALSE) AS is_outside_counsel,
|
|
'rmd_stale' AS issue,
|
|
NULL AS state
|
|
FROM fcc_rmd r
|
|
WHERE r.contact_email IS NOT NULL
|
|
AND r.contact_email != ''
|
|
AND r.removed_from_rmd = FALSE
|
|
AND r.last_recertified < CURRENT_DATE - INTERVAL '13 months'
|
|
""")
|
|
signal_2 = cur.fetchall()
|
|
LOG.info("Signal 2 (RMD stale): %d carriers", len(signal_2))
|
|
|
|
# Signal 3: Removed from RMD
|
|
cur.execute("""
|
|
SELECT rm.frn, rm.business_name AS company,
|
|
rm.contact_email AS email, rm.contact_name AS name,
|
|
FALSE AS is_outside_counsel,
|
|
'rmd_removed' AS issue,
|
|
NULL AS state
|
|
FROM fcc_rmd_removed rm
|
|
WHERE rm.contact_email IS NOT NULL
|
|
AND rm.contact_email != ''
|
|
""")
|
|
signal_3 = cur.fetchall()
|
|
LOG.info("Signal 3 (RMD removed): %d carriers", len(signal_3))
|
|
|
|
# Signal 4: CORES red light
|
|
cur.execute("""
|
|
SELECT r.frn, r.business_name AS company,
|
|
r.contact_email AS email, r.contact_name AS name,
|
|
COALESCE(r.is_outside_counsel, FALSE) AS is_outside_counsel,
|
|
'cores_red_light' AS issue,
|
|
NULL AS state
|
|
FROM fcc_rmd r
|
|
WHERE r.red_light_status = 'red'
|
|
AND r.contact_email IS NOT NULL
|
|
AND r.contact_email != ''
|
|
""")
|
|
signal_4 = cur.fetchall()
|
|
LOG.info("Signal 4 (CORES red light): %d carriers", len(signal_4))
|
|
|
|
# Signal 5: RMD carrier with email but unknown recertification date.
|
|
# These carriers are in the RMD but we don't know if they recertified
|
|
# this year. They're prime candidates for a compliance check reminder.
|
|
cur.execute("""
|
|
SELECT r.frn, r.business_name AS company,
|
|
r.contact_email AS email, r.contact_name AS name,
|
|
COALESCE(r.is_outside_counsel, FALSE) AS is_outside_counsel,
|
|
'recert_unknown' AS issue,
|
|
NULL AS state
|
|
FROM fcc_rmd r
|
|
WHERE r.contact_email IS NOT NULL
|
|
AND LENGTH(r.contact_email) > 0
|
|
AND r.removed_from_rmd = FALSE
|
|
AND r.last_recertified IS NULL
|
|
""")
|
|
signal_5 = cur.fetchall()
|
|
LOG.info("Signal 5 (recert unknown): %d carriers", len(signal_5))
|
|
|
|
cur.close()
|
|
return [dict(r) for r in signal_1 + signal_2 + signal_3 + signal_4 + signal_5]
|
|
|
|
|
|
def aggregate_by_email(rows: list[dict]) -> list[dict]:
|
|
"""Deduplicate and aggregate issues per email address.
|
|
|
|
Returns one entry per email with a list of all compliance issues.
|
|
Carriers without email are dropped (can't subscribe them).
|
|
"""
|
|
by_email: dict[str, dict] = {}
|
|
for row in rows:
|
|
email = (row.get("email") or "").strip().lower()
|
|
if not email or "@" not in email:
|
|
continue
|
|
if email not in by_email:
|
|
by_email[email] = {
|
|
"email": email,
|
|
"name": row.get("name") or "",
|
|
"frn": row.get("frn") or "",
|
|
"company": row.get("company") or "",
|
|
"state": row.get("state") or "",
|
|
"is_outside_counsel": row.get("is_outside_counsel", False),
|
|
"issues": [],
|
|
}
|
|
issue = row["issue"]
|
|
if issue not in by_email[email]["issues"]:
|
|
by_email[email]["issues"].append(issue)
|
|
# Prefer non-empty values
|
|
entry = by_email[email]
|
|
if not entry["name"] and row.get("name"):
|
|
entry["name"] = row["name"]
|
|
if not entry["company"] and row.get("company"):
|
|
entry["company"] = row["company"]
|
|
if not entry["state"] and row.get("state"):
|
|
entry["state"] = row["state"]
|
|
if row.get("is_outside_counsel"):
|
|
entry["is_outside_counsel"] = True
|
|
|
|
return list(by_email.values())
|
|
|
|
|
|
def ensure_list_exists(session: requests.Session) -> int:
|
|
"""Ensure Listmonk list 6 exists. Creates it if missing. Returns list ID."""
|
|
try:
|
|
r = session.get(
|
|
f"{LISTMONK_URL}/api/lists",
|
|
params={"per_page": 50},
|
|
timeout=15,
|
|
)
|
|
lists = r.json().get("data", {}).get("results", [])
|
|
for lst in lists:
|
|
if lst["id"] == LIST_COMPLIANCE:
|
|
LOG.info("List %d already exists: %s", lst["id"], lst["name"])
|
|
return lst["id"]
|
|
except Exception as exc:
|
|
LOG.warning("Could not check existing lists: %s", exc)
|
|
|
|
# Create the list
|
|
try:
|
|
r = session.post(
|
|
f"{LISTMONK_URL}/api/lists",
|
|
json={
|
|
"name": "FCC Carriers - Compliance Alert",
|
|
"type": "public",
|
|
"optin": "single",
|
|
"tags": ["fcc", "compliance", "alert"],
|
|
},
|
|
timeout=15,
|
|
)
|
|
data = r.json().get("data", {})
|
|
list_id = data.get("id", LIST_COMPLIANCE)
|
|
LOG.info("Created list %d: %s", list_id, data.get("name"))
|
|
return list_id
|
|
except Exception as exc:
|
|
LOG.error("Could not create compliance alert list: %s", exc)
|
|
return LIST_COMPLIANCE
|
|
|
|
|
|
def upsert_subscriber(
|
|
email: str, name: str, lists: list[int],
|
|
attribs: dict, session: requests.Session,
|
|
) -> Optional[int]:
|
|
"""Create or update a Listmonk subscriber. Returns subscriber ID or None."""
|
|
payload = {
|
|
"email": email.lower().strip(),
|
|
"name": (name or "").strip() or email.split("@")[0],
|
|
"status": "enabled",
|
|
"lists": lists,
|
|
"attribs": attribs,
|
|
"preconfirm_subscriptions": True,
|
|
}
|
|
try:
|
|
r = session.post(
|
|
f"{LISTMONK_URL}/api/subscribers",
|
|
json=payload, timeout=15,
|
|
)
|
|
data = r.json().get("data", r.json())
|
|
if r.status_code == 409 or data.get("duplicate"):
|
|
# Already exists — add to lists
|
|
try:
|
|
sr = session.get(
|
|
f"{LISTMONK_URL}/api/subscribers",
|
|
params={"query": f"email = '{email.lower().strip()}'", "per_page": 1},
|
|
timeout=15,
|
|
)
|
|
existing = sr.json().get("data", {}).get("results", [])
|
|
if existing:
|
|
sub_id = existing[0]["id"]
|
|
session.put(
|
|
f"{LISTMONK_URL}/api/subscribers/lists",
|
|
json={"ids": [sub_id], "action": "add",
|
|
"target_list_ids": lists, "status": "confirmed"},
|
|
timeout=15,
|
|
)
|
|
# Update attributes
|
|
session.put(
|
|
f"{LISTMONK_URL}/api/subscribers/{sub_id}",
|
|
json={"attribs": attribs},
|
|
timeout=15,
|
|
)
|
|
return sub_id
|
|
except Exception:
|
|
pass
|
|
return None
|
|
return data.get("id")
|
|
except Exception as exc:
|
|
LOG.debug("Subscriber upsert error for %s: %s", email, exc)
|
|
return None
|
|
|
|
|
|
def run(dry_run: bool = False) -> dict:
|
|
"""Main entry point. Returns summary dict."""
|
|
summary = {
|
|
"signals": {},
|
|
"unique_emails": 0,
|
|
"upserted": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
if not DATABASE_URL:
|
|
LOG.error("DATABASE_URL not set")
|
|
return summary
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
try:
|
|
rows = get_noncompliant_carriers(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
# Count per signal
|
|
signal_counts: dict[str, int] = defaultdict(int)
|
|
for r in rows:
|
|
signal_counts[r["issue"]] += 1
|
|
summary["signals"] = dict(signal_counts)
|
|
|
|
# Aggregate by email
|
|
subscribers = aggregate_by_email(rows)
|
|
summary["unique_emails"] = len(subscribers)
|
|
|
|
LOG.info(
|
|
"Signals: %s | Unique emails: %d",
|
|
", ".join(f"{k}={v}" for k, v in sorted(signal_counts.items())),
|
|
len(subscribers),
|
|
)
|
|
|
|
if dry_run:
|
|
LOG.info("DRY RUN — not touching Listmonk")
|
|
for sub in subscribers[:5]:
|
|
LOG.info(
|
|
" %s (%s) — %s — issues: %s",
|
|
sub["email"], sub["company"], sub["frn"],
|
|
", ".join(sub["issues"]),
|
|
)
|
|
if len(subscribers) > 5:
|
|
LOG.info(" ... and %d more", len(subscribers) - 5)
|
|
return summary
|
|
|
|
# Listmonk session
|
|
session = requests.Session()
|
|
session.auth = (LISTMONK_USER, LISTMONK_PASS)
|
|
|
|
list_id = ensure_list_exists(session)
|
|
|
|
for sub in subscribers:
|
|
# Build list assignment
|
|
lists = [list_id]
|
|
if sub["is_outside_counsel"]:
|
|
lists.append(LIST_COUNSEL)
|
|
else:
|
|
lists.append(LIST_DIRECT)
|
|
|
|
# Build attributes
|
|
issues_human = "; ".join(
|
|
ISSUE_LABELS.get(i, i) for i in sub["issues"]
|
|
)
|
|
attribs = {
|
|
"fcc_frn": sub["frn"],
|
|
"company": sub["company"],
|
|
"state": sub["state"],
|
|
"compliance_issues": sub["issues"],
|
|
"compliance_issues_human": issues_human,
|
|
"is_outside_counsel": sub["is_outside_counsel"],
|
|
"source": "compliance_alert_list",
|
|
}
|
|
|
|
sub_id = upsert_subscriber(
|
|
email=sub["email"],
|
|
name=sub["name"],
|
|
lists=lists,
|
|
attribs=attribs,
|
|
session=session,
|
|
)
|
|
if sub_id:
|
|
summary["upserted"] += 1
|
|
else:
|
|
summary["skipped"] += 1
|
|
|
|
time.sleep(DELAY)
|
|
|
|
LOG.info(
|
|
"Done: %d upserted, %d skipped, %d errors",
|
|
summary["upserted"], summary["skipped"], summary["errors"],
|
|
)
|
|
return summary
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Build Listmonk mailing list of FCC carriers behind on filings",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true",
|
|
help="Query DB and print results without touching Listmonk",
|
|
)
|
|
args = parser.parse_args()
|
|
result = run(dry_run=args.dry_run)
|
|
LOG.info("Summary: %s", result)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|