Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
258 lines
9.6 KiB
Python
258 lines
9.6 KiB
Python
"""
|
|
listmonk_import.py — Import FCC RMD contacts into Listmonk.
|
|
|
|
Reads from PostgreSQL fcc_rmd and fcc_rmd_removed tables and upserts
|
|
subscribers into Listmonk via the REST API.
|
|
|
|
List assignment:
|
|
fcc_rmd (is_outside_counsel=False) → list 3 "FCC Carriers - Direct Contacts"
|
|
fcc_rmd (is_outside_counsel=True) → list 4 "FCC Carriers - Outside Counsel"
|
|
fcc_rmd_removed → list 5 "FCC RMD Removed - Noncompliant"
|
|
|
|
Contacts can appear in multiple lists (e.g. a removed carrier whose email
|
|
was also in fcc_rmd gets added to both Removed and Direct/Counsel lists).
|
|
|
|
Usage:
|
|
python -m workers.listmonk_import
|
|
python -m workers.listmonk_import --only removed # just fcc_rmd_removed
|
|
python -m workers.listmonk_import --only active # just fcc_rmd
|
|
|
|
Environment variables:
|
|
DATABASE_URL PostgreSQL DSN
|
|
LISTMONK_URL e.g. https://lists.performancewest.net
|
|
LISTMONK_USER API username
|
|
LISTMONK_PASSWORD API token
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import requests
|
|
|
|
LOG = logging.getLogger("workers.listmonk_import")
|
|
logging.basicConfig(level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
stream=sys.stdout)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
LISTMONK_URL = os.environ.get("LISTMONK_URL", "https://lists.performancewest.net").rstrip("/")
|
|
LISTMONK_USER = os.environ.get("LISTMONK_USER", "api")
|
|
LISTMONK_PASS = os.environ.get("LISTMONK_PASSWORD", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
|
|
|
|
# Listmonk list IDs (created in Phase 3)
|
|
LIST_DIRECT = 3 # FCC Carriers - Direct Contacts
|
|
LIST_COUNSEL = 4 # FCC Carriers - Outside Counsel
|
|
LIST_REMOVED = 5 # FCC RMD Removed - Noncompliant
|
|
|
|
DELAY = 0.05 # seconds between API calls (~20 req/s — well within limits)
|
|
|
|
|
|
def api_post(path: str, payload: dict, session: requests.Session) -> Optional[dict]:
|
|
"""POST to Listmonk API. Returns response data or None on error."""
|
|
url = f"{LISTMONK_URL}{path}"
|
|
try:
|
|
r = session.post(url, json=payload, timeout=15)
|
|
if r.status_code in (200, 201):
|
|
return r.json().get("data")
|
|
elif r.status_code == 409:
|
|
# Conflict = duplicate email — treat as success, update instead
|
|
return {"duplicate": True}
|
|
else:
|
|
LOG.warning("POST %s → %d: %s", path, r.status_code, r.text[:200])
|
|
return None
|
|
except Exception as exc:
|
|
LOG.error("POST %s error: %s", path, exc)
|
|
return None
|
|
|
|
|
|
def upsert_subscriber(email: str, name: str, lists: list[int],
|
|
attribs: dict, session: requests.Session) -> Optional[int]:
|
|
"""Create or update a Listmonk subscriber. Returns subscriber ID or None."""
|
|
payload = {
|
|
"email": email.lower().strip(),
|
|
"name": (name or "").strip() or email.split("@")[0],
|
|
"status": "enabled",
|
|
"lists": lists,
|
|
"attribs": attribs,
|
|
"preconfirm_subscriptions": True,
|
|
}
|
|
result = api_post("/api/subscribers", payload, session)
|
|
if result is None:
|
|
return None
|
|
if result.get("duplicate"):
|
|
# Already exists — find by email and update list memberships
|
|
try:
|
|
r = session.get(f"{LISTMONK_URL}/api/subscribers",
|
|
params={"query": f"email = '{email.lower().strip()}'",
|
|
"per_page": 1},
|
|
timeout=15)
|
|
existing = r.json().get("data", {}).get("results", [])
|
|
if existing:
|
|
sub_id = existing[0]["id"]
|
|
# Add to lists
|
|
session.put(f"{LISTMONK_URL}/api/subscribers/lists",
|
|
json={"ids": [sub_id], "action": "add",
|
|
"target_list_ids": lists,
|
|
"status": "confirmed"},
|
|
timeout=15)
|
|
return sub_id
|
|
except Exception as exc:
|
|
LOG.debug("Duplicate lookup error for %s: %s", email, exc)
|
|
return None
|
|
return result.get("id")
|
|
|
|
|
|
def import_active_carriers(conn: psycopg2.extensions.connection,
|
|
session: requests.Session) -> int:
|
|
"""Import from fcc_rmd into Direct or Outside Counsel list."""
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
cur.execute("""
|
|
SELECT contact_email, contact_name, contact_title,
|
|
contact_telephone_number, contact_phone_extension,
|
|
business_name, frn, rmd_number, implementation,
|
|
voice_service_provider, gateway_provider, intermediate_provider,
|
|
last_updated, filing_url, is_outside_counsel
|
|
FROM fcc_rmd
|
|
WHERE contact_email IS NOT NULL AND contact_email != ''
|
|
ORDER BY rmd_number
|
|
""")
|
|
rows = cur.fetchall()
|
|
LOG.info("Importing %d active carrier contacts...", len(rows))
|
|
|
|
success = 0
|
|
for i, row in enumerate(rows, 1):
|
|
email = row["contact_email"].strip().lower()
|
|
if not email or "@" not in email:
|
|
continue
|
|
|
|
is_counsel = bool(row.get("is_outside_counsel"))
|
|
lists = [LIST_COUNSEL if is_counsel else LIST_DIRECT]
|
|
|
|
phone = row.get("contact_telephone_number") or ""
|
|
ext = row.get("contact_phone_extension") or ""
|
|
phone_full = f"{phone} x{ext}".strip() if ext else phone.strip()
|
|
|
|
attribs = {
|
|
"company": row.get("business_name") or "",
|
|
"title": row.get("contact_title") or "",
|
|
"phone": phone_full,
|
|
"fcc_rmd_number": row.get("rmd_number") or "",
|
|
"fcc_frn": row.get("frn") or "",
|
|
"fcc_implementation": row.get("implementation") or "",
|
|
"fcc_voice_provider": bool(row.get("voice_service_provider")),
|
|
"fcc_gateway_provider": bool(row.get("gateway_provider")),
|
|
"fcc_intermediate_provider": bool(row.get("intermediate_provider")),
|
|
"fcc_last_updated": str(row.get("last_updated") or ""),
|
|
"fcc_filing_url": row.get("filing_url") or "",
|
|
"is_outside_counsel": is_counsel,
|
|
}
|
|
|
|
sub_id = upsert_subscriber(
|
|
email=email,
|
|
name=row.get("contact_name") or "",
|
|
lists=lists,
|
|
attribs=attribs,
|
|
session=session,
|
|
)
|
|
if sub_id:
|
|
success += 1
|
|
|
|
if i % 500 == 0:
|
|
LOG.info(" %d/%d imported (%d success)", i, len(rows), success)
|
|
|
|
time.sleep(DELAY)
|
|
|
|
LOG.info("Active carriers: %d/%d imported successfully", success, len(rows))
|
|
return success
|
|
|
|
|
|
def import_removed_carriers(conn: psycopg2.extensions.connection,
|
|
session: requests.Session) -> int:
|
|
"""Import from fcc_rmd_removed into the Noncompliant list."""
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
cur.execute("""
|
|
SELECT contact_email, contact_name, contact_phone, business_name,
|
|
rmd_number, frn, action_type, fcc_document,
|
|
fcc_citation, action_date, removal_reason
|
|
FROM fcc_rmd_removed
|
|
WHERE contact_email IS NOT NULL AND contact_email != ''
|
|
AND business_name NOT LIKE '[%'
|
|
ORDER BY id
|
|
""")
|
|
rows = cur.fetchall()
|
|
LOG.info("Importing %d removed carrier contacts...", len(rows))
|
|
|
|
success = 0
|
|
for i, row in enumerate(rows, 1):
|
|
email = row["contact_email"].strip().lower()
|
|
if not email or "@" not in email:
|
|
continue
|
|
|
|
attribs = {
|
|
"company": row.get("business_name") or "",
|
|
"phone": row.get("contact_phone") or "",
|
|
"fcc_rmd_number": row.get("rmd_number") or "",
|
|
"fcc_frn": row.get("frn") or "",
|
|
"fcc_removed_action": row.get("action_type") or "",
|
|
"fcc_removed_document": row.get("fcc_document") or "",
|
|
"fcc_removed_date": str(row.get("action_date") or ""),
|
|
"fcc_removal_reason": (row.get("removal_reason") or "")[:300],
|
|
}
|
|
|
|
sub_id = upsert_subscriber(
|
|
email=email,
|
|
name=row.get("contact_name") or "",
|
|
lists=[LIST_REMOVED],
|
|
attribs=attribs,
|
|
session=session,
|
|
)
|
|
if sub_id:
|
|
success += 1
|
|
|
|
time.sleep(DELAY)
|
|
|
|
LOG.info("Removed carriers: %d/%d imported successfully", success, len(rows))
|
|
return success
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Import FCC RMD contacts into Listmonk")
|
|
parser.add_argument("--only", choices=["active", "removed", "all"], default="all")
|
|
args = parser.parse_args()
|
|
|
|
if not DATABASE_URL:
|
|
LOG.error("DATABASE_URL not set")
|
|
sys.exit(1)
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
|
|
session = requests.Session()
|
|
session.auth = (LISTMONK_USER, LISTMONK_PASS)
|
|
session.headers["Content-Type"] = "application/json"
|
|
|
|
# Verify connection
|
|
r = session.get(f"{LISTMONK_URL}/api/lists", timeout=10)
|
|
if r.status_code != 200:
|
|
LOG.error("Listmonk API unreachable: %d %s", r.status_code, r.text[:100])
|
|
sys.exit(1)
|
|
LOG.info("Listmonk API OK")
|
|
|
|
total = 0
|
|
if args.only in ("active", "all"):
|
|
total += import_active_carriers(conn, session)
|
|
if args.only in ("removed", "all"):
|
|
total += import_removed_carriers(conn, session)
|
|
|
|
conn.close()
|
|
LOG.info("Import complete — %d total subscribers added/updated", total)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|