Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
294 lines
11 KiB
Python
294 lines
11 KiB
Python
"""
|
|
rmd_deficiency_campaign.py — Populate a Listmonk mailing list with carriers
|
|
whose 2026 RMD filings have deficiencies, including per-carrier issue details
|
|
for mail merge.
|
|
|
|
Creates/updates Listmonk list 7 "FCC RMD Deficiency Alert" with subscriber
|
|
attributes containing the specific issues found in their filing.
|
|
|
|
Usage:
|
|
python -m workers.rmd_deficiency_campaign
|
|
python -m workers.rmd_deficiency_campaign --dry-run
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import requests
|
|
|
|
LOG = logging.getLogger("workers.rmd_deficiency_campaign")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
LISTMONK_URL = os.environ.get("LISTMONK_URL", "https://lists.performancewest.net")
|
|
LISTMONK_USER = os.environ.get("LISTMONK_USER", "api")
|
|
LISTMONK_PASS = os.environ.get("LISTMONK_PASS", "")
|
|
|
|
LIST_ID = 7 # "FCC RMD Deficiency Alert"
|
|
LIST_NAME = "FCC RMD Deficiency Alert"
|
|
|
|
# Human-readable issue labels for email template
|
|
ISSUE_LABELS = {
|
|
"missing_kyc": "Missing Know Your Customer (KYC) procedures — required under 2025 RMD Report & Order",
|
|
"missing_material_change": "Missing 10-business-day material change update commitment — required effective Feb 5, 2026",
|
|
"missing_dno": "Missing Do-Not-Originate (DNO) list enforcement — emphasized in 2026 requirements",
|
|
"ss_vsp_no_shaken": "Voice Service Provider without STIR/SHAKEN implementation — VSPs must implement unless exempt",
|
|
"ss_intermediate_complete": "Intermediate provider claims Complete STIR/SHAKEN — intermediates cannot sign calls",
|
|
"missing_traceback": "Missing 24-hour traceback response commitment",
|
|
"missing_recertification": "Missing annual recertification acknowledgment (March 1 deadline)",
|
|
"missing_perjury": "Missing perjury declaration in uploaded document",
|
|
"missing_stir_shaken": "Missing STIR/SHAKEN implementation details",
|
|
"missing_mitigation": "Missing robocall mitigation program description",
|
|
"missing_provider_id": "Missing provider identification (FRN/entity details)",
|
|
"missing_classification": "Missing provider classification",
|
|
"missing_enforcement": "Missing enforcement history disclosure",
|
|
"xref_ss_mismatch": "STIR/SHAKEN status in document doesn't match structured data",
|
|
"xref_old_document": "Uploaded document references outdated year — may not reflect 2026 requirements",
|
|
"xref_name_mismatch": "Business name in uploaded document doesn't match RMD record",
|
|
"no_classification": "No provider classification selected (critical)",
|
|
"no_recert_date": "No recertification date on file",
|
|
"ss_partial_note": "Partial STIR/SHAKEN — upstream provider should be named",
|
|
}
|
|
|
|
|
|
def get_deficient_carriers(conn) -> list[dict]:
|
|
"""Get all carriers with RMD deficiencies and their contact info."""
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
# Pull emails from multiple sources: RMD contact_email, or fallback to
|
|
# the email we scraped via the ServiceNow SP API (Phase 2 of fcc_rmd_scraper)
|
|
cur.execute("""
|
|
SELECT
|
|
a.rmd_number,
|
|
a.frn,
|
|
a.business_name,
|
|
a.severity,
|
|
a.total_deficiencies,
|
|
a.structured_checks,
|
|
a.pdf_checks,
|
|
COALESCE(r.contact_email, '') AS contact_email,
|
|
COALESCE(r.contact_name, '') AS contact_name,
|
|
r.implementation,
|
|
r.voice_service_provider,
|
|
r.gateway_provider,
|
|
r.intermediate_provider,
|
|
r.last_recertified,
|
|
r.servicenow_sys_id
|
|
FROM fcc_rmd_audit_results a
|
|
JOIN fcc_rmd r ON r.rmd_number = a.rmd_number
|
|
WHERE a.total_deficiencies > 0
|
|
AND a.severity IN ('major', 'critical')
|
|
AND (r.removed_from_rmd = FALSE OR r.removed_from_rmd IS NULL)
|
|
ORDER BY a.total_deficiencies DESC
|
|
""")
|
|
|
|
carriers = []
|
|
for row in cur.fetchall():
|
|
row = dict(row)
|
|
|
|
# Collect all issue IDs from structured + pdf checks
|
|
issues = []
|
|
for check_list in [row.get("structured_checks") or [], row.get("pdf_checks") or []]:
|
|
if isinstance(check_list, str):
|
|
check_list = json.loads(check_list)
|
|
for check in check_list:
|
|
issue_id = check.get("id", "")
|
|
# Skip contact email/name issues
|
|
if issue_id in ("missing_contact_email", "missing_contact_name"):
|
|
continue
|
|
issues.append({
|
|
"id": issue_id,
|
|
"label": ISSUE_LABELS.get(issue_id, check.get("label", issue_id)),
|
|
"severity": check.get("severity", "major"),
|
|
})
|
|
|
|
if not issues:
|
|
continue
|
|
|
|
# Build human-readable issue list for the email
|
|
issue_bullets = []
|
|
for iss in issues:
|
|
icon = "🔴" if iss["severity"] == "critical" else "🟡" if iss["severity"] == "major" else "🟢"
|
|
issue_bullets.append(f"{icon} {iss['label']}")
|
|
|
|
raw_email = (row.get("contact_email") or "").strip().lower()
|
|
if not raw_email or "@" not in raw_email:
|
|
continue # Skip carriers without email
|
|
|
|
carriers.append({
|
|
"email": raw_email,
|
|
"name": row.get("contact_name", ""),
|
|
"company": row["business_name"],
|
|
"frn": row["frn"],
|
|
"rmd_number": row["rmd_number"],
|
|
"severity": row["severity"],
|
|
"deficiency_count": len(issues),
|
|
"issues_human": "\n".join(issue_bullets),
|
|
"issues_html": "<ul>" + "".join(
|
|
f'<li style="margin-bottom:6px;">{iss["label"]}</li>'
|
|
for iss in issues
|
|
) + "</ul>",
|
|
"issue_ids": [iss["id"] for iss in issues],
|
|
"last_recertified": str(row.get("last_recertified", "")),
|
|
"implementation": row.get("implementation", ""),
|
|
})
|
|
|
|
LOG.info("Found %d carriers with deficiencies and contact emails", len(carriers))
|
|
return carriers
|
|
|
|
|
|
def ensure_list(session: requests.Session) -> int:
|
|
"""Create Listmonk list if it doesn't exist. Returns list ID."""
|
|
# Check if list exists
|
|
r = session.get(f"{LISTMONK_URL}/api/lists", timeout=10)
|
|
if r.ok:
|
|
for lst in r.json().get("data", {}).get("results", []):
|
|
if lst.get("id") == LIST_ID or lst.get("name") == LIST_NAME:
|
|
LOG.info("List exists: %s (id=%d)", lst["name"], lst["id"])
|
|
return lst["id"]
|
|
|
|
# Create it
|
|
r = session.post(f"{LISTMONK_URL}/api/lists", json={
|
|
"name": LIST_NAME,
|
|
"type": "public",
|
|
"optin": "single",
|
|
"tags": ["rmd", "deficiency", "2026"],
|
|
}, timeout=10)
|
|
|
|
if r.ok:
|
|
lid = r.json().get("data", {}).get("id", LIST_ID)
|
|
LOG.info("Created list: %s (id=%d)", LIST_NAME, lid)
|
|
return lid
|
|
|
|
LOG.warning("Could not create list: %s", r.text[:200])
|
|
return LIST_ID
|
|
|
|
|
|
def upsert_subscribers(session: requests.Session, carriers: list[dict], list_id: int, dry_run: bool = False) -> int:
|
|
"""Upsert carriers as Listmonk subscribers with attributes."""
|
|
count = 0
|
|
|
|
for carrier in carriers:
|
|
attribs = {
|
|
"company": carrier["company"],
|
|
"fcc_frn": carrier["frn"],
|
|
"rmd_number": carrier["rmd_number"],
|
|
"severity": carrier["severity"],
|
|
"deficiency_count": carrier["deficiency_count"],
|
|
"issues_html": carrier["issues_html"],
|
|
"issues_human": carrier["issues_human"],
|
|
"issue_ids": ",".join(carrier["issue_ids"]),
|
|
"last_recertified": carrier["last_recertified"],
|
|
"implementation": carrier["implementation"],
|
|
}
|
|
|
|
if dry_run:
|
|
LOG.info("DRY RUN: %s (%s) — %d issues", carrier["company"], carrier["email"], carrier["deficiency_count"])
|
|
count += 1
|
|
continue
|
|
|
|
payload = {
|
|
"email": carrier["email"],
|
|
"name": carrier.get("name") or carrier["company"],
|
|
"status": "enabled",
|
|
"lists": [list_id],
|
|
"attribs": attribs,
|
|
"preconfirm_subscriptions": True,
|
|
}
|
|
|
|
r = session.post(f"{LISTMONK_URL}/api/subscribers", json=payload, timeout=10)
|
|
|
|
if r.ok:
|
|
count += 1
|
|
elif r.status_code == 409:
|
|
# Already exists — update attributes
|
|
# Get subscriber ID
|
|
sr = session.get(
|
|
f"{LISTMONK_URL}/api/subscribers",
|
|
params={"query": f"subscribers.email = '{carrier['email']}'"},
|
|
timeout=10,
|
|
)
|
|
if sr.ok:
|
|
results = sr.json().get("data", {}).get("results", [])
|
|
if results:
|
|
sub_id = results[0]["id"]
|
|
# Update attributes
|
|
session.put(
|
|
f"{LISTMONK_URL}/api/subscribers/{sub_id}",
|
|
json={"attribs": attribs},
|
|
timeout=10,
|
|
)
|
|
# Add to list
|
|
session.put(
|
|
f"{LISTMONK_URL}/api/subscribers/lists",
|
|
json={"ids": [sub_id], "action": "add", "target_list_ids": [list_id]},
|
|
timeout=10,
|
|
)
|
|
count += 1
|
|
else:
|
|
LOG.warning("Subscriber upsert failed for %s: %s", carrier["email"], r.text[:100])
|
|
|
|
if count % 100 == 0 and count > 0:
|
|
LOG.info("Progress: %d/%d subscribers", count, len(carriers))
|
|
|
|
return count
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="Populate Listmonk with RMD deficiency data")
|
|
parser.add_argument("--dry-run", action="store_true", help="Don't write to Listmonk")
|
|
args = parser.parse_args()
|
|
|
|
if not DATABASE_URL:
|
|
LOG.error("DATABASE_URL not set")
|
|
sys.exit(1)
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
carriers = get_deficient_carriers(conn)
|
|
conn.close()
|
|
|
|
if not carriers:
|
|
LOG.info("No carriers with deficiencies found")
|
|
return
|
|
|
|
LOG.info("Carriers with deficiencies + email: %d", len(carriers))
|
|
LOG.info("Top issues:")
|
|
from collections import Counter
|
|
issue_counts = Counter()
|
|
for c in carriers:
|
|
for iid in c["issue_ids"]:
|
|
issue_counts[iid] += 1
|
|
for issue_id, cnt in issue_counts.most_common(10):
|
|
LOG.info(" %s: %d carriers", ISSUE_LABELS.get(issue_id, issue_id)[:60], cnt)
|
|
|
|
if args.dry_run:
|
|
LOG.info("DRY RUN — not writing to Listmonk")
|
|
upsert_subscribers(None, carriers, LIST_ID, dry_run=True)
|
|
return
|
|
|
|
if not LISTMONK_PASS:
|
|
LOG.error("LISTMONK_PASS not set")
|
|
sys.exit(1)
|
|
|
|
session = requests.Session()
|
|
session.auth = (LISTMONK_USER, LISTMONK_PASS)
|
|
|
|
list_id = ensure_list(session)
|
|
count = upsert_subscribers(session, carriers, list_id)
|
|
|
|
LOG.info("Done: %d subscribers upserted to list %d (%s)", count, list_id, LIST_NAME)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|