#!/usr/bin/env python3 """Blocklist already-imported consumer-domain subscribers in Listmonk. WHY THIS EXISTS --------------- The per-vertical campaign builders (build_trucking_campaigns.py etc.) filter the authoritative consumer/do-not-contact exclusion list (scripts/_email_exclusions.py) out of *new* audience selections, and listmonk_import.py refuses to import a blocked address. But Listmonk also drives **list-based** campaigns (e.g. the FCC "Direct Contacts" list, the CRTC/USF blasts) whose subscribers were imported BEFORE a given domain was added to the exclusion list. Those stale subscribers keep receiving cold blasts and keep generating reputation-damaging bounces (2026-06: gmail.com was still the #1 bounce domain via the CRTC campaign on list 3, plus 1,321 enabled iCloud subs that were not yet excluded at all). This script reconciles the live Listmonk subscriber table with the authoritative exclusion list: any ENABLED subscriber whose address `is_blocked()` is moved to `blocklisted` (the same terminal state Listmonk's own bounce handler uses, so it is excluded from every current and future campaign without deleting history). It is idempotent and safe to run repeatedly -- re-run it whenever the exclusion list grows so the change is applied retroactively to already-imported contacts. USAGE ----- # dry-run against the trucking DB (default): show what WOULD be blocklisted python3 -m scripts.scrub_listmonk_consumer --dry-run # apply against the trucking DB python3 -m scripts.scrub_listmonk_consumer # healthcare DB LISTMONK_DATABASE_URL=postgresql://pw:...@host/listmonk_hc \ python3 -m scripts.scrub_listmonk_consumer Connection: uses LISTMONK_DATABASE_URL if set, else derives `/listmonk`. """ from __future__ import annotations import argparse import os import sys import psycopg2 try: from scripts._email_exclusions import ( BLOCKED_EMAIL_DOMAINS, DO_NOT_CONTACT_EMAILS, ) except ImportError: # running as a bare script, not a module sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from scripts._email_exclusions import ( # type: ignore BLOCKED_EMAIL_DOMAINS, DO_NOT_CONTACT_EMAILS, ) def listmonk_db_url() -> str: """LISTMONK_DATABASE_URL override, else derive `/listmonk`.""" override = os.getenv("LISTMONK_DATABASE_URL") if override: return override base = os.getenv("DATABASE_URL", "") if "/" in base: return base.rsplit("/", 1)[0] + "/listmonk" return base def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument( "--dry-run", action="store_true", help="report what would be blocklisted without changing anything", ) args = ap.parse_args() url = listmonk_db_url() if not url: print("ERROR: no DATABASE_URL / LISTMONK_DATABASE_URL set", file=sys.stderr) return 2 domains = sorted(BLOCKED_EMAIL_DOMAINS) emails = sorted(DO_NOT_CONTACT_EMAILS) conn = psycopg2.connect(url) conn.autocommit = False try: with conn.cursor() as cur: # Enabled subscribers whose domain is blocked OR whose exact address is # on the do-not-contact list. Mirrors _email_exclusions.is_blocked(). where = ( "status = 'enabled' AND (" "lower(split_part(email, '@', 2)) = ANY(%s) " "OR lower(email) = ANY(%s))" ) params = [domains, emails] # Report breakdown by domain first (for the operator + the log). cur.execute( f"SELECT lower(split_part(email, '@', 2)) AS domain, count(*) " f"FROM subscribers WHERE {where} GROUP BY 1 ORDER BY 2 DESC", params, ) rows = cur.fetchall() total = sum(int(c) for _, c in rows) db_label = url.rsplit("/", 1)[-1] mode = "DRY-RUN" if args.dry_run else "APPLY" print(f"=== Listmonk consumer scrub [{mode}] db={db_label} ===") print(f"Exclusion list: {len(domains)} domains, {len(emails)} addresses") if not rows: print("No enabled consumer-domain subscribers found. Nothing to do.") return 0 print(f"Enabled subscribers to blocklist: {total}") for dom, cnt in rows[:40]: print(f" {cnt:>7} {dom}") if len(rows) > 40: print(f" ... and {len(rows) - 40} more domains") if args.dry_run: print("\nDRY-RUN: no changes made. Re-run without --dry-run to apply.") return 0 cur.execute( f"UPDATE subscribers SET status = 'blocklisted', " f"updated_at = now() WHERE {where}", params, ) changed = cur.rowcount conn.commit() print(f"\nBlocklisted {changed} subscribers. Committed.") return 0 finally: conn.close() if __name__ == "__main__": raise SystemExit(main())