new-site/scripts/listmonk-bounce-sync.py
justin ba2f6eb667 UPL-proof document templates + reliable bounce sync
Templates (22 files):
- Replace "Reviewed By" with "Document prepared by" + consulting disclaimer
- Add "not a law firm / not legal advice" footer to all CPNI, CALEA, RMD docs
- Change "on behalf of" to "at the direction of" in discontinuance letter
- Reframe RMD penalty language as client acknowledgment

Bounce sync:
- New listmonk-bounce-sync.py replaces unreliable bash tail watcher
- Scans full mail.log, matches QIDs to campaign senders, inserts directly
  into Listmonk DB with proper subscriber_id foreign keys
- Idempotent, runs via cron every 5 minutes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-21 15:06:29 -05:00

178 lines
6 KiB
Python

#!/usr/bin/env python3
"""
Scan Postfix mail.log for bounced campaign emails and insert into Listmonk DB.
Listmonk's /webhooks/bounce endpoint silently ignores bounces it can't match
to a subscriber. This script queries the subscriber table directly and inserts
bounces with proper subscriber_id foreign keys.
Idempotent — skips emails that already have a bounce record.
Usage:
python3 listmonk-bounce-sync.py # scan /var/log/mail.log
python3 listmonk-bounce-sync.py /var/log/mail.log.1 # scan rotated log
python3 listmonk-bounce-sync.py --dry-run # show what would be reported
"""
import re
import sys
import subprocess
from pathlib import Path
CAMPAIGN_SENDERS = {"noreply@performancewest.net", "info@performancewest.net"}
DOCKER_PSQL = [
"docker", "exec", "-i", "performancewest-api-postgres-1",
"psql", "-U", "pw", "-d", "listmonk", "-t", "-A",
]
# Regex patterns
QID_RE = re.compile(r"postfix/\w+\[\d+\]: ([A-Z0-9]+):")
FROM_RE = re.compile(r"from=<([^>]*)>")
TO_RE = re.compile(r"to=<([^>]*)>")
DSN_RE = re.compile(r"dsn=(\d\.\d+\.\d+)")
def run_sql(sql: str) -> str:
r = subprocess.run(DOCKER_PSQL, input=sql, capture_output=True, text=True, timeout=30)
return r.stdout.strip()
def scan_log(log_path: str) -> list:
"""Scan mail.log for bounced campaign emails. Returns list of dicts."""
campaign_qids = set()
bounces = []
with open(log_path) as f:
for line in f:
qid_match = QID_RE.search(line)
if not qid_match:
continue
qid = qid_match.group(1)
from_match = FROM_RE.search(line)
if from_match and from_match.group(1) in CAMPAIGN_SENDERS:
campaign_qids.add(qid)
if "status=bounced" in line and qid in campaign_qids:
to_match = TO_RE.search(line)
dsn_match = DSN_RE.search(line)
if to_match:
bounces.append({
"email": to_match.group(1).lower(),
"type": "hard",
"dsn": dsn_match.group(1) if dsn_match else "",
})
if "status=deferred" in line and qid in campaign_qids:
if re.search(r"said: 5\d\d ", line):
to_match = TO_RE.search(line)
dsn_match = DSN_RE.search(line)
if to_match:
bounces.append({
"email": to_match.group(1).lower(),
"type": "soft",
"dsn": dsn_match.group(1) if dsn_match else "",
})
# Deduplicate
seen = set()
unique = []
for b in bounces:
if b["email"] not in seen:
seen.add(b["email"])
unique.append(b)
return unique
def main():
dry_run = "--dry-run" in sys.argv
log_files = [a for a in sys.argv[1:] if not a.startswith("--")]
if not log_files:
log_files = ["/var/log/mail.log"]
for log_path in log_files:
if not Path(log_path).exists():
print(f"Not found: {log_path}", file=sys.stderr)
continue
print(f"Scanning {log_path}...")
bounces = scan_log(log_path)
print(f" Found {len(bounces)} unique bounced emails")
if not bounces:
continue
# Get subscriber IDs for bounced emails
emails_csv = ",".join(f"'{b['email']}'" for b in bounces)
rows = run_sql(f"SELECT id, email FROM subscribers WHERE email IN ({emails_csv});")
sub_map = {}
for row in rows.strip().split("\n"):
if "|" in row:
sid, email = row.split("|", 1)
sub_map[email.strip().lower()] = int(sid.strip())
print(f" Matched {len(sub_map)} to Listmonk subscribers")
# Get emails that already have bounces
if sub_map:
sids_csv = ",".join(str(sid) for sid in sub_map.values())
existing = run_sql(
f"SELECT DISTINCT s.email FROM bounces b "
f"JOIN subscribers s ON s.id = b.subscriber_id "
f"WHERE b.subscriber_id IN ({sids_csv});"
)
already_bounced = {e.strip().lower() for e in existing.split("\n") if e.strip()}
else:
already_bounced = set()
print(f" Already recorded: {len(already_bounced)}")
# Insert new bounces
inserted = 0
skipped = 0
no_subscriber = 0
for b in bounces:
email = b["email"]
if email not in sub_map:
no_subscriber += 1
continue
if email in already_bounced:
skipped += 1
continue
sid = sub_map[email]
meta = f'{{"dsn": "{b["dsn"]}"}}'
bounce_type = b["type"]
if dry_run:
print(f" [DRY] {email} (sub={sid}, {bounce_type}, dsn={b['dsn']})")
inserted += 1
continue
run_sql(
f"INSERT INTO bounces (subscriber_id, type, source, meta) "
f"VALUES ({sid}, '{bounce_type}', 'postfix-logscan', '{meta}');"
)
inserted += 1
print(f" Inserted: {inserted}, Skipped (existing): {skipped}, No subscriber: {no_subscriber}")
# Blocklist subscribers with hard bounces (Listmonk's own behavior)
if not dry_run and inserted > 0:
new_hard = [
b for b in bounces
if b["type"] == "hard"
and b["email"] in sub_map
and b["email"] not in already_bounced
]
if new_hard:
sids = ",".join(str(sub_map[b["email"]]) for b in new_hard)
run_sql(
f"UPDATE subscribers SET status = 'blocklisted' "
f"WHERE id IN ({sids}) AND status != 'blocklisted';"
)
print(f" Blocklisted {len(new_hard)} hard-bounce subscribers")
if __name__ == "__main__":
main()