diff --git a/data/hc_campaigns/hc_nppes_outdated.html b/data/hc_campaigns/hc_nppes_outdated.html index d7643ef..533be90 100644 --- a/data/hc_campaigns/hc_nppes_outdated.html +++ b/data/hc_campaigns/hc_nppes_outdated.html @@ -14,7 +14,7 @@
Hi {{ .Subscriber.Name }},
As a quick example, the public NPPES NPI Registry shows the record for {{ .Subscriber.Attribs.practice }} was last updated on {{ .Subscriber.Attribs.nppes_last_updated }} — about {{ .Subscriber.Attribs.nppes_years_stale }} years ago. That’s usually fine, but it’s only one of several things payers and CMS check. Our free tool runs your NPI against the public government sources in one place — no signup, no cost — and tells you exactly where you stand.
+{{ if .Subscriber.Attribs.nppes_last_updated }}As a quick example, the public NPPES NPI Registry shows the record for {{ .Subscriber.Attribs.practice }} was last updated on {{ .Subscriber.Attribs.nppes_last_updated }} — about {{ .Subscriber.Attribs.nppes_years_stale }} years ago. That’s usually fine, but it’s only one of several things payers and CMS check. {{ else }}Your NPI touches several public government records — NPPES, the Medicare revalidation list, and the federal exclusion lists — and any one of them being off can hold up your payments. {{ end }}Our free tool runs your NPI against those public sources in one place — no signup, no cost — and tells you exactly where you stand.
|
Your free check covers: @@ -31,7 +31,9 @@Payers, clearinghouses, and CMS pull from NPPES. A stale address, taxonomy, or contact can cause claim denials, mail you never receive, and failed credentialing. CMS requires you to correct your NPPES record within 30 days of any change.
|
| diff --git a/scripts/build_healthcare_campaigns.py b/scripts/build_healthcare_campaigns.py index 530954b..39bfae0 100644 --- a/scripts/build_healthcare_campaigns.py +++ b/scripts/build_healthcare_campaigns.py @@ -140,6 +140,32 @@ def template_path(seg_key: str) -> str: return os.path.join(OUT_DIR, SEGMENTS[seg_key]["template"]) +def _eval_conditionals(html: str, attribs: dict) -> str: + """Minimal evaluator for the listmonk/Go `{{ if .Subscriber.Attribs.X }}A + {{ else }}B{{ end }}` blocks used in the templates, so TEST/PREVIEW renders + match what listmonk produces at send time (listmonk itself evaluates these + server-side; this is only for the standalone preview/test-send path). Treats + an attribute as truthy when it is present and non-empty. Supports an optional + {{ else }} and is non-nested (which is all the templates use).""" + import re + pat = re.compile( + r"\{\{\s*if\s+\.Subscriber\.Attribs\.(\w+)\s*\}\}(.*?)" + r"(?:\{\{\s*else\s*\}\}(.*?))?\{\{\s*end\s*\}\}", + re.DOTALL, + ) + + def repl(m: "re.Match") -> str: + key, if_body, else_body = m.group(1), m.group(2), m.group(3) or "" + return if_body if str(attribs.get(key, "")).strip() else else_body + + # Loop until stable so adjacent/multiple blocks all resolve. + prev = None + while prev != html: + prev = html + html = pat.sub(repl, html) + return html + + def render(seg_key: str, *, test: bool = False) -> tuple[str, str]: """Return (subject, html) for a segment. The html is the canonical data/hc_campaigns/ file -- the single source of truth. For test @@ -148,6 +174,9 @@ def render(seg_key: str, *, test: bool = False) -> tuple[str, str]: s = SEGMENTS[seg_key] html = open(template_path(seg_key)).read() if test: + # Resolve {{ if .Subscriber.Attribs.X }} blocks first (listmonk does this + # server-side on real sends), using SAMPLE as the attrib source. + html = _eval_conditionals(html, SAMPLE) html = (html .replace("{{ .Subscriber.Name }}", SAMPLE["name"]) .replace("{{ .Subscriber.Attribs.npi }}", SAMPLE["npi"]) diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py index 2f2287a..2a4bfa6 100644 --- a/scripts/build_healthcare_campaigns_cron.py +++ b/scripts/build_healthcare_campaigns_cron.py @@ -247,6 +247,24 @@ def save_imported(seg_key: str, emails: set[str]): f.write("\n".join(sorted(emails)) + "\n") +def load_all_imported() -> set[str]: + """Union of EVERY segment's imported-emails state, i.e. everyone who has + already been emailed by ANY segment. Used as a cross-segment AND cross-cron + guard so a provider gets exactly one healthcare email overall: the two crons + (pw-hc-campaign on the small warmup file, pw-hc-nppes on the 63k institutional + file) share these state files, and ~312 emails overlap both files, so without + this a provider warmed as 'revalidation_overdue' by one cron could also be + warmed as the free 'nppes_outdated' check by the other. Reads all + hc_imported_*.txt plus the legacy single-segment file.""" + seen: set[str] = set() + for key in SEGMENTS: + seen |= load_imported(key) + legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt") + if os.path.exists(legacy): + seen |= {ln.strip().lower() for ln in open(legacy) if ln.strip()} + return seen + + def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool: try: lm("/subscribers", { @@ -410,14 +428,25 @@ def assign_segment(r: dict, active_segments: list[str]) -> str | None: def assign_all(rows: list[dict], active_segments: list[str]) -> dict[str, str]: """Map email -> assigned segment across the whole list, so each segment's - importer can claim only its assigned providers. Computed once per run.""" + importer can claim only its assigned providers. Computed once per run. + + An email can appear on MULTIPLE rows (a shared practice inbox covering + several NPIs, e.g. a credentialing address) and those rows can carry + DIFFERENT statuses (one NPI overdue, another not on the list). We must keep + the MOST-URGENT assignment across all of that email's rows -- otherwise a + later, less-urgent row would clobber an earlier urgent one and the provider + would get the free check instead of the overdue email. So we compare + priorities and keep the winner (lower number = more urgent).""" out: dict[str, str] = {} for r in rows: email = (r.get("email") or "").strip().lower() if not email: continue seg = assign_segment(r, active_segments) - if seg is not None: + if seg is None: + continue + prev = out.get(email) + if prev is None or _seg_priority(seg) < _seg_priority(prev): out[email] = seg return out @@ -472,24 +501,47 @@ def warm_segment(seg_key: str, rows: list[dict], slice_n: int, keeps working unchanged.""" seg = SEGMENTS[seg_key] imported = load_imported(seg_key) + # Cross-segment + cross-cron guard: skip anyone already emailed by ANY + # segment so each provider gets exactly one healthcare email overall. + already_anywhere = load_all_imported() suppressed = load_suppressed() def _is_candidate(r: dict) -> bool: email = r.get("email", "").strip().lower() - if not email or email in imported or email in suppressed: + if not email or email in already_anywhere or email in suppressed: return False if _is_google_hosted(r): return False if assignment is not None: - return assignment.get(email) == seg_key + # The email must be assigned to THIS segment AND this specific row + # must be the one that earns it. An email can span several rows (a + # shared practice inbox over multiple NPIs); only the row whose own + # status matches this segment's selector should represent it, so the + # template renders that row's real data (e.g. the overdue NPI's due + # date, never a sibling 'not_on_list' row's blank one). This also + # dedupes: at most one row per email passes. + return assignment.get(email) == seg_key and row_matches(seg_key, r) return row_matches(seg_key, r) - candidates = [r for r in rows if _is_candidate(r)] + # Dedupe by email: an email can legitimately appear on multiple matching + # rows (e.g. two overdue NPIs share one inbox). Keep the first so the email + # is imported once and counted once against the slice budget. + candidates = [] + seen_emails: set[str] = set() + for r in rows: + if not _is_candidate(r): + continue + email = r["email"].strip().lower() + if email in seen_emails: + continue + seen_emails.add(email) + candidates.append(r) # Spread the slice across MX operators so no single receiving system (e.g. # Microsoft 365) gets the whole batch. Caps ramp with the warmup day. todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day())) print(f"[hc-cron] {seg_key}: candidates={len(candidates)} " - f"already={len(imported)} to_import={len(todo)}") + f"in_segment={len(imported)} emailed_anywhere={len(already_anywhere)} " + f"to_import={len(todo)}") if dry_run: for r in todo[:3]: |