#!/usr/bin/env python3 """Render a trucking campaign body with coupon merge-tags filled in and score it through SpamAssassin. Run on the prod host (has spamassassin + listmonk DB). Usage: python3 /tmp/sa_coupon_test.py """ import html import re import subprocess import sys from email.message import EmailMessage from email.utils import formatdate, make_msgid CID = sys.argv[1] if len(sys.argv) > 1 else "186" PCT = sys.argv[2] if len(sys.argv) > 2 else "40" CODE = sys.argv[3] if len(sys.argv) > 3 else "KQ7MTN".replace("6", "K")[:5] or "KQMTN" def psql(q): out = subprocess.check_output( ["docker", "exec", "performancewest-api-postgres-1", "psql", "-U", "pw", "-d", "listmonk", "-tA", "-c", q], text=True, ) return out.rstrip("\n") subject = psql(f"SELECT subject FROM campaigns WHERE id={CID};") from_email = psql(f"SELECT from_email FROM campaigns WHERE id={CID};") body = psql(f"SELECT body FROM campaigns WHERE id={CID};") # --- Sample subscriber attribs (mirrors build_trucking_campaigns.coupon_attribs) --- attribs = { "dot_number": "1228791", "company": "BLUE RIDGE FREIGHT LLC", "state": "CT", "lp_link": f"https://performancewest.net/order/mcs150-update?code={CODE}", "coupon_code": CODE, "coupon_pct": PCT, "coupon_expires": "11:59 PM ET tonight", } # --- Minimal listmonk template renderer: {{ if .Subscriber.Attribs.X }}..{{ else }}..{{ end }} + {{ .Subscriber.Attribs.X }} --- def render(tpl: str, at: dict) -> str: # Resolve {{ if .Subscriber.Attribs.key }}TRUE{{ else }}FALSE{{ end }} (no nesting). pat = re.compile( r"\{\{\s*if\s+\.Subscriber\.Attribs\.(\w+)\s*\}\}(.*?)" r"(?:\{\{\s*else\s*\}\}(.*?))?\{\{\s*end\s*\}\}", re.DOTALL, ) def repl(m): key, t_branch, f_branch = m.group(1), m.group(2), m.group(3) or "" return t_branch if str(at.get(key, "")).strip() else f_branch prev = None while prev != tpl: prev = tpl tpl = pat.sub(repl, tpl) # Resolve simple value tags. def val(m): return html.escape(str(at.get(m.group(1), ""))) tpl = re.sub(r"\{\{\s*\.Subscriber\.Attribs\.(\w+)\s*\}\}", val, tpl) # Listmonk unsubscribe/tracking placeholders -> realistic stand-ins. tpl = tpl.replace("{{ UnsubscribeURL }}", "https://performancewest.net/unsubscribe/abc123") tpl = re.sub(r"\{\{[^}]*\}\}", "", tpl) # strip any remaining tags return tpl rendered_subject = render(subject, attribs) rendered_body = render(body, attribs) # --- Assemble a realistic MIME email (what the recipient's filter sees) --- m_from = from_email # from_email is like 'Performance West ' msg = EmailMessage() msg["From"] = m_from msg["To"] = "dispatch@blueridgefreight.com" msg["Subject"] = rendered_subject msg["Date"] = formatdate(localtime=True) msg["Message-ID"] = make_msgid(domain="performancewest.net") msg["List-Unsubscribe"] = ", " msg["List-Unsubscribe-Post"] = "List-Unsubscribe=One-Click" msg["Precedence"] = "bulk" # plain-text altbody (strip tags crudely) + html text_alt = re.sub(r"<[^>]+>", "", rendered_body) text_alt = html.unescape(re.sub(r"\n{3,}", "\n\n", text_alt)).strip() msg.set_content(text_alt) msg.add_alternative(rendered_body, subtype="html") raw = msg.as_bytes() # --- Score through SpamAssassin (-t test mode; network tests skipped for determinism) --- proc = subprocess.run( ["spamassassin", "-t", "-L"], # -L = local tests only (no DNS/network), -t = test mode input=raw, capture_output=True, ) scored = proc.stdout.decode("utf-8", "replace") # Pull the score + the rule hits from the rewritten headers. score_line = "" rules = [] for line in scored.splitlines(): if line.startswith("X-Spam-Status:"): score_line = line if line.startswith("X-Spam-Report:") or line.startswith("\tpts") or re.match(r"^\s+[-0-9.]+\s+[A-Z0-9_]+\s", line): rules.append(line.rstrip()) print("=" * 70) print(f"Campaign {CID} arm={PCT}% code={CODE}") print(f"Subject: {rendered_subject}") print("=" * 70) print(score_line or "(no X-Spam-Status header found)") print("-" * 70) m = re.search(r"score=([-0-9.]+)", score_line) if m: print(f"SCORE: {m.group(1)} (SpamAssassin default spam threshold = 5.0)") # Show the detailed report block. rep = re.search(r"X-Spam-Report:(.*?)(?:\nX-Spam-|\n\n)", scored, re.DOTALL) if rep: print(rep.group(0)) else: # Fallback: print any rule lines we collected. for r in rules: print(r)