fix: stagger trucking campaign catchups and subscriber reattach

This commit is contained in:
justin 2026-06-03 13:21:16 -05:00
parent 6d4c323ab6
commit d7de818f39

View file

@ -209,7 +209,13 @@ def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
# Already exists — attach to this list instead of failing
if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg:
try:
subs = lm_api(f"/subscribers?query=" + urllib.parse.quote(f"subscribers.email='{email}'"))
subs = lm_api(
"/subscribers?"
+ urllib.parse.urlencode({
"query": f"email = '{email.lower().strip()}'",
"per_page": 1,
})
)
results = subs.get("data", {}).get("results", [])
if results:
lm_api("/subscribers/lists", {
@ -401,7 +407,8 @@ def list_segments(send_date: date) -> None:
def run(send_date: date, dry_run: bool = False, preview: bool = False,
max_per_segment: int | None = None, only_segments: set[str] | None = None,
send_hour_override: int | None = None, send_minute: int = 0) -> None:
send_hour_override: int | None = None, send_minute: int = 0,
stagger_minutes: int = 0) -> None:
conn = psycopg2.connect(DB_URL)
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
@ -445,6 +452,8 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
scheduled_count = 0
for tz, tz_cfg in TIMEZONE_CONFIG.items():
states = tz_cfg["states"]
send_hour = send_hour_override if send_hour_override is not None else tz_cfg["send_hour_utc"]
@ -454,6 +463,7 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
)
for campaign_type, base, limit, label in campaign_specs:
effective_send_at = send_at + timedelta(minutes=stagger_minutes * scheduled_count)
fetch_limit = 1 if preview else limit
rows = fetch_carriers(conn, states, campaign_type, fetch_limit)
if not rows:
@ -467,10 +477,11 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
LOG.info("[%s/%s] %d carriers -> %s (send %s UTC)",
tz, campaign_type, actual, campaign_name,
send_at.strftime("%Y-%m-%d %H:%M"))
effective_send_at.strftime("%Y-%m-%d %H:%M"))
if dry_run:
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
scheduled_count += 1
continue
# Build subscriber list. In preview, only the owner's address (with the
@ -503,9 +514,10 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False,
continue
# Create campaign (draft in preview, scheduled in real run)
cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at, schedule=not preview)
cid = create_and_schedule_campaign(base, list_id, campaign_name, effective_send_at, schedule=not preview)
LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid,
"draft/preview" if preview else f"scheduled {send_at.isoformat()}")
"draft/preview" if preview else f"scheduled {effective_send_at.isoformat()}")
scheduled_count += 1
# Send a test to the owner so they can spot-check the rendered email
send_test(base, cid, rows[0], label, tz, campaign_type)
@ -536,6 +548,8 @@ def main():
help="Override send hour (UTC) for ALL timezones instead of per-tz defaults.")
parser.add_argument("--send-minute", type=int, default=0,
help="Send minute (UTC), used with --send-hour. Default 0.")
parser.add_argument("--stagger-minutes", type=int, default=0,
help="Add this many minutes between each created campaign. Useful for catchup sends.")
args = parser.parse_args()
if args.date:
@ -549,12 +563,13 @@ def main():
only = set(args.only_segment) if args.only_segment else None
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, "
"max_per_segment=%s, only=%s, send_hour=%s)",
"max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s)",
send_date, args.dry_run, args.preview, args.max_per_segment,
sorted(only) if only else None, args.send_hour)
sorted(only) if only else None, args.send_hour, args.stagger_minutes)
run(send_date, dry_run=args.dry_run, preview=args.preview,
max_per_segment=args.max_per_segment, only_segments=only,
send_hour_override=args.send_hour, send_minute=args.send_minute)
send_hour_override=args.send_hour, send_minute=args.send_minute,
stagger_minutes=args.stagger_minutes)
if __name__ == "__main__":