Extend FMCSA flagger with state-level deficiency flags
- Interstate carriers flagged for IRP/IFTA needs - Weight-distance tax flags (OR/NY/KY/NM/CT) - State carrier permit flags (CA MCP, etc.) - Emissions flags (CA CARB, ACT states) - Intrastate authority flags (29 states, for-hire only) - New --state-lists flag creates state-targeted Listmonk lists (CA, OR, NY, KY, NM, CT, TX, FL) - Stats now include state-level counts Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e473953fef
commit
584f887f82
1 changed files with 149 additions and 0 deletions
|
|
@ -69,6 +69,25 @@ def flag_carriers(dry_run: bool = False) -> dict:
|
|||
conn.rollback()
|
||||
conn.commit()
|
||||
|
||||
# Load state trucking requirements for state-level flags
|
||||
state_reqs = {}
|
||||
try:
|
||||
cur.execute("SELECT state_code, weight_distance_tax, state_carrier_permit, "
|
||||
"emissions_program, intrastate_authority_required, "
|
||||
"weight_distance_tax_name, state_carrier_permit_name, "
|
||||
"emissions_program_name FROM state_trucking_requirements")
|
||||
for sr in cur.fetchall():
|
||||
state_reqs[sr[0]] = {
|
||||
"weight_tax": sr[1], "carrier_permit": sr[2],
|
||||
"emissions": sr[3], "intrastate": sr[4],
|
||||
"weight_tax_name": sr[5], "permit_name": sr[6],
|
||||
"emissions_name": sr[7],
|
||||
}
|
||||
LOG.info("Loaded state requirements for %d states", len(state_reqs))
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
LOG.warning("state_trucking_requirements table not found — skipping state flags")
|
||||
|
||||
# Query all carriers with email
|
||||
LOG.info("Querying carriers...")
|
||||
cur.execute("""
|
||||
|
|
@ -143,6 +162,45 @@ def flag_carriers(dry_run: bool = False) -> dict:
|
|||
stats.setdefault("zero_fleet", 0)
|
||||
stats["zero_fleet"] += 1
|
||||
|
||||
# State-level flags based on carrier's physical state
|
||||
is_interstate = carrier_op and "interstate" in (carrier_op or "").lower()
|
||||
sr = state_reqs.get(state) if state else None
|
||||
if sr:
|
||||
# Interstate carriers need IRP and IFTA
|
||||
if is_interstate:
|
||||
flags.append("interstate_needs_irp_ifta")
|
||||
issues.append("Interstate carrier — IRP (apportioned registration) and IFTA (fuel tax) required")
|
||||
stats.setdefault("interstate_irp_ifta", 0)
|
||||
stats["interstate_irp_ifta"] += 1
|
||||
|
||||
# Weight-distance tax states (OR, NY, KY, NM, CT)
|
||||
if sr["weight_tax"]:
|
||||
flags.append(f"state_weight_tax_{state}")
|
||||
issues.append(f"{sr['weight_tax_name'] or 'Weight-distance tax'} required in {state}")
|
||||
stats.setdefault("state_weight_tax", 0)
|
||||
stats["state_weight_tax"] += 1
|
||||
|
||||
# State carrier permit (CA MCP, etc.)
|
||||
if sr["carrier_permit"]:
|
||||
flags.append(f"state_permit_{state}")
|
||||
issues.append(f"{sr['permit_name'] or 'State carrier permit'} required in {state}")
|
||||
stats.setdefault("state_carrier_permit", 0)
|
||||
stats["state_carrier_permit"] += 1
|
||||
|
||||
# Emissions (CA CARB, ACT states)
|
||||
if sr["emissions"]:
|
||||
flags.append(f"state_emissions_{state}")
|
||||
issues.append(f"{sr['emissions_name'] or 'Emissions compliance'} required in {state}")
|
||||
stats.setdefault("state_emissions", 0)
|
||||
stats["state_emissions"] += 1
|
||||
|
||||
# Intrastate authority (29 states)
|
||||
if sr["intrastate"] and for_hire:
|
||||
flags.append(f"intrastate_authority_{state}")
|
||||
issues.append(f"For-hire intrastate carriers in {state} need state operating authority")
|
||||
stats.setdefault("intrastate_authority", 0)
|
||||
stats["intrastate_authority"] += 1
|
||||
|
||||
# New entrant (registered in last 18 months) — needs safety audit prep
|
||||
# add_date is stored as text like "01-JAN-25"
|
||||
# We can't easily parse all formats here, so use mcs150_parsed as a proxy
|
||||
|
|
@ -316,10 +374,98 @@ def populate_listmonk(list_name: str = "FMCSA MCS-150 Overdue Carriers") -> int:
|
|||
return imported
|
||||
|
||||
|
||||
def populate_state_lists() -> None:
|
||||
"""Create state-targeted Listmonk lists for campaign segmentation."""
|
||||
STATE_LISTS = {
|
||||
"CA": "FMCSA State - California (MCP/CARB)",
|
||||
"OR": "FMCSA State - Oregon (Weight-Mile Tax)",
|
||||
"NY": "FMCSA State - New York (HUT)",
|
||||
"KY": "FMCSA State - Kentucky (KYU)",
|
||||
"NM": "FMCSA State - New Mexico (Weight-Distance)",
|
||||
"CT": "FMCSA State - Connecticut (Highway Use Fee)",
|
||||
"TX": "FMCSA State - Texas",
|
||||
"FL": "FMCSA State - Florida",
|
||||
}
|
||||
|
||||
conn = psycopg2.connect(DATABASE_URL)
|
||||
cur = conn.cursor()
|
||||
|
||||
auth = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
|
||||
headers = {"Content-Type": "application/json", "Authorization": f"Basic {auth}"}
|
||||
|
||||
for state_code, list_name in STATE_LISTS.items():
|
||||
cur.execute("""
|
||||
SELECT dot_number, legal_name, email_address, telephone,
|
||||
phy_city, phy_state, issues_summary, deficiency_count
|
||||
FROM fmcsa_carriers
|
||||
WHERE campaign_eligible = TRUE
|
||||
AND phy_state = %s
|
||||
AND email_address IS NOT NULL AND email_address != ''
|
||||
""", (state_code,))
|
||||
rows = cur.fetchall()
|
||||
if not rows:
|
||||
LOG.info("No eligible carriers in %s — skipping", state_code)
|
||||
continue
|
||||
|
||||
LOG.info("Found %d eligible carriers in %s", len(rows), state_code)
|
||||
|
||||
# Get or create list
|
||||
try:
|
||||
req = urllib.request.Request(f"{LISTMONK_API}/api/lists", headers=headers)
|
||||
resp = json.loads(urllib.request.urlopen(req).read())
|
||||
existing = [l for l in resp.get("data", {}).get("results", []) if l["name"] == list_name]
|
||||
if existing:
|
||||
list_id = existing[0]["id"]
|
||||
else:
|
||||
data = json.dumps({"name": list_name, "type": "public", "optin": "single"}).encode()
|
||||
req = urllib.request.Request(f"{LISTMONK_API}/api/lists", data=data, method="POST", headers=headers)
|
||||
resp = json.loads(urllib.request.urlopen(req).read())
|
||||
list_id = resp["data"]["id"]
|
||||
LOG.info("Created list: %s (ID %d)", list_name, list_id)
|
||||
except Exception as e:
|
||||
LOG.error("Failed to create list %s: %s", list_name, e)
|
||||
continue
|
||||
|
||||
# Import in batches
|
||||
imported = 0
|
||||
for i in range(0, len(rows), 500):
|
||||
batch = rows[i:i+500]
|
||||
subscribers = []
|
||||
for dot, name, email, phone, city, st, issues, def_count in batch:
|
||||
issues_list = (issues or "").split("; ")
|
||||
issues_html = "".join(f"<li>{iss}</li>" for iss in issues_list if iss)
|
||||
subscribers.append({
|
||||
"email": email.lower().strip(),
|
||||
"name": name or "",
|
||||
"status": "enabled",
|
||||
"lists": [list_id],
|
||||
"attribs": {
|
||||
"company": name or "", "dot_number": dot,
|
||||
"phone": phone or "", "city": city or "",
|
||||
"state": st or "", "deficiency_count": def_count or 0,
|
||||
"issues_html": f"<ul style='margin:0;padding:0 0 0 16px'>{issues_html}</ul>",
|
||||
},
|
||||
})
|
||||
try:
|
||||
data = json.dumps({"mode": "subscribe", "subscribers": subscribers,
|
||||
"lists": [list_id], "overwrite": True}).encode()
|
||||
req = urllib.request.Request(f"{LISTMONK_API}/api/import/subscribers",
|
||||
data=data, method="POST", headers=headers)
|
||||
urllib.request.urlopen(req)
|
||||
imported += len(batch)
|
||||
except Exception as e:
|
||||
LOG.error(" Batch import failed for %s: %s", state_code, e)
|
||||
|
||||
LOG.info(" %s: imported %d to list '%s'", state_code, imported, list_name)
|
||||
|
||||
conn.close()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Flag FMCSA carrier deficiencies")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Count only, don't update")
|
||||
parser.add_argument("--listmonk", action="store_true", help="Also populate Listmonk list")
|
||||
parser.add_argument("--state-lists", action="store_true", help="Also populate state-targeted lists")
|
||||
args = parser.parse_args()
|
||||
|
||||
stats = flag_carriers(dry_run=args.dry_run)
|
||||
|
|
@ -331,6 +477,9 @@ def main():
|
|||
if args.listmonk and not args.dry_run:
|
||||
populate_listmonk()
|
||||
|
||||
if args.state_lists and not args.dry_run:
|
||||
populate_state_lists()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue