Isolates bulk sending reputation onto a dedicated subdomain so the root domain stays clean for transactional/verification mail (and recovers faster). Replies still go to the root domain via Reply-To, so the customer-facing reply experience is unchanged. - build_trucking_campaigns.py: add env-overridable FROM_EMAIL (noreply@send.performancewest.net); use it for both scheduled + test sends instead of inheriting base["from_email"] from the DB base campaign. - build_healthcare_campaigns_cron.py: FROM_EMAIL -> compliance@send.performancewest.net (env-overridable). - bounce-watcher.sh / hc-bounce-watcher.sh: track the new subdomain envelope sender (keep legacy root-domain sender so the pre-cutover queue still drains; HC also tracks by hcout transport regardless of sender). Infra already live (separate, non-code): subdomain DNS (A/MX/SPF/DKIM selector=send/DMARC p=reject) on the Hestia master, OpenDKIM signs d=send.performancewest.net (verified end-to-end), egress .94/.107. Root SPF trimmed to the real IPs; pointless IP-rehab cron disabled.
573 lines
26 KiB
Python
573 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
"""Healthcare warmup campaign builder for listmonk-hc.
|
|
|
|
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
|
|
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
|
|
listmonk-hc list (deduped; already-imported rows are skipped).
|
|
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
|
|
running, pointed at that list.
|
|
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
|
|
actual per-hour throttling, so this builder just feeds the queue.
|
|
|
|
WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2.
|
|
Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs
|
|
(550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501
|
|
non-Google practice domains first; switch to hc_warmup_google.csv (222) once the
|
|
IPs are warm (week 2-3).
|
|
|
|
The daily slice size follows the hc warmup ramp so we never queue more than the
|
|
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
|
|
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
|
|
|
|
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
|
|
|
|
Usage:
|
|
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
|
|
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
|
|
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
|
|
from datetime import datetime, timezone
|
|
|
|
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
|
|
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
|
|
# Token read from the host file written when the api user was created.
|
|
def _token() -> str:
|
|
t = os.getenv("HC_LISTMONK_TOKEN")
|
|
if t:
|
|
return t
|
|
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
|
|
"/etc/postfix/hc-listmonk-token"):
|
|
if os.path.exists(p):
|
|
try:
|
|
return open(p).read().strip()
|
|
except PermissionError:
|
|
continue
|
|
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
|
|
|
|
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
|
|
STATE_DIR = os.getenv("HC_STATE_DIR", "/opt/performancewest/data")
|
|
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
|
|
# Permanent do-not-contact list (one email per line). Providers who tell us they
|
|
# already revalidated, asked to stop, etc. go here so the warmup NEVER imports or
|
|
# re-mails them, regardless of segment or stale CMS data. Append + run --prune.
|
|
SUPPRESS_FILE = os.getenv("HC_SUPPRESS_FILE", os.path.join(STATE_DIR, "hc_suppress.txt"))
|
|
|
|
# Bulk From — sends from the dedicated bulk subdomain so its sending reputation
|
|
# is isolated from the root domain (which stays clean for transactional /
|
|
# verification mail). Replies still go to the root domain via Reply-To, so the
|
|
# customer-facing reply experience is unchanged. See docs/deliverability.md.
|
|
FROM_EMAIL = os.getenv(
|
|
"HC_CAMPAIGN_FROM",
|
|
"Performance West Compliance <compliance@send.performancewest.net>",
|
|
)
|
|
REPLY_TO = os.getenv("HC_CAMPAIGN_REPLY_TO", "info@performancewest.net")
|
|
|
|
# Segment registry (subject, template file, list/campaign names, row selector)
|
|
# is the single source of truth shared with build_healthcare_campaigns.py.
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from build_healthcare_campaigns import SEGMENTS, template_path # noqa: E402
|
|
from _email_plaintext import html_to_text # noqa: E402
|
|
|
|
|
|
def load_suppressed() -> set[str]:
|
|
if os.path.exists(SUPPRESS_FILE):
|
|
return {ln.strip().lower() for ln in open(SUPPRESS_FILE) if ln.strip() and not ln.startswith("#")}
|
|
return set()
|
|
|
|
# Which segments to warm, in priority order. Revalidation stays the lead (it has
|
|
# the most verified data + the official-record card). The others warm in
|
|
# parallel on smaller slices so we collect engagement data across all programs
|
|
# without overwhelming the warming IPs.
|
|
ACTIVE_SEGMENTS = os.getenv(
|
|
"HC_SEGMENTS",
|
|
"revalidation_overdue,revalidation_due_soon,oig_screening,nppes_outdated,npi_reactivation,compliance_bundle",
|
|
).split(",")
|
|
|
|
# Warmup deliverability guard: only mail SLIGHTLY-overdue providers. A practice
|
|
# that lapsed recently is almost certainly still operating with a live inbox; one
|
|
# that is many months/years overdue has likely closed, merged, or abandoned the
|
|
# address, so its mail bounces -- and bounces are the fastest way to wreck a
|
|
# warming IP's reputation. Window is inclusive [MIN, MAX] days overdue.
|
|
WARMUP_OVERDUE_MIN = int(os.getenv("HC_OVERDUE_MIN", "1"))
|
|
WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90"))
|
|
# Proactive "revalidation due soon" window (days UNTIL the due date). Mirrors the
|
|
# overdue window so we reach providers shortly before AND after their deadline,
|
|
# roughly doubling the deliverable warmup pool from the same CMS data source.
|
|
WARMUP_DUE_SOON_MIN = int(os.getenv("HC_DUE_SOON_MIN", "1"))
|
|
WARMUP_DUE_SOON_MAX = int(os.getenv("HC_DUE_SOON_MAX", "90"))
|
|
|
|
|
|
def _overdue_days(r: dict):
|
|
v = (r.get("days_overdue") or "").strip()
|
|
try:
|
|
return int(v)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
# During warmup, hold out Google-Workspace-hosted domains: Google rejects bulk
|
|
# mail from cold/warming IPs hard (550-5.7.1), and those bounces wreck the
|
|
# warming reputation. The mx_provider flag is set by the weekly hc_data_refresh
|
|
# (an MX lookup, since a custom domain can silently use Google Workspace). Set
|
|
# HC_SKIP_GOOGLE=0 to lift this once the IPs are warm.
|
|
SKIP_GOOGLE = os.getenv("HC_SKIP_GOOGLE", "1") not in ("0", "false", "no")
|
|
|
|
|
|
def _is_google_hosted(r: dict) -> bool:
|
|
if not SKIP_GOOGLE:
|
|
return False
|
|
return (r.get("mx_provider") or "").strip().lower() == "google"
|
|
|
|
|
|
def warmup_day() -> int:
|
|
try:
|
|
start = int(open(WARMUP_STAMP).read().strip())
|
|
return max(0, int((time.time() - start) // 86400))
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def daily_slice(day: int) -> int:
|
|
"""TOTAL new subscribers to import across ALL segments today, aligned with
|
|
the hc ramp. The rampcap caps hourly *delivery*; this caps daily *queueing*
|
|
so we never flood the warming IPs. Runs DAILY (continuous warmup -- mailbox
|
|
providers reward consistent daily volume; weekends are NOT skipped)."""
|
|
if day <= 1: return 100
|
|
if day <= 4: return 300
|
|
if day <= 9: return 600
|
|
return 1000
|
|
|
|
|
|
# ── Per-MX-operator throttle ─────────────────────────────────────────────────
|
|
# Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365,
|
|
# Google Workspace, Proofpoint, ...), not by recipient domain. So we cap how many
|
|
# new providers we queue per operator per day, and let volume spread across the
|
|
# long tail of operators freely. This lets total daily volume be much higher than
|
|
# a flat cap without hammering any single receiving system. Caps ramp with the
|
|
# warmup day. "default" applies to any operator not explicitly listed (mostly the
|
|
# long tail of small/independent mail hosts -- a generous cap is safe there
|
|
# because each sees only a handful).
|
|
def mx_daily_caps(day: int) -> dict:
|
|
# (microsoft, google, proofpoint, default-per-operator)
|
|
if day <= 1: big, default = 25, 15
|
|
elif day <= 4: big, default = 60, 40
|
|
elif day <= 9: big, default = 120, 80
|
|
else: big, default = 250, 150
|
|
return {
|
|
"microsoft": big,
|
|
"google": big,
|
|
"proofpoint": big,
|
|
"cisco": big,
|
|
"mimecast": big,
|
|
"barracuda": big,
|
|
"__default__": default,
|
|
}
|
|
|
|
|
|
def mx_throttled(candidates: list[dict], total_n: int, caps: dict) -> list[dict]:
|
|
"""Pick up to total_n candidates, capping per mx_provider so no single
|
|
receiving operator gets more than its daily share. Preserves input order
|
|
within each operator. Falls back to ungrouped slicing if rows have no
|
|
mx_provider."""
|
|
if not candidates or "mx_provider" not in candidates[0]:
|
|
return candidates[:total_n]
|
|
per_op: dict = {}
|
|
chosen: list[dict] = []
|
|
default_cap = caps.get("__default__", 50)
|
|
for r in candidates:
|
|
if len(chosen) >= total_n:
|
|
break
|
|
op = (r.get("mx_provider") or "").strip() or "__default__"
|
|
cap = caps.get(op, default_cap)
|
|
if per_op.get(op, 0) >= cap:
|
|
continue
|
|
per_op[op] = per_op.get(op, 0) + 1
|
|
chosen.append(r)
|
|
return chosen
|
|
|
|
|
|
def lm(path: str, data=None, method=None):
|
|
tok = _token()
|
|
headers = {"Content-Type": "application/json",
|
|
"Authorization": f"token {LISTMONK_USER}:{tok}"}
|
|
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
|
|
if data is not None:
|
|
req.data = json.dumps(data).encode()
|
|
if not method:
|
|
method = "POST"
|
|
if method:
|
|
req.get_method = lambda: method
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=20) as r:
|
|
return json.loads(r.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()[:300]
|
|
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
|
|
|
|
|
|
def get_or_create_list(list_name: str, tags: list[str]) -> int:
|
|
res = lm("/lists?per_page=100")
|
|
for l in res.get("data", {}).get("results", []):
|
|
if l["name"] == list_name:
|
|
return l["id"]
|
|
res = lm("/lists", {"name": list_name, "type": "private", "optin": "single",
|
|
"tags": tags})
|
|
return res["data"]["id"]
|
|
|
|
|
|
def state_file(seg_key: str) -> str:
|
|
return os.path.join(STATE_DIR, f"hc_imported_{seg_key}.txt")
|
|
|
|
|
|
def load_imported(seg_key: str) -> set[str]:
|
|
p = state_file(seg_key)
|
|
if os.path.exists(p):
|
|
return {ln.strip().lower() for ln in open(p) if ln.strip()}
|
|
# Back-compat: the original single-segment cron tracked revalidation imports
|
|
# in hc_imported_emails.txt. Seed the revalidation state from it once.
|
|
legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt")
|
|
if seg_key == "revalidation_overdue" and os.path.exists(legacy):
|
|
return {ln.strip().lower() for ln in open(legacy) if ln.strip()}
|
|
return set()
|
|
|
|
|
|
def save_imported(seg_key: str, emails: set[str]):
|
|
os.makedirs(STATE_DIR, exist_ok=True)
|
|
with open(state_file(seg_key), "w") as f:
|
|
f.write("\n".join(sorted(emails)) + "\n")
|
|
|
|
|
|
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
|
|
try:
|
|
lm("/subscribers", {
|
|
"email": email, "name": name or email.split("@")[0],
|
|
"status": "enabled", "lists": [list_id],
|
|
"attribs": attribs, "preconfirm_subscriptions": True,
|
|
})
|
|
return True
|
|
except SystemExit as e:
|
|
# Already exists -> attach to the list AND refresh attribs so per-segment
|
|
# merge fields (e.g. days_until for the due-soon template) are correct
|
|
# even when the same provider already exists from another segment import.
|
|
if "409" in str(e) or "already exists" in str(e).lower():
|
|
try:
|
|
q = "subscribers.email = '" + email.replace("'", "''") + "'"
|
|
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
|
|
results = found.get("data", {}).get("results", [])
|
|
if results:
|
|
sid = results[0]["id"]
|
|
existing_attribs = results[0].get("attribs") or {}
|
|
# Merge: keep prior fields, overwrite with this segment's
|
|
# values (npi/practice/due-date/days_until/days_overdue).
|
|
merged = {**existing_attribs, **attribs}
|
|
lm(f"/subscribers/{sid}", {
|
|
"email": email, "name": name or email.split("@")[0],
|
|
"attribs": merged,
|
|
}, "PUT")
|
|
lm("/subscribers/lists", {"ids": [sid], "action": "add",
|
|
"target_list_ids": [list_id],
|
|
"status": "confirmed"}, "PUT")
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
|
|
def ensure_campaign(seg_key: str, list_id: int) -> int:
|
|
# Reuse an existing warmup campaign for this segment only if it's still
|
|
# ACTIVE (draft / running / paused / scheduled). A finished/cancelled one
|
|
# can't accept new subscribers or restart, so we create a fresh dated one --
|
|
# that also picks up the latest email template (the canonical hc_*.html).
|
|
from datetime import date
|
|
seg = SEGMENTS[seg_key]
|
|
ACTIVE = {"draft", "running", "paused", "scheduled"}
|
|
res = lm("/campaigns?per_page=100")
|
|
for c in res.get("data", {}).get("results", []):
|
|
if c["name"].startswith(seg["campaign_name"]) and c.get("status") in ACTIVE:
|
|
return c["id"]
|
|
body = open(template_path(seg_key)).read()
|
|
dated = f"{seg['campaign_name']} - {date.today():%b %d %Y}"
|
|
payload = {
|
|
"name": dated, "subject": seg["subject"], "lists": [list_id],
|
|
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
|
|
"body": body, "altbody": html_to_text(body), "messenger": "email",
|
|
"tags": ["healthcare", "warmup", seg_key],
|
|
"headers": [{"Reply-To": REPLY_TO},
|
|
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
|
|
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
|
|
}
|
|
res = lm("/campaigns", payload)
|
|
return res["data"]["id"]
|
|
|
|
|
|
def row_matches(seg_key: str, r: dict) -> bool:
|
|
"""Does this master-CSV row belong to a segment? Driven by the segment's
|
|
`selector` so the row->segment mapping lives next to the segment metadata."""
|
|
sel = SEGMENTS[seg_key]["selector"]
|
|
status = (r.get("reval_status") or "").strip().lower()
|
|
excluded = (r.get("leie_excluded") or "").strip() not in ("", "0", "false")
|
|
optout = (r.get("optout_ending") or "").strip() != ""
|
|
if sel == "reval_overdue":
|
|
# Only SLIGHTLY-overdue: recently-lapsed practices are still active with
|
|
# deliverable inboxes. Heavily-overdue ones likely bounce and burn the
|
|
# warming IP's reputation, so we hold them out of the warmup window.
|
|
if status != "overdue":
|
|
return False
|
|
od = _overdue_days(r)
|
|
return od is not None and WARMUP_OVERDUE_MIN <= od <= WARMUP_OVERDUE_MAX
|
|
if sel == "reval_due_soon":
|
|
# Proactive: revalidation is UPCOMING within the lookahead window. Pitch
|
|
# is "handle it before your deadline" -- taps the same CMS Revalidation
|
|
# Due Date List as reval_overdue but the (much larger) not-yet-due slice,
|
|
# so it grows warmup supply without touching a new data source.
|
|
# days_overdue is negative for upcoming (days until due), so a provider
|
|
# due in N days has days_overdue == -N.
|
|
if status != "upcoming":
|
|
return False
|
|
od = _overdue_days(r)
|
|
if od is None:
|
|
return False
|
|
days_until = -od
|
|
return WARMUP_DUE_SOON_MIN <= days_until <= WARMUP_DUE_SOON_MAX
|
|
if sel == "reval_upcoming": return status == "upcoming"
|
|
if sel == "leie_or_deactivated":
|
|
# Reactivation targets: flagged excluded, OR no longer on the reval list
|
|
# (a strong deactivation proxy once revalidation lapses).
|
|
return excluded or status in ("not_on_list", "no_reval_flag")
|
|
if sel == "optout_ending": return optout
|
|
if sel == "any":
|
|
# OIG screening applies to any billing practice, but for warmup we still
|
|
# exclude the likely-undeliverable: providers heavily overdue (stale) or
|
|
# already dropped off the reval list. Recently-lapsed and upcoming stay.
|
|
if status in ("not_on_list", "no_reval_flag"):
|
|
return False
|
|
od = _overdue_days(r)
|
|
if status == "overdue" and (od is None or od > WARMUP_OVERDUE_MAX):
|
|
return False
|
|
return True
|
|
if sel == "institutional_verified":
|
|
# For the freshly SMTP-VERIFIED institutional list, "not on the CMS
|
|
# revalidation list" does NOT mean undeliverable -- it just means the org
|
|
# NPI is not an individual Medicare enrollee. We already proved the inbox
|
|
# is live (verify_ok), so trust that instead of using reval-list presence
|
|
# as a deliverability proxy. Still hold out the heavily-overdue (stale)
|
|
# individual enrollees that DO appear, to protect the warming IP.
|
|
if (str(r.get("verify_ok", "")).strip().upper() not in ("Y", "YES", "TRUE", "1", "")):
|
|
return False
|
|
od = _overdue_days(r)
|
|
if status == "overdue" and od is not None and od > WARMUP_OVERDUE_MAX:
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
|
|
def attribs_for(r: dict) -> dict:
|
|
# days_overdue is positive when past due and negative when upcoming (days
|
|
# until the due date). Expose a clean positive "days_until" for the
|
|
# due-soon segment's template.
|
|
od_raw = (str(r.get("days_overdue", "")) or "").strip()
|
|
days_until = ""
|
|
try:
|
|
od = int(od_raw)
|
|
if od < 0:
|
|
days_until = str(-od)
|
|
except ValueError:
|
|
pass
|
|
return {
|
|
"npi": r.get("npi", ""),
|
|
"practice": r.get("name", ""),
|
|
"specialty": r.get("specialty", ""),
|
|
"state": r.get("state", ""),
|
|
# Separate fields so the email's "official CMS record" card can render
|
|
# the due date + overdue/until count cleanly (mirrors the CMS
|
|
# Revalidation Due Date List, verified by NPI via the weekly refresh).
|
|
"reval_due_date": r.get("reval_due_date", ""),
|
|
"days_overdue": str(r.get("days_overdue", "")),
|
|
"days_until": days_until,
|
|
# MX operator (for per-operator analysis + throttling audit).
|
|
"mx_provider": r.get("mx_provider", ""),
|
|
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
|
|
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
|
|
}
|
|
|
|
|
|
def warm_segment(seg_key: str, rows: list[dict], slice_n: int,
|
|
dry_run: bool, start_campaign: bool) -> int:
|
|
"""Import up to slice_n new subscribers for one segment and keep its
|
|
campaign active. Returns how many NEW subscribers were imported."""
|
|
seg = SEGMENTS[seg_key]
|
|
imported = load_imported(seg_key)
|
|
suppressed = load_suppressed()
|
|
candidates = [r for r in rows
|
|
if r.get("email", "").strip()
|
|
and r["email"].strip().lower() not in imported
|
|
and r["email"].strip().lower() not in suppressed
|
|
and not _is_google_hosted(r)
|
|
and row_matches(seg_key, r)]
|
|
# Spread the slice across MX operators so no single receiving system (e.g.
|
|
# Microsoft 365) gets the whole batch. Caps ramp with the warmup day.
|
|
todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day()))
|
|
print(f"[hc-cron] {seg_key}: candidates={len(candidates)} "
|
|
f"already={len(imported)} to_import={len(todo)}")
|
|
|
|
if dry_run:
|
|
for r in todo[:3]:
|
|
print(f" would import: {r['email']} {r.get('name','')[:28]} "
|
|
f"status={r.get('reval_status','')}")
|
|
return 0
|
|
if not todo:
|
|
return 0
|
|
|
|
list_id = get_or_create_list(seg["list_name"], ["healthcare", "warmup", seg_key])
|
|
n_ok = 0
|
|
for r in todo:
|
|
email = r["email"].strip().lower()
|
|
if add_subscriber(list_id, email, r.get("name") or "", attribs_for(r)):
|
|
imported.add(email); n_ok += 1
|
|
save_imported(seg_key, imported)
|
|
cid = ensure_campaign(seg_key, list_id)
|
|
if start_campaign:
|
|
try:
|
|
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
|
|
except SystemExit as e:
|
|
# Already running raises; that's fine.
|
|
if "already" not in str(e).lower():
|
|
print(f"[hc-cron] {seg_key}: start warning: {e}")
|
|
print(f"[hc-cron] {seg_key}: imported {n_ok} into list {list_id}; campaign={cid}")
|
|
return n_ok
|
|
|
|
|
|
def _all_list_subscribers(list_id: int):
|
|
"""Yield (id, email) for every subscriber on a list, paging the API."""
|
|
page, per = 1, 1000
|
|
while True:
|
|
q = urllib.parse.urlencode({"list_id": list_id, "page": page, "per_page": per})
|
|
res = lm("/subscribers?" + q)
|
|
results = res.get("data", {}).get("results", []) or []
|
|
for s in results:
|
|
yield s["id"], (s.get("email") or "").strip().lower()
|
|
if len(results) < per:
|
|
break
|
|
page += 1
|
|
|
|
|
|
def prune_holdouts(dry_run: bool) -> int:
|
|
"""Belt-and-suspenders: remove subscribers who should NOT be in the warmup
|
|
from the active warmup lists, even if they were imported before a guard
|
|
existed or their domain's MX has since flipped to Google. We match against
|
|
the FRESH MASTER CSV (re-classified weekly by hc_data_refresh), not the
|
|
listmonk attribs snapshot, so a domain that newly became Google-hosted is
|
|
caught here. Returns the number of (subscriber, list) removals."""
|
|
master_path = os.getenv("HC_MASTER_CSV", os.path.join(STATE_DIR, "hc_warmup_week1.csv"))
|
|
if not os.path.exists(master_path):
|
|
print(f"[hc-cron] prune: master {master_path} not found, skipping")
|
|
return 0
|
|
rows = list(csv.DictReader(open(master_path)))
|
|
by_email = {r.get("email", "").strip().lower(): r for r in rows if r.get("email")}
|
|
suppressed = load_suppressed()
|
|
removed = 0
|
|
for seg_key, seg in SEGMENTS.items():
|
|
try:
|
|
res = lm("/lists?per_page=100")
|
|
list_id = next((l["id"] for l in res.get("data", {}).get("results", [])
|
|
if l["name"] == seg["list_name"]), None)
|
|
except SystemExit:
|
|
list_id = None
|
|
if not list_id:
|
|
continue
|
|
drop_ids = []
|
|
for sid, email in _all_list_subscribers(list_id):
|
|
# Always remove anyone on the permanent do-not-contact list (they
|
|
# told us to stop / already revalidated), regardless of source data.
|
|
if email in suppressed:
|
|
drop_ids.append(sid)
|
|
continue
|
|
r = by_email.get(email)
|
|
if r is None:
|
|
continue # not in our source data; leave it alone
|
|
# DELIVERABILITY-only prune: remove subscribers whose domain is now
|
|
# Google-hosted (would hard-bounce from the cold IP). We deliberately
|
|
# do NOT evict for audience reasons (e.g. an overdue provider drifting
|
|
# out of the 1-90 day window) -- they were a valid target when warmed
|
|
# and re-evaluating audience on already-engaged people just wastes
|
|
# warmup progress. The import-time guard handles audience for NEW adds.
|
|
if _is_google_hosted(r):
|
|
drop_ids.append(sid)
|
|
if drop_ids:
|
|
print(f"[hc-cron] prune {seg_key} (list {list_id}): "
|
|
f"{len(drop_ids)} holdouts to remove")
|
|
if not dry_run:
|
|
# Bulk unsubscribe + detach from this list (chunked).
|
|
for i in range(0, len(drop_ids), 500):
|
|
chunk = drop_ids[i:i + 500]
|
|
lm("/subscribers/lists", {"ids": chunk, "action": "remove",
|
|
"target_list_ids": [list_id]}, "PUT")
|
|
removed += len(drop_ids)
|
|
print(f"[hc-cron] prune: removed {removed} subscriber-list holdouts")
|
|
return removed
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--dry-run", action="store_true")
|
|
ap.add_argument("--slice", type=int, default=0,
|
|
help="override TOTAL daily import slice across segments (0=ramp default)")
|
|
ap.add_argument("--segments", default=",".join(ACTIVE_SEGMENTS),
|
|
help="comma list of segment keys to warm")
|
|
ap.add_argument("--start-campaign", action="store_true",
|
|
help="flip campaigns to 'running' (otherwise left as draft for approval)")
|
|
ap.add_argument("--prune", action="store_true",
|
|
help="also remove now-Google-hosted / out-of-audience subscribers "
|
|
"from the warmup lists (run after the weekly refresh)")
|
|
ap.add_argument("--prune-only", action="store_true",
|
|
help="run ONLY the deliverability prune, then exit (no import/warm)")
|
|
args = ap.parse_args()
|
|
|
|
day = warmup_day()
|
|
total_slice = args.slice or daily_slice(day)
|
|
segments = [s.strip() for s in args.segments.split(",") if s.strip() in SEGMENTS]
|
|
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} "
|
|
f"total_slice={total_slice} segments={segments}")
|
|
|
|
rows = list(csv.DictReader(open(VERIFIED_CSV)))
|
|
print(f"[hc-cron] verified_total={len(rows)}")
|
|
|
|
if args.prune or args.prune_only:
|
|
prune_holdouts(args.dry_run)
|
|
if args.prune_only:
|
|
return
|
|
|
|
# Split the daily slice across segments. Revalidation (the lead, richest
|
|
# data) gets ~half; the rest share the remainder evenly. The lead reclaims
|
|
# any rounding remainder so the total never exceeds the warming-rate budget.
|
|
lead = "revalidation_overdue"
|
|
others = [s for s in segments if s != lead]
|
|
per_seg = {}
|
|
if lead in segments:
|
|
per_seg[lead] = max(1, int(total_slice * 0.5))
|
|
rem = total_slice - per_seg[lead]
|
|
else:
|
|
rem = total_slice
|
|
if others and rem > 0:
|
|
base, extra = divmod(rem, len(others))
|
|
for i, s in enumerate(others):
|
|
per_seg[s] = base + (1 if i < extra else 0)
|
|
elif others:
|
|
for s in others:
|
|
per_seg[s] = 0
|
|
# Reclaim any rounding remainder onto the lead so sum(per_seg) == total_slice
|
|
# exactly (never overshoot the rate cap, never silently drop budget).
|
|
if lead in per_seg:
|
|
per_seg[lead] += total_slice - sum(per_seg.values())
|
|
|
|
grand = 0
|
|
for seg_key in segments:
|
|
grand += warm_segment(seg_key, rows, per_seg.get(seg_key, 0),
|
|
args.dry_run, args.start_campaign)
|
|
print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|