Fix trucking campaign cron send date

This commit is contained in:
justin 2026-06-04 03:19:35 -05:00
parent b48fc3a406
commit c027d49f43
3 changed files with 206 additions and 17 deletions

View file

@ -27,10 +27,19 @@ import os
import sys
import urllib.request
import urllib.parse
import math
import time
from datetime import date, datetime, timedelta, timezone
import psycopg2
# Allow both supported invocation styles:
# python -m scripts.build_trucking_campaigns
# python3 scripts/build_trucking_campaigns.py
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS
LOG = logging.getLogger("build_trucking_campaigns")
@ -160,6 +169,38 @@ USABLE_FILTER = (
)
DB_URL = os.getenv("DATABASE_URL", "")
WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
def warmup_day() -> int | None:
"""Return days since MTA warmup start, or None if not configured/readable."""
try:
with open(WARMUP_START_FILE, "r", encoding="utf-8") as fh:
start = int(fh.read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return None
def warmup_daily_queue_cap(day: int | None = None) -> int | None:
"""Daily campaign-builder queue cap aligned with Listmonk rampcap targets.
This prevents the builder from scheduling 10k-20k recipients/day while
Listmonk is throttled to only 500-3,000/day, which would create a massive
backlog that later drains unpredictably. The numbers match the runbook's
intended daily warmup totals, not the theoretical 24-hour sliding-window max.
"""
if day is None:
day = warmup_day()
if day is None:
return None
if day <= 1:
return 500
if day <= 3:
return 1500
if day <= 6:
return 2500
return 3000
def lm_api(path: str, data: dict | None = None, method: str | None = None):
@ -314,6 +355,98 @@ def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: s
LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc)
def campaign_type_filter(campaign_type: str) -> str:
if campaign_type == "mcs150":
return "mcs150_parsed < NOW() - INTERVAL '2 years'"
if campaign_type == "inactive":
return "oos_active IS TRUE"
if campaign_type in DEFICIENCY_SEGMENTS:
return DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"]
raise ValueError(f"unknown campaign_type {campaign_type!r}")
def campaign_weight(campaign_type: str) -> float:
"""Warmup allocation weight by campaign specificity.
Highly specific deficiency copy should generally get more early warmup share
than generic overdue/inactive notices because it is more relevant and less
repetitive at the mailbox-provider/content-fingerprint level.
"""
if campaign_type == "mcs150":
return 0.75
if campaign_type == "inactive":
return 0.65
return 1.25
def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None = None) -> int:
cur = conn.cursor()
states_placeholder = ",".join(["%s"] * len(tz_states))
type_filter = campaign_type_filter(campaign_type)
cur.execute(f"""
SELECT count(*)
FROM (
SELECT 1
FROM fmcsa_carriers
WHERE {type_filter}
AND {USABLE_FILTER}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})
LIMIT %s
) s
""", [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit or 10_000_000])
return int(cur.fetchone()[0])
def allocate_daily_budget(candidates: list[dict], daily_cap: int | None) -> dict[tuple[str, str], int]:
"""Allocate daily cap across (timezone, campaign_type) by weighted audience.
The allocation is proportional to each slot's eligible lead count and campaign
specificity weight, capped by the slot's audience/limit. This avoids sending
the same number from a tiny hazmat segment and a massive MCS-150 segment while
still preserving body-text variety during warmup.
"""
positive = [c for c in candidates if c["audience"] > 0]
if daily_cap is None:
return {(c["tz"], c["campaign_type"]): c["audience"] for c in candidates}
remaining_cap = min(daily_cap, sum(c["audience"] for c in positive))
quotas = {(c["tz"], c["campaign_type"]): 0 for c in candidates}
remaining = { (c["tz"], c["campaign_type"]): c["audience"] for c in positive }
weighted = { (c["tz"], c["campaign_type"]): c["audience"] * c["weight"] for c in positive }
# Repeated largest-remainder passes handle slots that hit their audience cap.
while remaining_cap > 0 and remaining:
total_weight = sum(weighted[k] for k in remaining)
if total_weight <= 0:
break
raw = {k: remaining_cap * (weighted[k] / total_weight) for k in remaining}
assigned_this_pass = 0
remainders: list[tuple[float, tuple[str, str]]] = []
for k, val in raw.items():
add = min(remaining[k], int(math.floor(val)))
quotas[k] += add
remaining[k] -= add
assigned_this_pass += add
if remaining[k] > 0:
remainders.append((val - math.floor(val), k))
leftover = remaining_cap - assigned_this_pass
for _, k in sorted(remainders, reverse=True):
if leftover <= 0:
break
if remaining.get(k, 0) <= 0:
continue
quotas[k] += 1
remaining[k] -= 1
assigned_this_pass += 1
leftover -= 1
remaining_cap -= assigned_this_pass
remaining = {k: v for k, v in remaining.items() if v > 0}
if assigned_this_pass == 0:
break
return quotas
def fetch_carriers(
conn,
tz_states: tuple,
@ -330,15 +463,7 @@ def fetch_carriers(
"""
cur = conn.cursor()
states_placeholder = ",".join(["%s"] * len(tz_states))
if campaign_type == "mcs150":
type_filter = "mcs150_parsed < NOW() - INTERVAL '2 years'"
elif campaign_type == "inactive":
type_filter = "oos_active IS TRUE"
elif campaign_type in DEFICIENCY_SEGMENTS:
# Flag-based segment: dedupe against already-sent and require the flag.
type_filter = DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"]
else:
raise ValueError(f"unknown campaign_type {campaign_type!r}")
type_filter = campaign_type_filter(campaign_type)
# target_state expression: pull the state suffix out of the matching flag
# for per-state programs, otherwise fall back to the base (phy_state).
@ -412,7 +537,8 @@ def list_segments(send_date: date) -> None:
def run(send_date: date, dry_run: bool = False, preview: bool = False,
max_per_segment: int | None = None, only_segments: set[str] | None = None,
send_hour_override: int | None = None, send_minute: int = 0,
stagger_minutes: int = 0) -> None:
stagger_minutes: int = 0, daily_cap: int | None = None,
warmup_cap: bool = True) -> None:
conn = psycopg2.connect(DB_URL)
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
@ -452,11 +578,46 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
for (ct, base, lim, label) in campaign_specs
]
if daily_cap is None and warmup_cap and not preview:
daily_cap = warmup_daily_queue_cap()
if daily_cap is not None:
LOG.info("Daily queue cap active: %d recipients total", daily_cap)
if preview:
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
candidates: list[dict] = []
totals_by_campaign: dict[str, int] = {}
for tz, tz_cfg in TIMEZONE_CONFIG.items():
for campaign_type, _base, limit, label in campaign_specs:
audience = 1 if preview else count_carriers(conn, tz_cfg["states"], campaign_type, limit)
weight = campaign_weight(campaign_type)
candidates.append({
"tz": tz,
"states": tz_cfg["states"],
"campaign_type": campaign_type,
"limit": limit,
"label": label,
"audience": audience,
"weight": weight,
})
totals_by_campaign[campaign_type] = totals_by_campaign.get(campaign_type, 0) + audience
quotas = allocate_daily_budget(candidates, None if preview else daily_cap)
LOG.info("Eligible audience by SQL criterion: %s", ", ".join(
f"{ct}={n}" for ct, n in sorted(totals_by_campaign.items())
))
if daily_cap is not None and not preview:
planned_by_campaign: dict[str, int] = {}
for (tz, ctype), quota in quotas.items():
planned_by_campaign[ctype] = planned_by_campaign.get(ctype, 0) + quota
LOG.info("Planned allocation by criterion: %s (total=%d/%d)", ", ".join(
f"{ct}={n}" for ct, n in sorted(planned_by_campaign.items())
), sum(planned_by_campaign.values()), daily_cap)
scheduled_count = 0
queued_recipients = 0
for tz, tz_cfg in TIMEZONE_CONFIG.items():
states = tz_cfg["states"]
@ -467,8 +628,17 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
)
for campaign_type, base, limit, label in campaign_specs:
if daily_cap is not None and queued_recipients >= daily_cap:
LOG.info("[%s/%s] Daily cap reached (%d/%d) — skipping remaining slots",
tz, campaign_type, queued_recipients, daily_cap)
continue
effective_send_at = send_at + timedelta(minutes=stagger_minutes * scheduled_count)
fetch_limit = 1 if preview else limit
if daily_cap is not None and not preview:
fetch_limit = min(fetch_limit, quotas.get((tz, campaign_type), 0))
if fetch_limit <= 0:
LOG.info("[%s/%s] Allocation is 0 — skipping", tz, campaign_type)
continue
rows = fetch_carriers(conn, states, campaign_type, fetch_limit)
if not rows:
LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type)
@ -486,6 +656,7 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
if dry_run:
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
scheduled_count += 1
queued_recipients += actual
continue
# Build subscriber list. In preview, only the owner's address (with the
@ -522,6 +693,7 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid,
"draft/preview" if preview else f"scheduled {effective_send_at.isoformat()}")
scheduled_count += 1
queued_recipients += added
# Send a test to the owner so they can spot-check the rendered email
send_test(base, cid, rows[0], label, tz, campaign_type)
@ -531,7 +703,8 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
mark_sent(conn, [row[0] for row in rows], campaign_type)
conn.close()
LOG.info("Done.")
LOG.info("Done. queued_recipients=%d%s", queued_recipients,
f" daily_cap={daily_cap}" if daily_cap is not None else "")
def main():
@ -543,7 +716,9 @@ def main():
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent")
parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)")
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)")
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: today)")
parser.add_argument("--tomorrow", action="store_true",
help="Schedule for tomorrow instead of today. Use only for manual pre-builds.")
parser.add_argument("--max-per-segment", type=int, default=None,
help="Cap audience per (segment x timezone). Use for fresh-IP warmup small sends.")
parser.add_argument("--only-segment", action="append", default=None, metavar="SEG",
@ -554,12 +729,21 @@ def main():
help="Send minute (UTC), used with --send-hour. Default 0.")
parser.add_argument("--stagger-minutes", type=int, default=0,
help="Add this many minutes between each created campaign. Useful for catchup sends.")
parser.add_argument("--daily-cap", type=int, default=None,
help="Hard cap on recipients queued by this run. Default: warmup-derived cap when available.")
parser.add_argument("--no-warmup-cap", action="store_true",
help="Disable automatic daily queue cap from /etc/postfix/pw-warmup-start.")
args = parser.parse_args()
if args.date and args.tomorrow:
parser.error("--date and --tomorrow are mutually exclusive")
if args.date:
send_date = date.fromisoformat(args.date)
else:
elif args.tomorrow:
send_date = date.today() + timedelta(days=1)
else:
send_date = date.today()
if args.list_segments:
list_segments(send_date)
@ -567,13 +751,15 @@ def main():
only = set(args.only_segment) if args.only_segment else None
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, "
"max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s)",
"max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s, daily_cap=%s, warmup_cap=%s)",
send_date, args.dry_run, args.preview, args.max_per_segment,
sorted(only) if only else None, args.send_hour, args.stagger_minutes)
sorted(only) if only else None, args.send_hour, args.stagger_minutes,
args.daily_cap, not args.no_warmup_cap)
run(send_date, dry_run=args.dry_run, preview=args.preview,
max_per_segment=args.max_per_segment, only_segments=only,
send_hour_override=args.send_hour, send_minute=args.send_minute,
stagger_minutes=args.stagger_minutes)
stagger_minutes=args.stagger_minutes, daily_cap=args.daily_cap,
warmup_cap=not args.no_warmup_cap)
if __name__ == "__main__":