From 258e384f95d0ef7533f017842e82e023c894824f Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 29 May 2026 14:26:44 -0500 Subject: [PATCH] Fix state list import: use individual subscriber creates instead of bulk Listmonk bulk import API returns 400. Switch to individual POST /api/subscribers with 409 conflict handling (update existing subs to add them to the new list). Progress logging every 200 records. Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/workers/fmcsa_deficiency_flagger.py | 77 ++++++++++++++------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/scripts/workers/fmcsa_deficiency_flagger.py b/scripts/workers/fmcsa_deficiency_flagger.py index 5d3f27a..63dc069 100644 --- a/scripts/workers/fmcsa_deficiency_flagger.py +++ b/scripts/workers/fmcsa_deficiency_flagger.py @@ -427,37 +427,62 @@ def populate_state_lists() -> None: LOG.error("Failed to create list %s: %s", list_name, e) continue - # Import in batches + # Import subscribers individually (bulk import API returns 400) imported = 0 - for i in range(0, len(rows), 500): - batch = rows[i:i+500] - subscribers = [] - for dot, name, email, phone, city, st, issues, def_count in batch: - issues_list = (issues or "").split("; ") - issues_html = "".join(f"
  • {iss}
  • " for iss in issues_list if iss) - subscribers.append({ - "email": email.lower().strip(), - "name": name or "", - "status": "enabled", - "lists": [list_id], - "attribs": { - "company": name or "", "dot_number": dot, - "phone": phone or "", "city": city or "", - "state": st or "", "deficiency_count": def_count or 0, - "issues_html": f"", - }, - }) + failed = 0 + for dot, name, email, phone, city, st, issues, def_count in rows: + issues_list = (issues or "").split("; ") + issues_html = "".join(f"
  • {iss}
  • " for iss in issues_list if iss) + sub_data = { + "email": email.lower().strip(), + "name": name or email.split("@")[0], + "status": "enabled", + "lists": [list_id], + "preconfirm_subscriptions": True, + "attribs": { + "company": name or "", "dot_number": dot, + "phone": phone or "", "city": city or "", + "state": st or "", "deficiency_count": def_count or 0, + "issues_html": f"", + }, + } try: - data = json.dumps({"mode": "subscribe", "subscribers": subscribers, - "lists": [list_id], "overwrite": True}).encode() - req = urllib.request.Request(f"{LISTMONK_API}/api/import/subscribers", + data = json.dumps(sub_data).encode() + req = urllib.request.Request(f"{LISTMONK_API}/api/subscribers", data=data, method="POST", headers=headers) urllib.request.urlopen(req) - imported += len(batch) - except Exception as e: - LOG.error(" Batch import failed for %s: %s", state_code, e) + imported += 1 + except urllib.error.HTTPError as e: + if e.code == 409: # Already exists — add to list + try: + # Look up existing subscriber and add to list + search_req = urllib.request.Request( + f"{LISTMONK_API}/api/subscribers?query=subscribers.email='{email.lower().strip()}'", + headers=headers) + search_resp = json.loads(urllib.request.urlopen(search_req).read()) + results = search_resp.get("data", {}).get("results", []) + if results: + sub_id = results[0]["id"] + existing_lists = [l["id"] for l in results[0].get("lists", [])] + if list_id not in existing_lists: + existing_lists.append(list_id) + update_data = json.dumps({"lists": existing_lists, "attribs": sub_data["attribs"]}).encode() + update_req = urllib.request.Request( + f"{LISTMONK_API}/api/subscribers/{sub_id}", + data=update_data, method="PUT", headers=headers) + urllib.request.urlopen(update_req) + imported += 1 + except Exception: + failed += 1 + else: + failed += 1 + except Exception: + failed += 1 - LOG.info(" %s: imported %d to list '%s'", state_code, imported, list_name) + if (imported + failed) % 200 == 0 and (imported + failed) > 0: + LOG.info(" %s: %d imported, %d failed of %d", state_code, imported, failed, len(rows)) + + LOG.info(" %s: imported %d, failed %d to list '%s'", state_code, imported, failed, list_name) conn.close()