#!/usr/bin/env python3
"""Populate Listmonk with FCC RMD deficiency data for campaign sends.
Creates a dedicated list and upserts every carrier that has major/critical
audit deficiencies. Sets subscriber attributes with issues_html so the
campaign template can render personalized issue lists.
Skips carriers tagged as is_customer=true.
"""
import json
import subprocess
import sys
import time
import urllib.request
import urllib.error
import base64
AUTH = base64.b64encode(b"api:6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y").decode()
API = "http://localhost:9100/api"
ISSUE_LABELS = {
"ss_partial_note": "STIR/SHAKEN partial implementation \u2014 upstream provider not named",
"ss_vsp_no_shaken": "Voice Service Provider without STIR/SHAKEN implementation",
"conflicting_classification": "Unusual provider classification \u2014 may need correction",
"missing_kyc": "Missing Know Your Customer (KYC) procedures",
"missing_material_change": "Missing 10-business-day material change update commitment",
"missing_dno": "Missing Do-Not-Originate (DNO) list enforcement",
"missing_traceback": "Missing 24-hour traceback response commitment",
"missing_recertification": "Missing annual recertification acknowledgment",
"missing_perjury": "Missing perjury declaration",
"missing_mitigation": "Missing robocall mitigation program details",
"no_classification": "No provider classification selected",
"ss_intermediate_complete": "Intermediate provider claims Complete STIR/SHAKEN",
"xref_name_mismatch": "Business name not found in certification document",
"xref_old_document": "Outdated certification document",
"xref_ss_mismatch": "STIR/SHAKEN status mismatch between RMD and document",
"missing_stir_shaken": "Missing STIR/SHAKEN implementation details",
"missing_enforcement": "Missing enforcement history disclosure",
"missing_provider_id": "Missing provider identification",
"no_recert_date": "No recertification date on file",
}
def api_call(method, path, data=None):
payload = json.dumps(data).encode() if data else None
req = urllib.request.Request(
API + path, data=payload, method=method,
headers={"Content-Type": "application/json", "Authorization": "Basic " + AUTH},
)
return json.loads(urllib.request.urlopen(req).read())
def build_issues_html(structured_json, pdf_json):
"""Merge structured + PDF checks, return HTML list of major/critical issues."""
all_checks = []
for cj in [structured_json, pdf_json]:
try:
checks = json.loads(cj)
if isinstance(checks, list):
all_checks.extend(checks)
except Exception:
pass
severity_order = {"critical": 0, "major": 1, "minor": 2}
all_checks.sort(key=lambda c: severity_order.get(c.get("severity", "minor"), 2))
items = []
ids = []
seen = set()
for c in all_checks:
cid = c.get("id", "")
if cid in seen:
continue
seen.add(cid)
sev = c.get("severity", "minor")
if sev == "minor":
continue
label = ISSUE_LABELS.get(cid, c.get("label", cid))
items.append("
" + label + "")
ids.append(cid)
if not items:
for c in all_checks:
cid = c.get("id", "")
label = ISSUE_LABELS.get(cid, c.get("label", cid))
items.append("" + label + "")
ids.append(cid)
html = '' + "".join(items[:6]) + "
"
return html, ids
def main():
# Query carriers with deficiencies
result = subprocess.run([
"docker", "exec", "performancewest-api-postgres-1", "psql", "-U", "pw",
"performancewest", "-t", "-A", "-F", "||", "-c",
"""SELECT a.frn, a.business_name, a.total_deficiencies, a.severity,
r.contact_email, r.contact_name, r.implementation,
r.last_recertified::text, r.rmd_number,
a.structured_checks::text, a.pdf_checks::text
FROM fcc_rmd_audit_results a
JOIN fcc_rmd r ON r.frn = a.frn
WHERE a.total_deficiencies > 0
AND a.severity IN ('major', 'critical')
AND r.contact_email IS NOT NULL
AND r.removed_from_rmd = FALSE
ORDER BY a.total_deficiencies DESC""",
], capture_output=True, text=True)
lines = [l for l in result.stdout.strip().split("\n") if l.strip()]
print(f"Found {len(lines)} carriers with major/critical deficiencies")
# Find or create list
lists_resp = api_call("GET", "/lists")
deficiency_list_id = None
for l in lists_resp.get("data", {}).get("results", []):
if "Deficiency" in l["name"] and "2026" in l["name"]:
deficiency_list_id = l["id"]
break
if not deficiency_list_id:
resp = api_call("POST", "/lists", {
"name": "FCC RMD Deficiency Alert 2026",
"type": "public",
"optin": "single",
"tags": ["fcc", "rmd", "deficiency", "2026"],
})
deficiency_list_id = resp.get("data", {}).get("id")
print(f"Created list: {deficiency_list_id}")
else:
print(f"Using existing list: {deficiency_list_id}")
added = 0
updated = 0
skipped_customer = 0
errors = 0
for line in lines:
parts = line.split("||")
if len(parts) < 11:
continue
frn = parts[0].strip()
company = parts[1].strip()
deficiency_count = int(parts[2].strip() or 0)
severity = parts[3].strip()
email = parts[4].strip().lower()
contact_name = parts[5].strip()
implementation = parts[6].strip()
last_recert = parts[7].strip()
rmd_number = parts[8].strip()
structured_json = parts[9].strip()
pdf_json = parts[10].strip()
if not email or "@" not in email:
continue
issues_html, issue_ids = build_issues_html(structured_json, pdf_json)
attribs = {
"company": company,
"fcc_frn": frn,
"rmd_number": rmd_number,
"severity": severity,
"deficiency_count": deficiency_count,
"issues_html": issues_html,
"issue_ids": ",".join(issue_ids),
"implementation": implementation,
"last_recertified": last_recert,
}
try:
resp = api_call("POST", "/subscribers", {
"email": email,
"name": contact_name or company,
"status": "enabled",
"lists": [deficiency_list_id],
"attribs": attribs,
"preconfirm_subscriptions": True,
})
added += 1
except urllib.error.HTTPError as e:
err = e.read().decode()
if "already exists" in err:
try:
search = api_call("GET", "/subscribers?query=subscribers.email%3D%27" + email.replace("'", "") + "%27&per_page=1")
results = search.get("data", {}).get("results", [])
if results:
sub_id = results[0]["id"]
existing = results[0].get("attribs", {})
if existing.get("is_customer"):
skipped_customer += 1
continue
existing.update(attribs)
api_call("PUT", f"/subscribers/{sub_id}", {
"email": email,
"name": contact_name or results[0].get("name", company),
"attribs": existing,
"status": "enabled",
})
api_call("PUT", "/subscribers/lists", {
"ids": [sub_id],
"action": "add",
"target_list_ids": [deficiency_list_id],
"status": "confirmed",
})
updated += 1
except Exception as ex:
errors += 1
else:
errors += 1
if (added + updated) % 200 == 0 and (added + updated) > 0:
print(f" Progress: {added} added, {updated} updated, {skipped_customer} skipped")
time.sleep(0.05) # rate limit
print(f"\nDone:")
print(f" Added: {added}")
print(f" Updated: {updated}")
print(f" Customers skipped: {skipped_customer}")
print(f" Errors: {errors}")
print(f" List ID: {deficiency_list_id}")
search = api_call("GET", f"/subscribers?list_id={deficiency_list_id}&per_page=1")
total = search.get("data", {}).get("total", 0)
print(f" Total subscribers on list: {total}")
if __name__ == "__main__":
main()