#!/usr/bin/env python3 """Populate Listmonk with FCC RMD deficiency data for campaign sends. Creates a dedicated list and upserts every carrier that has major/critical audit deficiencies. Sets subscriber attributes with issues_html so the campaign template can render personalized issue lists. Skips carriers tagged as is_customer=true. """ import json import subprocess import sys import time import urllib.request import urllib.error import base64 AUTH = base64.b64encode(b"api:6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y").decode() API = "http://localhost:9100/api" ISSUE_LABELS = { "ss_partial_note": "STIR/SHAKEN partial implementation \u2014 upstream provider not named", "ss_vsp_no_shaken": "Voice Service Provider without STIR/SHAKEN implementation", "conflicting_classification": "Unusual provider classification \u2014 may need correction", "missing_kyc": "Missing Know Your Customer (KYC) procedures", "missing_material_change": "Missing 10-business-day material change update commitment", "missing_dno": "Missing Do-Not-Originate (DNO) list enforcement", "missing_traceback": "Missing 24-hour traceback response commitment", "missing_recertification": "Missing annual recertification acknowledgment", "missing_perjury": "Missing perjury declaration", "missing_mitigation": "Missing robocall mitigation program details", "no_classification": "No provider classification selected", "ss_intermediate_complete": "Intermediate provider claims Complete STIR/SHAKEN", "xref_name_mismatch": "Business name not found in certification document", "xref_old_document": "Outdated certification document", "xref_ss_mismatch": "STIR/SHAKEN status mismatch between RMD and document", "missing_stir_shaken": "Missing STIR/SHAKEN implementation details", "missing_enforcement": "Missing enforcement history disclosure", "missing_provider_id": "Missing provider identification", "no_recert_date": "No recertification date on file", } def api_call(method, path, data=None): payload = json.dumps(data).encode() if data else None req = urllib.request.Request( API + path, data=payload, method=method, headers={"Content-Type": "application/json", "Authorization": "Basic " + AUTH}, ) return json.loads(urllib.request.urlopen(req).read()) def build_issues_html(structured_json, pdf_json): """Merge structured + PDF checks, return HTML list of major/critical issues.""" all_checks = [] for cj in [structured_json, pdf_json]: try: checks = json.loads(cj) if isinstance(checks, list): all_checks.extend(checks) except Exception: pass severity_order = {"critical": 0, "major": 1, "minor": 2} all_checks.sort(key=lambda c: severity_order.get(c.get("severity", "minor"), 2)) items = [] ids = [] seen = set() for c in all_checks: cid = c.get("id", "") if cid in seen: continue seen.add(cid) sev = c.get("severity", "minor") if sev == "minor": continue label = ISSUE_LABELS.get(cid, c.get("label", cid)) items.append("
  • " + label + "
  • ") ids.append(cid) if not items: for c in all_checks: cid = c.get("id", "") label = ISSUE_LABELS.get(cid, c.get("label", cid)) items.append("
  • " + label + "
  • ") ids.append(cid) html = '" return html, ids def main(): # Query carriers with deficiencies result = subprocess.run([ "docker", "exec", "performancewest-api-postgres-1", "psql", "-U", "pw", "performancewest", "-t", "-A", "-F", "||", "-c", """SELECT a.frn, a.business_name, a.total_deficiencies, a.severity, r.contact_email, r.contact_name, r.implementation, r.last_recertified::text, r.rmd_number, a.structured_checks::text, a.pdf_checks::text FROM fcc_rmd_audit_results a JOIN fcc_rmd r ON r.frn = a.frn WHERE a.total_deficiencies > 0 AND a.severity IN ('major', 'critical') AND r.contact_email IS NOT NULL AND r.removed_from_rmd = FALSE ORDER BY a.total_deficiencies DESC""", ], capture_output=True, text=True) lines = [l for l in result.stdout.strip().split("\n") if l.strip()] print(f"Found {len(lines)} carriers with major/critical deficiencies") # Find or create list lists_resp = api_call("GET", "/lists") deficiency_list_id = None for l in lists_resp.get("data", {}).get("results", []): if "Deficiency" in l["name"] and "2026" in l["name"]: deficiency_list_id = l["id"] break if not deficiency_list_id: resp = api_call("POST", "/lists", { "name": "FCC RMD Deficiency Alert 2026", "type": "public", "optin": "single", "tags": ["fcc", "rmd", "deficiency", "2026"], }) deficiency_list_id = resp.get("data", {}).get("id") print(f"Created list: {deficiency_list_id}") else: print(f"Using existing list: {deficiency_list_id}") added = 0 updated = 0 skipped_customer = 0 errors = 0 for line in lines: parts = line.split("||") if len(parts) < 11: continue frn = parts[0].strip() company = parts[1].strip() deficiency_count = int(parts[2].strip() or 0) severity = parts[3].strip() email = parts[4].strip().lower() contact_name = parts[5].strip() implementation = parts[6].strip() last_recert = parts[7].strip() rmd_number = parts[8].strip() structured_json = parts[9].strip() pdf_json = parts[10].strip() if not email or "@" not in email: continue issues_html, issue_ids = build_issues_html(structured_json, pdf_json) attribs = { "company": company, "fcc_frn": frn, "rmd_number": rmd_number, "severity": severity, "deficiency_count": deficiency_count, "issues_html": issues_html, "issue_ids": ",".join(issue_ids), "implementation": implementation, "last_recertified": last_recert, } try: resp = api_call("POST", "/subscribers", { "email": email, "name": contact_name or company, "status": "enabled", "lists": [deficiency_list_id], "attribs": attribs, "preconfirm_subscriptions": True, }) added += 1 except urllib.error.HTTPError as e: err = e.read().decode() if "already exists" in err: try: search = api_call("GET", "/subscribers?query=subscribers.email%3D%27" + email.replace("'", "") + "%27&per_page=1") results = search.get("data", {}).get("results", []) if results: sub_id = results[0]["id"] existing = results[0].get("attribs", {}) if existing.get("is_customer"): skipped_customer += 1 continue existing.update(attribs) api_call("PUT", f"/subscribers/{sub_id}", { "email": email, "name": contact_name or results[0].get("name", company), "attribs": existing, "status": "enabled", }) api_call("PUT", "/subscribers/lists", { "ids": [sub_id], "action": "add", "target_list_ids": [deficiency_list_id], "status": "confirmed", }) updated += 1 except Exception as ex: errors += 1 else: errors += 1 if (added + updated) % 200 == 0 and (added + updated) > 0: print(f" Progress: {added} added, {updated} updated, {skipped_customer} skipped") time.sleep(0.05) # rate limit print(f"\nDone:") print(f" Added: {added}") print(f" Updated: {updated}") print(f" Customers skipped: {skipped_customer}") print(f" Errors: {errors}") print(f" List ID: {deficiency_list_id}") search = api_call("GET", f"/subscribers?list_id={deficiency_list_id}&per_page=1") total = search.get("data", {}).get("total", 0) print(f" Total subscribers on list: {total}") if __name__ == "__main__": main()