healthcare: fix 4 bugs in segment-assignment + free-check email
Found during a bug-review pass of the one-email-per-provider work:
1. assign_all overwrite bug: an email on MULTIPLE rows (shared practice inbox /
multiple NPIs -- 2,592 such emails, 299 with mixed status) was assigned by
the LAST row, so a less-urgent row could clobber an urgent one (overdue ->
free check). Now keeps the most-urgent (lowest-priority) assignment.
2. warm_segment double-import + wrong-row render: all of an email's rows passed
the candidate filter, so it could be imported twice (over-counting the slice)
and attribs_for could render a sibling row's blank due-date in the overdue
email. Now requires row_matches(seg) for the specific row AND dedupes by
email (one row per email).
3. free-check email rendered broken text ('last updated on -- about years
ago', 'Last updated . ~ yrs ago') for any provider whose NPPES date isn't
cached yet (the free check goes to everyone, and the fill is gradual). Wrapped
the example sentence + official-record card in listmonk {{ if
.nppes_last_updated }}...{{ else }}...{{ end }}; added a date-free else
branch. altbody keeps the conditionals (listmonk evaluates body+altbody), and
the test/preview renderer gained a minimal {{ if/else/end }} evaluator so
previews match real sends. Verified both branches render with zero unfilled
tokens.
4. cross-cron double-send: pw-hc-campaign (warmup file) and pw-hc-nppes (63k
file) share state but tracked imports per-segment; 312 emails overlap both
files, so a provider could get an urgent email from one cron AND the free
check from the other. Added load_all_imported() global guard (union of all
segment state) so each provider gets exactly one healthcare email overall.
All verified: assignment regression test (10 cases) + new dup-email/guard checks
pass; all 6 templates render clean.
This commit is contained in:
parent
0320dc17ba
commit
1acae2f20c
3 changed files with 92 additions and 8 deletions
|
|
@ -140,6 +140,32 @@ def template_path(seg_key: str) -> str:
|
|||
return os.path.join(OUT_DIR, SEGMENTS[seg_key]["template"])
|
||||
|
||||
|
||||
def _eval_conditionals(html: str, attribs: dict) -> str:
|
||||
"""Minimal evaluator for the listmonk/Go `{{ if .Subscriber.Attribs.X }}A
|
||||
{{ else }}B{{ end }}` blocks used in the templates, so TEST/PREVIEW renders
|
||||
match what listmonk produces at send time (listmonk itself evaluates these
|
||||
server-side; this is only for the standalone preview/test-send path). Treats
|
||||
an attribute as truthy when it is present and non-empty. Supports an optional
|
||||
{{ else }} and is non-nested (which is all the templates use)."""
|
||||
import re
|
||||
pat = re.compile(
|
||||
r"\{\{\s*if\s+\.Subscriber\.Attribs\.(\w+)\s*\}\}(.*?)"
|
||||
r"(?:\{\{\s*else\s*\}\}(.*?))?\{\{\s*end\s*\}\}",
|
||||
re.DOTALL,
|
||||
)
|
||||
|
||||
def repl(m: "re.Match") -> str:
|
||||
key, if_body, else_body = m.group(1), m.group(2), m.group(3) or ""
|
||||
return if_body if str(attribs.get(key, "")).strip() else else_body
|
||||
|
||||
# Loop until stable so adjacent/multiple blocks all resolve.
|
||||
prev = None
|
||||
while prev != html:
|
||||
prev = html
|
||||
html = pat.sub(repl, html)
|
||||
return html
|
||||
|
||||
|
||||
def render(seg_key: str, *, test: bool = False) -> tuple[str, str]:
|
||||
"""Return (subject, html) for a segment. The html is the canonical
|
||||
data/hc_campaigns/<template> file -- the single source of truth. For test
|
||||
|
|
@ -148,6 +174,9 @@ def render(seg_key: str, *, test: bool = False) -> tuple[str, str]:
|
|||
s = SEGMENTS[seg_key]
|
||||
html = open(template_path(seg_key)).read()
|
||||
if test:
|
||||
# Resolve {{ if .Subscriber.Attribs.X }} blocks first (listmonk does this
|
||||
# server-side on real sends), using SAMPLE as the attrib source.
|
||||
html = _eval_conditionals(html, SAMPLE)
|
||||
html = (html
|
||||
.replace("{{ .Subscriber.Name }}", SAMPLE["name"])
|
||||
.replace("{{ .Subscriber.Attribs.npi }}", SAMPLE["npi"])
|
||||
|
|
|
|||
|
|
@ -247,6 +247,24 @@ def save_imported(seg_key: str, emails: set[str]):
|
|||
f.write("\n".join(sorted(emails)) + "\n")
|
||||
|
||||
|
||||
def load_all_imported() -> set[str]:
|
||||
"""Union of EVERY segment's imported-emails state, i.e. everyone who has
|
||||
already been emailed by ANY segment. Used as a cross-segment AND cross-cron
|
||||
guard so a provider gets exactly one healthcare email overall: the two crons
|
||||
(pw-hc-campaign on the small warmup file, pw-hc-nppes on the 63k institutional
|
||||
file) share these state files, and ~312 emails overlap both files, so without
|
||||
this a provider warmed as 'revalidation_overdue' by one cron could also be
|
||||
warmed as the free 'nppes_outdated' check by the other. Reads all
|
||||
hc_imported_*.txt plus the legacy single-segment file."""
|
||||
seen: set[str] = set()
|
||||
for key in SEGMENTS:
|
||||
seen |= load_imported(key)
|
||||
legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt")
|
||||
if os.path.exists(legacy):
|
||||
seen |= {ln.strip().lower() for ln in open(legacy) if ln.strip()}
|
||||
return seen
|
||||
|
||||
|
||||
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
|
||||
try:
|
||||
lm("/subscribers", {
|
||||
|
|
@ -410,14 +428,25 @@ def assign_segment(r: dict, active_segments: list[str]) -> str | None:
|
|||
|
||||
def assign_all(rows: list[dict], active_segments: list[str]) -> dict[str, str]:
|
||||
"""Map email -> assigned segment across the whole list, so each segment's
|
||||
importer can claim only its assigned providers. Computed once per run."""
|
||||
importer can claim only its assigned providers. Computed once per run.
|
||||
|
||||
An email can appear on MULTIPLE rows (a shared practice inbox covering
|
||||
several NPIs, e.g. a credentialing address) and those rows can carry
|
||||
DIFFERENT statuses (one NPI overdue, another not on the list). We must keep
|
||||
the MOST-URGENT assignment across all of that email's rows -- otherwise a
|
||||
later, less-urgent row would clobber an earlier urgent one and the provider
|
||||
would get the free check instead of the overdue email. So we compare
|
||||
priorities and keep the winner (lower number = more urgent)."""
|
||||
out: dict[str, str] = {}
|
||||
for r in rows:
|
||||
email = (r.get("email") or "").strip().lower()
|
||||
if not email:
|
||||
continue
|
||||
seg = assign_segment(r, active_segments)
|
||||
if seg is not None:
|
||||
if seg is None:
|
||||
continue
|
||||
prev = out.get(email)
|
||||
if prev is None or _seg_priority(seg) < _seg_priority(prev):
|
||||
out[email] = seg
|
||||
return out
|
||||
|
||||
|
|
@ -472,24 +501,47 @@ def warm_segment(seg_key: str, rows: list[dict], slice_n: int,
|
|||
keeps working unchanged."""
|
||||
seg = SEGMENTS[seg_key]
|
||||
imported = load_imported(seg_key)
|
||||
# Cross-segment + cross-cron guard: skip anyone already emailed by ANY
|
||||
# segment so each provider gets exactly one healthcare email overall.
|
||||
already_anywhere = load_all_imported()
|
||||
suppressed = load_suppressed()
|
||||
|
||||
def _is_candidate(r: dict) -> bool:
|
||||
email = r.get("email", "").strip().lower()
|
||||
if not email or email in imported or email in suppressed:
|
||||
if not email or email in already_anywhere or email in suppressed:
|
||||
return False
|
||||
if _is_google_hosted(r):
|
||||
return False
|
||||
if assignment is not None:
|
||||
return assignment.get(email) == seg_key
|
||||
# The email must be assigned to THIS segment AND this specific row
|
||||
# must be the one that earns it. An email can span several rows (a
|
||||
# shared practice inbox over multiple NPIs); only the row whose own
|
||||
# status matches this segment's selector should represent it, so the
|
||||
# template renders that row's real data (e.g. the overdue NPI's due
|
||||
# date, never a sibling 'not_on_list' row's blank one). This also
|
||||
# dedupes: at most one row per email passes.
|
||||
return assignment.get(email) == seg_key and row_matches(seg_key, r)
|
||||
return row_matches(seg_key, r)
|
||||
|
||||
candidates = [r for r in rows if _is_candidate(r)]
|
||||
# Dedupe by email: an email can legitimately appear on multiple matching
|
||||
# rows (e.g. two overdue NPIs share one inbox). Keep the first so the email
|
||||
# is imported once and counted once against the slice budget.
|
||||
candidates = []
|
||||
seen_emails: set[str] = set()
|
||||
for r in rows:
|
||||
if not _is_candidate(r):
|
||||
continue
|
||||
email = r["email"].strip().lower()
|
||||
if email in seen_emails:
|
||||
continue
|
||||
seen_emails.add(email)
|
||||
candidates.append(r)
|
||||
# Spread the slice across MX operators so no single receiving system (e.g.
|
||||
# Microsoft 365) gets the whole batch. Caps ramp with the warmup day.
|
||||
todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day()))
|
||||
print(f"[hc-cron] {seg_key}: candidates={len(candidates)} "
|
||||
f"already={len(imported)} to_import={len(todo)}")
|
||||
f"in_segment={len(imported)} emailed_anywhere={len(already_anywhere)} "
|
||||
f"to_import={len(todo)}")
|
||||
|
||||
if dry_run:
|
||||
for r in todo[:3]:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue