new-site/scripts/workers/rmd_deficiency_campaign.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

294 lines
11 KiB
Python

"""
rmd_deficiency_campaign.py — Populate a Listmonk mailing list with carriers
whose 2026 RMD filings have deficiencies, including per-carrier issue details
for mail merge.
Creates/updates Listmonk list 7 "FCC RMD Deficiency Alert" with subscriber
attributes containing the specific issues found in their filing.
Usage:
python -m workers.rmd_deficiency_campaign
python -m workers.rmd_deficiency_campaign --dry-run
"""
from __future__ import annotations
import json
import logging
import os
import sys
import psycopg2
import psycopg2.extras
import requests
LOG = logging.getLogger("workers.rmd_deficiency_campaign")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
LISTMONK_URL = os.environ.get("LISTMONK_URL", "https://lists.performancewest.net")
LISTMONK_USER = os.environ.get("LISTMONK_USER", "api")
LISTMONK_PASS = os.environ.get("LISTMONK_PASS", "")
LIST_ID = 7 # "FCC RMD Deficiency Alert"
LIST_NAME = "FCC RMD Deficiency Alert"
# Human-readable issue labels for email template
ISSUE_LABELS = {
"missing_kyc": "Missing Know Your Customer (KYC) procedures — required under 2025 RMD Report & Order",
"missing_material_change": "Missing 10-business-day material change update commitment — required effective Feb 5, 2026",
"missing_dno": "Missing Do-Not-Originate (DNO) list enforcement — emphasized in 2026 requirements",
"ss_vsp_no_shaken": "Voice Service Provider without STIR/SHAKEN implementation — VSPs must implement unless exempt",
"ss_intermediate_complete": "Intermediate provider claims Complete STIR/SHAKEN — intermediates cannot sign calls",
"missing_traceback": "Missing 24-hour traceback response commitment",
"missing_recertification": "Missing annual recertification acknowledgment (March 1 deadline)",
"missing_perjury": "Missing perjury declaration in uploaded document",
"missing_stir_shaken": "Missing STIR/SHAKEN implementation details",
"missing_mitigation": "Missing robocall mitigation program description",
"missing_provider_id": "Missing provider identification (FRN/entity details)",
"missing_classification": "Missing provider classification",
"missing_enforcement": "Missing enforcement history disclosure",
"xref_ss_mismatch": "STIR/SHAKEN status in document doesn't match structured data",
"xref_old_document": "Uploaded document references outdated year — may not reflect 2026 requirements",
"xref_name_mismatch": "Business name in uploaded document doesn't match RMD record",
"no_classification": "No provider classification selected (critical)",
"no_recert_date": "No recertification date on file",
"ss_partial_note": "Partial STIR/SHAKEN — upstream provider should be named",
}
def get_deficient_carriers(conn) -> list[dict]:
"""Get all carriers with RMD deficiencies and their contact info."""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# Pull emails from multiple sources: RMD contact_email, or fallback to
# the email we scraped via the ServiceNow SP API (Phase 2 of fcc_rmd_scraper)
cur.execute("""
SELECT
a.rmd_number,
a.frn,
a.business_name,
a.severity,
a.total_deficiencies,
a.structured_checks,
a.pdf_checks,
COALESCE(r.contact_email, '') AS contact_email,
COALESCE(r.contact_name, '') AS contact_name,
r.implementation,
r.voice_service_provider,
r.gateway_provider,
r.intermediate_provider,
r.last_recertified,
r.servicenow_sys_id
FROM fcc_rmd_audit_results a
JOIN fcc_rmd r ON r.rmd_number = a.rmd_number
WHERE a.total_deficiencies > 0
AND a.severity IN ('major', 'critical')
AND (r.removed_from_rmd = FALSE OR r.removed_from_rmd IS NULL)
ORDER BY a.total_deficiencies DESC
""")
carriers = []
for row in cur.fetchall():
row = dict(row)
# Collect all issue IDs from structured + pdf checks
issues = []
for check_list in [row.get("structured_checks") or [], row.get("pdf_checks") or []]:
if isinstance(check_list, str):
check_list = json.loads(check_list)
for check in check_list:
issue_id = check.get("id", "")
# Skip contact email/name issues
if issue_id in ("missing_contact_email", "missing_contact_name"):
continue
issues.append({
"id": issue_id,
"label": ISSUE_LABELS.get(issue_id, check.get("label", issue_id)),
"severity": check.get("severity", "major"),
})
if not issues:
continue
# Build human-readable issue list for the email
issue_bullets = []
for iss in issues:
icon = "🔴" if iss["severity"] == "critical" else "🟡" if iss["severity"] == "major" else "🟢"
issue_bullets.append(f"{icon} {iss['label']}")
raw_email = (row.get("contact_email") or "").strip().lower()
if not raw_email or "@" not in raw_email:
continue # Skip carriers without email
carriers.append({
"email": raw_email,
"name": row.get("contact_name", ""),
"company": row["business_name"],
"frn": row["frn"],
"rmd_number": row["rmd_number"],
"severity": row["severity"],
"deficiency_count": len(issues),
"issues_human": "\n".join(issue_bullets),
"issues_html": "<ul>" + "".join(
f'<li style="margin-bottom:6px;">{iss["label"]}</li>'
for iss in issues
) + "</ul>",
"issue_ids": [iss["id"] for iss in issues],
"last_recertified": str(row.get("last_recertified", "")),
"implementation": row.get("implementation", ""),
})
LOG.info("Found %d carriers with deficiencies and contact emails", len(carriers))
return carriers
def ensure_list(session: requests.Session) -> int:
"""Create Listmonk list if it doesn't exist. Returns list ID."""
# Check if list exists
r = session.get(f"{LISTMONK_URL}/api/lists", timeout=10)
if r.ok:
for lst in r.json().get("data", {}).get("results", []):
if lst.get("id") == LIST_ID or lst.get("name") == LIST_NAME:
LOG.info("List exists: %s (id=%d)", lst["name"], lst["id"])
return lst["id"]
# Create it
r = session.post(f"{LISTMONK_URL}/api/lists", json={
"name": LIST_NAME,
"type": "public",
"optin": "single",
"tags": ["rmd", "deficiency", "2026"],
}, timeout=10)
if r.ok:
lid = r.json().get("data", {}).get("id", LIST_ID)
LOG.info("Created list: %s (id=%d)", LIST_NAME, lid)
return lid
LOG.warning("Could not create list: %s", r.text[:200])
return LIST_ID
def upsert_subscribers(session: requests.Session, carriers: list[dict], list_id: int, dry_run: bool = False) -> int:
"""Upsert carriers as Listmonk subscribers with attributes."""
count = 0
for carrier in carriers:
attribs = {
"company": carrier["company"],
"fcc_frn": carrier["frn"],
"rmd_number": carrier["rmd_number"],
"severity": carrier["severity"],
"deficiency_count": carrier["deficiency_count"],
"issues_html": carrier["issues_html"],
"issues_human": carrier["issues_human"],
"issue_ids": ",".join(carrier["issue_ids"]),
"last_recertified": carrier["last_recertified"],
"implementation": carrier["implementation"],
}
if dry_run:
LOG.info("DRY RUN: %s (%s) — %d issues", carrier["company"], carrier["email"], carrier["deficiency_count"])
count += 1
continue
payload = {
"email": carrier["email"],
"name": carrier.get("name") or carrier["company"],
"status": "enabled",
"lists": [list_id],
"attribs": attribs,
"preconfirm_subscriptions": True,
}
r = session.post(f"{LISTMONK_URL}/api/subscribers", json=payload, timeout=10)
if r.ok:
count += 1
elif r.status_code == 409:
# Already exists — update attributes
# Get subscriber ID
sr = session.get(
f"{LISTMONK_URL}/api/subscribers",
params={"query": f"subscribers.email = '{carrier['email']}'"},
timeout=10,
)
if sr.ok:
results = sr.json().get("data", {}).get("results", [])
if results:
sub_id = results[0]["id"]
# Update attributes
session.put(
f"{LISTMONK_URL}/api/subscribers/{sub_id}",
json={"attribs": attribs},
timeout=10,
)
# Add to list
session.put(
f"{LISTMONK_URL}/api/subscribers/lists",
json={"ids": [sub_id], "action": "add", "target_list_ids": [list_id]},
timeout=10,
)
count += 1
else:
LOG.warning("Subscriber upsert failed for %s: %s", carrier["email"], r.text[:100])
if count % 100 == 0 and count > 0:
LOG.info("Progress: %d/%d subscribers", count, len(carriers))
return count
def main():
import argparse
parser = argparse.ArgumentParser(description="Populate Listmonk with RMD deficiency data")
parser.add_argument("--dry-run", action="store_true", help="Don't write to Listmonk")
args = parser.parse_args()
if not DATABASE_URL:
LOG.error("DATABASE_URL not set")
sys.exit(1)
conn = psycopg2.connect(DATABASE_URL)
carriers = get_deficient_carriers(conn)
conn.close()
if not carriers:
LOG.info("No carriers with deficiencies found")
return
LOG.info("Carriers with deficiencies + email: %d", len(carriers))
LOG.info("Top issues:")
from collections import Counter
issue_counts = Counter()
for c in carriers:
for iid in c["issue_ids"]:
issue_counts[iid] += 1
for issue_id, cnt in issue_counts.most_common(10):
LOG.info(" %s: %d carriers", ISSUE_LABELS.get(issue_id, issue_id)[:60], cnt)
if args.dry_run:
LOG.info("DRY RUN — not writing to Listmonk")
upsert_subscribers(None, carriers, LIST_ID, dry_run=True)
return
if not LISTMONK_PASS:
LOG.error("LISTMONK_PASS not set")
sys.exit(1)
session = requests.Session()
session.auth = (LISTMONK_USER, LISTMONK_PASS)
list_id = ensure_list(session)
count = upsert_subscribers(session, carriers, list_id)
LOG.info("Done: %d subscribers upserted to list %d (%s)", count, list_id, LIST_NAME)
if __name__ == "__main__":
main()