diff --git a/docker-compose.yml b/docker-compose.yml index bbc14fc..98b59e6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -131,6 +131,9 @@ services: - CAMPAIGN_HAZMAT_ID=${CAMPAIGN_HAZMAT_ID} volumes: - worker-data:/app/data + # Read-only host MTA warmup stamp so the trucking campaign builder caps + # daily queued recipients in lockstep with Postfix/Listmonk warmup. + - /etc/postfix/pw-warmup-start:/etc/postfix/pw-warmup-start:ro depends_on: - api-postgres restart: unless-stopped diff --git a/infra/ansible/roles/worker-crons/defaults/main.yml b/infra/ansible/roles/worker-crons/defaults/main.yml index 84149ee..23bd1c8 100644 --- a/infra/ansible/roles/worker-crons/defaults/main.yml +++ b/infra/ansible/roles/worker-crons/defaults/main.yml @@ -201,7 +201,7 @@ worker_crons: persistent: true # Trucking campaign builder — 3 AM EST (08:00 UTC) daily. - # Creates 8 Listmonk campaigns for the next day's sends (4 TZ × MCS-150 + inactive USDOT). + # Creates Listmonk campaigns for today's sends (4 TZ × active trucking segments). # 2,000 carriers/TZ for MCS-150; 1,000 carriers/TZ for inactive USDOT. - name: pw-trucking-campaigns description: Build daily trucking Listmonk campaigns (MCS-150 overdue + inactive USDOT) diff --git a/scripts/build_trucking_campaigns.py b/scripts/build_trucking_campaigns.py index dbbae88..58ab58c 100644 --- a/scripts/build_trucking_campaigns.py +++ b/scripts/build_trucking_campaigns.py @@ -27,10 +27,19 @@ import os import sys import urllib.request import urllib.parse +import math +import time from datetime import date, datetime, timedelta, timezone import psycopg2 +# Allow both supported invocation styles: +# python -m scripts.build_trucking_campaigns +# python3 scripts/build_trucking_campaigns.py +ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if ROOT not in sys.path: + sys.path.insert(0, ROOT) + from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS LOG = logging.getLogger("build_trucking_campaigns") @@ -160,6 +169,38 @@ USABLE_FILTER = ( ) DB_URL = os.getenv("DATABASE_URL", "") +WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start") + + +def warmup_day() -> int | None: + """Return days since MTA warmup start, or None if not configured/readable.""" + try: + with open(WARMUP_START_FILE, "r", encoding="utf-8") as fh: + start = int(fh.read().strip()) + return max(0, int((time.time() - start) // 86400)) + except Exception: + return None + + +def warmup_daily_queue_cap(day: int | None = None) -> int | None: + """Daily campaign-builder queue cap aligned with Listmonk rampcap targets. + + This prevents the builder from scheduling 10k-20k recipients/day while + Listmonk is throttled to only 500-3,000/day, which would create a massive + backlog that later drains unpredictably. The numbers match the runbook's + intended daily warmup totals, not the theoretical 24-hour sliding-window max. + """ + if day is None: + day = warmup_day() + if day is None: + return None + if day <= 1: + return 500 + if day <= 3: + return 1500 + if day <= 6: + return 2500 + return 3000 def lm_api(path: str, data: dict | None = None, method: str | None = None): @@ -314,6 +355,98 @@ def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: s LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc) +def campaign_type_filter(campaign_type: str) -> str: + if campaign_type == "mcs150": + return "mcs150_parsed < NOW() - INTERVAL '2 years'" + if campaign_type == "inactive": + return "oos_active IS TRUE" + if campaign_type in DEFICIENCY_SEGMENTS: + return DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"] + raise ValueError(f"unknown campaign_type {campaign_type!r}") + + +def campaign_weight(campaign_type: str) -> float: + """Warmup allocation weight by campaign specificity. + + Highly specific deficiency copy should generally get more early warmup share + than generic overdue/inactive notices because it is more relevant and less + repetitive at the mailbox-provider/content-fingerprint level. + """ + if campaign_type == "mcs150": + return 0.75 + if campaign_type == "inactive": + return 0.65 + return 1.25 + + +def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None = None) -> int: + cur = conn.cursor() + states_placeholder = ",".join(["%s"] * len(tz_states)) + type_filter = campaign_type_filter(campaign_type) + cur.execute(f""" + SELECT count(*) + FROM ( + SELECT 1 + FROM fmcsa_carriers + WHERE {type_filter} + AND {USABLE_FILTER} + AND listmonk_sent_at IS NULL + AND lower(split_part(email_address, '@', 2)) <> ALL(%s) + AND phy_state IN ({states_placeholder}) + LIMIT %s + ) s + """, [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit or 10_000_000]) + return int(cur.fetchone()[0]) + + +def allocate_daily_budget(candidates: list[dict], daily_cap: int | None) -> dict[tuple[str, str], int]: + """Allocate daily cap across (timezone, campaign_type) by weighted audience. + + The allocation is proportional to each slot's eligible lead count and campaign + specificity weight, capped by the slot's audience/limit. This avoids sending + the same number from a tiny hazmat segment and a massive MCS-150 segment while + still preserving body-text variety during warmup. + """ + positive = [c for c in candidates if c["audience"] > 0] + if daily_cap is None: + return {(c["tz"], c["campaign_type"]): c["audience"] for c in candidates} + remaining_cap = min(daily_cap, sum(c["audience"] for c in positive)) + quotas = {(c["tz"], c["campaign_type"]): 0 for c in candidates} + remaining = { (c["tz"], c["campaign_type"]): c["audience"] for c in positive } + weighted = { (c["tz"], c["campaign_type"]): c["audience"] * c["weight"] for c in positive } + + # Repeated largest-remainder passes handle slots that hit their audience cap. + while remaining_cap > 0 and remaining: + total_weight = sum(weighted[k] for k in remaining) + if total_weight <= 0: + break + raw = {k: remaining_cap * (weighted[k] / total_weight) for k in remaining} + assigned_this_pass = 0 + remainders: list[tuple[float, tuple[str, str]]] = [] + for k, val in raw.items(): + add = min(remaining[k], int(math.floor(val))) + quotas[k] += add + remaining[k] -= add + assigned_this_pass += add + if remaining[k] > 0: + remainders.append((val - math.floor(val), k)) + leftover = remaining_cap - assigned_this_pass + for _, k in sorted(remainders, reverse=True): + if leftover <= 0: + break + if remaining.get(k, 0) <= 0: + continue + quotas[k] += 1 + remaining[k] -= 1 + assigned_this_pass += 1 + leftover -= 1 + remaining_cap -= assigned_this_pass + remaining = {k: v for k, v in remaining.items() if v > 0} + if assigned_this_pass == 0: + break + return quotas + + def fetch_carriers( conn, tz_states: tuple, @@ -330,15 +463,7 @@ def fetch_carriers( """ cur = conn.cursor() states_placeholder = ",".join(["%s"] * len(tz_states)) - if campaign_type == "mcs150": - type_filter = "mcs150_parsed < NOW() - INTERVAL '2 years'" - elif campaign_type == "inactive": - type_filter = "oos_active IS TRUE" - elif campaign_type in DEFICIENCY_SEGMENTS: - # Flag-based segment: dedupe against already-sent and require the flag. - type_filter = DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"] - else: - raise ValueError(f"unknown campaign_type {campaign_type!r}") + type_filter = campaign_type_filter(campaign_type) # target_state expression: pull the state suffix out of the matching flag # for per-state programs, otherwise fall back to the base (phy_state). @@ -412,7 +537,8 @@ def list_segments(send_date: date) -> None: def run(send_date: date, dry_run: bool = False, preview: bool = False, max_per_segment: int | None = None, only_segments: set[str] | None = None, send_hour_override: int | None = None, send_minute: int = 0, - stagger_minutes: int = 0) -> None: + stagger_minutes: int = 0, daily_cap: int | None = None, + warmup_cap: bool = True) -> None: conn = psycopg2.connect(DB_URL) base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID) @@ -452,11 +578,46 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False, for (ct, base, lim, label) in campaign_specs ] + if daily_cap is None and warmup_cap and not preview: + daily_cap = warmup_daily_queue_cap() + if daily_cap is not None: + LOG.info("Daily queue cap active: %d recipients total", daily_cap) + if preview: LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, " "test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL) + candidates: list[dict] = [] + totals_by_campaign: dict[str, int] = {} + for tz, tz_cfg in TIMEZONE_CONFIG.items(): + for campaign_type, _base, limit, label in campaign_specs: + audience = 1 if preview else count_carriers(conn, tz_cfg["states"], campaign_type, limit) + weight = campaign_weight(campaign_type) + candidates.append({ + "tz": tz, + "states": tz_cfg["states"], + "campaign_type": campaign_type, + "limit": limit, + "label": label, + "audience": audience, + "weight": weight, + }) + totals_by_campaign[campaign_type] = totals_by_campaign.get(campaign_type, 0) + audience + + quotas = allocate_daily_budget(candidates, None if preview else daily_cap) + LOG.info("Eligible audience by SQL criterion: %s", ", ".join( + f"{ct}={n}" for ct, n in sorted(totals_by_campaign.items()) + )) + if daily_cap is not None and not preview: + planned_by_campaign: dict[str, int] = {} + for (tz, ctype), quota in quotas.items(): + planned_by_campaign[ctype] = planned_by_campaign.get(ctype, 0) + quota + LOG.info("Planned allocation by criterion: %s (total=%d/%d)", ", ".join( + f"{ct}={n}" for ct, n in sorted(planned_by_campaign.items()) + ), sum(planned_by_campaign.values()), daily_cap) + scheduled_count = 0 + queued_recipients = 0 for tz, tz_cfg in TIMEZONE_CONFIG.items(): states = tz_cfg["states"] @@ -467,8 +628,17 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False, ) for campaign_type, base, limit, label in campaign_specs: + if daily_cap is not None and queued_recipients >= daily_cap: + LOG.info("[%s/%s] Daily cap reached (%d/%d) — skipping remaining slots", + tz, campaign_type, queued_recipients, daily_cap) + continue effective_send_at = send_at + timedelta(minutes=stagger_minutes * scheduled_count) fetch_limit = 1 if preview else limit + if daily_cap is not None and not preview: + fetch_limit = min(fetch_limit, quotas.get((tz, campaign_type), 0)) + if fetch_limit <= 0: + LOG.info("[%s/%s] Allocation is 0 — skipping", tz, campaign_type) + continue rows = fetch_carriers(conn, states, campaign_type, fetch_limit) if not rows: LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type) @@ -486,6 +656,7 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False, if dry_run: LOG.info(" DRY RUN — skipping Listmonk + DB writes") scheduled_count += 1 + queued_recipients += actual continue # Build subscriber list. In preview, only the owner's address (with the @@ -522,6 +693,7 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False, LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid, "draft/preview" if preview else f"scheduled {effective_send_at.isoformat()}") scheduled_count += 1 + queued_recipients += added # Send a test to the owner so they can spot-check the rendered email send_test(base, cid, rows[0], label, tz, campaign_type) @@ -531,7 +703,8 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False, mark_sent(conn, [row[0] for row in rows], campaign_type) conn.close() - LOG.info("Done.") + LOG.info("Done. queued_recipients=%d%s", queued_recipients, + f" daily_cap={daily_cap}" if daily_cap is not None else "") def main(): @@ -543,7 +716,9 @@ def main(): parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes") parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent") parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)") - parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)") + parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: today)") + parser.add_argument("--tomorrow", action="store_true", + help="Schedule for tomorrow instead of today. Use only for manual pre-builds.") parser.add_argument("--max-per-segment", type=int, default=None, help="Cap audience per (segment x timezone). Use for fresh-IP warmup small sends.") parser.add_argument("--only-segment", action="append", default=None, metavar="SEG", @@ -554,12 +729,21 @@ def main(): help="Send minute (UTC), used with --send-hour. Default 0.") parser.add_argument("--stagger-minutes", type=int, default=0, help="Add this many minutes between each created campaign. Useful for catchup sends.") + parser.add_argument("--daily-cap", type=int, default=None, + help="Hard cap on recipients queued by this run. Default: warmup-derived cap when available.") + parser.add_argument("--no-warmup-cap", action="store_true", + help="Disable automatic daily queue cap from /etc/postfix/pw-warmup-start.") args = parser.parse_args() + if args.date and args.tomorrow: + parser.error("--date and --tomorrow are mutually exclusive") + if args.date: send_date = date.fromisoformat(args.date) - else: + elif args.tomorrow: send_date = date.today() + timedelta(days=1) + else: + send_date = date.today() if args.list_segments: list_segments(send_date) @@ -567,13 +751,15 @@ def main(): only = set(args.only_segment) if args.only_segment else None LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, " - "max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s)", + "max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s, daily_cap=%s, warmup_cap=%s)", send_date, args.dry_run, args.preview, args.max_per_segment, - sorted(only) if only else None, args.send_hour, args.stagger_minutes) + sorted(only) if only else None, args.send_hour, args.stagger_minutes, + args.daily_cap, not args.no_warmup_cap) run(send_date, dry_run=args.dry_run, preview=args.preview, max_per_segment=args.max_per_segment, only_segments=only, send_hour_override=args.send_hour, send_minute=args.send_minute, - stagger_minutes=args.stagger_minutes) + stagger_minutes=args.stagger_minutes, daily_cap=args.daily_cap, + warmup_cap=not args.no_warmup_cap) if __name__ == "__main__":