feat(campaigns): deficiency-flag segments + LP routing (Phase 5)

Add 6 flag-based campaign segments to build_trucking_campaigns.py keyed off
fmcsa_carriers.deficiency_flags: for_hire_boc3, irp_ifta, intrastate_authority,
state_weight_tax (per-state LP), state_emissions (CA->ca-mcp-carb), hazmat.
Each injects an order-LP link into subscriber attribs (lp_link) and only
schedules when its CAMPAIGN_*_ID source template env is set (nightly run never
breaks on unconfigured templates). Adds --list-segments audience report and a
synthetic-data segment test (fixed a real psycopg2 % escaping bug in LIKE).
This commit is contained in:
justin 2026-06-02 03:38:02 -05:00
parent fc1a0588f7
commit 4b6c828b1c
2 changed files with 227 additions and 3 deletions

View file

@ -43,6 +43,87 @@ _LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode(
CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk"
CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over"
# Public site for landing-page links injected into Listmonk subscriber attribs.
SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.com")
# ── Deficiency-flag segments (Phase 5) ──────────────────────────────────────
# Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column
# written by fmcsa_deficiency_flagger.py) and links to its order landing page.
#
# `source_id` is the Listmonk template campaign to clone; set via env once the
# template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured
# source_id are reported by --list-segments but skipped by the scheduled run so
# the nightly job never breaks on an unconfigured template.
#
# `flag_sql` is a predicate over the deficiency_flags array.
DEFICIENCY_SEGMENTS = {
"for_hire_boc3": {
"label": "For-Hire BOC-3 + UCR",
"flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)",
"lp_slug": "boc3-filing",
"source_env": "CAMPAIGN_FOR_HIRE_ID",
"limit": 1500,
},
"irp_ifta": {
"label": "IRP / IFTA Registration",
"flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)",
"lp_slug": "state-trucking-bundle",
"source_env": "CAMPAIGN_IRP_IFTA_ID",
"limit": 1500,
},
"intrastate_authority": {
"label": "Intrastate Operating Authority",
# any intrastate_authority_<state> flag
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'intrastate_authority_%%')",
"lp_slug": "intrastate-authority",
"source_env": "CAMPAIGN_INTRASTATE_ID",
"limit": 1000,
},
"state_weight_tax": {
"label": "State Weight-Distance Tax",
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'state_weight_tax_%%')",
# per-state LP resolved per-row in build_lp_link()
"lp_slug": "state-trucking-bundle",
"source_env": "CAMPAIGN_WEIGHT_TAX_ID",
"limit": 1000,
},
"state_emissions": {
"label": "State Clean-Truck / Emissions",
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'state_emissions_%%')",
"lp_slug": "state-emissions",
"source_env": "CAMPAIGN_EMISSIONS_ID",
"limit": 1000,
},
"hazmat": {
"label": "Hazmat PHMSA Registration",
"flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)",
"lp_slug": "hazmat-phmsa",
"source_env": "CAMPAIGN_HAZMAT_ID",
"limit": 500,
},
}
# Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb).
_WEIGHT_TAX_LP = {
"OR": "or-weight-mile-tax", "NY": "ny-hut-registration",
"KY": "ky-kyu-registration", "NM": "nm-weight-distance",
"CT": "ct-highway-use-fee",
}
def build_lp_link(campaign_type: str, phy_state: str | None) -> str:
"""Return the order landing-page URL for a (segment, state)."""
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
slug = seg["lp_slug"] if seg else "dot-full-compliance"
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
slug = _WEIGHT_TAX_LP[phy_state]
if campaign_type == "state_emissions" and phy_state == "CA":
slug = "ca-mcp-carb"
return f"{SITE_DOMAIN}/order/{slug}"
# ── TZ config: tz_key -> (states, send_hour_utc) ─────────────────────────────
# 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local
TIMEZONE_CONFIG = {
@ -227,8 +308,13 @@ def fetch_carriers(
states_placeholder = ",".join(["%s"] * len(tz_states))
if campaign_type == "mcs150":
type_filter = "mcs150_parsed < NOW() - INTERVAL '2 years'"
else:
elif campaign_type == "inactive":
type_filter = "oos_active IS TRUE"
elif campaign_type in DEFICIENCY_SEGMENTS:
# Flag-based segment: dedupe against already-sent and require the flag.
type_filter = DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"]
else:
raise ValueError(f"unknown campaign_type {campaign_type!r}")
cur.execute(f"""
SELECT dot_number, email_address, legal_name, phy_state
@ -255,6 +341,35 @@ def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None:
conn.commit()
def list_segments(send_date: date) -> None:
"""Report deduped audience size per deficiency segment (no writes).
Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience
and points at a valid order LP. Source-template configuration is shown so we
know which segments are ready to schedule.
"""
conn = psycopg2.connect(DB_URL)
print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n")
print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page")
print("-" * 78)
grand_total = 0
for ctype, seg in DEFICIENCY_SEGMENTS.items():
total = 0
for tz_cfg in TIMEZONE_CONFIG.values():
rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"])
total += len(rows)
grand_total += total
src = os.getenv(seg["source_env"])
src_str = src if src else "UNSET"
lp = build_lp_link(ctype, None)
print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}")
print("-" * 78)
print(f"{'TOTAL':<22}{grand_total:>10}")
print("\n(UNSET source = template not yet created; segment is skipped by the "
"scheduled run until its CAMPAIGN_*_ID env is set.)\n")
conn.close()
def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None:
conn = psycopg2.connect(DB_URL)
@ -266,6 +381,21 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None:
("inactive", base_inactive, 1000, "Inactive USDOT"),
]
# Add deficiency-flag segments whose Listmonk source template is configured.
for ctype, seg in DEFICIENCY_SEGMENTS.items():
src = os.getenv(seg["source_env"])
if not src:
LOG.info("[segment] %s skipped — %s not set (template not created)",
ctype, seg["source_env"])
continue
try:
base = get_base_campaign(int(src))
except Exception as exc: # noqa: BLE001
LOG.warning("[segment] %s skipped — source %s unusable: %s",
ctype, src, exc)
continue
campaign_specs.append((ctype, base, seg["limit"], seg["label"]))
if preview:
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
@ -305,14 +435,16 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None:
subscribers = [{
"email": TEST_EMAIL,
"name": r0[2] or "Sample Carrier",
"attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or ""},
"attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "",
"lp_link": build_lp_link(campaign_type, r0[3])},
}]
else:
subscribers = [
{
"email": row[1],
"name": row[2] or row[1],
"attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or ""},
"attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "",
"lp_link": build_lp_link(campaign_type, row[3])},
}
for row in rows
]
@ -349,6 +481,7 @@ def main():
parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk")
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent")
parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)")
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)")
args = parser.parse_args()
@ -357,6 +490,10 @@ def main():
else:
send_date = date.today() + timedelta(days=1)
if args.list_segments:
list_segments(send_date)
return
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s)", send_date, args.dry_run, args.preview)
run(send_date, dry_run=args.dry_run, preview=args.preview)