diff --git a/infra/cron/pw-hc-refresh b/infra/cron/pw-hc-refresh index 6061d2d..1bd64ba 100644 --- a/infra/cron/pw-hc-refresh +++ b/infra/cron/pw-hc-refresh @@ -1,8 +1,10 @@ # Healthcare data refresh: weekly re-check of every emailable NPI against the -# live government sources (CMS Revalidation list, OIG LEIE) so warmup sends -# never go stale. Runs Mon 06:00 Central, ~1h before the 07:00 weekday send, -# and propagates fresh status into the channel CSVs the campaign cron reads. -# Takes ~8 min (per-NPI CMS lookups + 15MB OIG download). SAM is opt-in only -# (--sam-pages N); SAM exclusions rarely carry an NPI, so OIG LEIE is the -# NPI-bearing exclusion source for the cross-flag. -0 6 * * 1 deploy cd /opt/performancewest && python3 -u scripts/hc_data_refresh.py >> /var/log/pw-hc-refresh.log 2>&1 +# live government sources (CMS Revalidation list, OIG LEIE) + MX re-classification +# (Google-host detection) so warmup sends never go stale. Runs Mon 06:00 Central, +# ~1h before the 07:00 weekday send, propagating fresh status into the channel +# CSVs the campaign cron reads. Takes ~8 min. SAM is opt-in (--sam-pages); SAM +# exclusions rarely carry an NPI, so OIG LEIE is the NPI-bearing exclusion source. +# Then prune-only: remove any subscriber whose domain newly became Google-hosted +# from the warmup lists (deliverability safety net; removes only likely-bouncers, +# never evicts for audience reasons). +0 6 * * 1 deploy cd /opt/performancewest && python3 -u scripts/hc_data_refresh.py >> /var/log/pw-hc-refresh.log 2>&1 && python3 -u scripts/build_healthcare_campaigns_cron.py --prune-only >> /var/log/pw-hc-refresh.log 2>&1 diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py index 3f7b392..d2df7e0 100644 --- a/scripts/build_healthcare_campaigns_cron.py +++ b/scripts/build_healthcare_campaigns_cron.py @@ -312,6 +312,70 @@ def warm_segment(seg_key: str, rows: list[dict], slice_n: int, return n_ok +def _all_list_subscribers(list_id: int): + """Yield (id, email) for every subscriber on a list, paging the API.""" + page, per = 1, 1000 + while True: + q = urllib.parse.urlencode({"list_id": list_id, "page": page, "per_page": per}) + res = lm("/subscribers?" + q) + results = res.get("data", {}).get("results", []) or [] + for s in results: + yield s["id"], (s.get("email") or "").strip().lower() + if len(results) < per: + break + page += 1 + + +def prune_holdouts(dry_run: bool) -> int: + """Belt-and-suspenders: remove subscribers who should NOT be in the warmup + from the active warmup lists, even if they were imported before a guard + existed or their domain's MX has since flipped to Google. We match against + the FRESH MASTER CSV (re-classified weekly by hc_data_refresh), not the + listmonk attribs snapshot, so a domain that newly became Google-hosted is + caught here. Returns the number of (subscriber, list) removals.""" + master_path = os.getenv("HC_MASTER_CSV", os.path.join(STATE_DIR, "hc_warmup_week1.csv")) + if not os.path.exists(master_path): + print(f"[hc-cron] prune: master {master_path} not found, skipping") + return 0 + rows = list(csv.DictReader(open(master_path))) + by_email = {r.get("email", "").strip().lower(): r for r in rows if r.get("email")} + removed = 0 + for seg_key, seg in SEGMENTS.items(): + try: + res = lm("/lists?per_page=100") + list_id = next((l["id"] for l in res.get("data", {}).get("results", []) + if l["name"] == seg["list_name"]), None) + except SystemExit: + list_id = None + if not list_id: + continue + drop_ids = [] + for sid, email in _all_list_subscribers(list_id): + r = by_email.get(email) + if r is None: + continue # not in our source data; leave it alone + # DELIVERABILITY-only prune: remove subscribers whose domain is now + # Google-hosted (would hard-bounce from the cold IP). We deliberately + # do NOT evict for audience reasons (e.g. an overdue provider drifting + # out of the 1-90 day window) -- they were a valid target when warmed + # and re-evaluating audience on already-engaged people just wastes + # warmup progress. The import-time guard handles audience for NEW adds. + if _is_google_hosted(r): + drop_ids.append(sid) + if drop_ids: + print(f"[hc-cron] prune {seg_key} (list {list_id}): " + f"{len(drop_ids)} holdouts to remove") + if not dry_run: + # Bulk unsubscribe + detach from this list (chunked). + for i in range(0, len(drop_ids), 500): + chunk = drop_ids[i:i + 500] + lm("/subscribers/lists", {"ids": chunk, "action": "remove", + "target_list_ids": [list_id]}, "PUT") + removed += len(drop_ids) + print(f"[hc-cron] prune: removed {removed} subscriber-list holdouts") + return removed + + def main(): ap = argparse.ArgumentParser() ap.add_argument("--dry-run", action="store_true") @@ -321,6 +385,11 @@ def main(): help="comma list of segment keys to warm") ap.add_argument("--start-campaign", action="store_true", help="flip campaigns to 'running' (otherwise left as draft for approval)") + ap.add_argument("--prune", action="store_true", + help="also remove now-Google-hosted / out-of-audience subscribers " + "from the warmup lists (run after the weekly refresh)") + ap.add_argument("--prune-only", action="store_true", + help="run ONLY the deliverability prune, then exit (no import/warm)") args = ap.parse_args() day = warmup_day() @@ -332,6 +401,11 @@ def main(): rows = list(csv.DictReader(open(VERIFIED_CSV))) print(f"[hc-cron] verified_total={len(rows)}") + if args.prune or args.prune_only: + prune_holdouts(args.dry_run) + if args.prune_only: + return + # Split the daily slice across segments. Revalidation (the lead, richest # data) gets ~half; the rest share the remainder evenly. The lead reclaims # any rounding remainder so the total never exceeds the warming-rate budget.