diff --git a/scripts/workers/fmcsa_deficiency_flagger.py b/scripts/workers/fmcsa_deficiency_flagger.py
index 5d3f27a..63dc069 100644
--- a/scripts/workers/fmcsa_deficiency_flagger.py
+++ b/scripts/workers/fmcsa_deficiency_flagger.py
@@ -427,37 +427,62 @@ def populate_state_lists() -> None:
LOG.error("Failed to create list %s: %s", list_name, e)
continue
- # Import in batches
+ # Import subscribers individually (bulk import API returns 400)
imported = 0
- for i in range(0, len(rows), 500):
- batch = rows[i:i+500]
- subscribers = []
- for dot, name, email, phone, city, st, issues, def_count in batch:
- issues_list = (issues or "").split("; ")
- issues_html = "".join(f"
{iss}" for iss in issues_list if iss)
- subscribers.append({
- "email": email.lower().strip(),
- "name": name or "",
- "status": "enabled",
- "lists": [list_id],
- "attribs": {
- "company": name or "", "dot_number": dot,
- "phone": phone or "", "city": city or "",
- "state": st or "", "deficiency_count": def_count or 0,
- "issues_html": f"",
- },
- })
+ failed = 0
+ for dot, name, email, phone, city, st, issues, def_count in rows:
+ issues_list = (issues or "").split("; ")
+ issues_html = "".join(f"{iss}" for iss in issues_list if iss)
+ sub_data = {
+ "email": email.lower().strip(),
+ "name": name or email.split("@")[0],
+ "status": "enabled",
+ "lists": [list_id],
+ "preconfirm_subscriptions": True,
+ "attribs": {
+ "company": name or "", "dot_number": dot,
+ "phone": phone or "", "city": city or "",
+ "state": st or "", "deficiency_count": def_count or 0,
+ "issues_html": f"",
+ },
+ }
try:
- data = json.dumps({"mode": "subscribe", "subscribers": subscribers,
- "lists": [list_id], "overwrite": True}).encode()
- req = urllib.request.Request(f"{LISTMONK_API}/api/import/subscribers",
+ data = json.dumps(sub_data).encode()
+ req = urllib.request.Request(f"{LISTMONK_API}/api/subscribers",
data=data, method="POST", headers=headers)
urllib.request.urlopen(req)
- imported += len(batch)
- except Exception as e:
- LOG.error(" Batch import failed for %s: %s", state_code, e)
+ imported += 1
+ except urllib.error.HTTPError as e:
+ if e.code == 409: # Already exists — add to list
+ try:
+ # Look up existing subscriber and add to list
+ search_req = urllib.request.Request(
+ f"{LISTMONK_API}/api/subscribers?query=subscribers.email='{email.lower().strip()}'",
+ headers=headers)
+ search_resp = json.loads(urllib.request.urlopen(search_req).read())
+ results = search_resp.get("data", {}).get("results", [])
+ if results:
+ sub_id = results[0]["id"]
+ existing_lists = [l["id"] for l in results[0].get("lists", [])]
+ if list_id not in existing_lists:
+ existing_lists.append(list_id)
+ update_data = json.dumps({"lists": existing_lists, "attribs": sub_data["attribs"]}).encode()
+ update_req = urllib.request.Request(
+ f"{LISTMONK_API}/api/subscribers/{sub_id}",
+ data=update_data, method="PUT", headers=headers)
+ urllib.request.urlopen(update_req)
+ imported += 1
+ except Exception:
+ failed += 1
+ else:
+ failed += 1
+ except Exception:
+ failed += 1
- LOG.info(" %s: imported %d to list '%s'", state_code, imported, list_name)
+ if (imported + failed) % 200 == 0 and (imported + failed) > 0:
+ LOG.info(" %s: %d imported, %d failed of %d", state_code, imported, failed, len(rows))
+
+ LOG.info(" %s: imported %d, failed %d to list '%s'", state_code, imported, failed, list_name)
conn.close()