diff --git a/scripts/workers/fmcsa_deficiency_flagger.py b/scripts/workers/fmcsa_deficiency_flagger.py index 00cf740..7cd7040 100644 --- a/scripts/workers/fmcsa_deficiency_flagger.py +++ b/scripts/workers/fmcsa_deficiency_flagger.py @@ -69,6 +69,25 @@ def flag_carriers(dry_run: bool = False) -> dict: conn.rollback() conn.commit() + # Load state trucking requirements for state-level flags + state_reqs = {} + try: + cur.execute("SELECT state_code, weight_distance_tax, state_carrier_permit, " + "emissions_program, intrastate_authority_required, " + "weight_distance_tax_name, state_carrier_permit_name, " + "emissions_program_name FROM state_trucking_requirements") + for sr in cur.fetchall(): + state_reqs[sr[0]] = { + "weight_tax": sr[1], "carrier_permit": sr[2], + "emissions": sr[3], "intrastate": sr[4], + "weight_tax_name": sr[5], "permit_name": sr[6], + "emissions_name": sr[7], + } + LOG.info("Loaded state requirements for %d states", len(state_reqs)) + except Exception: + conn.rollback() + LOG.warning("state_trucking_requirements table not found — skipping state flags") + # Query all carriers with email LOG.info("Querying carriers...") cur.execute(""" @@ -143,6 +162,45 @@ def flag_carriers(dry_run: bool = False) -> dict: stats.setdefault("zero_fleet", 0) stats["zero_fleet"] += 1 + # State-level flags based on carrier's physical state + is_interstate = carrier_op and "interstate" in (carrier_op or "").lower() + sr = state_reqs.get(state) if state else None + if sr: + # Interstate carriers need IRP and IFTA + if is_interstate: + flags.append("interstate_needs_irp_ifta") + issues.append("Interstate carrier — IRP (apportioned registration) and IFTA (fuel tax) required") + stats.setdefault("interstate_irp_ifta", 0) + stats["interstate_irp_ifta"] += 1 + + # Weight-distance tax states (OR, NY, KY, NM, CT) + if sr["weight_tax"]: + flags.append(f"state_weight_tax_{state}") + issues.append(f"{sr['weight_tax_name'] or 'Weight-distance tax'} required in {state}") + stats.setdefault("state_weight_tax", 0) + stats["state_weight_tax"] += 1 + + # State carrier permit (CA MCP, etc.) + if sr["carrier_permit"]: + flags.append(f"state_permit_{state}") + issues.append(f"{sr['permit_name'] or 'State carrier permit'} required in {state}") + stats.setdefault("state_carrier_permit", 0) + stats["state_carrier_permit"] += 1 + + # Emissions (CA CARB, ACT states) + if sr["emissions"]: + flags.append(f"state_emissions_{state}") + issues.append(f"{sr['emissions_name'] or 'Emissions compliance'} required in {state}") + stats.setdefault("state_emissions", 0) + stats["state_emissions"] += 1 + + # Intrastate authority (29 states) + if sr["intrastate"] and for_hire: + flags.append(f"intrastate_authority_{state}") + issues.append(f"For-hire intrastate carriers in {state} need state operating authority") + stats.setdefault("intrastate_authority", 0) + stats["intrastate_authority"] += 1 + # New entrant (registered in last 18 months) — needs safety audit prep # add_date is stored as text like "01-JAN-25" # We can't easily parse all formats here, so use mcs150_parsed as a proxy @@ -316,10 +374,98 @@ def populate_listmonk(list_name: str = "FMCSA MCS-150 Overdue Carriers") -> int: return imported +def populate_state_lists() -> None: + """Create state-targeted Listmonk lists for campaign segmentation.""" + STATE_LISTS = { + "CA": "FMCSA State - California (MCP/CARB)", + "OR": "FMCSA State - Oregon (Weight-Mile Tax)", + "NY": "FMCSA State - New York (HUT)", + "KY": "FMCSA State - Kentucky (KYU)", + "NM": "FMCSA State - New Mexico (Weight-Distance)", + "CT": "FMCSA State - Connecticut (Highway Use Fee)", + "TX": "FMCSA State - Texas", + "FL": "FMCSA State - Florida", + } + + conn = psycopg2.connect(DATABASE_URL) + cur = conn.cursor() + + auth = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() + headers = {"Content-Type": "application/json", "Authorization": f"Basic {auth}"} + + for state_code, list_name in STATE_LISTS.items(): + cur.execute(""" + SELECT dot_number, legal_name, email_address, telephone, + phy_city, phy_state, issues_summary, deficiency_count + FROM fmcsa_carriers + WHERE campaign_eligible = TRUE + AND phy_state = %s + AND email_address IS NOT NULL AND email_address != '' + """, (state_code,)) + rows = cur.fetchall() + if not rows: + LOG.info("No eligible carriers in %s — skipping", state_code) + continue + + LOG.info("Found %d eligible carriers in %s", len(rows), state_code) + + # Get or create list + try: + req = urllib.request.Request(f"{LISTMONK_API}/api/lists", headers=headers) + resp = json.loads(urllib.request.urlopen(req).read()) + existing = [l for l in resp.get("data", {}).get("results", []) if l["name"] == list_name] + if existing: + list_id = existing[0]["id"] + else: + data = json.dumps({"name": list_name, "type": "public", "optin": "single"}).encode() + req = urllib.request.Request(f"{LISTMONK_API}/api/lists", data=data, method="POST", headers=headers) + resp = json.loads(urllib.request.urlopen(req).read()) + list_id = resp["data"]["id"] + LOG.info("Created list: %s (ID %d)", list_name, list_id) + except Exception as e: + LOG.error("Failed to create list %s: %s", list_name, e) + continue + + # Import in batches + imported = 0 + for i in range(0, len(rows), 500): + batch = rows[i:i+500] + subscribers = [] + for dot, name, email, phone, city, st, issues, def_count in batch: + issues_list = (issues or "").split("; ") + issues_html = "".join(f"
  • {iss}
  • " for iss in issues_list if iss) + subscribers.append({ + "email": email.lower().strip(), + "name": name or "", + "status": "enabled", + "lists": [list_id], + "attribs": { + "company": name or "", "dot_number": dot, + "phone": phone or "", "city": city or "", + "state": st or "", "deficiency_count": def_count or 0, + "issues_html": f"", + }, + }) + try: + data = json.dumps({"mode": "subscribe", "subscribers": subscribers, + "lists": [list_id], "overwrite": True}).encode() + req = urllib.request.Request(f"{LISTMONK_API}/api/import/subscribers", + data=data, method="POST", headers=headers) + urllib.request.urlopen(req) + imported += len(batch) + except Exception as e: + LOG.error(" Batch import failed for %s: %s", state_code, e) + + LOG.info(" %s: imported %d to list '%s'", state_code, imported, list_name) + + conn.close() + + def main(): parser = argparse.ArgumentParser(description="Flag FMCSA carrier deficiencies") parser.add_argument("--dry-run", action="store_true", help="Count only, don't update") parser.add_argument("--listmonk", action="store_true", help="Also populate Listmonk list") + parser.add_argument("--state-lists", action="store_true", help="Also populate state-targeted lists") args = parser.parse_args() stats = flag_carriers(dry_run=args.dry_run) @@ -331,6 +477,9 @@ def main(): if args.listmonk and not args.dry_run: populate_listmonk() + if args.state_lists and not args.dry_run: + populate_state_lists() + if __name__ == "__main__": main()