new-site/scripts/listmonk-bounce-sync.py
justin bfdbf8f031 bounce-sync: stop blocklisting good carriers on first auth/policy bounce
This script ran every 5 min and blocklisted on the FIRST hard bounce of ANY
5xx DSN via direct SQL, bypassing Listmonk's count-based bounce.actions rule.
That is the actual mechanism that wrongly killed ~17,000 good carriers during
the broken-DKIM window (their mail got 5.7.1 DMARC-reject, not bad-mailbox).

Fix: only genuine bad-mailbox DSNs (5.1.1/5.1.0/5.0.0/5.4.1/5.5.0) count toward
a blocklist, and a subscriber must accumulate >=3 such hard bounces (matching
Listmonk's threshold) before being blocklisted. Reputation/policy 5.7.x and
quota/greylist 5.2.x never trigger a blocklist.
2026-06-26 23:53:20 -05:00

207 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Scan Postfix mail.log for bounced campaign emails and insert into Listmonk DB.
Listmonk's /webhooks/bounce endpoint silently ignores bounces it can't match
to a subscriber. This script queries the subscriber table directly and inserts
bounces with proper subscriber_id foreign keys.
Idempotent — skips emails that already have a bounce record.
Usage:
python3 listmonk-bounce-sync.py # scan /var/log/mail.log
python3 listmonk-bounce-sync.py /var/log/mail.log.1 # scan rotated log
python3 listmonk-bounce-sync.py --dry-run # show what would be reported
"""
import re
import sys
import subprocess
from pathlib import Path
CAMPAIGN_SENDERS = {"noreply@performancewest.net", "info@performancewest.net"}
DOCKER_PSQL = [
"docker", "exec", "-i", "performancewest-api-postgres-1",
"psql", "-U", "pw", "-d", "listmonk", "-t", "-A",
]
# Regex patterns
QID_RE = re.compile(r"postfix/\w+\[\d+\]: ([A-Z0-9]+):")
FROM_RE = re.compile(r"from=<([^>]*)>")
TO_RE = re.compile(r"to=<([^>]*)>")
DSN_RE = re.compile(r"dsn=(\d\.\d+\.\d+)")
def run_sql(sql: str) -> str:
r = subprocess.run(DOCKER_PSQL, input=sql, capture_output=True, text=True, timeout=30)
return r.stdout.strip()
def scan_log(log_path: str) -> list:
"""Scan mail.log for bounced campaign emails. Returns list of dicts."""
campaign_qids = set()
bounces = []
with open(log_path) as f:
for line in f:
qid_match = QID_RE.search(line)
if not qid_match:
continue
qid = qid_match.group(1)
from_match = FROM_RE.search(line)
if from_match and from_match.group(1) in CAMPAIGN_SENDERS:
campaign_qids.add(qid)
if "status=bounced" in line and qid in campaign_qids:
to_match = TO_RE.search(line)
dsn_match = DSN_RE.search(line)
if to_match:
bounces.append({
"email": to_match.group(1).lower(),
"type": "hard",
"dsn": dsn_match.group(1) if dsn_match else "",
})
if "status=deferred" in line and qid in campaign_qids:
if re.search(r"said: 5\d\d ", line):
to_match = TO_RE.search(line)
dsn_match = DSN_RE.search(line)
if to_match:
bounces.append({
"email": to_match.group(1).lower(),
"type": "soft",
"dsn": dsn_match.group(1) if dsn_match else "",
})
# Deduplicate
seen = set()
unique = []
for b in bounces:
if b["email"] not in seen:
seen.add(b["email"])
unique.append(b)
return unique
def main():
dry_run = "--dry-run" in sys.argv
log_files = [a for a in sys.argv[1:] if not a.startswith("--")]
if not log_files:
log_files = ["/var/log/mail.log"]
for log_path in log_files:
if not Path(log_path).exists():
print(f"Not found: {log_path}", file=sys.stderr)
continue
print(f"Scanning {log_path}...")
bounces = scan_log(log_path)
print(f" Found {len(bounces)} unique bounced emails")
if not bounces:
continue
# Get subscriber IDs for bounced emails
emails_csv = ",".join(f"'{b['email']}'" for b in bounces)
rows = run_sql(f"SELECT id, email FROM subscribers WHERE email IN ({emails_csv});")
sub_map = {}
for row in rows.strip().split("\n"):
if "|" in row:
sid, email = row.split("|", 1)
sub_map[email.strip().lower()] = int(sid.strip())
print(f" Matched {len(sub_map)} to Listmonk subscribers")
# Get emails that already have bounces
if sub_map:
sids_csv = ",".join(str(sid) for sid in sub_map.values())
existing = run_sql(
f"SELECT DISTINCT s.email FROM bounces b "
f"JOIN subscribers s ON s.id = b.subscriber_id "
f"WHERE b.subscriber_id IN ({sids_csv});"
)
already_bounced = {e.strip().lower() for e in existing.split("\n") if e.strip()}
else:
already_bounced = set()
print(f" Already recorded: {len(already_bounced)}")
# Insert new bounces
inserted = 0
skipped = 0
no_subscriber = 0
for b in bounces:
email = b["email"]
if email not in sub_map:
no_subscriber += 1
continue
if email in already_bounced:
skipped += 1
continue
sid = sub_map[email]
meta = f'{{"dsn": "{b["dsn"]}"}}'
bounce_type = b["type"]
if dry_run:
print(f" [DRY] {email} (sub={sid}, {bounce_type}, dsn={b['dsn']})")
inserted += 1
continue
run_sql(
f"INSERT INTO bounces (subscriber_id, type, source, meta) "
f"VALUES ({sid}, '{bounce_type}', 'postfix-logscan', '{meta}');"
)
inserted += 1
print(f" Inserted: {inserted}, Skipped (existing): {skipped}, No subscriber: {no_subscriber}")
# Blocklist subscribers ONLY for genuine bad-mailbox bounces, and only
# once they cross the same 3-strike threshold Listmonk's bounce.actions
# uses. This script previously blocklisted on the FIRST hard bounce of
# ANY 5xx DSN -- including 5.7.1 (DMARC/auth/"low reputation" policy
# rejections). During the Jun broken-DKIM window that wrongly killed
# ~17,000 good carriers in one pass (a deliverability bug, not bad
# addresses). Now: policy/reputation DSNs (5.7.x) and greylist/quota
# (5.2.x) never trigger a blocklist, and a real bad-mailbox address must
# accumulate >= HARD_BOUNCE_BLOCKLIST_THRESHOLD distinct hard bounces.
if not dry_run and inserted > 0:
# DSN prefixes that indicate a genuinely undeliverable mailbox
# (vs. sender-reputation/policy or transient mailbox-full issues).
BAD_MAILBOX_DSNS = ("5.1.1", "5.1.10", "5.1.0", "5.0.0", "5.4.1", "5.5.0")
HARD_BOUNCE_BLOCKLIST_THRESHOLD = 3
new_hard = [
b for b in bounces
if b["type"] == "hard"
and b["email"] in sub_map
and b["email"] not in already_bounced
and b["dsn"].startswith(BAD_MAILBOX_DSNS)
]
if new_hard:
# Count TOTAL distinct hard bounces per subscriber (existing in
# the DB + the new one) and blocklist only those at/over the
# threshold, exactly as Listmonk's own count-based rule would.
sids = ",".join(str(sub_map[b["email"]]) for b in new_hard)
over_threshold = run_sql(
f"SELECT subscriber_id FROM bounces "
f"WHERE subscriber_id IN ({sids}) AND type = 'hard' "
f"GROUP BY subscriber_id "
f"HAVING count(*) >= {HARD_BOUNCE_BLOCKLIST_THRESHOLD};"
)
bl_sids = [s.strip() for s in over_threshold.split("\n") if s.strip()]
if bl_sids:
run_sql(
f"UPDATE subscribers SET status = 'blocklisted' "
f"WHERE id IN ({','.join(bl_sids)}) AND status != 'blocklisted';"
)
print(f" Blocklisted {len(bl_sids)} subscribers "
f"(>= {HARD_BOUNCE_BLOCKLIST_THRESHOLD} bad-mailbox hard bounces)")
else:
print(f" Recorded {len(new_hard)} bad-mailbox hard bounces; "
f"none yet at {HARD_BOUNCE_BLOCKLIST_THRESHOLD}-strike blocklist threshold")
if __name__ == "__main__":
main()