new-site/scripts/workers/listmonk_import.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

258 lines
9.6 KiB
Python

"""
listmonk_import.py — Import FCC RMD contacts into Listmonk.
Reads from PostgreSQL fcc_rmd and fcc_rmd_removed tables and upserts
subscribers into Listmonk via the REST API.
List assignment:
fcc_rmd (is_outside_counsel=False) → list 3 "FCC Carriers - Direct Contacts"
fcc_rmd (is_outside_counsel=True) → list 4 "FCC Carriers - Outside Counsel"
fcc_rmd_removed → list 5 "FCC RMD Removed - Noncompliant"
Contacts can appear in multiple lists (e.g. a removed carrier whose email
was also in fcc_rmd gets added to both Removed and Direct/Counsel lists).
Usage:
python -m workers.listmonk_import
python -m workers.listmonk_import --only removed # just fcc_rmd_removed
python -m workers.listmonk_import --only active # just fcc_rmd
Environment variables:
DATABASE_URL PostgreSQL DSN
LISTMONK_URL e.g. https://lists.performancewest.net
LISTMONK_USER API username
LISTMONK_PASSWORD API token
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
import time
from typing import Optional
import psycopg2
import psycopg2.extras
import requests
LOG = logging.getLogger("workers.listmonk_import")
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
LISTMONK_URL = os.environ.get("LISTMONK_URL", "https://lists.performancewest.net").rstrip("/")
LISTMONK_USER = os.environ.get("LISTMONK_USER", "api")
LISTMONK_PASS = os.environ.get("LISTMONK_PASSWORD", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
# Listmonk list IDs (created in Phase 3)
LIST_DIRECT = 3 # FCC Carriers - Direct Contacts
LIST_COUNSEL = 4 # FCC Carriers - Outside Counsel
LIST_REMOVED = 5 # FCC RMD Removed - Noncompliant
DELAY = 0.05 # seconds between API calls (~20 req/s — well within limits)
def api_post(path: str, payload: dict, session: requests.Session) -> Optional[dict]:
"""POST to Listmonk API. Returns response data or None on error."""
url = f"{LISTMONK_URL}{path}"
try:
r = session.post(url, json=payload, timeout=15)
if r.status_code in (200, 201):
return r.json().get("data")
elif r.status_code == 409:
# Conflict = duplicate email — treat as success, update instead
return {"duplicate": True}
else:
LOG.warning("POST %s%d: %s", path, r.status_code, r.text[:200])
return None
except Exception as exc:
LOG.error("POST %s error: %s", path, exc)
return None
def upsert_subscriber(email: str, name: str, lists: list[int],
attribs: dict, session: requests.Session) -> Optional[int]:
"""Create or update a Listmonk subscriber. Returns subscriber ID or None."""
payload = {
"email": email.lower().strip(),
"name": (name or "").strip() or email.split("@")[0],
"status": "enabled",
"lists": lists,
"attribs": attribs,
"preconfirm_subscriptions": True,
}
result = api_post("/api/subscribers", payload, session)
if result is None:
return None
if result.get("duplicate"):
# Already exists — find by email and update list memberships
try:
r = session.get(f"{LISTMONK_URL}/api/subscribers",
params={"query": f"email = '{email.lower().strip()}'",
"per_page": 1},
timeout=15)
existing = r.json().get("data", {}).get("results", [])
if existing:
sub_id = existing[0]["id"]
# Add to lists
session.put(f"{LISTMONK_URL}/api/subscribers/lists",
json={"ids": [sub_id], "action": "add",
"target_list_ids": lists,
"status": "confirmed"},
timeout=15)
return sub_id
except Exception as exc:
LOG.debug("Duplicate lookup error for %s: %s", email, exc)
return None
return result.get("id")
def import_active_carriers(conn: psycopg2.extensions.connection,
session: requests.Session) -> int:
"""Import from fcc_rmd into Direct or Outside Counsel list."""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT contact_email, contact_name, contact_title,
contact_telephone_number, contact_phone_extension,
business_name, frn, rmd_number, implementation,
voice_service_provider, gateway_provider, intermediate_provider,
last_updated, filing_url, is_outside_counsel
FROM fcc_rmd
WHERE contact_email IS NOT NULL AND contact_email != ''
ORDER BY rmd_number
""")
rows = cur.fetchall()
LOG.info("Importing %d active carrier contacts...", len(rows))
success = 0
for i, row in enumerate(rows, 1):
email = row["contact_email"].strip().lower()
if not email or "@" not in email:
continue
is_counsel = bool(row.get("is_outside_counsel"))
lists = [LIST_COUNSEL if is_counsel else LIST_DIRECT]
phone = row.get("contact_telephone_number") or ""
ext = row.get("contact_phone_extension") or ""
phone_full = f"{phone} x{ext}".strip() if ext else phone.strip()
attribs = {
"company": row.get("business_name") or "",
"title": row.get("contact_title") or "",
"phone": phone_full,
"fcc_rmd_number": row.get("rmd_number") or "",
"fcc_frn": row.get("frn") or "",
"fcc_implementation": row.get("implementation") or "",
"fcc_voice_provider": bool(row.get("voice_service_provider")),
"fcc_gateway_provider": bool(row.get("gateway_provider")),
"fcc_intermediate_provider": bool(row.get("intermediate_provider")),
"fcc_last_updated": str(row.get("last_updated") or ""),
"fcc_filing_url": row.get("filing_url") or "",
"is_outside_counsel": is_counsel,
}
sub_id = upsert_subscriber(
email=email,
name=row.get("contact_name") or "",
lists=lists,
attribs=attribs,
session=session,
)
if sub_id:
success += 1
if i % 500 == 0:
LOG.info(" %d/%d imported (%d success)", i, len(rows), success)
time.sleep(DELAY)
LOG.info("Active carriers: %d/%d imported successfully", success, len(rows))
return success
def import_removed_carriers(conn: psycopg2.extensions.connection,
session: requests.Session) -> int:
"""Import from fcc_rmd_removed into the Noncompliant list."""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT contact_email, contact_name, contact_phone, business_name,
rmd_number, frn, action_type, fcc_document,
fcc_citation, action_date, removal_reason
FROM fcc_rmd_removed
WHERE contact_email IS NOT NULL AND contact_email != ''
AND business_name NOT LIKE '[%'
ORDER BY id
""")
rows = cur.fetchall()
LOG.info("Importing %d removed carrier contacts...", len(rows))
success = 0
for i, row in enumerate(rows, 1):
email = row["contact_email"].strip().lower()
if not email or "@" not in email:
continue
attribs = {
"company": row.get("business_name") or "",
"phone": row.get("contact_phone") or "",
"fcc_rmd_number": row.get("rmd_number") or "",
"fcc_frn": row.get("frn") or "",
"fcc_removed_action": row.get("action_type") or "",
"fcc_removed_document": row.get("fcc_document") or "",
"fcc_removed_date": str(row.get("action_date") or ""),
"fcc_removal_reason": (row.get("removal_reason") or "")[:300],
}
sub_id = upsert_subscriber(
email=email,
name=row.get("contact_name") or "",
lists=[LIST_REMOVED],
attribs=attribs,
session=session,
)
if sub_id:
success += 1
time.sleep(DELAY)
LOG.info("Removed carriers: %d/%d imported successfully", success, len(rows))
return success
def main() -> None:
parser = argparse.ArgumentParser(description="Import FCC RMD contacts into Listmonk")
parser.add_argument("--only", choices=["active", "removed", "all"], default="all")
args = parser.parse_args()
if not DATABASE_URL:
LOG.error("DATABASE_URL not set")
sys.exit(1)
conn = psycopg2.connect(DATABASE_URL)
session = requests.Session()
session.auth = (LISTMONK_USER, LISTMONK_PASS)
session.headers["Content-Type"] = "application/json"
# Verify connection
r = session.get(f"{LISTMONK_URL}/api/lists", timeout=10)
if r.status_code != 200:
LOG.error("Listmonk API unreachable: %d %s", r.status_code, r.text[:100])
sys.exit(1)
LOG.info("Listmonk API OK")
total = 0
if args.only in ("active", "all"):
total += import_active_carriers(conn, session)
if args.only in ("removed", "all"):
total += import_removed_carriers(conn, session)
conn.close()
LOG.info("Import complete — %d total subscribers added/updated", total)
if __name__ == "__main__":
main()