trucking campaigns: daily builder + MX verifier concurrency + tracking column
- build_trucking_campaigns.py: nightly script that creates 8 Listmonk campaigns per day (4 TZ x 2 types: MCS-150 overdue 2k/TZ, inactive USDOT 1k/TZ) at 4AM ET / 5AM ET (CT) / 6AM ET (MT) / 7AM ET (PT). Deduplicates via listmonk_sent_at column. - migration 083: add listmonk_sent_at + listmonk_campaign_type to fmcsa_carriers - email_verifier.py: bump max_workers from 5 to 20 for 4x faster throughput - cron: daily pw-trucking-campaigns at 08:00 UTC (3 AM EST) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
54a46062a5
commit
0b7a35a58e
4 changed files with 294 additions and 1 deletions
274
scripts/build_trucking_campaigns.py
Normal file
274
scripts/build_trucking_campaigns.py
Normal file
|
|
@ -0,0 +1,274 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Daily trucking campaign builder.
|
||||
|
||||
Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns:
|
||||
- 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST)
|
||||
- 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule)
|
||||
|
||||
Selection criteria:
|
||||
- email verified or catch-all (usable)
|
||||
- mcs150_parsed > 2 years ago (for MCS-150 campaign)
|
||||
- oos_active IS TRUE (for inactive USDOT campaign)
|
||||
- listmonk_sent_at IS NULL (not yet sent)
|
||||
- ordered by mcs150_parsed ASC (most overdue first)
|
||||
|
||||
Usage:
|
||||
python3 scripts/build_trucking_campaigns.py
|
||||
python3 scripts/build_trucking_campaigns.py --dry-run
|
||||
python3 scripts/build_trucking_campaigns.py --date 2026-06-02
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
|
||||
import psycopg2
|
||||
|
||||
LOG = logging.getLogger("build_trucking_campaigns")
|
||||
|
||||
# ── Listmonk ──────────────────────────────────────────────────────────────────
|
||||
LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000")
|
||||
LISTMONK_USER = os.getenv("LISTMONK_USER", "api")
|
||||
LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
|
||||
_LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
|
||||
|
||||
# ── Source campaign IDs ────────────────────────────────────────────────────────
|
||||
CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk"
|
||||
CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over"
|
||||
|
||||
# ── TZ config: tz_key -> (states, send_hour_utc) ─────────────────────────────
|
||||
# 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local
|
||||
TIMEZONE_CONFIG = {
|
||||
"ET": {
|
||||
"states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI",
|
||||
"NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"),
|
||||
"send_hour_utc": 9, # 4 AM EST
|
||||
},
|
||||
"CT": {
|
||||
"states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND",
|
||||
"NE","OK","SD","TX","WI"),
|
||||
"send_hour_utc": 10, # 5 AM EST = 4 AM CST
|
||||
},
|
||||
"MT": {
|
||||
"states": ("AZ","CO","ID","MT","NM","UT","WY"),
|
||||
"send_hour_utc": 11, # 6 AM EST = 4 AM MST
|
||||
},
|
||||
"PT": {
|
||||
"states": ("AK","CA","HI","NV","OR","WA"),
|
||||
"send_hour_utc": 12, # 7 AM EST = 4 AM PST
|
||||
},
|
||||
}
|
||||
|
||||
USABLE_FILTER = (
|
||||
"(email_verified IS TRUE OR email_verify_result IN "
|
||||
"('smtp_valid','catch_all_domain','catch_all_detected'))"
|
||||
)
|
||||
|
||||
DB_URL = os.getenv("DATABASE_URL", "")
|
||||
|
||||
|
||||
def lm_api(path: str, data: dict | None = None, method: str | None = None):
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"}
|
||||
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
|
||||
if data is not None:
|
||||
req.data = json.dumps(data).encode()
|
||||
if method:
|
||||
req.method = method
|
||||
try:
|
||||
return json.loads(urllib.request.urlopen(req, timeout=30).read())
|
||||
except urllib.error.HTTPError as e:
|
||||
body = e.read().decode()[:300]
|
||||
raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}")
|
||||
|
||||
|
||||
def get_base_campaign(campaign_id: int) -> dict:
|
||||
return lm_api(f"/campaigns/{campaign_id}")["data"]
|
||||
|
||||
|
||||
def create_list(name: str) -> int:
|
||||
result = lm_api("/lists", {
|
||||
"name": name,
|
||||
"type": "private",
|
||||
"optin": "single",
|
||||
"tags": ["trucking", "auto"],
|
||||
}, "POST")
|
||||
return result["data"]["id"]
|
||||
|
||||
|
||||
def import_subscribers(list_id: int, subscribers: list[dict]) -> None:
|
||||
"""Bulk-import subscribers to a Listmonk list."""
|
||||
# Listmonk bulk import via CSV-style JSON
|
||||
lm_api(f"/subscribers", {
|
||||
"subscribers": subscribers,
|
||||
"list_ids": [list_id],
|
||||
"overwrite": False,
|
||||
}, "POST")
|
||||
|
||||
|
||||
def create_and_schedule_campaign(
|
||||
base: dict,
|
||||
list_id: int,
|
||||
name: str,
|
||||
send_at_utc: datetime,
|
||||
) -> int:
|
||||
payload = {
|
||||
"name": name,
|
||||
"subject": base["subject"],
|
||||
"lists": [list_id],
|
||||
"from_email": base["from_email"],
|
||||
"type": "regular",
|
||||
"content_type": base["content_type"],
|
||||
"body": base["body"],
|
||||
"altbody": base.get("altbody"),
|
||||
"template_id": base["template_id"],
|
||||
"tags": base.get("tags") or [],
|
||||
"messenger": base.get("messenger") or "email",
|
||||
"send_at": send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00"),
|
||||
}
|
||||
result = lm_api("/campaigns", payload, "POST")
|
||||
cid = result["data"]["id"]
|
||||
# Schedule it
|
||||
lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT")
|
||||
return cid
|
||||
|
||||
|
||||
def fetch_carriers(
|
||||
conn,
|
||||
tz_states: tuple,
|
||||
campaign_type: str,
|
||||
limit: int,
|
||||
) -> list[tuple]:
|
||||
"""Return (dot_number, email_address, legal_name, phy_state) rows."""
|
||||
cur = conn.cursor()
|
||||
states_placeholder = ",".join(["%s"] * len(tz_states))
|
||||
if campaign_type == "mcs150":
|
||||
type_filter = "mcs150_parsed < NOW() - INTERVAL '2 years'"
|
||||
else:
|
||||
type_filter = "oos_active IS TRUE"
|
||||
|
||||
cur.execute(f"""
|
||||
SELECT dot_number, email_address, legal_name, phy_state
|
||||
FROM fmcsa_carriers
|
||||
WHERE {type_filter}
|
||||
AND {USABLE_FILTER}
|
||||
AND listmonk_sent_at IS NULL
|
||||
AND phy_state IN ({states_placeholder})
|
||||
ORDER BY mcs150_parsed ASC NULLS LAST
|
||||
LIMIT %s
|
||||
""", list(tz_states) + [limit])
|
||||
return cur.fetchall()
|
||||
|
||||
|
||||
def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None:
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
UPDATE fmcsa_carriers
|
||||
SET listmonk_sent_at = NOW(),
|
||||
listmonk_campaign_type = %s
|
||||
WHERE dot_number = ANY(%s::text[])
|
||||
""", (campaign_type, dot_numbers))
|
||||
conn.commit()
|
||||
|
||||
|
||||
def run(send_date: date, dry_run: bool = False) -> None:
|
||||
conn = psycopg2.connect(DB_URL)
|
||||
|
||||
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
|
||||
base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID)
|
||||
|
||||
campaign_specs = [
|
||||
("mcs150", base_mcs150, 2000, "MCS-150 Overdue"),
|
||||
("inactive", base_inactive, 1000, "Inactive USDOT"),
|
||||
]
|
||||
|
||||
for tz, tz_cfg in TIMEZONE_CONFIG.items():
|
||||
states = tz_cfg["states"]
|
||||
send_hour = tz_cfg["send_hour_utc"]
|
||||
send_at = datetime(
|
||||
send_date.year, send_date.month, send_date.day,
|
||||
send_hour, 0, 0, tzinfo=timezone.utc
|
||||
)
|
||||
|
||||
for campaign_type, base, limit, label in campaign_specs:
|
||||
rows = fetch_carriers(conn, states, campaign_type, limit)
|
||||
if not rows:
|
||||
LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type)
|
||||
continue
|
||||
|
||||
actual = len(rows)
|
||||
list_name = f"Trucking {label} {tz} {send_date.isoformat()}"
|
||||
campaign_name = f"{label} - {tz} - {send_date.strftime('%b %d %Y')}"
|
||||
|
||||
LOG.info("[%s/%s] %d carriers -> %s (send %s UTC)",
|
||||
tz, campaign_type, actual, campaign_name,
|
||||
send_at.strftime("%Y-%m-%d %H:%M"))
|
||||
|
||||
if dry_run:
|
||||
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
|
||||
continue
|
||||
|
||||
# Create Listmonk list
|
||||
list_id = create_list(list_name)
|
||||
|
||||
# Import subscribers
|
||||
subscribers = [
|
||||
{
|
||||
"email": row[1],
|
||||
"name": row[2] or row[1],
|
||||
"attribs": {
|
||||
"dot_number": row[0],
|
||||
"company": row[2] or "",
|
||||
"state": row[3] or "",
|
||||
},
|
||||
}
|
||||
for row in rows
|
||||
]
|
||||
try:
|
||||
import_subscribers(list_id, subscribers)
|
||||
except Exception as exc:
|
||||
LOG.warning("[%s/%s] Subscriber import error: %s — falling back to individual", tz, campaign_type, exc)
|
||||
for sub in subscribers:
|
||||
try:
|
||||
lm_api("/subscribers", {**sub, "list_ids": [list_id], "status": "enabled", "preconfirm_subscriptions": True}, "POST")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Create and schedule the campaign
|
||||
cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at)
|
||||
LOG.info("[%s/%s] Campaign %d scheduled for %s UTC", tz, campaign_type, cid, send_at.isoformat())
|
||||
|
||||
# Mark carriers as sent in DB
|
||||
dot_numbers = [row[0] for row in rows]
|
||||
mark_sent(conn, dot_numbers, campaign_type)
|
||||
|
||||
conn.close()
|
||||
LOG.info("Done.")
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
||||
)
|
||||
parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
|
||||
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.date:
|
||||
send_date = date.fromisoformat(args.date)
|
||||
else:
|
||||
send_date = date.today() + timedelta(days=1)
|
||||
|
||||
LOG.info("Building campaigns for send date %s (dry_run=%s)", send_date, args.dry_run)
|
||||
run(send_date, dry_run=args.dry_run)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue