- CAMPAIGN_COUPON_AB_PCTS="20,30,40" mints one daily code per arm; each carrier is bucketed by a stable sha256(email) hash so the split is even (~33/33/33 verified over 30k) and stable across re-sends (no arm-hopping). - Each arm's code stores its own percent in discount_codes, so the advertised discount always matches what checkout applies; redemptions are countable per code (marker campaign-daily:<date>:<pct>). - Empty/unset keeps legacy single-arm behavior (COUPON_PCT, legacy marker). - coupon_attribs() now takes per-recipient pct. - Tests: scripts/tests/test_coupon_ab.py (5 pass). SpamAssassin: both main campaigns (186/188) score 0.0 HAM across all 3 arms, coupon block renders clean; harness saved for re-runs.
123 lines
4.5 KiB
Python
123 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Render a trucking campaign body with coupon merge-tags filled in and score it
|
|
through SpamAssassin. Run on the prod host (has spamassassin + listmonk DB).
|
|
|
|
Usage: python3 /tmp/sa_coupon_test.py <campaign_id> <pct> <code>
|
|
"""
|
|
import html
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from email.message import EmailMessage
|
|
from email.utils import formatdate, make_msgid
|
|
|
|
CID = sys.argv[1] if len(sys.argv) > 1 else "186"
|
|
PCT = sys.argv[2] if len(sys.argv) > 2 else "40"
|
|
CODE = sys.argv[3] if len(sys.argv) > 3 else "KQ7MTN".replace("6", "K")[:5] or "KQMTN"
|
|
|
|
|
|
def psql(q):
|
|
out = subprocess.check_output(
|
|
["docker", "exec", "performancewest-api-postgres-1",
|
|
"psql", "-U", "pw", "-d", "listmonk", "-tA", "-c", q],
|
|
text=True,
|
|
)
|
|
return out.rstrip("\n")
|
|
|
|
|
|
subject = psql(f"SELECT subject FROM campaigns WHERE id={CID};")
|
|
from_email = psql(f"SELECT from_email FROM campaigns WHERE id={CID};")
|
|
body = psql(f"SELECT body FROM campaigns WHERE id={CID};")
|
|
|
|
# --- Sample subscriber attribs (mirrors build_trucking_campaigns.coupon_attribs) ---
|
|
attribs = {
|
|
"dot_number": "1228791",
|
|
"company": "BLUE RIDGE FREIGHT LLC",
|
|
"state": "CT",
|
|
"lp_link": f"https://performancewest.net/order/mcs150-update?code={CODE}",
|
|
"coupon_code": CODE,
|
|
"coupon_pct": PCT,
|
|
"coupon_expires": "11:59 PM ET tonight",
|
|
}
|
|
|
|
# --- Minimal listmonk template renderer: {{ if .Subscriber.Attribs.X }}..{{ else }}..{{ end }} + {{ .Subscriber.Attribs.X }} ---
|
|
def render(tpl: str, at: dict) -> str:
|
|
# Resolve {{ if .Subscriber.Attribs.key }}TRUE{{ else }}FALSE{{ end }} (no nesting).
|
|
pat = re.compile(
|
|
r"\{\{\s*if\s+\.Subscriber\.Attribs\.(\w+)\s*\}\}(.*?)"
|
|
r"(?:\{\{\s*else\s*\}\}(.*?))?\{\{\s*end\s*\}\}",
|
|
re.DOTALL,
|
|
)
|
|
def repl(m):
|
|
key, t_branch, f_branch = m.group(1), m.group(2), m.group(3) or ""
|
|
return t_branch if str(at.get(key, "")).strip() else f_branch
|
|
prev = None
|
|
while prev != tpl:
|
|
prev = tpl
|
|
tpl = pat.sub(repl, tpl)
|
|
# Resolve simple value tags.
|
|
def val(m):
|
|
return html.escape(str(at.get(m.group(1), "")))
|
|
tpl = re.sub(r"\{\{\s*\.Subscriber\.Attribs\.(\w+)\s*\}\}", val, tpl)
|
|
# Listmonk unsubscribe/tracking placeholders -> realistic stand-ins.
|
|
tpl = tpl.replace("{{ UnsubscribeURL }}", "https://performancewest.net/unsubscribe/abc123")
|
|
tpl = re.sub(r"\{\{[^}]*\}\}", "", tpl) # strip any remaining tags
|
|
return tpl
|
|
|
|
rendered_subject = render(subject, attribs)
|
|
rendered_body = render(body, attribs)
|
|
|
|
# --- Assemble a realistic MIME email (what the recipient's filter sees) ---
|
|
m_from = from_email
|
|
# from_email is like 'Performance West <noreply@performancewest.net>'
|
|
msg = EmailMessage()
|
|
msg["From"] = m_from
|
|
msg["To"] = "dispatch@blueridgefreight.com"
|
|
msg["Subject"] = rendered_subject
|
|
msg["Date"] = formatdate(localtime=True)
|
|
msg["Message-ID"] = make_msgid(domain="performancewest.net")
|
|
msg["List-Unsubscribe"] = "<https://performancewest.net/unsubscribe/abc123>, <mailto:unsub@performancewest.net>"
|
|
msg["List-Unsubscribe-Post"] = "List-Unsubscribe=One-Click"
|
|
msg["Precedence"] = "bulk"
|
|
|
|
# plain-text altbody (strip tags crudely) + html
|
|
text_alt = re.sub(r"<[^>]+>", "", rendered_body)
|
|
text_alt = html.unescape(re.sub(r"\n{3,}", "\n\n", text_alt)).strip()
|
|
msg.set_content(text_alt)
|
|
msg.add_alternative(rendered_body, subtype="html")
|
|
|
|
raw = msg.as_bytes()
|
|
|
|
# --- Score through SpamAssassin (-t test mode; network tests skipped for determinism) ---
|
|
proc = subprocess.run(
|
|
["spamassassin", "-t", "-L"], # -L = local tests only (no DNS/network), -t = test mode
|
|
input=raw, capture_output=True,
|
|
)
|
|
scored = proc.stdout.decode("utf-8", "replace")
|
|
|
|
# Pull the score + the rule hits from the rewritten headers.
|
|
score_line = ""
|
|
rules = []
|
|
for line in scored.splitlines():
|
|
if line.startswith("X-Spam-Status:"):
|
|
score_line = line
|
|
if line.startswith("X-Spam-Report:") or line.startswith("\tpts") or re.match(r"^\s+[-0-9.]+\s+[A-Z0-9_]+\s", line):
|
|
rules.append(line.rstrip())
|
|
|
|
print("=" * 70)
|
|
print(f"Campaign {CID} arm={PCT}% code={CODE}")
|
|
print(f"Subject: {rendered_subject}")
|
|
print("=" * 70)
|
|
print(score_line or "(no X-Spam-Status header found)")
|
|
print("-" * 70)
|
|
m = re.search(r"score=([-0-9.]+)", score_line)
|
|
if m:
|
|
print(f"SCORE: {m.group(1)} (SpamAssassin default spam threshold = 5.0)")
|
|
# Show the detailed report block.
|
|
rep = re.search(r"X-Spam-Report:(.*?)(?:\nX-Spam-|\n\n)", scored, re.DOTALL)
|
|
if rep:
|
|
print(rep.group(0))
|
|
else:
|
|
# Fallback: print any rule lines we collected.
|
|
for r in rules:
|
|
print(r)
|