diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py index ee7104e..3f7b392 100644 --- a/scripts/build_healthcare_campaigns_cron.py +++ b/scripts/build_healthcare_campaigns_cron.py @@ -84,6 +84,20 @@ def _overdue_days(r: dict): return None +# During warmup, hold out Google-Workspace-hosted domains: Google rejects bulk +# mail from cold/warming IPs hard (550-5.7.1), and those bounces wreck the +# warming reputation. The mx_provider flag is set by the weekly hc_data_refresh +# (an MX lookup, since a custom domain can silently use Google Workspace). Set +# HC_SKIP_GOOGLE=0 to lift this once the IPs are warm. +SKIP_GOOGLE = os.getenv("HC_SKIP_GOOGLE", "1") not in ("0", "false", "no") + + +def _is_google_hosted(r: dict) -> bool: + if not SKIP_GOOGLE: + return False + return (r.get("mx_provider") or "").strip().lower() == "google" + + def warmup_day() -> int: try: start = int(open(WARMUP_STAMP).read().strip()) @@ -265,6 +279,7 @@ def warm_segment(seg_key: str, rows: list[dict], slice_n: int, candidates = [r for r in rows if r.get("email", "").strip() and r["email"].strip().lower() not in imported + and not _is_google_hosted(r) and row_matches(seg_key, r)] todo = candidates[:slice_n] print(f"[hc-cron] {seg_key}: candidates={len(candidates)} " diff --git a/scripts/hc_data_refresh.py b/scripts/hc_data_refresh.py index 1407e39..ffd38fb 100644 --- a/scripts/hc_data_refresh.py +++ b/scripts/hc_data_refresh.py @@ -44,7 +44,7 @@ UA = "PerformanceWest-HCRefresh/1.0 (compliance@performancewest.net)" # campaign cron's column expectations never change). HEADER = ["npi", "email", "stream", "name", "specialty", "state", "reval_due_date", "days_overdue", "reval_status", - "leie_excluded", "optout_ending"] + "leie_excluded", "optout_ending", "mx_provider"] def log(*a): @@ -59,6 +59,42 @@ def http_json(url: str, timeout: int = 30): # ── Source pulls ──────────────────────────────────────────────────────────── +# Mail providers whose MX indicates the domain is hosted by Google Workspace. +# Google rejects bulk mail from cold/warming IPs hard (550-5.7.1), so these must +# be held out of the warmup -- and the only reliable signal is the MX record, +# since a custom domain (e.g. practice.com) can silently use Google Workspace. +_GOOGLE_MX_SUFFIXES = ("google.com", "googlemail.com", "aspmx.l.google.com") + + +def classify_mx(domain: str) -> str: + """Return 'google' if the domain's MX is Google-hosted, else 'other'. + Best-effort: DNS failures classify as 'other' (we don't want a transient + resolver error to permanently exclude a deliverable domain).""" + try: + import dns.resolver # type: ignore + answers = dns.resolver.resolve(domain, "MX", lifetime=5) + hosts = [str(r.exchange).rstrip(".").lower() for r in answers] + except Exception: + return "other" + for h in hosts: + if any(h == s or h.endswith("." + s) or h.endswith(s) for s in _GOOGLE_MX_SUFFIXES): + return "google" + return "other" + + +def classify_domains_mx(emails: list[str]) -> dict[str, str]: + """Map each unique email domain -> 'google'/'other' via one MX lookup per + domain (cached), so the daily campaign cron can skip Google-hosted addresses + during warmup without re-resolving.""" + domains = sorted({e.split("@", 1)[1].strip().lower() for e in emails if "@" in e}) + out: dict[str, str] = {} + for i, d in enumerate(domains, 1): + out[d] = classify_mx(d) + if i % 100 == 0: + log(f" mx: classified {i}/{len(domains)} domains") + return out + + def sam_key() -> str | None: t = os.getenv("SAM_GOV_API_KEY") if t: @@ -206,6 +242,7 @@ def main() -> int: help="crawl first N SAM exclusion pages for an NPI cross-flag (slow; default off)") ap.add_argument("--skip-cms", action="store_true") ap.add_argument("--skip-oig", action="store_true") + ap.add_argument("--skip-mx", action="store_true", help="skip MX (Google-host) classification") args = ap.parse_args() if not os.path.exists(args.master): @@ -239,9 +276,21 @@ def main() -> int: excluded = leie | sam today = datetime.date.today() + # MX classification (Google Workspace vs other) for the warmup deliverability + # guard. Done once per unique domain. Skippable for a fast status-only run. + mx_map = {} + if not args.skip_mx: + all_emails = [r.get("email", "") for r in rows] + mx_map = classify_domains_mx(all_emails) + n_google = sum(1 for v in mx_map.values() if v == "google") + log(f"mx: {len(mx_map)} domains classified; {n_google} Google-hosted") + refreshed = [] for r in rows: npi = r["npi"].strip() + if mx_map: + dom = r.get("email", "").split("@", 1)[-1].strip().lower() + r["mx_provider"] = mx_map.get(dom, "other") if not npi: # No NPI to re-check; leave the row's existing status untouched. refreshed.append(r) @@ -287,7 +336,7 @@ def main() -> int: # optout_ending, which only the original list builder computes -- including # it here would blank it and starve the compliance_bundle segment). REFRESHED_FIELDS = ["reval_due_date", "days_overdue", "reval_status", - "leie_excluded", "name", "specialty", "state"] + "leie_excluded", "mx_provider", "name", "specialty", "state"] by_email = {r["email"].strip().lower(): r for r in refreshed if r.get("email")} channel_csvs = [os.path.join(args.out_dir, f) for f in ("hc_warmup_nongoogle.csv", "hc_warmup_google.csv", @@ -297,15 +346,20 @@ def main() -> int: continue with open(path, newline="") as f: rdr = csv.DictReader(f) - cols = rdr.fieldnames or [] + cols = list(rdr.fieldnames or []) rows_ch = list(rdr) + # Add any refreshed field the channel CSV doesn't have yet (e.g. a newly + # introduced mx_provider column) so the cron can read it. + for fld in REFRESHED_FIELDS: + if fld not in cols: + cols.append(fld) updated = 0 for r in rows_ch: m = by_email.get(r.get("email", "").strip().lower()) if not m: continue for fld in REFRESHED_FIELDS: - if fld in cols and fld in m: + if fld in m: r[fld] = m[fld] updated += 1 write_atomic(path, rows_ch, cols)