diff --git a/scripts/build_trucking_campaigns.py b/scripts/build_trucking_campaigns.py index 7c0cc45..782e7a9 100644 --- a/scripts/build_trucking_campaigns.py +++ b/scripts/build_trucking_campaigns.py @@ -43,6 +43,87 @@ _LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode( CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk" CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over" +# Public site for landing-page links injected into Listmonk subscriber attribs. +SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.com") + +# ── Deficiency-flag segments (Phase 5) ────────────────────────────────────── +# Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column +# written by fmcsa_deficiency_flagger.py) and links to its order landing page. +# +# `source_id` is the Listmonk template campaign to clone; set via env once the +# template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured +# source_id are reported by --list-segments but skipped by the scheduled run so +# the nightly job never breaks on an unconfigured template. +# +# `flag_sql` is a predicate over the deficiency_flags array. +DEFICIENCY_SEGMENTS = { + "for_hire_boc3": { + "label": "For-Hire BOC-3 + UCR", + "flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)", + "lp_slug": "boc3-filing", + "source_env": "CAMPAIGN_FOR_HIRE_ID", + "limit": 1500, + }, + "irp_ifta": { + "label": "IRP / IFTA Registration", + "flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)", + "lp_slug": "state-trucking-bundle", + "source_env": "CAMPAIGN_IRP_IFTA_ID", + "limit": 1500, + }, + "intrastate_authority": { + "label": "Intrastate Operating Authority", + # any intrastate_authority_ flag + "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " + "WHERE f LIKE 'intrastate_authority_%%')", + "lp_slug": "intrastate-authority", + "source_env": "CAMPAIGN_INTRASTATE_ID", + "limit": 1000, + }, + "state_weight_tax": { + "label": "State Weight-Distance Tax", + "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " + "WHERE f LIKE 'state_weight_tax_%%')", + # per-state LP resolved per-row in build_lp_link() + "lp_slug": "state-trucking-bundle", + "source_env": "CAMPAIGN_WEIGHT_TAX_ID", + "limit": 1000, + }, + "state_emissions": { + "label": "State Clean-Truck / Emissions", + "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " + "WHERE f LIKE 'state_emissions_%%')", + "lp_slug": "state-emissions", + "source_env": "CAMPAIGN_EMISSIONS_ID", + "limit": 1000, + }, + "hazmat": { + "label": "Hazmat PHMSA Registration", + "flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)", + "lp_slug": "hazmat-phmsa", + "source_env": "CAMPAIGN_HAZMAT_ID", + "limit": 500, + }, +} + +# Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb). +_WEIGHT_TAX_LP = { + "OR": "or-weight-mile-tax", "NY": "ny-hut-registration", + "KY": "ky-kyu-registration", "NM": "nm-weight-distance", + "CT": "ct-highway-use-fee", +} + + +def build_lp_link(campaign_type: str, phy_state: str | None) -> str: + """Return the order landing-page URL for a (segment, state).""" + seg = DEFICIENCY_SEGMENTS.get(campaign_type) + slug = seg["lp_slug"] if seg else "dot-full-compliance" + if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP: + slug = _WEIGHT_TAX_LP[phy_state] + if campaign_type == "state_emissions" and phy_state == "CA": + slug = "ca-mcp-carb" + return f"{SITE_DOMAIN}/order/{slug}" + # ── TZ config: tz_key -> (states, send_hour_utc) ───────────────────────────── # 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local TIMEZONE_CONFIG = { @@ -227,8 +308,13 @@ def fetch_carriers( states_placeholder = ",".join(["%s"] * len(tz_states)) if campaign_type == "mcs150": type_filter = "mcs150_parsed < NOW() - INTERVAL '2 years'" - else: + elif campaign_type == "inactive": type_filter = "oos_active IS TRUE" + elif campaign_type in DEFICIENCY_SEGMENTS: + # Flag-based segment: dedupe against already-sent and require the flag. + type_filter = DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"] + else: + raise ValueError(f"unknown campaign_type {campaign_type!r}") cur.execute(f""" SELECT dot_number, email_address, legal_name, phy_state @@ -255,6 +341,35 @@ def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None: conn.commit() +def list_segments(send_date: date) -> None: + """Report deduped audience size per deficiency segment (no writes). + + Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience + and points at a valid order LP. Source-template configuration is shown so we + know which segments are ready to schedule. + """ + conn = psycopg2.connect(DB_URL) + print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n") + print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page") + print("-" * 78) + grand_total = 0 + for ctype, seg in DEFICIENCY_SEGMENTS.items(): + total = 0 + for tz_cfg in TIMEZONE_CONFIG.values(): + rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"]) + total += len(rows) + grand_total += total + src = os.getenv(seg["source_env"]) + src_str = src if src else "UNSET" + lp = build_lp_link(ctype, None) + print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}") + print("-" * 78) + print(f"{'TOTAL':<22}{grand_total:>10}") + print("\n(UNSET source = template not yet created; segment is skipped by the " + "scheduled run until its CAMPAIGN_*_ID env is set.)\n") + conn.close() + + def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None: conn = psycopg2.connect(DB_URL) @@ -266,6 +381,21 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None: ("inactive", base_inactive, 1000, "Inactive USDOT"), ] + # Add deficiency-flag segments whose Listmonk source template is configured. + for ctype, seg in DEFICIENCY_SEGMENTS.items(): + src = os.getenv(seg["source_env"]) + if not src: + LOG.info("[segment] %s skipped — %s not set (template not created)", + ctype, seg["source_env"]) + continue + try: + base = get_base_campaign(int(src)) + except Exception as exc: # noqa: BLE001 + LOG.warning("[segment] %s skipped — source %s unusable: %s", + ctype, src, exc) + continue + campaign_specs.append((ctype, base, seg["limit"], seg["label"])) + if preview: LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, " "test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL) @@ -305,14 +435,16 @@ def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None: subscribers = [{ "email": TEST_EMAIL, "name": r0[2] or "Sample Carrier", - "attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or ""}, + "attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "", + "lp_link": build_lp_link(campaign_type, r0[3])}, }] else: subscribers = [ { "email": row[1], "name": row[2] or row[1], - "attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or ""}, + "attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "", + "lp_link": build_lp_link(campaign_type, row[3])}, } for row in rows ] @@ -349,6 +481,7 @@ def main(): parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk") parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes") parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent") + parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)") parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)") args = parser.parse_args() @@ -357,6 +490,10 @@ def main(): else: send_date = date.today() + timedelta(days=1) + if args.list_segments: + list_segments(send_date) + return + LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s)", send_date, args.dry_run, args.preview) run(send_date, dry_run=args.dry_run, preview=args.preview) diff --git a/scripts/tests/check_campaign_segments.py b/scripts/tests/check_campaign_segments.py new file mode 100644 index 0000000..c702871 --- /dev/null +++ b/scripts/tests/check_campaign_segments.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +Phase-5 campaign segment validation (no production DB required). + +Builds a synthetic in-memory fmcsa_carriers table and asserts that every +deficiency-flag segment in build_trucking_campaigns.DEFICIENCY_SEGMENTS: + - selects exactly the carriers carrying its flag, + - excludes blocked-domain + already-sent + unflagged carriers, + - routes to a valid order landing page (incl. per-state overrides). + +Requires a reachable Postgres (DATABASE_URL) only to create a TEMP table; it +never reads real data. Run: + DATABASE_URL=postgresql://pw:pw_dev_2026@localhost:5434/performancewest \ + python3 scripts/tests/check_campaign_segments.py +""" +from __future__ import annotations +import importlib.util +import os +import sys + +ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +BTC = os.path.join(ROOT, "scripts/build_trucking_campaigns.py") + + +def main() -> int: + if not os.environ.get("DATABASE_URL"): + print("SKIP: DATABASE_URL not set (needs any reachable Postgres for TEMP table)") + return 0 + import psycopg2 + + spec = importlib.util.spec_from_file_location("btc", BTC) + btc = importlib.util.module_from_spec(spec) + spec.loader.exec_module(btc) + + conn = psycopg2.connect(os.environ["DATABASE_URL"]) + cur = conn.cursor() + cur.execute(""" + CREATE TEMP TABLE fmcsa_carriers ( + dot_number text, email_address text, legal_name text, phy_state text, + mcs150_parsed timestamptz, oos_active boolean, + email_verified boolean, email_verify_result text, + listmonk_sent_at timestamptz, deficiency_flags text[]); + """) + rows = [ + ("1", "a@good.com", "For Hire Co", "NY", None, False, True, None, None, ["for_hire_carrier"]), + ("2", "b@good.com", "IRP Co", "TX", None, False, True, None, None, ["interstate_needs_irp_ifta"]), + ("3", "c@good.com", "Intrastate Co", "CA", None, False, True, None, None, ["intrastate_authority_CA"]), + ("4", "d@good.com", "Weight Tax Co", "OR", None, False, True, None, None, ["state_weight_tax_OR"]), + ("5", "e@good.com", "Emissions Co", "CO", None, False, True, None, None, ["state_emissions_CO"]), + ("6", "f@good.com", "Hazmat Co", "OH", None, False, True, None, None, ["hazmat_carrier"]), + ("7", "g@good.com", "Clean Co", "NY", None, False, True, None, None, []), # no flags + ("8", "h@aol.com", "Blocked Co", "NY", None, False, True, None, None, ["for_hire_carrier"]), # blocked domain + ("9", "i@good.com", "Sent Co", "NY", None, False, True, None, "2026-01-01", ["for_hire_carrier"]), # already sent + ] + for r in rows: + cur.execute("INSERT INTO fmcsa_carriers VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", r) + conn.commit() + + all_states = tuple(s for cfg in btc.TIMEZONE_CONFIG.values() for s in cfg["states"]) + expected = { + "for_hire_boc3": 1, "irp_ifta": 1, "intrastate_authority": 1, + "state_weight_tax": 1, "state_emissions": 1, "hazmat": 1, + } + ok = True + for ctype in btc.DEFICIENCY_SEGMENTS: + got = len(btc.fetch_carriers(conn, all_states, ctype, 1000)) + status = "OK" if got == expected[ctype] else "FAIL" + if got != expected[ctype]: + ok = False + print(f" {status}: {ctype:<22} matched {got} (expected {expected[ctype]})") + + lp_ok = ( + btc.build_lp_link("state_weight_tax", "OR").endswith("/order/or-weight-mile-tax") + and btc.build_lp_link("state_emissions", "CA").endswith("/order/ca-mcp-carb") + and btc.build_lp_link("hazmat", None).endswith("/order/hazmat-phmsa") + and btc.build_lp_link("irp_ifta", None).endswith("/order/state-trucking-bundle") + ) + print(" LP routing:", "OK" if lp_ok else "FAIL") + conn.close() + + passed = ok and lp_ok + print("CAMPAIGN SEGMENTS:", "PASS" if passed else "FAIL") + return 0 if passed else 1 + + +if __name__ == "__main__": + sys.exit(main())