From 60312e5201cd8ec820b7d5ab7ae8b0f3ff3ab3a7 Mon Sep 17 00:00:00 2001 From: justin Date: Sun, 31 May 2026 17:28:09 -0500 Subject: [PATCH] campaign builder: add --preview mode + fix subscriber-attach + test-send list bugs - import_subscribers: was POSTing wrong bulk shape AND fallback used 'list_ids' (ignored by Listmonk) instead of 'lists' -> subscribers never attached to the list -> real sends would go to an empty list. Now single-adds with 'lists', handles already-exists, returns a count, logs if 0 added. - send_test: passed base['lists'] (objects) instead of IDs -> test send rejected. Now extracts list IDs. - create_and_schedule_campaign: add schedule= flag (preview makes drafts). - --preview: 1 sample carrier/campaign, only owner email, drafts not scheduled, test sends immediately, never marks real carriers sent. Co-Authored-By: Claude Opus 4.8 (1M context) --- scripts/build_trucking_campaigns.py | 152 ++++++++++++++++++---------- 1 file changed, 101 insertions(+), 51 deletions(-) diff --git a/scripts/build_trucking_campaigns.py b/scripts/build_trucking_campaigns.py index be500d7..b486ccd 100644 --- a/scripts/build_trucking_campaigns.py +++ b/scripts/build_trucking_campaigns.py @@ -26,6 +26,7 @@ import logging import os import sys import urllib.request +import urllib.parse from datetime import date, datetime, timedelta, timezone import psycopg2 @@ -104,14 +105,49 @@ def create_list(name: str) -> int: return result["data"]["id"] -def import_subscribers(list_id: int, subscribers: list[dict]) -> None: - """Bulk-import subscribers to a Listmonk list.""" - # Listmonk bulk import via CSV-style JSON - lm_api(f"/subscribers", { - "subscribers": subscribers, - "list_ids": [list_id], - "overwrite": False, - }, "POST") +def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool: + """Create/attach a single subscriber to a list. Returns True on success. + + Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of + list IDs) and preconfirm_subscriptions=true so they're immediately sendable. + """ + try: + lm_api("/subscribers", { + "email": email, + "name": name or email, + "status": "enabled", + "lists": [list_id], + "attribs": attribs, + "preconfirm_subscriptions": True, + }, "POST") + return True + except Exception as exc: + msg = str(exc) + # Already exists — attach to this list instead of failing + if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg: + try: + subs = lm_api(f"/subscribers?query=" + urllib.parse.quote(f"subscribers.email='{email}'")) + results = subs.get("data", {}).get("results", []) + if results: + lm_api("/subscribers/lists", { + "ids": [results[0]["id"]], + "action": "add", + "target_list_ids": [list_id], + "status": "confirmed", + }, "PUT") + return True + except Exception: + return False + return False + + +def import_subscribers(list_id: int, subscribers: list[dict]) -> int: + """Add subscribers to a list one-by-one. Returns count successfully added.""" + added = 0 + for sub in subscribers: + if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})): + added += 1 + return added def create_and_schedule_campaign( @@ -119,6 +155,7 @@ def create_and_schedule_campaign( list_id: int, name: str, send_at_utc: datetime, + schedule: bool = True, ) -> int: payload = { "name": name, @@ -132,12 +169,14 @@ def create_and_schedule_campaign( "template_id": base["template_id"], "tags": base.get("tags") or [], "messenger": base.get("messenger") or "email", - "send_at": send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00"), } + if schedule: + payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00") result = lm_api("/campaigns", payload, "POST") cid = result["data"]["id"] - # Schedule it - lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT") + if schedule: + lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT") + # else: leave as draft (preview mode) return cid @@ -150,10 +189,12 @@ def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: s body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX") body = body.replace("{{ UnsubscribeURL }}", "https://performancewest.net/unsubscribe") subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000") + # Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects) + list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1] payload = { - "name": base.get("name", ""), "subject": subj, - "lists": base.get("lists", []), "from_email": base["from_email"], - "type": base.get("type", "regular"), "content_type": base["content_type"], + "name": base.get("name", "Test"), "subject": subj, + "lists": list_ids, "from_email": base["from_email"], + "type": "regular", "content_type": base["content_type"], "body": body, "altbody": base.get("altbody"), "template_id": base["template_id"], "tags": base.get("tags") or [], "messenger": base.get("messenger") or "email", @@ -204,7 +245,7 @@ def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None: conn.commit() -def run(send_date: date, dry_run: bool = False) -> None: +def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None: conn = psycopg2.connect(DB_URL) base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID) @@ -215,6 +256,10 @@ def run(send_date: date, dry_run: bool = False) -> None: ("inactive", base_inactive, 1000, "Inactive USDOT"), ] + if preview: + LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, " + "test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL) + for tz, tz_cfg in TIMEZONE_CONFIG.items(): states = tz_cfg["states"] send_hour = tz_cfg["send_hour_utc"] @@ -224,14 +269,16 @@ def run(send_date: date, dry_run: bool = False) -> None: ) for campaign_type, base, limit, label in campaign_specs: - rows = fetch_carriers(conn, states, campaign_type, limit) + fetch_limit = 1 if preview else limit + rows = fetch_carriers(conn, states, campaign_type, fetch_limit) if not rows: LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type) continue actual = len(rows) - list_name = f"Trucking {label} {tz} {send_date.isoformat()}" - campaign_name = f"{label} - {tz} - {send_date.strftime('%b %d %Y')}" + tag = "PREVIEW " if preview else "" + list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}" + campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}" LOG.info("[%s/%s] %d carriers -> %s (send %s UTC)", tz, campaign_type, actual, campaign_name, @@ -241,42 +288,44 @@ def run(send_date: date, dry_run: bool = False) -> None: LOG.info(" DRY RUN — skipping Listmonk + DB writes") continue - # Create Listmonk list + # Build subscriber list. In preview, only the owner's address (with the + # sample carrier's attribs) so the real audience is never touched. + if preview: + r0 = rows[0] + subscribers = [{ + "email": TEST_EMAIL, + "name": r0[2] or "Sample Carrier", + "attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or ""}, + }] + else: + subscribers = [ + { + "email": row[1], + "name": row[2] or row[1], + "attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or ""}, + } + for row in rows + ] + + # Create list + add subscribers list_id = create_list(list_name) + added = import_subscribers(list_id, subscribers) + LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers)) + if added == 0: + LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id) + continue - # Import subscribers - subscribers = [ - { - "email": row[1], - "name": row[2] or row[1], - "attribs": { - "dot_number": row[0], - "company": row[2] or "", - "state": row[3] or "", - }, - } - for row in rows - ] - try: - import_subscribers(list_id, subscribers) - except Exception as exc: - LOG.warning("[%s/%s] Subscriber import error: %s — falling back to individual", tz, campaign_type, exc) - for sub in subscribers: - try: - lm_api("/subscribers", {**sub, "list_ids": [list_id], "status": "enabled", "preconfirm_subscriptions": True}, "POST") - except Exception: - pass + # Create campaign (draft in preview, scheduled in real run) + cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at, schedule=not preview) + LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid, + "draft/preview" if preview else f"scheduled {send_at.isoformat()}") - # Create and schedule the campaign - cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at) - LOG.info("[%s/%s] Campaign %d scheduled for %s UTC", tz, campaign_type, cid, send_at.isoformat()) - - # Send a test to the owner so they can spot-check before the blast fires + # Send a test to the owner so they can spot-check the rendered email send_test(base, cid, rows[0], label, tz) - # Mark carriers as sent in DB - dot_numbers = [row[0] for row in rows] - mark_sent(conn, dot_numbers, campaign_type) + # Mark carriers as sent only on a real run + if not preview: + mark_sent(conn, [row[0] for row in rows], campaign_type) conn.close() LOG.info("Done.") @@ -289,6 +338,7 @@ def main(): ) parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk") parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes") + parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent") parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)") args = parser.parse_args() @@ -297,8 +347,8 @@ def main(): else: send_date = date.today() + timedelta(days=1) - LOG.info("Building campaigns for send date %s (dry_run=%s)", send_date, args.dry_run) - run(send_date, dry_run=args.dry_run) + LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s)", send_date, args.dry_run, args.preview) + run(send_date, dry_run=args.dry_run, preview=args.preview) if __name__ == "__main__":