diff --git a/scripts/build_trucking_campaigns.py b/scripts/build_trucking_campaigns.py index 58ab58c..659a41f 100644 --- a/scripts/build_trucking_campaigns.py +++ b/scripts/build_trucking_campaigns.py @@ -217,6 +217,69 @@ def lm_api(path: str, data: dict | None = None, method: str | None = None): raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}") +_LM_SUPPRESSION_CACHE: dict[str, tuple[bool, str]] = {} + + +def _lm_email_query(email: str) -> str: + """Return a Listmonk subscriber query for an exact email address.""" + # Listmonk's query language uses single-quoted strings. Email addresses should + # not contain quotes, but escape defensively so a bad source row cannot break + # the query or broaden it. + safe = email.lower().strip().replace("'", "''") + return f"email = '{safe}'" + + +def listmonk_sendable(email: str) -> tuple[bool, str]: + """Return whether Listmonk will send to this address, with reason. + + SQL eligibility is not enough: an address may already exist in Listmonk as + disabled, blocklisted, bounced, or unsubscribed. If we add those rows to a + fresh campaign list, Listmonk filters them at send time, producing a smaller + `to_send` count than the builder's daily warmup allocation. Check status at + build time and fetch replacements so each campaign list is filled with + actually-sendable subscribers. + """ + key = email.lower().strip() + if not key: + return False, "blank_email" + if key in _LM_SUPPRESSION_CACHE: + return _LM_SUPPRESSION_CACHE[key] + try: + res = lm_api( + "/subscribers?" + urllib.parse.urlencode({ + "query": _lm_email_query(key), + "per_page": 1, + }) + ) + results = res.get("data", {}).get("results", []) + if not results: + out = (True, "new") + else: + sub = results[0] + status = (sub.get("status") or "").lower() + if status != "enabled": + out = (False, f"listmonk_status:{status or 'unknown'}") + else: + bad_list_status = "" + for lst in sub.get("lists") or []: + lstatus = (lst.get("subscription_status") or "").lower() + if lstatus in {"unsubscribed", "unconfirmed"}: + bad_list_status = lstatus + break + if bad_list_status: + out = (False, f"list_subscription_status:{bad_list_status}") + else: + out = (True, "enabled") + except Exception as exc: # noqa: BLE001 + # Do not silently underfill campaigns because Listmonk's lookup endpoint + # hiccupped. Let the caller/logs surface the problem; treating it as not + # sendable is safer than scheduling a suppressed address and marking it + # sent in our source DB. + out = (False, f"lookup_error:{exc}") + _LM_SUPPRESSION_CACHE[key] = out + return out + + def get_base_campaign(campaign_id: int) -> dict: return lm_api(f"/campaigns/{campaign_id}")["data"] @@ -261,6 +324,10 @@ def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool: ) results = subs.get("data", {}).get("results", []) if results: + status = (results[0].get("status") or "").lower() + if status != "enabled": + LOG.info("Listmonk suppressed existing subscriber %s status=%s", email, status or "unknown") + return False lm_api("/subscribers/lists", { "ids": [results[0]["id"]], "action": "add", @@ -452,6 +519,7 @@ def fetch_carriers( tz_states: tuple, campaign_type: str, limit: int, + offset: int = 0, ) -> list[tuple]: """Return (dot_number, email_address, legal_name, phy_state, target_state) rows. @@ -489,11 +557,55 @@ def fetch_carriers( AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND phy_state IN ({states_placeholder}) ORDER BY mcs150_parsed ASC NULLS LAST - LIMIT %s - """, target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit]) + LIMIT %s OFFSET %s + """, target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit, offset]) return cur.fetchall() +def select_sendable_carriers( + conn, + tz_states: tuple, + campaign_type: str, + quota: int, + max_scan: int | None = None, +) -> tuple[list[tuple], dict[str, int]]: + """Fetch rows until `quota` Listmonk-sendable carriers are found. + + Returns (rows, suppression_counts). Rows that are disabled/blocklisted/etc. + in Listmonk are skipped and not marked sent in Postgres. + """ + selected: list[tuple] = [] + skipped: dict[str, int] = {} + seen_emails: set[str] = set() + offset = 0 + # Warmup caps are small, but old audiences can contain many prior bounces or + # unsubscribes. Scan beyond the quota while still bounding worst-case API calls. + scan_limit = max_scan or max(quota * 8, quota + 500, 1000) + batch_size = min(max(quota * 2, 100), 1000) + + while len(selected) < quota and offset < scan_limit: + batch = fetch_carriers(conn, tz_states, campaign_type, batch_size, offset=offset) + if not batch: + break + offset += len(batch) + for row in batch: + email = (row[1] or "").lower().strip() + if email in seen_emails: + skipped["duplicate_email"] = skipped.get("duplicate_email", 0) + 1 + continue + seen_emails.add(email) + ok, reason = listmonk_sendable(email) + if not ok: + skipped[reason] = skipped.get(reason, 0) + 1 + continue + selected.append(row) + if len(selected) >= quota: + break + if len(batch) < batch_size: + break + return selected, skipped + + def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None: cur = conn.cursor() cur.execute(""" @@ -639,7 +751,16 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False, if fetch_limit <= 0: LOG.info("[%s/%s] Allocation is 0 — skipping", tz, campaign_type) continue - rows = fetch_carriers(conn, states, campaign_type, fetch_limit) + if preview: + rows = fetch_carriers(conn, states, campaign_type, fetch_limit) + suppressed: dict[str, int] = {} + else: + rows, suppressed = select_sendable_carriers(conn, states, campaign_type, fetch_limit) + if suppressed: + LOG.info("[%s/%s] Runtime Listmonk suppression skipped %d rows: %s", + tz, campaign_type, sum(suppressed.values()), ", ".join( + f"{reason}={n}" for reason, n in sorted(suppressed.items()) + )) if not rows: LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type) continue @@ -649,7 +770,7 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False, list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}" campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}" - LOG.info("[%s/%s] %d carriers -> %s (send %s UTC)", + LOG.info("[%s/%s] %d sendable carriers -> %s (send %s UTC)", tz, campaign_type, actual, campaign_name, effective_send_at.strftime("%Y-%m-%d %H:%M"))