""" rmd_deficiency_campaign.py — Populate a Listmonk mailing list with carriers whose 2026 RMD filings have deficiencies, including per-carrier issue details for mail merge. Creates/updates Listmonk list 7 "FCC RMD Deficiency Alert" with subscriber attributes containing the specific issues found in their filing. Usage: python -m workers.rmd_deficiency_campaign python -m workers.rmd_deficiency_campaign --dry-run """ from __future__ import annotations import json import logging import os import sys import psycopg2 import psycopg2.extras import requests LOG = logging.getLogger("workers.rmd_deficiency_campaign") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout, ) DATABASE_URL = os.environ.get("DATABASE_URL", "") LISTMONK_URL = os.environ.get("LISTMONK_URL", "https://lists.performancewest.net") LISTMONK_USER = os.environ.get("LISTMONK_USER", "api") LISTMONK_PASS = os.environ.get("LISTMONK_PASS", "") LIST_ID = 7 # "FCC RMD Deficiency Alert" LIST_NAME = "FCC RMD Deficiency Alert" # Human-readable issue labels for email template ISSUE_LABELS = { "missing_kyc": "Missing Know Your Customer (KYC) procedures — required under 2025 RMD Report & Order", "missing_material_change": "Missing 10-business-day material change update commitment — required effective Feb 5, 2026", "missing_dno": "Missing Do-Not-Originate (DNO) list enforcement — emphasized in 2026 requirements", "ss_vsp_no_shaken": "Voice Service Provider without STIR/SHAKEN implementation — VSPs must implement unless exempt", "ss_intermediate_complete": "Intermediate provider claims Complete STIR/SHAKEN — intermediates cannot sign calls", "missing_traceback": "Missing 24-hour traceback response commitment", "missing_recertification": "Missing annual recertification acknowledgment (March 1 deadline)", "missing_perjury": "Missing perjury declaration in uploaded document", "missing_stir_shaken": "Missing STIR/SHAKEN implementation details", "missing_mitigation": "Missing robocall mitigation program description", "missing_provider_id": "Missing provider identification (FRN/entity details)", "missing_classification": "Missing provider classification", "missing_enforcement": "Missing enforcement history disclosure", "xref_ss_mismatch": "STIR/SHAKEN status in document doesn't match structured data", "xref_old_document": "Uploaded document references outdated year — may not reflect 2026 requirements", "xref_name_mismatch": "Business name in uploaded document doesn't match RMD record", "no_classification": "No provider classification selected (critical)", "no_recert_date": "No recertification date on file", "ss_partial_note": "Partial STIR/SHAKEN — upstream provider should be named", } def get_deficient_carriers(conn) -> list[dict]: """Get all carriers with RMD deficiencies and their contact info.""" cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) # Pull emails from multiple sources: RMD contact_email, or fallback to # the email we scraped via the ServiceNow SP API (Phase 2 of fcc_rmd_scraper) cur.execute(""" SELECT a.rmd_number, a.frn, a.business_name, a.severity, a.total_deficiencies, a.structured_checks, a.pdf_checks, COALESCE(r.contact_email, '') AS contact_email, COALESCE(r.contact_name, '') AS contact_name, r.implementation, r.voice_service_provider, r.gateway_provider, r.intermediate_provider, r.last_recertified, r.servicenow_sys_id FROM fcc_rmd_audit_results a JOIN fcc_rmd r ON r.rmd_number = a.rmd_number WHERE a.total_deficiencies > 0 AND a.severity IN ('major', 'critical') AND (r.removed_from_rmd = FALSE OR r.removed_from_rmd IS NULL) ORDER BY a.total_deficiencies DESC """) carriers = [] for row in cur.fetchall(): row = dict(row) # Collect all issue IDs from structured + pdf checks issues = [] for check_list in [row.get("structured_checks") or [], row.get("pdf_checks") or []]: if isinstance(check_list, str): check_list = json.loads(check_list) for check in check_list: issue_id = check.get("id", "") # Skip contact email/name issues if issue_id in ("missing_contact_email", "missing_contact_name"): continue issues.append({ "id": issue_id, "label": ISSUE_LABELS.get(issue_id, check.get("label", issue_id)), "severity": check.get("severity", "major"), }) if not issues: continue # Build human-readable issue list for the email issue_bullets = [] for iss in issues: icon = "🔴" if iss["severity"] == "critical" else "🟡" if iss["severity"] == "major" else "🟢" issue_bullets.append(f"{icon} {iss['label']}") raw_email = (row.get("contact_email") or "").strip().lower() if not raw_email or "@" not in raw_email: continue # Skip carriers without email carriers.append({ "email": raw_email, "name": row.get("contact_name", ""), "company": row["business_name"], "frn": row["frn"], "rmd_number": row["rmd_number"], "severity": row["severity"], "deficiency_count": len(issues), "issues_human": "\n".join(issue_bullets), "issues_html": "", "issue_ids": [iss["id"] for iss in issues], "last_recertified": str(row.get("last_recertified", "")), "implementation": row.get("implementation", ""), }) LOG.info("Found %d carriers with deficiencies and contact emails", len(carriers)) return carriers def ensure_list(session: requests.Session) -> int: """Create Listmonk list if it doesn't exist. Returns list ID.""" # Check if list exists r = session.get(f"{LISTMONK_URL}/api/lists", timeout=10) if r.ok: for lst in r.json().get("data", {}).get("results", []): if lst.get("id") == LIST_ID or lst.get("name") == LIST_NAME: LOG.info("List exists: %s (id=%d)", lst["name"], lst["id"]) return lst["id"] # Create it r = session.post(f"{LISTMONK_URL}/api/lists", json={ "name": LIST_NAME, "type": "public", "optin": "single", "tags": ["rmd", "deficiency", "2026"], }, timeout=10) if r.ok: lid = r.json().get("data", {}).get("id", LIST_ID) LOG.info("Created list: %s (id=%d)", LIST_NAME, lid) return lid LOG.warning("Could not create list: %s", r.text[:200]) return LIST_ID def upsert_subscribers(session: requests.Session, carriers: list[dict], list_id: int, dry_run: bool = False) -> int: """Upsert carriers as Listmonk subscribers with attributes.""" count = 0 for carrier in carriers: attribs = { "company": carrier["company"], "fcc_frn": carrier["frn"], "rmd_number": carrier["rmd_number"], "severity": carrier["severity"], "deficiency_count": carrier["deficiency_count"], "issues_html": carrier["issues_html"], "issues_human": carrier["issues_human"], "issue_ids": ",".join(carrier["issue_ids"]), "last_recertified": carrier["last_recertified"], "implementation": carrier["implementation"], } if dry_run: LOG.info("DRY RUN: %s (%s) — %d issues", carrier["company"], carrier["email"], carrier["deficiency_count"]) count += 1 continue payload = { "email": carrier["email"], "name": carrier.get("name") or carrier["company"], "status": "enabled", "lists": [list_id], "attribs": attribs, "preconfirm_subscriptions": True, } r = session.post(f"{LISTMONK_URL}/api/subscribers", json=payload, timeout=10) if r.ok: count += 1 elif r.status_code == 409: # Already exists — update attributes # Get subscriber ID sr = session.get( f"{LISTMONK_URL}/api/subscribers", params={"query": f"subscribers.email = '{carrier['email']}'"}, timeout=10, ) if sr.ok: results = sr.json().get("data", {}).get("results", []) if results: sub_id = results[0]["id"] # Update attributes session.put( f"{LISTMONK_URL}/api/subscribers/{sub_id}", json={"attribs": attribs}, timeout=10, ) # Add to list session.put( f"{LISTMONK_URL}/api/subscribers/lists", json={"ids": [sub_id], "action": "add", "target_list_ids": [list_id]}, timeout=10, ) count += 1 else: LOG.warning("Subscriber upsert failed for %s: %s", carrier["email"], r.text[:100]) if count % 100 == 0 and count > 0: LOG.info("Progress: %d/%d subscribers", count, len(carriers)) return count def main(): import argparse parser = argparse.ArgumentParser(description="Populate Listmonk with RMD deficiency data") parser.add_argument("--dry-run", action="store_true", help="Don't write to Listmonk") args = parser.parse_args() if not DATABASE_URL: LOG.error("DATABASE_URL not set") sys.exit(1) conn = psycopg2.connect(DATABASE_URL) carriers = get_deficient_carriers(conn) conn.close() if not carriers: LOG.info("No carriers with deficiencies found") return LOG.info("Carriers with deficiencies + email: %d", len(carriers)) LOG.info("Top issues:") from collections import Counter issue_counts = Counter() for c in carriers: for iid in c["issue_ids"]: issue_counts[iid] += 1 for issue_id, cnt in issue_counts.most_common(10): LOG.info(" %s: %d carriers", ISSUE_LABELS.get(issue_id, issue_id)[:60], cnt) if args.dry_run: LOG.info("DRY RUN — not writing to Listmonk") upsert_subscribers(None, carriers, LIST_ID, dry_run=True) return if not LISTMONK_PASS: LOG.error("LISTMONK_PASS not set") sys.exit(1) session = requests.Session() session.auth = (LISTMONK_USER, LISTMONK_PASS) list_id = ensure_list(session) count = upsert_subscribers(session, carriers, list_id) LOG.info("Done: %d subscribers upserted to list %d (%s)", count, list_id, LIST_NAME) if __name__ == "__main__": main()