new-site/scripts/workers/remove_adversary_subscribers.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

223 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""
remove_adversary_subscribers.py — Remove foreign adversary country subscribers.
Finds subscribers in fcc_rmd whose country or contact_country matches a
Commerce Department foreign adversary designation (15 CFR Part 791) and
removes them from all Listmonk lists.
Foreign adversary countries (as of 2026):
- China (including Hong Kong SAR and Macao SAR)
- Russia (Russian Federation)
- Iran
- North Korea (Democratic People's Republic of Korea)
- Cuba
- Venezuela
Note: Republic of Korea (South Korea) is NOT a foreign adversary.
Usage:
python -m workers.remove_adversary_subscribers --dry-run # preview
python -m workers.remove_adversary_subscribers # execute
Environment variables:
DATABASE_URL PostgreSQL DSN
LISTMONK_URL e.g. https://lists.performancewest.net
LISTMONK_USER API username
LISTMONK_PASSWORD API token
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
import time
import psycopg2
import psycopg2.extras
import requests
LOG = logging.getLogger("workers.remove_adversary")
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
LISTMONK_URL = os.environ.get("LISTMONK_URL", "https://lists.performancewest.net").rstrip("/")
LISTMONK_USER = os.environ.get("LISTMONK_USER", "api")
LISTMONK_PASS = os.environ.get("LISTMONK_PASSWORD", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
DELAY = 0.1
# Foreign adversary country patterns (case-insensitive matching)
# Based on Department of Commerce 15 CFR Part 791
ADVERSARY_PATTERNS = [
"CHINA",
"HONG KONG",
"MACAO",
"MACAU",
"RUSSIA",
"RUSSIAN FEDERATION",
"IRAN",
"NORTH KOREA",
"DEMOCRATIC PEOPLE'S REPUBLIC OF KOREA",
"CUBA",
"VENEZUELA",
]
# Explicitly NOT adversaries (avoid false positives)
NOT_ADVERSARY = {
"REPUBLIC OF KOREA", # South Korea
"SOUTH KOREA",
}
def is_adversary_country(country: str | None) -> bool:
"""Return True if country matches a foreign adversary designation."""
if not country:
return False
upper = country.strip().upper()
if upper in NOT_ADVERSARY:
return False
return any(pattern in upper for pattern in ADVERSARY_PATTERNS)
def find_adversary_subscribers(conn) -> list[dict]:
"""Find RMD contacts from foreign adversary countries."""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("""
SELECT rmd_number, frn, business_name, country, contact_country,
contact_email, contact_name
FROM fcc_rmd
WHERE contact_email IS NOT NULL AND contact_email != ''
AND (country IS NOT NULL OR contact_country IS NOT NULL)
ORDER BY rmd_number
""")
rows = cur.fetchall()
cur.close()
matches = []
for row in rows:
biz_country = row.get("country") or ""
contact_country = row.get("contact_country") or ""
biz_adversary = is_adversary_country(biz_country)
contact_adversary = is_adversary_country(contact_country)
if biz_adversary or contact_adversary:
matches.append({
"email": row["contact_email"].strip().lower(),
"name": row.get("contact_name") or "",
"company": row.get("business_name") or "",
"country": biz_country,
"contact_country": contact_country,
"frn": row.get("frn") or "",
})
return matches
def remove_from_listmonk(matches: list[dict], session: requests.Session,
dry_run: bool) -> dict:
"""Remove adversary subscribers from all Listmonk lists."""
stats = {"found": 0, "removed": 0, "not_found": 0, "errors": 0}
for m in matches:
if dry_run:
LOG.info(
" [DRY RUN] Would remove: %s | %s | %s / %s",
m["email"], m["company"],
m["country"] or "", m["contact_country"] or "",
)
stats["found"] += 1
continue
try:
# Find subscriber by email
r = session.get(
f"{LISTMONK_URL}/api/subscribers",
params={"query": f"email = '{m['email']}'", "per_page": 1},
timeout=15,
)
results = r.json().get("data", {}).get("results", [])
if not results:
stats["not_found"] += 1
continue
sub_id = results[0]["id"]
stats["found"] += 1
# Block the subscriber (prevents future sends)
session.put(
f"{LISTMONK_URL}/api/subscribers/{sub_id}",
json={"status": "blocklisted"},
timeout=15,
)
stats["removed"] += 1
LOG.info(
" Blocklisted: %s (%s) — %s",
m["email"], m["company"],
m["country"] or m["contact_country"],
)
except Exception as exc:
LOG.warning("Error removing %s: %s", m["email"], exc)
stats["errors"] += 1
time.sleep(DELAY)
return stats
def main() -> None:
parser = argparse.ArgumentParser(
description="Remove foreign adversary country subscribers from Listmonk"
)
parser.add_argument("--dry-run", action="store_true",
help="Preview removals without modifying Listmonk")
args = parser.parse_args()
if not DATABASE_URL:
LOG.error("DATABASE_URL not set")
sys.exit(1)
conn = psycopg2.connect(DATABASE_URL)
session = requests.Session()
session.auth = (LISTMONK_USER, LISTMONK_PASS)
session.headers["Content-Type"] = "application/json"
if not args.dry_run:
r = session.get(f"{LISTMONK_URL}/api/lists", timeout=10)
if r.status_code != 200:
LOG.error("Listmonk API unreachable: %d", r.status_code)
sys.exit(1)
LOG.info("Listmonk API OK")
matches = find_adversary_subscribers(conn)
conn.close()
LOG.info("Found %d subscribers from foreign adversary countries", len(matches))
# Breakdown by country
by_country: dict[str, int] = {}
for m in matches:
c = m["country"] or m["contact_country"]
by_country[c] = by_country.get(c, 0) + 1
for country, count in sorted(by_country.items(), key=lambda x: -x[1]):
LOG.info(" %s: %d", country, count)
stats = remove_from_listmonk(matches, session, args.dry_run)
LOG.info(
"%s complete — found: %d, removed: %d, not in Listmonk: %d, errors: %d",
"Dry run" if args.dry_run else "Removal",
stats["found"], stats["removed"], stats["not_found"], stats["errors"],
)
if __name__ == "__main__":
main()