new-site/scripts/ip_rehab.py
justin 1c2e263bb7 warmup(ip-rehab): bias recipients to multi-subscriber business domains (cut bounce)
Day-0 batch saw ~45% bounce because 'no listmonk bounce record' is a weak
liveness signal. Now require the recipient's domain to have >=2 enabled
subscribers (a real org, not a one-off typo'd address), which materially lowers
the dead-mailbox bounce rate on the recovering IPs.
2026-06-09 20:31:45 -05:00

209 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""IP rehab feeder for .91-.93 (mta02-04), retired after the May 30-31 over-volume
blast. Sends a small, daily-ramping trickle through the heavily-throttled rehab02/
03/04 postfix transports so provider-internal reputation rebuilds and the IPs can
be reallocated.
Strategy (conservative -- the goal is reputation recovery, not lead-gen):
* Recipients: enabled main-listmonk subscribers on REAL business/ISP domains
(never consumer, never previously-bounced), so every send is to a live,
low-complaint inbox -- the opposite of what torched the IPs.
* Volume: tiny and ramping (day0=10/IP, then +10/day, cap 60/IP) split across
the 3 rehab IPs, well under any provider rate limit at the 30s/transport delay.
* Content: a genuine, low-pressure compliance-update email (CAN-SPAM compliant:
real from/subject, physical address, unsubscribe). No aggressive offers.
* Sends DIRECTLY via the rehab transports (sendmail -o) so the IP binding +
throttle apply; does NOT route through the listmonk warmup pool.
Run daily from cron AFTER the main warmup window. Idempotent per day (a state file
records the ramp day). Records who we mailed so we don't re-hit the same inbox.
Usage:
python3 scripts/ip_rehab.py # send today's rehab trickle
python3 scripts/ip_rehab.py --dry-run # show who/what without sending
python3 scripts/ip_rehab.py --status # show ramp day + transports + DNSBL
"""
from __future__ import annotations
import argparse, datetime, os, subprocess, sys, time
STATE_DIR = os.getenv("IP_REHAB_STATE", "/opt/performancewest/data/ip_rehab")
START_FILE = os.path.join(STATE_DIR, "start_date")
SENT_FILE = os.path.join(STATE_DIR, "mailed.txt") # emails we've already rehab-mailed
TRANSPORTS = ["rehab02", "rehab03", "rehab04"] # .91 .92 .93
# Each rehab transport is selected by an X-PW-Rehab-IP header (matched by a
# postfix header_checks FILTER), which OVERRIDES the transport_maps randmap
# warmup rotation so the message is actually bound to the rehab IP + throttle.
REHAB_HEADER = {"rehab02": "02", "rehab03": "03", "rehab04": "04"}
FROM_ADDR = "Performance West <noreply@performancewest.net>"
PHYS_ADDR = "Performance West Inc., 525 Randall Ave Ste 100-1195, Cheyenne, WY 82001"
# We query listmonk via `docker exec <pg_container> psql` (no pg driver on host).
PG_CONTAINER = os.getenv("PG_CONTAINER", "performancewest-api-postgres-1")
PG_DB = os.getenv("LISTMONK_DB", "listmonk")
# Consumer domains we never use for rehab (same policy as the warmup builder).
CONSUMER = {
"gmail.com", "googlemail.com", "yahoo.com", "yahoo.co.uk", "yahoo.ca",
"yahoo.es", "yahoo.it", "ymail.com", "rocketmail.com", "myyahoo.com",
"aol.com", "aim.com", "icloud.com", "me.com", "mac.com",
"hotmail.com", "hotmail.co.uk", "outlook.com", "live.com", "msn.com",
"comcast.net", "att.net", "sbcglobal.net", "bellsouth.net", "pacbell.net",
"verizon.net", "cox.net", "charter.net", "frontier.com", "frontiernet.net",
"windstream.net", "earthlink.net", "centurytel.net", "centurylink.net",
}
def ramp_day() -> int:
os.makedirs(STATE_DIR, exist_ok=True)
if not os.path.exists(START_FILE):
with open(START_FILE, "w") as f:
f.write(datetime.date.today().isoformat())
return 0
start = datetime.date.fromisoformat(open(START_FILE).read().strip())
return (datetime.date.today() - start).days
def per_ip_quota(day: int) -> int:
# 10/IP day0, +10/day, cap 60/IP (=180/day total across 3 IPs at full ramp)
return min(10 + 10 * day, 60)
def already_mailed() -> set[str]:
if not os.path.exists(SENT_FILE):
return set()
return {l.strip().lower() for l in open(SENT_FILE) if l.strip()}
def pick_recipients(n: int, exclude: set[str]) -> list[tuple[str, str]]:
"""Return up to n (email, name) for clean business-domain, never-bounced subs.
Queried via `docker exec <pg> psql` since the host has no pg driver.
"""
consumer_list = "(" + ",".join("'%s'" % d for d in CONSUMER) + ")"
# Bias toward higher-deliverability recipients on a recovering IP:
# - real business/ISP domains (never consumer)
# - never recorded a bounce in listmonk
# - domain has >= 2 enabled subscribers (a real org, not a one-off typo'd
# address) -- this materially cuts the dead-mailbox bounce rate.
sql = (
"WITH dom AS ("
" SELECT lower(split_part(email,'@',2)) AS d, count(*) AS c "
" FROM subscribers WHERE status='enabled' GROUP BY 1 HAVING count(*) >= 2"
") "
"SELECT s.email, COALESCE(s.name,'') "
"FROM subscribers s "
"JOIN dom ON dom.d = lower(split_part(s.email,'@',2)) "
"WHERE s.status='enabled' "
f"AND lower(split_part(s.email,'@',2)) NOT IN {consumer_list} "
"AND NOT EXISTS (SELECT 1 FROM bounces b WHERE b.subscriber_id=s.id) "
f"ORDER BY random() LIMIT {n * 3}"
)
try:
proc = subprocess.run(
["docker", "exec", PG_CONTAINER, "psql", "-U", "pw", "-d", PG_DB,
"-At", "-F", "\t", "-c", sql],
capture_output=True, text=True, timeout=60,
)
if proc.returncode != 0:
print(f"[ip-rehab] psql error: {proc.stderr.strip()[:200]}")
return []
out: list[tuple[str, str]] = []
for line in proc.stdout.splitlines():
if "\t" not in line:
continue
email, _, name = line.partition("\t")
e = email.strip().lower()
if e and e not in exclude:
out.append((e, name.strip()))
if len(out) >= n:
break
return out
except Exception as exc: # noqa: BLE001
print(f"[ip-rehab] recipient query failed: {exc}")
return []
def build_email(to: str, name: str, transport: str) -> str:
first = (name.split(" ")[0] if name else "there") or "there"
unsub = f"https://performancewest.net/unsubscribe?e={to}"
return (
f"X-PW-Rehab-IP: {REHAB_HEADER[transport]}\r\n"
f"From: {FROM_ADDR}\r\n"
f"To: {to}\r\n"
f"Subject: Quick compliance check-in from Performance West\r\n"
f"Content-Type: text/plain; charset=utf-8\r\n"
f"List-Unsubscribe: <{unsub}>\r\n"
f"\r\n"
f"Hi {first},\r\n\r\n"
f"This is a brief check-in from the compliance team at Performance West. "
f"If you handle FCC, DOT/FMCSA, healthcare, or corporate filings and want a "
f"quick read on upcoming deadlines for your business, just reply and we'll "
f"take a look -- no obligation.\r\n\r\n"
f"If this isn't useful, you can unsubscribe here: {unsub}\r\n\r\n"
f"{PHYS_ADDR}\r\n"
f"performancewest.net | (888) 411-0383\r\n"
)
def send_via(transport: str, to: str, raw: str, dry: bool) -> bool:
if dry:
print(f" [dry] {transport} -> {to}")
return True
try:
# The X-PW-Rehab-IP header (in raw) triggers a header_checks FILTER that
# binds the message to this rehab transport + IP, overriding the randmap.
p = subprocess.run(
["/usr/sbin/sendmail", "-f", "noreply@performancewest.net", to],
input=raw.encode(), timeout=60,
)
return p.returncode == 0
except Exception as exc: # noqa: BLE001
print(f" send error {transport} -> {to}: {exc}")
return False
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--status", action="store_true")
args = ap.parse_args()
day = ramp_day()
quota = per_ip_quota(day)
total = quota * len(TRANSPORTS)
if args.status:
print(f"IP rehab: day={day} per_ip={quota} total_today={total}")
print(f"transports: {TRANSPORTS} (.91 .92 .93)")
print(f"already mailed: {len(already_mailed())}")
return 0
exclude = already_mailed()
recips = pick_recipients(total, exclude)
if not recips:
print("[ip-rehab] no clean recipients available")
return 0
print(f"[ip-rehab] day={day} per_ip={quota} total={len(recips)} "
f"({'DRY' if args.dry_run else 'SEND'})")
sent = 0
newly_mailed: list[str] = []
for i, (email, name) in enumerate(recips):
transport = TRANSPORTS[i % len(TRANSPORTS)]
raw = build_email(email, name, transport)
if send_via(transport, email, raw, args.dry_run):
sent += 1
newly_mailed.append(email)
time.sleep(0.2)
if not args.dry_run and newly_mailed:
with open(SENT_FILE, "a") as f:
for e in newly_mailed:
f.write(e + "\n")
print(f"[ip-rehab] done: sent={sent}/{len(recips)} via {len(TRANSPORTS)} IPs")
return 0
if __name__ == "__main__":
sys.exit(main())