campaign builder: add --preview mode + fix subscriber-attach + test-send list bugs

- import_subscribers: was POSTing wrong bulk shape AND fallback used 'list_ids'
  (ignored by Listmonk) instead of 'lists' -> subscribers never attached to the
  list -> real sends would go to an empty list. Now single-adds with 'lists',
  handles already-exists, returns a count, logs if 0 added.
- send_test: passed base['lists'] (objects) instead of IDs -> test send rejected.
  Now extracts list IDs.
- create_and_schedule_campaign: add schedule= flag (preview makes drafts).
- --preview: 1 sample carrier/campaign, only owner email, drafts not scheduled,
  test sends immediately, never marks real carriers sent.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
justin 2026-05-31 17:28:09 -05:00
parent 03702dfbb7
commit 60312e5201

View file

@ -26,6 +26,7 @@ import logging
import os
import sys
import urllib.request
import urllib.parse
from datetime import date, datetime, timedelta, timezone
import psycopg2
@ -104,14 +105,49 @@ def create_list(name: str) -> int:
return result["data"]["id"]
def import_subscribers(list_id: int, subscribers: list[dict]) -> None:
"""Bulk-import subscribers to a Listmonk list."""
# Listmonk bulk import via CSV-style JSON
lm_api(f"/subscribers", {
"subscribers": subscribers,
"list_ids": [list_id],
"overwrite": False,
}, "POST")
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
"""Create/attach a single subscriber to a list. Returns True on success.
Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of
list IDs) and preconfirm_subscriptions=true so they're immediately sendable.
"""
try:
lm_api("/subscribers", {
"email": email,
"name": name or email,
"status": "enabled",
"lists": [list_id],
"attribs": attribs,
"preconfirm_subscriptions": True,
}, "POST")
return True
except Exception as exc:
msg = str(exc)
# Already exists — attach to this list instead of failing
if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg:
try:
subs = lm_api(f"/subscribers?query=" + urllib.parse.quote(f"subscribers.email='{email}'"))
results = subs.get("data", {}).get("results", [])
if results:
lm_api("/subscribers/lists", {
"ids": [results[0]["id"]],
"action": "add",
"target_list_ids": [list_id],
"status": "confirmed",
}, "PUT")
return True
except Exception:
return False
return False
def import_subscribers(list_id: int, subscribers: list[dict]) -> int:
"""Add subscribers to a list one-by-one. Returns count successfully added."""
added = 0
for sub in subscribers:
if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})):
added += 1
return added
def create_and_schedule_campaign(
@ -119,6 +155,7 @@ def create_and_schedule_campaign(
list_id: int,
name: str,
send_at_utc: datetime,
schedule: bool = True,
) -> int:
payload = {
"name": name,
@ -132,12 +169,14 @@ def create_and_schedule_campaign(
"template_id": base["template_id"],
"tags": base.get("tags") or [],
"messenger": base.get("messenger") or "email",
"send_at": send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00"),
}
if schedule:
payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00")
result = lm_api("/campaigns", payload, "POST")
cid = result["data"]["id"]
# Schedule it
lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT")
if schedule:
lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT")
# else: leave as draft (preview mode)
return cid
@ -150,10 +189,12 @@ def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: s
body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX")
body = body.replace("{{ UnsubscribeURL }}", "https://performancewest.net/unsubscribe")
subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
# Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects)
list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1]
payload = {
"name": base.get("name", ""), "subject": subj,
"lists": base.get("lists", []), "from_email": base["from_email"],
"type": base.get("type", "regular"), "content_type": base["content_type"],
"name": base.get("name", "Test"), "subject": subj,
"lists": list_ids, "from_email": base["from_email"],
"type": "regular", "content_type": base["content_type"],
"body": body, "altbody": base.get("altbody"),
"template_id": base["template_id"],
"tags": base.get("tags") or [], "messenger": base.get("messenger") or "email",
@ -204,7 +245,7 @@ def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None:
conn.commit()
def run(send_date: date, dry_run: bool = False) -> None:
def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None:
conn = psycopg2.connect(DB_URL)
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
@ -215,6 +256,10 @@ def run(send_date: date, dry_run: bool = False) -> None:
("inactive", base_inactive, 1000, "Inactive USDOT"),
]
if preview:
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
for tz, tz_cfg in TIMEZONE_CONFIG.items():
states = tz_cfg["states"]
send_hour = tz_cfg["send_hour_utc"]
@ -224,14 +269,16 @@ def run(send_date: date, dry_run: bool = False) -> None:
)
for campaign_type, base, limit, label in campaign_specs:
rows = fetch_carriers(conn, states, campaign_type, limit)
fetch_limit = 1 if preview else limit
rows = fetch_carriers(conn, states, campaign_type, fetch_limit)
if not rows:
LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type)
continue
actual = len(rows)
list_name = f"Trucking {label} {tz} {send_date.isoformat()}"
campaign_name = f"{label} - {tz} - {send_date.strftime('%b %d %Y')}"
tag = "PREVIEW " if preview else ""
list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}"
campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}"
LOG.info("[%s/%s] %d carriers -> %s (send %s UTC)",
tz, campaign_type, actual, campaign_name,
@ -241,42 +288,44 @@ def run(send_date: date, dry_run: bool = False) -> None:
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
continue
# Create Listmonk list
# Build subscriber list. In preview, only the owner's address (with the
# sample carrier's attribs) so the real audience is never touched.
if preview:
r0 = rows[0]
subscribers = [{
"email": TEST_EMAIL,
"name": r0[2] or "Sample Carrier",
"attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or ""},
}]
else:
subscribers = [
{
"email": row[1],
"name": row[2] or row[1],
"attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or ""},
}
for row in rows
]
# Create list + add subscribers
list_id = create_list(list_name)
added = import_subscribers(list_id, subscribers)
LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers))
if added == 0:
LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id)
continue
# Import subscribers
subscribers = [
{
"email": row[1],
"name": row[2] or row[1],
"attribs": {
"dot_number": row[0],
"company": row[2] or "",
"state": row[3] or "",
},
}
for row in rows
]
try:
import_subscribers(list_id, subscribers)
except Exception as exc:
LOG.warning("[%s/%s] Subscriber import error: %s — falling back to individual", tz, campaign_type, exc)
for sub in subscribers:
try:
lm_api("/subscribers", {**sub, "list_ids": [list_id], "status": "enabled", "preconfirm_subscriptions": True}, "POST")
except Exception:
pass
# Create campaign (draft in preview, scheduled in real run)
cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at, schedule=not preview)
LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid,
"draft/preview" if preview else f"scheduled {send_at.isoformat()}")
# Create and schedule the campaign
cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at)
LOG.info("[%s/%s] Campaign %d scheduled for %s UTC", tz, campaign_type, cid, send_at.isoformat())
# Send a test to the owner so they can spot-check before the blast fires
# Send a test to the owner so they can spot-check the rendered email
send_test(base, cid, rows[0], label, tz)
# Mark carriers as sent in DB
dot_numbers = [row[0] for row in rows]
mark_sent(conn, dot_numbers, campaign_type)
# Mark carriers as sent only on a real run
if not preview:
mark_sent(conn, [row[0] for row in rows], campaign_type)
conn.close()
LOG.info("Done.")
@ -289,6 +338,7 @@ def main():
)
parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk")
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent")
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)")
args = parser.parse_args()
@ -297,8 +347,8 @@ def main():
else:
send_date = date.today() + timedelta(days=1)
LOG.info("Building campaigns for send date %s (dry_run=%s)", send_date, args.dry_run)
run(send_date, dry_run=args.dry_run)
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s)", send_date, args.dry_run, args.preview)
run(send_date, dry_run=args.dry_run, preview=args.preview)
if __name__ == "__main__":