887 lines
37 KiB
Python
887 lines
37 KiB
Python
#!/usr/bin/env python3
|
|
"""Daily trucking campaign builder.
|
|
|
|
Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns:
|
|
- 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST)
|
|
- 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule)
|
|
|
|
Selection criteria:
|
|
- email verified or catch-all (usable)
|
|
- mcs150_parsed > 2 years ago (for MCS-150 campaign)
|
|
- oos_active IS TRUE (for inactive USDOT campaign)
|
|
- listmonk_sent_at IS NULL (not yet sent)
|
|
- ordered by mcs150_parsed ASC (most overdue first)
|
|
|
|
Usage:
|
|
python3 scripts/build_trucking_campaigns.py
|
|
python3 scripts/build_trucking_campaigns.py --dry-run
|
|
python3 scripts/build_trucking_campaigns.py --date 2026-06-02
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import urllib.request
|
|
import urllib.parse
|
|
import math
|
|
import time
|
|
from datetime import date, datetime, timedelta, timezone
|
|
|
|
import psycopg2
|
|
|
|
# Allow both supported invocation styles:
|
|
# python -m scripts.build_trucking_campaigns
|
|
# python3 scripts/build_trucking_campaigns.py
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT not in sys.path:
|
|
sys.path.insert(0, ROOT)
|
|
|
|
from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS
|
|
|
|
LOG = logging.getLogger("build_trucking_campaigns")
|
|
|
|
# ── Listmonk ──────────────────────────────────────────────────────────────────
|
|
LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000")
|
|
LISTMONK_USER = os.getenv("LISTMONK_USER", "api")
|
|
LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
|
|
_LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
|
|
|
|
# ── Source campaign IDs ────────────────────────────────────────────────────────
|
|
CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk"
|
|
CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over"
|
|
|
|
# Public site for landing-page links injected into Listmonk subscriber attribs.
|
|
SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.com")
|
|
|
|
# ── Deficiency-flag segments (Phase 5) ──────────────────────────────────────
|
|
# Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column
|
|
# written by fmcsa_deficiency_flagger.py) and links to its order landing page.
|
|
#
|
|
# `source_id` is the Listmonk template campaign to clone; set via env once the
|
|
# template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured
|
|
# source_id are reported by --list-segments but skipped by the scheduled run so
|
|
# the nightly job never breaks on an unconfigured template.
|
|
#
|
|
# `flag_sql` is a predicate over the deficiency_flags array.
|
|
DEFICIENCY_SEGMENTS = {
|
|
"for_hire_boc3": {
|
|
"label": "For-Hire BOC-3 + UCR",
|
|
"flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)",
|
|
"lp_slug": "boc3-filing",
|
|
"source_env": "CAMPAIGN_FOR_HIRE_ID",
|
|
"limit": 1500,
|
|
},
|
|
"irp_ifta": {
|
|
"label": "IRP / IFTA Registration",
|
|
"flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)",
|
|
"lp_slug": "state-trucking-bundle",
|
|
"source_env": "CAMPAIGN_IRP_IFTA_ID",
|
|
"limit": 1500,
|
|
},
|
|
"intrastate_authority": {
|
|
"label": "Intrastate Operating Authority",
|
|
# any intrastate_authority_<state> flag
|
|
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
|
|
"WHERE f LIKE 'intrastate_authority_%%')",
|
|
"lp_slug": "intrastate-authority",
|
|
"source_env": "CAMPAIGN_INTRASTATE_ID",
|
|
"limit": 1000,
|
|
},
|
|
"state_weight_tax": {
|
|
"label": "State Weight-Distance Tax",
|
|
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
|
|
"WHERE f LIKE 'state_weight_tax_%%')",
|
|
# per-state LP resolved per-row in build_lp_link()
|
|
"lp_slug": "state-trucking-bundle",
|
|
"source_env": "CAMPAIGN_WEIGHT_TAX_ID",
|
|
"limit": 1000,
|
|
},
|
|
"state_emissions": {
|
|
"label": "State Clean-Truck / Emissions",
|
|
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
|
|
"WHERE f LIKE 'state_emissions_%%')",
|
|
"lp_slug": "state-emissions",
|
|
"source_env": "CAMPAIGN_EMISSIONS_ID",
|
|
"limit": 1000,
|
|
},
|
|
"hazmat": {
|
|
"label": "Hazmat PHMSA Registration",
|
|
"flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)",
|
|
"lp_slug": "hazmat-phmsa",
|
|
"source_env": "CAMPAIGN_HAZMAT_ID",
|
|
"limit": 500,
|
|
},
|
|
}
|
|
|
|
# Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb).
|
|
_WEIGHT_TAX_LP = {
|
|
"OR": "or-weight-mile-tax", "NY": "ny-hut-registration",
|
|
"KY": "ky-kyu-registration", "NM": "nm-weight-distance",
|
|
"CT": "ct-highway-use-fee",
|
|
}
|
|
|
|
|
|
def build_lp_link(campaign_type: str, phy_state: str | None) -> str:
|
|
"""Return the order landing-page URL for a (segment, state)."""
|
|
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
|
|
slug = seg["lp_slug"] if seg else "dot-full-compliance"
|
|
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
|
|
slug = _WEIGHT_TAX_LP[phy_state]
|
|
if campaign_type == "state_emissions" and phy_state == "CA":
|
|
slug = "ca-mcp-carb"
|
|
return f"{SITE_DOMAIN}/order/{slug}"
|
|
|
|
# ── TZ config: tz_key -> (states, send_hour_utc) ─────────────────────────────
|
|
# 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local
|
|
TIMEZONE_CONFIG = {
|
|
"ET": {
|
|
"states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI",
|
|
"NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"),
|
|
"send_hour_utc": 9, # 4 AM EST
|
|
},
|
|
"CT": {
|
|
"states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND",
|
|
"NE","OK","SD","TX","WI"),
|
|
"send_hour_utc": 10, # 5 AM EST = 4 AM CST
|
|
},
|
|
"MT": {
|
|
"states": ("AZ","CO","ID","MT","NM","UT","WY"),
|
|
"send_hour_utc": 11, # 6 AM EST = 4 AM MST
|
|
},
|
|
"PT": {
|
|
"states": ("AK","CA","HI","NV","OR","WA"),
|
|
"send_hour_utc": 12, # 7 AM EST = 4 AM PST
|
|
},
|
|
}
|
|
|
|
# Owner email — test sends go here before each campaign is scheduled
|
|
TEST_EMAIL = os.getenv("CAMPAIGN_TEST_EMAIL", "carrierone@gmx.com")
|
|
REPLY_TO_EMAIL = os.getenv("CAMPAIGN_REPLY_TO", "info@performancewest.net")
|
|
REPLY_TO_HEADERS = [{"name": "Reply-To", "value": REPLY_TO_EMAIL}]
|
|
|
|
USABLE_FILTER = (
|
|
"(email_verified IS TRUE OR email_verify_result IN "
|
|
"('smtp_valid','catch_all_domain','catch_all_detected'))"
|
|
)
|
|
|
|
DB_URL = os.getenv("DATABASE_URL", "")
|
|
WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
|
|
|
|
|
|
def warmup_day() -> int | None:
|
|
"""Return days since MTA warmup start, or None if not configured/readable."""
|
|
try:
|
|
with open(WARMUP_START_FILE, "r", encoding="utf-8") as fh:
|
|
start = int(fh.read().strip())
|
|
return max(0, int((time.time() - start) // 86400))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def warmup_daily_queue_cap(day: int | None = None) -> int | None:
|
|
"""Daily campaign-builder queue cap aligned with Listmonk rampcap targets.
|
|
|
|
This prevents the builder from scheduling 10k-20k recipients/day while
|
|
Listmonk is throttled to only 500-3,000/day, which would create a massive
|
|
backlog that later drains unpredictably. The numbers match the runbook's
|
|
intended daily warmup totals, not the theoretical 24-hour sliding-window max.
|
|
"""
|
|
if day is None:
|
|
day = warmup_day()
|
|
if day is None:
|
|
return None
|
|
if day <= 1:
|
|
return 500
|
|
if day <= 3:
|
|
return 1500
|
|
if day <= 6:
|
|
return 2500
|
|
return 3000
|
|
|
|
|
|
def lm_api(path: str, data: dict | None = None, method: str | None = None):
|
|
headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"}
|
|
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
|
|
if data is not None:
|
|
req.data = json.dumps(data).encode()
|
|
if method:
|
|
req.method = method
|
|
try:
|
|
return json.loads(urllib.request.urlopen(req, timeout=30).read())
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()[:300]
|
|
raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}")
|
|
|
|
|
|
_LM_SUPPRESSION_CACHE: dict[str, tuple[bool, str]] = {}
|
|
|
|
|
|
def _lm_email_query(email: str) -> str:
|
|
"""Return a Listmonk subscriber query for an exact email address."""
|
|
# Listmonk's query language uses single-quoted strings. Email addresses should
|
|
# not contain quotes, but escape defensively so a bad source row cannot break
|
|
# the query or broaden it.
|
|
safe = email.lower().strip().replace("'", "''")
|
|
return f"email = '{safe}'"
|
|
|
|
|
|
def listmonk_sendable(email: str) -> tuple[bool, str]:
|
|
"""Return whether Listmonk will send to this address, with reason.
|
|
|
|
SQL eligibility is not enough: an address may already exist in Listmonk as
|
|
disabled, blocklisted, bounced, or unsubscribed. If we add those rows to a
|
|
fresh campaign list, Listmonk filters them at send time, producing a smaller
|
|
`to_send` count than the builder's daily warmup allocation. Check status at
|
|
build time and fetch replacements so each campaign list is filled with
|
|
actually-sendable subscribers.
|
|
"""
|
|
key = email.lower().strip()
|
|
if not key:
|
|
return False, "blank_email"
|
|
if key in _LM_SUPPRESSION_CACHE:
|
|
return _LM_SUPPRESSION_CACHE[key]
|
|
try:
|
|
res = lm_api(
|
|
"/subscribers?" + urllib.parse.urlencode({
|
|
"query": _lm_email_query(key),
|
|
"per_page": 1,
|
|
})
|
|
)
|
|
results = res.get("data", {}).get("results", [])
|
|
if not results:
|
|
out = (True, "new")
|
|
else:
|
|
sub = results[0]
|
|
status = (sub.get("status") or "").lower()
|
|
if status != "enabled":
|
|
out = (False, f"listmonk_status:{status or 'unknown'}")
|
|
else:
|
|
bad_list_status = ""
|
|
for lst in sub.get("lists") or []:
|
|
lstatus = (lst.get("subscription_status") or "").lower()
|
|
if lstatus in {"unsubscribed", "unconfirmed"}:
|
|
bad_list_status = lstatus
|
|
break
|
|
if bad_list_status:
|
|
out = (False, f"list_subscription_status:{bad_list_status}")
|
|
else:
|
|
out = (True, "enabled")
|
|
except Exception as exc: # noqa: BLE001
|
|
# Do not silently underfill campaigns because Listmonk's lookup endpoint
|
|
# hiccupped. Let the caller/logs surface the problem; treating it as not
|
|
# sendable is safer than scheduling a suppressed address and marking it
|
|
# sent in our source DB.
|
|
out = (False, f"lookup_error:{exc}")
|
|
_LM_SUPPRESSION_CACHE[key] = out
|
|
return out
|
|
|
|
|
|
def get_base_campaign(campaign_id: int) -> dict:
|
|
return lm_api(f"/campaigns/{campaign_id}")["data"]
|
|
|
|
|
|
def create_list(name: str) -> int:
|
|
result = lm_api("/lists", {
|
|
"name": name,
|
|
"type": "private",
|
|
"optin": "single",
|
|
"tags": ["trucking", "auto"],
|
|
}, "POST")
|
|
return result["data"]["id"]
|
|
|
|
|
|
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
|
|
"""Create/attach a single subscriber to a list. Returns True on success.
|
|
|
|
Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of
|
|
list IDs) and preconfirm_subscriptions=true so they're immediately sendable.
|
|
"""
|
|
try:
|
|
lm_api("/subscribers", {
|
|
"email": email,
|
|
"name": name or email,
|
|
"status": "enabled",
|
|
"lists": [list_id],
|
|
"attribs": attribs,
|
|
"preconfirm_subscriptions": True,
|
|
}, "POST")
|
|
return True
|
|
except Exception as exc:
|
|
msg = str(exc)
|
|
# Already exists — attach to this list instead of failing
|
|
if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg:
|
|
try:
|
|
subs = lm_api(
|
|
"/subscribers?"
|
|
+ urllib.parse.urlencode({
|
|
"query": f"email = '{email.lower().strip()}'",
|
|
"per_page": 1,
|
|
})
|
|
)
|
|
results = subs.get("data", {}).get("results", [])
|
|
if results:
|
|
status = (results[0].get("status") or "").lower()
|
|
if status != "enabled":
|
|
LOG.info("Listmonk suppressed existing subscriber %s status=%s", email, status or "unknown")
|
|
return False
|
|
lm_api("/subscribers/lists", {
|
|
"ids": [results[0]["id"]],
|
|
"action": "add",
|
|
"target_list_ids": [list_id],
|
|
"status": "confirmed",
|
|
}, "PUT")
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
|
|
def import_subscribers(list_id: int, subscribers: list[dict]) -> int:
|
|
"""Add subscribers to a list one-by-one. Returns count successfully added."""
|
|
added = 0
|
|
for sub in subscribers:
|
|
if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})):
|
|
added += 1
|
|
return added
|
|
|
|
|
|
def create_and_schedule_campaign(
|
|
base: dict,
|
|
list_id: int,
|
|
name: str,
|
|
send_at_utc: datetime,
|
|
schedule: bool = True,
|
|
) -> int:
|
|
payload = {
|
|
"name": name,
|
|
"subject": base["subject"],
|
|
"lists": [list_id],
|
|
"from_email": base["from_email"],
|
|
"type": "regular",
|
|
"content_type": base["content_type"],
|
|
"body": base["body"],
|
|
"altbody": base.get("altbody"),
|
|
"template_id": base["template_id"],
|
|
"tags": base.get("tags") or [],
|
|
"messenger": base.get("messenger") or "email",
|
|
"headers": base.get("headers") or REPLY_TO_HEADERS,
|
|
}
|
|
if schedule:
|
|
payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00")
|
|
result = lm_api("/campaigns", payload, "POST")
|
|
cid = result["data"]["id"]
|
|
if schedule:
|
|
lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT")
|
|
# else: leave as draft (preview mode)
|
|
return cid
|
|
|
|
|
|
def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: str,
|
|
campaign_type: str) -> None:
|
|
"""Send one test email so the owner can approve before the real blast goes out."""
|
|
# sample_row is (dot, email, name, phy_state, target_state). Older callers may
|
|
# still pass a 4-tuple (dot, email, name, state); handle both.
|
|
dot, email, name, phy_state = sample_row[0], sample_row[1], sample_row[2], sample_row[3]
|
|
target_state = sample_row[4] if len(sample_row) > 4 else phy_state
|
|
state = phy_state
|
|
body = base["body"]
|
|
body = body.replace("{{ .Subscriber.Attribs.company }}", name or "Sample Carrier LLC")
|
|
body = body.replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
|
|
body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX")
|
|
# Real subscribers get a populated lp_link attrib; the test send must mirror
|
|
# that or the CTA button (e.g. "Check My Emissions Status") renders as a bare
|
|
# "?dot=..." that links to nowhere. Build the same link the audience gets,
|
|
# using the target_state (the state the offer applies to, which for per-state
|
|
# programs comes from the deficiency flag, not the base state).
|
|
body = body.replace("{{ .Subscriber.Attribs.lp_link }}",
|
|
build_lp_link(campaign_type, target_state))
|
|
# NOTE: leave {{ UnsubscribeURL }} alone — Listmonk renders it into a real,
|
|
# working per-subscriber unsubscribe link (even on test sends). Overwriting it
|
|
# produced a dead /unsubscribe link with no subscriber identity.
|
|
subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
|
|
# Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects)
|
|
list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1]
|
|
payload = {
|
|
"name": base.get("name", "Test"), "subject": subj,
|
|
"lists": list_ids, "from_email": base["from_email"],
|
|
"type": "regular", "content_type": base["content_type"],
|
|
"body": body, "altbody": base.get("altbody"),
|
|
"template_id": base["template_id"],
|
|
"tags": base.get("tags") or [], "messenger": base.get("messenger") or "email",
|
|
"headers": base.get("headers") or REPLY_TO_HEADERS,
|
|
"subscribers": [TEST_EMAIL],
|
|
}
|
|
try:
|
|
lm_api(f"/campaigns/{campaign_id}/test", payload, "POST")
|
|
LOG.info("[%s/%s] Test sent to %s (sample: %s DOT#%s)", tz, label, TEST_EMAIL, name, dot)
|
|
except Exception as exc:
|
|
LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc)
|
|
|
|
|
|
def campaign_type_filter(campaign_type: str) -> str:
|
|
if campaign_type == "mcs150":
|
|
return "mcs150_parsed < NOW() - INTERVAL '2 years'"
|
|
if campaign_type == "inactive":
|
|
return "oos_active IS TRUE"
|
|
if campaign_type in DEFICIENCY_SEGMENTS:
|
|
return DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"]
|
|
raise ValueError(f"unknown campaign_type {campaign_type!r}")
|
|
|
|
|
|
def campaign_weight(campaign_type: str) -> float:
|
|
"""Warmup allocation weight by campaign specificity.
|
|
|
|
Highly specific deficiency copy should generally get more early warmup share
|
|
than generic overdue/inactive notices because it is more relevant and less
|
|
repetitive at the mailbox-provider/content-fingerprint level.
|
|
"""
|
|
if campaign_type == "mcs150":
|
|
return 0.75
|
|
if campaign_type == "inactive":
|
|
return 0.65
|
|
return 1.25
|
|
|
|
|
|
def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None = None) -> int:
|
|
cur = conn.cursor()
|
|
states_placeholder = ",".join(["%s"] * len(tz_states))
|
|
type_filter = campaign_type_filter(campaign_type)
|
|
cur.execute(f"""
|
|
SELECT count(*)
|
|
FROM (
|
|
SELECT 1
|
|
FROM fmcsa_carriers
|
|
WHERE {type_filter}
|
|
AND {USABLE_FILTER}
|
|
AND listmonk_sent_at IS NULL
|
|
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
|
|
AND phy_state IN ({states_placeholder})
|
|
LIMIT %s
|
|
) s
|
|
""", [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit or 10_000_000])
|
|
return int(cur.fetchone()[0])
|
|
|
|
|
|
def allocate_daily_budget(candidates: list[dict], daily_cap: int | None) -> dict[tuple[str, str], int]:
|
|
"""Allocate daily cap across (timezone, campaign_type) by weighted audience.
|
|
|
|
The allocation is proportional to each slot's eligible lead count and campaign
|
|
specificity weight, capped by the slot's audience/limit. This avoids sending
|
|
the same number from a tiny hazmat segment and a massive MCS-150 segment while
|
|
still preserving body-text variety during warmup.
|
|
"""
|
|
positive = [c for c in candidates if c["audience"] > 0]
|
|
if daily_cap is None:
|
|
return {(c["tz"], c["campaign_type"]): c["audience"] for c in candidates}
|
|
remaining_cap = min(daily_cap, sum(c["audience"] for c in positive))
|
|
quotas = {(c["tz"], c["campaign_type"]): 0 for c in candidates}
|
|
remaining = { (c["tz"], c["campaign_type"]): c["audience"] for c in positive }
|
|
weighted = { (c["tz"], c["campaign_type"]): c["audience"] * c["weight"] for c in positive }
|
|
|
|
# Repeated largest-remainder passes handle slots that hit their audience cap.
|
|
while remaining_cap > 0 and remaining:
|
|
total_weight = sum(weighted[k] for k in remaining)
|
|
if total_weight <= 0:
|
|
break
|
|
raw = {k: remaining_cap * (weighted[k] / total_weight) for k in remaining}
|
|
assigned_this_pass = 0
|
|
remainders: list[tuple[float, tuple[str, str]]] = []
|
|
for k, val in raw.items():
|
|
add = min(remaining[k], int(math.floor(val)))
|
|
quotas[k] += add
|
|
remaining[k] -= add
|
|
assigned_this_pass += add
|
|
if remaining[k] > 0:
|
|
remainders.append((val - math.floor(val), k))
|
|
leftover = remaining_cap - assigned_this_pass
|
|
for _, k in sorted(remainders, reverse=True):
|
|
if leftover <= 0:
|
|
break
|
|
if remaining.get(k, 0) <= 0:
|
|
continue
|
|
quotas[k] += 1
|
|
remaining[k] -= 1
|
|
assigned_this_pass += 1
|
|
leftover -= 1
|
|
remaining_cap -= assigned_this_pass
|
|
remaining = {k: v for k, v in remaining.items() if v > 0}
|
|
if assigned_this_pass == 0:
|
|
break
|
|
return quotas
|
|
|
|
|
|
def fetch_carriers(
|
|
conn,
|
|
tz_states: tuple,
|
|
campaign_type: str,
|
|
limit: int,
|
|
offset: int = 0,
|
|
) -> list[tuple]:
|
|
"""Return (dot_number, email_address, legal_name, phy_state, target_state) rows.
|
|
|
|
target_state is the state the offer applies to. For most segments that is
|
|
the carrier's base (phy_state). For per-state programs (state_weight_tax,
|
|
state_emissions) the relevant state is encoded in the deficiency flag suffix
|
|
(e.g. 'state_weight_tax_OR'), which can differ from the base state, so we
|
|
pull it out of the flag so the CTA links to the correct state's order page.
|
|
"""
|
|
cur = conn.cursor()
|
|
states_placeholder = ",".join(["%s"] * len(tz_states))
|
|
type_filter = campaign_type_filter(campaign_type)
|
|
|
|
# target_state expression: pull the state suffix out of the matching flag
|
|
# for per-state programs, otherwise fall back to the base (phy_state).
|
|
if campaign_type in ("state_weight_tax", "state_emissions"):
|
|
prefix = f"{campaign_type}_"
|
|
target_state_sql = (
|
|
"COALESCE((SELECT upper(substr(f, %s)) "
|
|
"FROM unnest(deficiency_flags) f "
|
|
f"WHERE f LIKE '{campaign_type}_%%' LIMIT 1), phy_state)"
|
|
)
|
|
target_state_params = [len(prefix) + 1]
|
|
else:
|
|
target_state_sql = "phy_state"
|
|
target_state_params = []
|
|
|
|
cur.execute(f"""
|
|
SELECT dot_number, email_address, legal_name, phy_state,
|
|
{target_state_sql} AS target_state
|
|
FROM fmcsa_carriers
|
|
WHERE {type_filter}
|
|
AND {USABLE_FILTER}
|
|
AND listmonk_sent_at IS NULL
|
|
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
|
|
AND phy_state IN ({states_placeholder})
|
|
ORDER BY mcs150_parsed ASC NULLS LAST
|
|
LIMIT %s OFFSET %s
|
|
""", target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit, offset])
|
|
return cur.fetchall()
|
|
|
|
|
|
def select_sendable_carriers(
|
|
conn,
|
|
tz_states: tuple,
|
|
campaign_type: str,
|
|
quota: int,
|
|
max_scan: int | None = None,
|
|
) -> tuple[list[tuple], dict[str, int]]:
|
|
"""Fetch rows until `quota` Listmonk-sendable carriers are found.
|
|
|
|
Returns (rows, suppression_counts). Rows that are disabled/blocklisted/etc.
|
|
in Listmonk are skipped and not marked sent in Postgres.
|
|
"""
|
|
selected: list[tuple] = []
|
|
skipped: dict[str, int] = {}
|
|
seen_emails: set[str] = set()
|
|
offset = 0
|
|
# Warmup caps are small, but old audiences can contain many prior bounces or
|
|
# unsubscribes. Scan beyond the quota while still bounding worst-case API calls.
|
|
scan_limit = max_scan or max(quota * 8, quota + 500, 1000)
|
|
batch_size = min(max(quota * 2, 100), 1000)
|
|
|
|
while len(selected) < quota and offset < scan_limit:
|
|
batch = fetch_carriers(conn, tz_states, campaign_type, batch_size, offset=offset)
|
|
if not batch:
|
|
break
|
|
offset += len(batch)
|
|
for row in batch:
|
|
email = (row[1] or "").lower().strip()
|
|
if email in seen_emails:
|
|
skipped["duplicate_email"] = skipped.get("duplicate_email", 0) + 1
|
|
continue
|
|
seen_emails.add(email)
|
|
ok, reason = listmonk_sendable(email)
|
|
if not ok:
|
|
skipped[reason] = skipped.get(reason, 0) + 1
|
|
continue
|
|
selected.append(row)
|
|
if len(selected) >= quota:
|
|
break
|
|
if len(batch) < batch_size:
|
|
break
|
|
return selected, skipped
|
|
|
|
|
|
def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
UPDATE fmcsa_carriers
|
|
SET listmonk_sent_at = NOW(),
|
|
listmonk_campaign_type = %s
|
|
WHERE dot_number = ANY(%s::text[])
|
|
""", (campaign_type, dot_numbers))
|
|
conn.commit()
|
|
|
|
|
|
def list_segments(send_date: date) -> None:
|
|
"""Report deduped audience size per deficiency segment (no writes).
|
|
|
|
Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience
|
|
and points at a valid order LP. Source-template configuration is shown so we
|
|
know which segments are ready to schedule.
|
|
"""
|
|
conn = psycopg2.connect(DB_URL)
|
|
print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n")
|
|
print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page")
|
|
print("-" * 78)
|
|
grand_total = 0
|
|
for ctype, seg in DEFICIENCY_SEGMENTS.items():
|
|
total = 0
|
|
for tz_cfg in TIMEZONE_CONFIG.values():
|
|
rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"])
|
|
total += len(rows)
|
|
grand_total += total
|
|
src = os.getenv(seg["source_env"])
|
|
src_str = src if src else "UNSET"
|
|
lp = build_lp_link(ctype, None)
|
|
print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}")
|
|
print("-" * 78)
|
|
print(f"{'TOTAL':<22}{grand_total:>10}")
|
|
print("\n(UNSET source = template not yet created; segment is skipped by the "
|
|
"scheduled run until its CAMPAIGN_*_ID env is set.)\n")
|
|
conn.close()
|
|
|
|
|
|
def run(send_date: date, dry_run: bool = False, preview: bool = False,
|
|
max_per_segment: int | None = None, only_segments: set[str] | None = None,
|
|
send_hour_override: int | None = None, send_minute: int = 0,
|
|
stagger_minutes: int = 0, daily_cap: int | None = None,
|
|
warmup_cap: bool = True) -> None:
|
|
conn = psycopg2.connect(DB_URL)
|
|
|
|
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
|
|
base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID)
|
|
|
|
campaign_specs = [
|
|
("mcs150", base_mcs150, 2000, "MCS-150 Overdue"),
|
|
("inactive", base_inactive, 1000, "Inactive USDOT"),
|
|
]
|
|
|
|
# Add deficiency-flag segments whose Listmonk source template is configured.
|
|
for ctype, seg in DEFICIENCY_SEGMENTS.items():
|
|
src = os.getenv(seg["source_env"])
|
|
if not src:
|
|
LOG.info("[segment] %s skipped — %s not set (template not created)",
|
|
ctype, seg["source_env"])
|
|
continue
|
|
try:
|
|
base = get_base_campaign(int(src))
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("[segment] %s skipped — source %s unusable: %s",
|
|
ctype, src, exc)
|
|
continue
|
|
campaign_specs.append((ctype, base, seg["limit"], seg["label"]))
|
|
|
|
# Optional warmup controls: restrict to specific segments and/or cap the
|
|
# per-segment audience so a fresh-IP day-0 send stays small.
|
|
if only_segments:
|
|
campaign_specs = [s for s in campaign_specs if s[0] in only_segments]
|
|
if not campaign_specs:
|
|
LOG.error("No campaign specs match --only-segment %s", sorted(only_segments))
|
|
conn.close()
|
|
return
|
|
if max_per_segment is not None:
|
|
campaign_specs = [
|
|
(ct, base, min(lim, max_per_segment), label)
|
|
for (ct, base, lim, label) in campaign_specs
|
|
]
|
|
|
|
if daily_cap is None and warmup_cap and not preview:
|
|
daily_cap = warmup_daily_queue_cap()
|
|
if daily_cap is not None:
|
|
LOG.info("Daily queue cap active: %d recipients total", daily_cap)
|
|
|
|
if preview:
|
|
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
|
|
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
|
|
|
|
candidates: list[dict] = []
|
|
totals_by_campaign: dict[str, int] = {}
|
|
for tz, tz_cfg in TIMEZONE_CONFIG.items():
|
|
for campaign_type, _base, limit, label in campaign_specs:
|
|
audience = 1 if preview else count_carriers(conn, tz_cfg["states"], campaign_type, limit)
|
|
weight = campaign_weight(campaign_type)
|
|
candidates.append({
|
|
"tz": tz,
|
|
"states": tz_cfg["states"],
|
|
"campaign_type": campaign_type,
|
|
"limit": limit,
|
|
"label": label,
|
|
"audience": audience,
|
|
"weight": weight,
|
|
})
|
|
totals_by_campaign[campaign_type] = totals_by_campaign.get(campaign_type, 0) + audience
|
|
|
|
quotas = allocate_daily_budget(candidates, None if preview else daily_cap)
|
|
LOG.info("Eligible audience by SQL criterion: %s", ", ".join(
|
|
f"{ct}={n}" for ct, n in sorted(totals_by_campaign.items())
|
|
))
|
|
if daily_cap is not None and not preview:
|
|
planned_by_campaign: dict[str, int] = {}
|
|
for (tz, ctype), quota in quotas.items():
|
|
planned_by_campaign[ctype] = planned_by_campaign.get(ctype, 0) + quota
|
|
LOG.info("Planned allocation by criterion: %s (total=%d/%d)", ", ".join(
|
|
f"{ct}={n}" for ct, n in sorted(planned_by_campaign.items())
|
|
), sum(planned_by_campaign.values()), daily_cap)
|
|
|
|
scheduled_count = 0
|
|
queued_recipients = 0
|
|
|
|
for tz, tz_cfg in TIMEZONE_CONFIG.items():
|
|
states = tz_cfg["states"]
|
|
send_hour = send_hour_override if send_hour_override is not None else tz_cfg["send_hour_utc"]
|
|
send_at = datetime(
|
|
send_date.year, send_date.month, send_date.day,
|
|
send_hour, send_minute, 0, tzinfo=timezone.utc
|
|
)
|
|
|
|
for campaign_type, base, limit, label in campaign_specs:
|
|
if daily_cap is not None and queued_recipients >= daily_cap:
|
|
LOG.info("[%s/%s] Daily cap reached (%d/%d) — skipping remaining slots",
|
|
tz, campaign_type, queued_recipients, daily_cap)
|
|
continue
|
|
effective_send_at = send_at + timedelta(minutes=stagger_minutes * scheduled_count)
|
|
fetch_limit = 1 if preview else limit
|
|
if daily_cap is not None and not preview:
|
|
fetch_limit = min(fetch_limit, quotas.get((tz, campaign_type), 0))
|
|
if fetch_limit <= 0:
|
|
LOG.info("[%s/%s] Allocation is 0 — skipping", tz, campaign_type)
|
|
continue
|
|
if preview:
|
|
rows = fetch_carriers(conn, states, campaign_type, fetch_limit)
|
|
suppressed: dict[str, int] = {}
|
|
else:
|
|
rows, suppressed = select_sendable_carriers(conn, states, campaign_type, fetch_limit)
|
|
if suppressed:
|
|
LOG.info("[%s/%s] Runtime Listmonk suppression skipped %d rows: %s",
|
|
tz, campaign_type, sum(suppressed.values()), ", ".join(
|
|
f"{reason}={n}" for reason, n in sorted(suppressed.items())
|
|
))
|
|
if not rows:
|
|
LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type)
|
|
continue
|
|
|
|
actual = len(rows)
|
|
tag = "PREVIEW " if preview else ""
|
|
list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}"
|
|
campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}"
|
|
|
|
LOG.info("[%s/%s] %d sendable carriers -> %s (send %s UTC)",
|
|
tz, campaign_type, actual, campaign_name,
|
|
effective_send_at.strftime("%Y-%m-%d %H:%M"))
|
|
|
|
if dry_run:
|
|
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
|
|
scheduled_count += 1
|
|
queued_recipients += actual
|
|
continue
|
|
|
|
# Build subscriber list. In preview, only the owner's address (with the
|
|
# sample carrier's attribs) so the real audience is never touched.
|
|
if preview:
|
|
r0 = rows[0]
|
|
subscribers = [{
|
|
"email": TEST_EMAIL,
|
|
"name": r0[2] or "Sample Carrier",
|
|
"attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "",
|
|
"lp_link": build_lp_link(campaign_type, r0[4])},
|
|
}]
|
|
else:
|
|
subscribers = [
|
|
{
|
|
"email": row[1],
|
|
"name": row[2] or row[1],
|
|
"attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "",
|
|
"lp_link": build_lp_link(campaign_type, row[4])},
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
# Create list + add subscribers
|
|
list_id = create_list(list_name)
|
|
added = import_subscribers(list_id, subscribers)
|
|
LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers))
|
|
if added == 0:
|
|
LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id)
|
|
continue
|
|
|
|
# Create campaign (draft in preview, scheduled in real run)
|
|
cid = create_and_schedule_campaign(base, list_id, campaign_name, effective_send_at, schedule=not preview)
|
|
LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid,
|
|
"draft/preview" if preview else f"scheduled {effective_send_at.isoformat()}")
|
|
scheduled_count += 1
|
|
queued_recipients += added
|
|
|
|
# Send a test to the owner so they can spot-check the rendered email
|
|
send_test(base, cid, rows[0], label, tz, campaign_type)
|
|
|
|
# Mark carriers as sent only on a real run
|
|
if not preview:
|
|
mark_sent(conn, [row[0] for row in rows], campaign_type)
|
|
|
|
conn.close()
|
|
LOG.info("Done. queued_recipients=%d%s", queued_recipients,
|
|
f" daily_cap={daily_cap}" if daily_cap is not None else "")
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
)
|
|
parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk")
|
|
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
|
|
parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent")
|
|
parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)")
|
|
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: today)")
|
|
parser.add_argument("--tomorrow", action="store_true",
|
|
help="Schedule for tomorrow instead of today. Use only for manual pre-builds.")
|
|
parser.add_argument("--max-per-segment", type=int, default=None,
|
|
help="Cap audience per (segment x timezone). Use for fresh-IP warmup small sends.")
|
|
parser.add_argument("--only-segment", action="append", default=None, metavar="SEG",
|
|
help="Restrict to specific segment(s), e.g. mcs150, inactive, for_hire_boc3. Repeatable.")
|
|
parser.add_argument("--send-hour", type=int, default=None,
|
|
help="Override send hour (UTC) for ALL timezones instead of per-tz defaults.")
|
|
parser.add_argument("--send-minute", type=int, default=0,
|
|
help="Send minute (UTC), used with --send-hour. Default 0.")
|
|
parser.add_argument("--stagger-minutes", type=int, default=0,
|
|
help="Add this many minutes between each created campaign. Useful for catchup sends.")
|
|
parser.add_argument("--daily-cap", type=int, default=None,
|
|
help="Hard cap on recipients queued by this run. Default: warmup-derived cap when available.")
|
|
parser.add_argument("--no-warmup-cap", action="store_true",
|
|
help="Disable automatic daily queue cap from /etc/postfix/pw-warmup-start.")
|
|
args = parser.parse_args()
|
|
|
|
if args.date and args.tomorrow:
|
|
parser.error("--date and --tomorrow are mutually exclusive")
|
|
|
|
if args.date:
|
|
send_date = date.fromisoformat(args.date)
|
|
elif args.tomorrow:
|
|
send_date = date.today() + timedelta(days=1)
|
|
else:
|
|
send_date = date.today()
|
|
|
|
if args.list_segments:
|
|
list_segments(send_date)
|
|
return
|
|
|
|
only = set(args.only_segment) if args.only_segment else None
|
|
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, "
|
|
"max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s, daily_cap=%s, warmup_cap=%s)",
|
|
send_date, args.dry_run, args.preview, args.max_per_segment,
|
|
sorted(only) if only else None, args.send_hour, args.stagger_minutes,
|
|
args.daily_cap, not args.no_warmup_cap)
|
|
run(send_date, dry_run=args.dry_run, preview=args.preview,
|
|
max_per_segment=args.max_per_segment, only_segments=only,
|
|
send_hour_override=args.send_hour, send_minute=args.send_minute,
|
|
stagger_minutes=args.stagger_minutes, daily_cap=args.daily_cap,
|
|
warmup_cap=not args.no_warmup_cap)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|