trucking: stamp e-signature exactly on form signature lines + state authorization gate

Capture-to-form signature placement so the customer's drawn or typed
signature lands right on the signature rule of the actual form, not in a
sidecar page.

- migration 085: esign_records.signature_anchors (JSONB exact PDF coords,
  lower-left origin, points) + signed_document_minio_key
- signature_stamper.py: signature_box() anchors; anchors_from_acroform()
  pulls the signature field /Rect from a real AcroForm (e.g. MCS-150
  certifySignature); stamp_signature() overlays PNG (auto-trimmed so ink
  rests on the rule) or typed name, scaled to actual page size
- state_trucking_authorization.py: renders the Limited Authorization to
  File PDF and returns (pdf_bytes, anchors)
- esign_stamp.py: stamp_esign_document() downloads unsigned PDF, stamps,
  uploads _signed.pdf, sets signed_document_minio_key (idempotent)
- dot_esign.py: extract certifySignature anchor for MCS-150/closeout forms
  so the federal perjury cert is signed on the line
- state_trucking.py: authorization gate — first run emails signing link
  and PAUSES; resumes with client_approved after signing
- job_server handle_esign_completed: stamp then re-dispatch
- tests: test_signature_placement.py (custom form), and
  test_mcs150_signature_placement.py (official AcroForm) both assert the
  signature lands inside the recorded signature box (verified visually)
This commit is contained in:
justin 2026-06-02 16:44:19 -05:00
parent 345979ed00
commit 7ed06780bb
9 changed files with 1322 additions and 5 deletions

View file

@ -0,0 +1,32 @@
-- 085: Precise signature placement for e-signed documents.
--
-- Requirement: the client's drawn/typed signature must land exactly on the
-- signature line of the generated form, not in a generic "signed" sidecar.
--
-- When a document generator draws a signature line, it records the exact
-- PDF coordinates of each signature box in `signature_anchors`. At sign time
-- the stamper composites the signature PNG onto those coordinates and writes
-- the flattened, signed PDF to `signed_document_minio_key`.
--
-- signature_anchors shape (array, one entry per place a signature goes):
-- [
-- {
-- "field": "signer", -- which signature this box expects
-- "page": 0, -- 0-based page index
-- "x": 72.0, -- lower-left x of the box, in PDF points
-- "y": 120.0, -- lower-left y of the box, in PDF points
-- "w": 216.0, -- box width in points
-- "h": 40.0, -- box height in points
-- "page_w": 612.0, -- page width the coordinates were authored against
-- "page_h": 792.0 -- page height the coordinates were authored against
-- }
-- ]
ALTER TABLE esign_records
ADD COLUMN IF NOT EXISTS signature_anchors JSONB,
ADD COLUMN IF NOT EXISTS signed_document_minio_key TEXT;
COMMENT ON COLUMN esign_records.signature_anchors IS
'Exact PDF coordinates of each signature box, recorded by the document generator so the stamper can place the signature on the signature line.';
COMMENT ON COLUMN esign_records.signed_document_minio_key IS
'MinIO key of the flattened PDF with the signature stamped onto the signature line.';

View file

@ -0,0 +1,117 @@
"""Verify the captured signature lands on the official MCS-150 signature line.
Fills the real MCS-150 AcroForm, extracts the ``certifySignature`` anchor from
the form's own field rectangle, stamps a synthetic drawn signature, renders the
signature page, and asserts a meaningful number of dark pixels fall inside the
recorded signature box (i.e. the signature is on the rule, not floating).
Requires: reportlab, pypdf, pillow, and (for rendering) pdftoppm.
Run from the repo root with a venv that has those installed.
"""
from __future__ import annotations
import base64
import importlib.util
import io
import os
import subprocess
import sys
import tempfile
REPO = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def _load(name: str, rel: str):
spec = importlib.util.spec_from_file_location(name, os.path.join(REPO, rel))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod) # type: ignore[union-attr]
return mod
def _synthetic_signature_png() -> str:
from PIL import Image, ImageDraw
im = Image.new("RGBA", (600, 140), (0, 0, 0, 0))
d = ImageDraw.Draw(im)
d.line(
[(20, 110), (120, 30), (220, 110), (340, 40), (480, 100), (580, 50)],
fill=(10, 20, 60, 255),
width=6,
)
buf = io.BytesIO()
im.save(buf, "PNG")
return base64.b64encode(buf.getvalue()).decode()
def main() -> int:
sys.path.insert(0, REPO)
mcs = _load("mcs150_pdf_filler", "scripts/document_gen/templates/mcs150_pdf_filler.py")
stamper = _load("signature_stamper", "scripts/workers/services/signature_stamper.py")
intake = {
"legal_name": "ADAMS LUMBER INC",
"dot_number": "1157913",
"entity_type": "corporation",
"carrier_operation": "authorized_for_hire",
"interstate_intrastate": "interstate",
"hazmat": "no",
"power_units": "5",
"drivers": "6",
"annual_miles": "250000",
"cargo_types": ["general"],
"signer_name": "Mark Adams",
"signer_title": "President",
"address_state": "OR",
"email": "m@a.com",
}
pdf_path = mcs.fill_mcs150(intake, order_number="TEST-MCS150-SIG")
pdf_bytes = open(pdf_path, "rb").read()
anchors = stamper.anchors_from_acroform(pdf_bytes, {"certifySignature": "signer"})
assert anchors, "no signature anchor extracted from MCS-150 AcroForm"
box = anchors[0]
print("signature anchor:", box)
signed = stamper.stamp_signature(
pdf_bytes,
anchors,
signature_type="drawn",
signature_data=_synthetic_signature_png(),
)
with tempfile.TemporaryDirectory() as td:
signed_path = os.path.join(td, "signed.pdf")
open(signed_path, "wb").write(signed)
page_no = box["page"] + 1
prefix = os.path.join(td, "render")
subprocess.run(
["pdftoppm", "-png", "-f", str(page_no), "-l", str(page_no), "-r", "150", signed_path, prefix],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# pdftoppm zero-pads the page number; find the produced PNG.
png = next(
os.path.join(td, f) for f in sorted(os.listdir(td)) if f.startswith("render") and f.endswith(".png")
)
from PIL import Image
im = Image.open(png).convert("L")
W, H = im.size
px = im.load()
sc = 150 / 72.0
bx, by, bw, bh = box["x"], box["y"], box["w"], box["h"]
dark = 0
for X in range(int(bx * sc), int((bx + bw) * sc)):
for Y in range(int(H - (by + bh) * sc), int(H - by * sc)):
if 0 <= X < W and 0 <= Y < H and px[X, Y] < 120:
dark += 1
print("dark signature pixels inside MCS-150 signature box:", dark)
assert dark > 500, f"signature not on the line (only {dark} dark px in box)"
print("PASS: signature stamped on the MCS-150 signature line")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -0,0 +1,103 @@
"""Verify the e-signature lands exactly on the signature line of the auth form.
Renders the authorization PDF, captures the signature-box anchors, creates a
synthetic drawn signature, stamps it, and asserts the stamped ink falls inside
the recorded signature box (and on/just-above the signature rule).
"""
import base64
import io
import os
import sys
_SVC = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "workers", "services"))
sys.path.insert(0, _SVC)
import signature_stamper # noqa: E402
from state_trucking_authorization import ( # noqa: E402
build_state_trucking_authorization,
)
def make_signature_png(w=600, h=200) -> bytes:
"""A black squiggle on transparent background -> base64 PNG."""
from PIL import Image, ImageDraw
im = Image.new("RGBA", (w, h), (0, 0, 0, 0))
d = ImageDraw.Draw(im)
pts = [(20, 150), (120, 40), (220, 160), (320, 50), (430, 150), (560, 60)]
d.line(pts, fill=(10, 20, 60, 255), width=8, joint="curve")
buf = io.BytesIO()
im.save(buf, "PNG")
return buf.getvalue()
def main() -> int:
pdf, anchors = build_state_trucking_authorization(
order_number="CO-TEST1234",
entity_name="Acme Freight LLC",
service_name="New York Highway Use Tax (HUT) Registration",
dot_number="3456789",
mc_number="MC-987654",
fein_last4="1234",
base_state="NY",
operating_states=["NY", "PA", "OH"],
signer_name="John Smith",
signer_title="Owner",
)
assert pdf[:4] == b"%PDF", "generator did not emit a PDF"
signer_anchor = next(a for a in anchors if a["field"] == "signer")
print(f"signer box: x={signer_anchor['x']:.1f} y={signer_anchor['y']:.1f} "
f"w={signer_anchor['w']:.1f} h={signer_anchor['h']:.1f}")
sig_png = make_signature_png()
sig_b64 = "data:image/png;base64," + base64.b64encode(sig_png).decode()
signed = signature_stamper.stamp_signature(
pdf,
anchors,
signature_type="drawn",
signature_data=sig_b64,
signer_field="signer",
)
assert signed[:4] == b"%PDF", "stamper did not emit a PDF"
assert len(signed) > len(pdf), "stamped PDF should be larger (image added)"
# Render the signed PDF to an image and confirm dark ink appears inside the
# signature box and essentially nowhere far outside it.
try:
from pdf2image import convert_from_bytes
except ImportError:
print("pdf2image not installed — skipping pixel check (PDF structure OK)")
with open("/tmp/state_trucking_authorization_signed.pdf", "wb") as f:
f.write(signed)
print("wrote /tmp/state_trucking_authorization_signed.pdf")
return 0
pages = convert_from_bytes(signed, dpi=150)
page = pages[0]
pw, ph = page.size
# PDF points -> pixels (origin top-left in image, bottom-left in PDF)
scale = pw / 612.0
bx = signer_anchor["x"] * scale
bw = signer_anchor["w"] * scale
by_top = (792.0 - (signer_anchor["y"] + signer_anchor["h"])) * scale
bh = signer_anchor["h"] * scale
px = page.convert("L").load()
ink_in_box = 0
for yy in range(int(by_top) - 4, int(by_top + bh) + 6):
for xx in range(int(bx) - 4, int(bx + bw) + 4):
if 0 <= xx < pw and 0 <= yy < ph and px[xx, yy] < 90:
ink_in_box += 1
print(f"dark signature pixels inside signature box: {ink_in_box}")
assert ink_in_box > 200, "signature ink not found on the signature line!"
with open("/tmp/state_trucking_authorization_signed.pdf", "wb") as f:
f.write(signed)
print("PASS: signature stamped on the signature line")
print("wrote /tmp/state_trucking_authorization_signed.pdf")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View file

@ -1635,6 +1635,21 @@ def handle_esign_completed(payload: dict) -> dict:
LOG.info("[esign_completed] Signature received for %s (type=%s)", order_number, document_type) LOG.info("[esign_completed] Signature received for %s (type=%s)", order_number, document_type)
# Stamp the captured signature onto the signature line of the form PDF, so
# the signed document carries the client's signature exactly where it
# belongs (see services/signature_stamper.py). Non-fatal on failure — the
# signature is still recorded in esign_records.
esign_record_id = payload.get("esign_record_id")
if esign_record_id:
try:
from scripts.workers.services.esign_stamp import stamp_esign_document
signed_key = stamp_esign_document(int(esign_record_id))
if signed_key:
LOG.info("[esign_completed] Signed form stamped for %s -> %s", order_number, signed_key)
except Exception as exc:
LOG.warning("[esign_completed] Signature stamp failed for %s: %s", order_number, exc)
try: try:
import psycopg2 import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))

View file

@ -104,6 +104,47 @@ def requires_signature(slug: str) -> bool:
return slug in DOT_SIGNING return slug in DOT_SIGNING
# AcroForm signature field name -> logical anchor field, per document_type.
# The stamper grows the box upward so the signature rests on the official rule.
DOC_SIG_FIELDS: dict[str, dict[str, str]] = {
"mcs150": {"certifySignature": "signer"},
"carrier-closeout": {"certifySignature": "signer"},
}
def _extract_acroform_anchors(document_minio_key: str, document_type: str) -> list | None:
"""Download the unsigned form from MinIO and return signature anchors.
Returns a list of anchor dicts (placed on the official signature line) for
document types backed by a real AcroForm PDF, or None if not applicable / on
any failure (the signing flow still works without anchors, falling back to a
typed-name overlay only if anchors are present).
"""
field_map = DOC_SIG_FIELDS.get(document_type)
if not field_map or not document_minio_key:
return None
try:
import tempfile
from .signature_stamper import anchors_from_acroform
try:
from scripts.document_gen.minio_client import MinioStorage
except ImportError:
from document_gen.minio_client import MinioStorage # type: ignore
storage = MinioStorage()
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf:
storage.download(document_minio_key, tf.name)
tf.seek(0)
pdf_bytes = open(tf.name, "rb").read()
anchors = anchors_from_acroform(pdf_bytes, field_map)
return anchors or None
except Exception as exc:
LOG.warning("Could not extract signature anchors from %s: %s", document_minio_key, exc)
return None
def request_dot_esign( def request_dot_esign(
order_number: str, order_number: str,
slug: str, slug: str,
@ -140,6 +181,10 @@ def request_dot_esign(
if extra_metadata: if extra_metadata:
metadata.update(extra_metadata) metadata.update(extra_metadata)
# Extract the exact signature-line coordinates from the unsigned form so the
# client's signature can be stamped right on the rule after they sign.
anchors = _extract_acroform_anchors(document_minio_key, document_type)
# 1. Upsert the pending record # 1. Upsert the pending record
esign_id = None esign_id = None
try: try:
@ -151,9 +196,9 @@ def request_dot_esign(
""" """
INSERT INTO esign_records ( INSERT INTO esign_records (
order_number, document_type, document_title, entity_name, order_number, document_type, document_title, entity_name,
document_minio_key, document_metadata, document_minio_key, document_metadata, signature_anchors,
requires_perjury, status, expires_at requires_perjury, status, expires_at
) VALUES (%s, %s, %s, %s, %s, %s, TRUE, 'pending', ) VALUES (%s, %s, %s, %s, %s, %s, %s, TRUE, 'pending',
NOW() + (%s || ' days')::interval) NOW() + (%s || ' days')::interval)
ON CONFLICT (order_number, document_type) ON CONFLICT (order_number, document_type)
WHERE status IN ('pending', 'signed') WHERE status IN ('pending', 'signed')
@ -162,13 +207,15 @@ def request_dot_esign(
entity_name = EXCLUDED.entity_name, entity_name = EXCLUDED.entity_name,
document_minio_key = EXCLUDED.document_minio_key, document_minio_key = EXCLUDED.document_minio_key,
document_metadata = EXCLUDED.document_metadata, document_metadata = EXCLUDED.document_metadata,
signature_anchors = EXCLUDED.signature_anchors,
expires_at = EXCLUDED.expires_at, expires_at = EXCLUDED.expires_at,
updated_at = NOW() updated_at = NOW()
RETURNING id RETURNING id
""", """,
( (
order_number, document_type, document_title, entity_name, order_number, document_type, document_title, entity_name,
document_minio_key, json.dumps(metadata), str(expires_days), document_minio_key, json.dumps(metadata),
json.dumps(anchors) if anchors else None, str(expires_days),
), ),
) )
row = cur.fetchone() row = cur.fetchone()

View file

@ -0,0 +1,122 @@
"""Stamp the captured signature onto the form PDF stored in MinIO.
Glue between the e-sign record (signature image + recorded anchor coordinates)
and the actual form document. After a client signs, call
:func:`stamp_esign_document` to:
1. read the signature, type, and signature_anchors from esign_records;
2. download the unsigned form PDF (document_minio_key) from MinIO;
3. stamp the signature onto the recorded signature line(s);
4. upload the flattened signed PDF and store signed_document_minio_key.
Safe to call more than once it is idempotent on signed_document_minio_key.
"""
from __future__ import annotations
import json
import logging
import os
import tempfile
LOG = logging.getLogger("workers.services.esign_stamp")
def stamp_esign_document(esign_record_id: int) -> str | None:
"""Stamp the signature onto the form for one esign_records row.
Returns the signed document's MinIO key, or None if nothing was stamped
(no document, no anchors, or already stamped).
"""
try:
import psycopg2
except ImportError: # pragma: no cover
LOG.error("psycopg2 unavailable — cannot stamp signature")
return None
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"""SELECT order_number, document_type, signature_type,
signature_data, document_minio_key, signature_anchors,
signed_document_minio_key
FROM esign_records WHERE id = %s""",
(esign_record_id,),
)
row = cur.fetchone()
if not row:
LOG.warning("[esign_stamp] No esign record %s", esign_record_id)
return None
(order_number, document_type, signature_type, signature_data,
document_key, anchors_raw, already_signed_key) = row
if already_signed_key:
LOG.info("[esign_stamp] %s already stamped (%s)", order_number, already_signed_key)
return already_signed_key
if not document_key:
LOG.info("[esign_stamp] %s has no form PDF to stamp", order_number)
return None
anchors = anchors_raw if isinstance(anchors_raw, list) else (
json.loads(anchors_raw) if anchors_raw else []
)
if not anchors:
LOG.info("[esign_stamp] %s has no signature anchors — skipping stamp", order_number)
return None
if not signature_type or not signature_data:
LOG.warning("[esign_stamp] %s missing signature data", order_number)
return None
# Download the unsigned form PDF.
try:
from scripts.document_gen.minio_client import MinioStorage
except ImportError: # pragma: no cover - alt sys.path layouts
from document_gen.minio_client import MinioStorage # type: ignore
try:
from .signature_stamper import stamp_signature
except ImportError: # pragma: no cover
from signature_stamper import stamp_signature # type: ignore
storage = MinioStorage()
with tempfile.TemporaryDirectory() as tmp:
local_in = os.path.join(tmp, "form.pdf")
storage.download(document_key, local_in)
with open(local_in, "rb") as f:
pdf_bytes = f.read()
signed_bytes = stamp_signature(
pdf_bytes,
anchors,
signature_type=signature_type,
signature_data=signature_data,
signer_field="signer",
)
local_out = os.path.join(tmp, "form_signed.pdf")
with open(local_out, "wb") as f:
f.write(signed_bytes)
# Signed key sits next to the original with a _signed suffix.
if document_key.endswith(".pdf"):
signed_key = document_key[:-4] + "_signed.pdf"
else:
signed_key = document_key + "_signed.pdf"
storage.upload(local_out, signed_key, content_type="application/pdf")
with conn.cursor() as cur:
cur.execute(
"UPDATE esign_records SET signed_document_minio_key = %s, updated_at = NOW() WHERE id = %s",
(signed_key, esign_record_id),
)
conn.commit()
LOG.info("[esign_stamp] %s signature stamped onto form -> %s", order_number, signed_key)
return signed_key
except Exception as exc:
conn.rollback()
LOG.error("[esign_stamp] Failed for record %s: %s", esign_record_id, exc)
return None
finally:
conn.close()

View file

@ -0,0 +1,336 @@
"""Stamp a captured e-signature onto the exact signature line of a PDF form.
Requirement: the client's drawn or typed signature must land **right on top of
where the signature belongs** on the generated form, not in a separate
"signature page" sidecar.
How it works
------------
1. When a document generator draws a signature line, it records the exact PDF
coordinates of every signature box (see ``signature_box`` below). Those
anchors are stored in ``esign_records.signature_anchors``.
2. After the client signs, :func:`stamp_signature` builds a transparent overlay
the same size as each page, draws the signature image (or a script-font
rendering of the typed name) inside the recorded box, then merges the overlay
onto the page with pypdf. The result is a flattened, signed PDF.
The overlay is authored in PDF user-space points with the origin at the
lower-left corner, matching reportlab and the coordinates the generators record,
so a box recorded at ``(x, y, w, h)`` receives the signature at that exact spot.
Only reportlab + pypdf + Pillow-free stdlib are required; if Pillow is present we
use it to read the PNG's intrinsic aspect ratio, otherwise reportlab reads it.
"""
from __future__ import annotations
import base64
import io
import logging
from typing import Any
LOG = logging.getLogger("workers.services.signature_stamper")
# Default signature-box geometry (PDF points). 1 inch = 72 pt.
SIG_BOX_W = 216.0 # 3.0 in
SIG_BOX_H = 40.0 # signature sits in a ~0.55 in tall band above the rule
PAGE_W = 612.0 # US Letter portrait
PAGE_H = 792.0
def signature_box(
field: str,
page: int,
x: float,
y: float,
w: float = SIG_BOX_W,
h: float = SIG_BOX_H,
page_w: float = PAGE_W,
page_h: float = PAGE_H,
) -> dict[str, Any]:
"""Build a signature-anchor record for a generator to persist.
Coordinates are the **lower-left** corner of the box and the box height/width,
in PDF points, authored against a page of ``page_w`` x ``page_h``. The
signature is drawn to sit on top of the signature rule (the rule should be at
or just below ``y``).
"""
return {
"field": field,
"page": int(page),
"x": float(x),
"y": float(y),
"w": float(w),
"h": float(h),
"page_w": float(page_w),
"page_h": float(page_h),
}
def anchors_from_acroform(
pdf_bytes: bytes,
field_map: dict[str, str],
*,
password: str = "",
grow_up: float = 22.0,
drop: float = 7.0,
left_pad: float = 2.0,
) -> list[dict[str, Any]]:
"""Extract signature anchors from a real AcroForm PDF by field name.
``field_map`` maps an AcroForm field name (e.g. ``"certifySignature"``) to the
logical anchor ``field`` we want to store (e.g. ``"signer"``). The field's
``/Rect`` becomes the box; the signature *line* (visible rule) usually sits a
few points below the field's lower edge, so we lower the box by ``drop`` so the
stamped signature rests on the rule, and grow it upward by ``grow_up`` so a
tall signature is not clipped.
Coordinates are returned in PDF points (lower-left origin) against the field's
own page size, exactly what :func:`stamp_signature` expects.
"""
from pypdf import PdfReader
reader = PdfReader(io.BytesIO(pdf_bytes))
if reader.is_encrypted:
try:
reader.decrypt(password)
except Exception as exc: # pragma: no cover - defensive
LOG.warning("Could not decrypt AcroForm PDF: %s", exc)
anchors: list[dict[str, Any]] = []
for page_index, page in enumerate(reader.pages):
page_w = float(page.mediabox.width)
page_h = float(page.mediabox.height)
annots = page.get("/Annots") or []
for annot in annots:
try:
obj = annot.get_object()
except Exception:
continue
name = obj.get("/T")
if not name:
continue
key = str(name)
if key not in field_map:
continue
rect = obj.get("/Rect")
if not rect or len(rect) != 4:
continue
x0, y0, x1, y1 = (float(v) for v in rect)
llx, lly = min(x0, x1), min(y0, y1)
urx, ury = max(x0, x1), max(y0, y1)
box_x = llx + left_pad
box_y = lly - drop
box_w = max(10.0, (urx - llx) - left_pad)
box_h = (ury - lly) + grow_up + drop
anchors.append(
signature_box(
field_map[key],
page_index,
box_x,
box_y,
w=box_w,
h=box_h,
page_w=page_w,
page_h=page_h,
)
)
return anchors
def _decode_png(signature_data: str) -> bytes | None:
"""Decode a base64 PNG (with or without the data: URI prefix)."""
if not signature_data:
return None
raw = signature_data.split(",", 1)[-1].strip()
try:
return base64.b64decode(raw)
except Exception as exc: # pragma: no cover - defensive
LOG.warning("Could not decode signature PNG: %s", exc)
return None
def _png_aspect(png_bytes: bytes) -> float:
"""Width/height aspect ratio of a PNG. Falls back to a sane default."""
try:
from PIL import Image # type: ignore
with Image.open(io.BytesIO(png_bytes)) as im:
w, h = im.size
if h:
return w / h
except Exception:
# Parse the IHDR chunk directly (PNG header) without Pillow.
try:
if png_bytes[:8] == b"\x89PNG\r\n\x1a\n":
width = int.from_bytes(png_bytes[16:20], "big")
height = int.from_bytes(png_bytes[20:24], "big")
if height:
return width / height
except Exception:
pass
return 3.0 # typical handwritten signature is wider than tall
def _trim_png(png_bytes: bytes) -> bytes:
"""Trim transparent/white margins so the ink bounding box fills the image.
This lets us bottom-anchor the *ink* (not the canvas) onto the signature
rule. Falls back to the original bytes if Pillow is unavailable.
"""
try:
from PIL import Image # type: ignore
except Exception:
return png_bytes
try:
with Image.open(io.BytesIO(png_bytes)) as im:
im = im.convert("RGBA")
alpha = im.split()[-1]
bbox = alpha.getbbox()
if not bbox or bbox == (0, 0, im.width, im.height):
# No alpha to trim (e.g. white background) — trim near-white.
gray = im.convert("L")
from PIL import ImageOps
inverted = ImageOps.invert(gray)
bbox = inverted.getbbox()
if bbox:
im = im.crop(bbox)
out = io.BytesIO()
im.save(out, "PNG")
return out.getvalue()
except Exception:
return png_bytes
def _draw_signature_in_box(
c,
box: dict[str, Any],
*,
signature_type: str,
signature_data: str,
page_w: float,
page_h: float,
) -> None:
"""Draw one signature inside ``box`` on an open reportlab canvas.
Scales the box coordinates from the authored page size to the actual page
size so the signature stays on the line even if the page differs slightly.
"""
from reportlab.lib.colors import HexColor
from reportlab.lib.utils import ImageReader
src_w = box.get("page_w") or page_w
src_h = box.get("page_h") or page_h
sx = page_w / src_w if src_w else 1.0
sy = page_h / src_h if src_h else 1.0
bx = box["x"] * sx
by = box["y"] * sy
bw = box["w"] * sx
bh = box["h"] * sy
if signature_type == "drawn":
png = _decode_png(signature_data)
if not png:
return
png = _trim_png(png)
aspect = _png_aspect(png)
# Fit the trimmed signature ink inside the box, preserving aspect ratio,
# left-aligned and bottom-anchored so the ink rests right on the rule. A
# tiny lift keeps the strokes from colliding with the printed line.
lift = 1.5
draw_h = bh
draw_w = draw_h * aspect
if draw_w > bw:
draw_w = bw
draw_h = draw_w / aspect
img = ImageReader(io.BytesIO(png))
c.drawImage(
img,
bx,
by + lift,
width=draw_w,
height=draw_h,
mask="auto",
preserveAspectRatio=True,
anchor="sw",
)
else: # typed
name = (signature_data or "").strip()
if not name:
return
# Render the typed name in an italic/script style, vertically centered in
# the box, resting just above the rule.
font_name = "Helvetica-Oblique"
font_size = min(bh * 0.7, 22.0)
c.setFont(font_name, font_size)
c.setFillColor(HexColor("#0b1f3a"))
# Baseline a few points above the box bottom so the name sits on the line.
c.drawString(bx + 2, by + max(3.0, bh * 0.18), name)
def stamp_signature(
pdf_bytes: bytes,
anchors: list[dict[str, Any]],
*,
signature_type: str,
signature_data: str,
signer_field: str | None = None,
) -> bytes:
"""Return a new PDF with the signature stamped onto each matching anchor.
Args:
pdf_bytes: the unsigned form PDF.
anchors: list of signature-box dicts (see :func:`signature_box`).
signature_type: "drawn" or "typed".
signature_data: base64 PNG (drawn) or the typed full name.
signer_field: if given, only stamp anchors whose ``field`` matches;
otherwise stamp every anchor.
The signature is placed at the recorded coordinates, so it lands exactly on
the signature line the generator drew.
"""
from pypdf import PdfReader, PdfWriter
from reportlab.pdfgen import canvas
if not anchors:
LOG.warning("stamp_signature called with no anchors — returning original PDF")
return pdf_bytes
reader = PdfReader(io.BytesIO(pdf_bytes))
writer = PdfWriter()
# Group anchors by page so we build one overlay per signed page.
by_page: dict[int, list[dict[str, Any]]] = {}
for a in anchors:
if signer_field and a.get("field") != signer_field:
continue
by_page.setdefault(int(a.get("page", 0)), []).append(a)
for page_index, page in enumerate(reader.pages):
page_w = float(page.mediabox.width)
page_h = float(page.mediabox.height)
page_anchors = by_page.get(page_index)
if page_anchors:
buf = io.BytesIO()
c = canvas.Canvas(buf, pagesize=(page_w, page_h))
for box in page_anchors:
_draw_signature_in_box(
c,
box,
signature_type=signature_type,
signature_data=signature_data,
page_w=page_w,
page_h=page_h,
)
c.save()
buf.seek(0)
overlay = PdfReader(buf).pages[0]
page.merge_page(overlay)
writer.add_page(page)
out = io.BytesIO()
writer.write(out)
return out.getvalue()

View file

@ -233,6 +233,46 @@ class StateTruckingHandler:
base_state = intake.get("base_state", intake.get("phy_state", "")) base_state = intake.get("base_state", intake.get("phy_state", ""))
operating_states = intake.get("operating_states", []) operating_states = intake.get("operating_states", [])
# ── Authorization gate ───────────────────────────────────────────
# Every state portal filing legally requires the customer's signed
# "Limited Authorization to File State Motor Carrier Compliance
# Documents" before we touch a state portal/tax system. On the first
# run we generate that authorization, email the signing link, and
# PAUSE. The pipeline resumes here with client_approved=True once the
# customer signs (handle_esign_completed re-dispatches us).
client_approved = bool(order_data.get("client_approved"))
signed_auth_key = None
if not client_approved:
requested = self._request_authorization(
order_number=order_number,
service_slug=service_slug,
service_name=service_name,
entity_name=entity_name,
customer_email=customer_email,
dot_number=dot_number,
mc_number=str(intake.get("mc_number", "")),
fein_last4=str(intake.get("fein_last4", "")),
base_state=base_state,
operating_states=operating_states,
signer_name=str(intake.get("signer_name", order_data.get("customer_name", ""))),
signer_title=str(intake.get("signer_title", "")),
)
if requested:
LOG.info(
"[%s] Authorization requested — pipeline PAUSED pending signature",
order_number,
)
return []
# If we could not request a signature (e.g. no email), fall through
# and create the admin todo so ops can chase the authorization.
LOG.warning(
"[%s] Could not request authorization e-sign — proceeding to admin todo",
order_number,
)
else:
signed_auth_key = self._signed_authorization_key(order_number)
LOG.info("[%s] Authorization signed (%s) — proceeding to filing", order_number, signed_auth_key)
# Slug-specific intake fields collected by StateTruckingIntakeStep. # Slug-specific intake fields collected by StateTruckingIntakeStep.
intake_summary = self._summarize_intake(service_slug, intake) intake_summary = self._summarize_intake(service_slug, intake)
@ -263,6 +303,7 @@ class StateTruckingHandler:
"intake_summary": intake_summary, "intake_summary": intake_summary,
"state_requirements": state_reqs, "state_requirements": state_reqs,
"steps": steps, "steps": steps,
"signed_authorization_minio_key": signed_auth_key,
} }
# Render the slug-specific intake fields into the description. # Render the slug-specific intake fields into the description.
@ -431,10 +472,236 @@ class StateTruckingHandler:
return {"agency": reqs[keys[0]], "url": reqs.get(keys[1], "")} return {"agency": reqs[keys[0]], "url": reqs.get(keys[1], "")}
return None return None
# ── Authorization (signed "Limited Authorization to File") ──────────
# document_type used in esign_records for the state-trucking authorization.
AUTH_DOCUMENT_TYPE = "state-trucking-authorization"
def _request_authorization(
self,
*,
order_number: str,
service_slug: str,
service_name: str,
entity_name: str,
customer_email: str,
dot_number: str = "",
mc_number: str = "",
fein_last4: str = "",
base_state: str = "",
operating_states: list | None = None,
signer_name: str = "",
signer_title: str = "",
) -> bool:
"""Generate + upload the authorization PDF and email the signing link.
Returns True if a signing request was created (pipeline should pause),
False if it could not be requested (e.g. no customer email) so the
caller can fall back to an admin todo.
"""
if not customer_email:
return False
# If a record already exists (pending or signed), don't duplicate.
existing = self._authorization_status(order_number)
if existing == "signed":
return False # already signed — proceed to filing
# (pending records are upserted/refreshed below)
try:
from .state_trucking_authorization import build_state_trucking_authorization
from .signature_stamper import signature_box # noqa: F401 (ensures module import)
except Exception as exc:
LOG.error("[%s] Authorization generator unavailable: %s", order_number, exc)
return False
try:
pdf_bytes, anchors = build_state_trucking_authorization(
order_number=order_number,
entity_name=entity_name,
service_name=service_name,
dot_number=dot_number,
mc_number=mc_number,
fein_last4=fein_last4,
base_state=base_state,
operating_states=operating_states or [],
signer_name=signer_name,
signer_title=signer_title,
)
except Exception as exc:
LOG.error("[%s] Failed to render authorization PDF: %s", order_number, exc)
return False
# Upload the unsigned authorization to MinIO.
document_key = f"compliance/{order_number}/state_trucking_authorization.pdf"
try:
import tempfile
try:
from scripts.document_gen.minio_client import MinioStorage
except ImportError:
from document_gen.minio_client import MinioStorage # type: ignore
storage = MinioStorage()
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf:
tf.write(pdf_bytes)
tf.flush()
storage.upload(tf.name, document_key, content_type="application/pdf")
except Exception as exc:
LOG.error("[%s] Failed to upload authorization PDF: %s", order_number, exc)
return False
# Create the esign record (with the signature anchors so the stamper can
# place the signature on the line) and email the signing link.
try:
import json
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO esign_records (
order_number, document_type, document_title, entity_name,
document_minio_key, document_metadata, signature_anchors,
requires_perjury, status, expires_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, FALSE, 'pending',
NOW() + INTERVAL '14 days')
ON CONFLICT (order_number, document_type)
WHERE status IN ('pending', 'signed')
DO UPDATE SET
document_title = EXCLUDED.document_title,
entity_name = EXCLUDED.entity_name,
document_minio_key = EXCLUDED.document_minio_key,
document_metadata = EXCLUDED.document_metadata,
signature_anchors = EXCLUDED.signature_anchors,
expires_at = EXCLUDED.expires_at,
updated_at = NOW()
""",
(
order_number,
self.AUTH_DOCUMENT_TYPE,
f"Authorization to File: {service_name}",
entity_name,
document_key,
json.dumps({"service_slug": service_slug, "dot_number": dot_number}),
json.dumps(anchors),
),
)
conn.commit()
finally:
conn.close()
except Exception as exc:
LOG.error("[%s] Failed to create authorization esign record: %s", order_number, exc)
return False
# Email the signing link (JWT signed with CUSTOMER_JWT_SECRET).
try:
self._send_authorization_email(
order_number=order_number,
service_name=service_name,
entity_name=entity_name,
customer_email=customer_email,
)
except Exception as exc:
LOG.warning("[%s] Authorization email failed (record exists): %s", order_number, exc)
return True
def _authorization_status(self, order_number: str) -> str | None:
"""Return the current authorization esign status: 'signed', 'pending', or None."""
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"""SELECT status FROM esign_records
WHERE order_number = %s AND document_type = %s
AND status IN ('pending', 'signed')
ORDER BY created_at DESC LIMIT 1""",
(order_number, self.AUTH_DOCUMENT_TYPE),
)
row = cur.fetchone()
return row[0] if row else None
finally:
conn.close()
except Exception:
return None
def _signed_authorization_key(self, order_number: str) -> str | None:
"""Return the MinIO key of the stamped, signed authorization PDF if available."""
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"""SELECT signed_document_minio_key, document_minio_key
FROM esign_records
WHERE order_number = %s AND document_type = %s
AND status = 'signed'
ORDER BY signed_at DESC NULLS LAST LIMIT 1""",
(order_number, self.AUTH_DOCUMENT_TYPE),
)
row = cur.fetchone()
if not row:
return None
return row[0] or row[1]
finally:
conn.close()
except Exception:
return None
def _send_authorization_email(
self, *, order_number, service_name, entity_name, customer_email
):
"""Email the customer a link to review and sign the authorization."""
import os as _os
import smtplib
from datetime import datetime, timedelta, timezone
from email.mime.text import MIMEText
try:
import jwt as pyjwt
except ImportError: # pragma: no cover
import PyJWT as pyjwt # type: ignore
secret = _os.environ.get("CUSTOMER_JWT_SECRET", "changeme_long_random_string")
domain = _os.environ.get("DOMAIN", "performancewest.net")
token = pyjwt.encode(
{
"order_id": order_number,
"order_type": self.AUTH_DOCUMENT_TYPE,
"email": customer_email,
"exp": datetime.now(timezone.utc) + timedelta(days=14),
},
secret,
algorithm="HS256",
)
sign_url = f"https://{domain}/portal/esign?token={token}"
body = (
f"Hi,\n\n"
f"To complete your {service_name} order for {entity_name}, we need your "
f"signed authorization before we can file with the state on your behalf.\n\n"
f"Please review and sign here:\n{sign_url}\n\n"
f"This authorization lets Performance West prepare and submit your state "
f"motor carrier filing, communicate with the agency, and remit government "
f"fees you provide. Government fees, taxes, card processing fees, decals, "
f"permits, bonds, and insurance filing costs are pass-through charges and "
f"may be billed separately if not known at checkout.\n\n"
f"This link expires in 14 days.\n\n"
f"Order: {order_number}\n"
f"Questions? Call (888) 411-0383.\n\n"
f"Performance West Inc.\nDOT / State Motor Carrier Compliance\n"
)
msg = MIMEText(body)
msg["Subject"] = f"Action Required: Sign Your Authorization — {service_name}"
msg["From"] = "noreply@performancewest.net"
msg["To"] = customer_email
with smtplib.SMTP("localhost", 25, timeout=30) as s:
s.sendmail(msg["From"], [customer_email], msg.as_string())
def _send_status_email(self, order_number, service_name, entity_name, dot_number, customer_email): def _send_status_email(self, order_number, service_name, entity_name, dot_number, customer_email):
"""Send client a status email.""" """Send client a status email."""
if not customer_email:
return
try: try:
import smtplib import smtplib
from email.mime.text import MIMEText from email.mime.text import MIMEText

View file

@ -0,0 +1,278 @@
"""Generate the state-trucking "Limited Authorization to File" PDF.
This is the signed authorization the customer must provide before Performance
West can file state motor-carrier compliance documents on their behalf (NY HUT,
CT HUF, OR Weight-Mile, CA MCP/CARB, KY KYU, NM WDT, IRP/IFTA, etc.). See
docs/trucking-state-authorization-plan.md.
The generator draws an explicit signature line and records its exact PDF
coordinates via :func:`signature_box`, so the e-signature stamper can place the
client's signature precisely on that line (see signature_stamper.py).
Returns ``(pdf_bytes, anchors)`` where ``anchors`` is the list to persist in
``esign_records.signature_anchors``.
"""
from __future__ import annotations
import io
import logging
from datetime import datetime, timezone
from typing import Any
try: # normal import when loaded as part of the services package
from .signature_stamper import signature_box
except ImportError: # pragma: no cover - standalone/CLI use
from signature_stamper import signature_box # type: ignore
LOG = logging.getLogger("workers.services.state_trucking_authorization")
NAVY = "#0b1f3a"
SLATE = "#475569"
RULE = "#94a3b8"
_AUTH_BODY = (
"By signing below, the Customer named above authorizes Performance West Inc. "
"(\"Performance West\") to act as its limited agent to:"
)
_AUTH_GRANTS = [
"prepare, review, submit, and amend the state motor carrier compliance "
"filing(s) identified in this authorization;",
"access state agency portals or tax systems where representative, "
"practitioner, bulk-filer, or delegate access is available;",
"communicate with state agencies about the filing and receive filing status, "
"notices, confirmations, decals, credentials, and deficiency messages;",
"coordinate with the Customer's insurer or insurance agent for any required "
"insurance filings;",
"remit government fees, taxes, card processing fees, decals, permits, bonds, "
"and pass-through charges when collected from the Customer;",
"retain a copy of this signed authorization for agency audit or request "
"purposes.",
]
_AUTH_ACK = [
"The Customer remains responsible for the accuracy and completeness of the "
"information it provides.",
"Government fees, taxes, card processing fees, bonds/security, decals, "
"permits, and insurance filing costs are pass-through charges and may be "
"billed separately if not known at the time of order.",
"Some state filings require the Customer to grant portal access/delegation or "
"approve account changes; Performance West will request this where needed.",
"This authorization is limited to the filing(s) listed and may be revoked in "
"writing, except as to filings already submitted.",
]
def build_state_trucking_authorization(
*,
order_number: str,
entity_name: str,
service_name: str,
dot_number: str = "",
mc_number: str = "",
fein_last4: str = "",
base_state: str = "",
operating_states: list[str] | None = None,
signer_name: str = "",
signer_title: str = "",
) -> tuple[bytes, list[dict[str, Any]]]:
"""Render the authorization PDF and return (pdf_bytes, signature_anchors)."""
from reportlab.lib.colors import HexColor
from reportlab.lib.pagesizes import letter
from reportlab.lib.units import inch
from reportlab.pdfgen import canvas
buf = io.BytesIO()
page_w, page_h = letter # 612 x 792
c = canvas.Canvas(buf, pagesize=letter)
left = 0.85 * inch
right = page_w - 0.85 * inch
width = right - left
def wrap(text: str, font: str, size: float, max_w: float) -> list[str]:
c.setFont(font, size)
words = text.split()
lines: list[str] = []
cur = ""
for w in words:
trial = (cur + " " + w).strip()
if c.stringWidth(trial, font, size) <= max_w:
cur = trial
else:
if cur:
lines.append(cur)
cur = w
if cur:
lines.append(cur)
return lines
y = page_h - 0.9 * inch
# Header
c.setFillColor(HexColor(NAVY))
c.setFont("Helvetica-Bold", 16)
c.drawString(left, y, "Performance West Inc.")
y -= 18
c.setFont("Helvetica", 9.5)
c.setFillColor(HexColor(SLATE))
c.drawString(left, y, "DOT / State Motor Carrier Compliance Division | (888) 411-0383 | performancewest.net")
y -= 8
c.setStrokeColor(HexColor(RULE))
c.line(left, y, right, y)
y -= 26
c.setFillColor(HexColor(NAVY))
c.setFont("Helvetica-Bold", 13.5)
c.drawString(left, y, "Limited Authorization to File State Motor Carrier Compliance Documents")
y -= 22
# Customer / order block
def field_row(label: str, value: str) -> None:
nonlocal y
c.setFont("Helvetica-Bold", 10)
c.setFillColor(HexColor(NAVY))
c.drawString(left, y, label)
c.setFont("Helvetica", 10)
c.setFillColor(HexColor("#1f2937"))
c.drawString(left + 1.6 * inch, y, value or "")
y -= 15
states = ", ".join(operating_states or []) if operating_states else (base_state or "")
field_row("Customer (Carrier):", entity_name)
field_row("USDOT #:", dot_number)
if mc_number:
field_row("MC/MX/FF #:", mc_number)
if fein_last4:
field_row("FEIN (last 4):", fein_last4)
field_row("Covered filing:", service_name)
field_row("Covered state(s):", states)
field_row("Order #:", order_number)
y -= 8
# Body intro
c.setFillColor(HexColor("#1f2937"))
for ln in wrap(_AUTH_BODY, "Helvetica", 10, width):
c.setFont("Helvetica", 10)
c.drawString(left, y, ln)
y -= 14
y -= 4
# Numbered grants
for i, grant in enumerate(_AUTH_GRANTS, 1):
bullet = f"{i}."
c.setFont("Helvetica-Bold", 10)
c.drawString(left, y, bullet)
lines = wrap(grant, "Helvetica", 10, width - 0.3 * inch)
for j, ln in enumerate(lines):
c.setFont("Helvetica", 10)
c.drawString(left + 0.3 * inch, y, ln)
y -= 13
y -= 3
y -= 6
c.setFont("Helvetica-Bold", 10.5)
c.setFillColor(HexColor(NAVY))
c.drawString(left, y, "Acknowledgements")
y -= 16
for ack in _AUTH_ACK:
c.setFillColor(HexColor("#475569"))
c.setFont("Helvetica", 9.5)
c.drawString(left, y, "\u2022")
for j, ln in enumerate(wrap(ack, "Helvetica", 9.5, width - 0.25 * inch)):
c.drawString(left + 0.25 * inch, y, ln)
y -= 12
y -= 2
# ── Signature block ──────────────────────────────────────────────
# Reserve a fixed band near the bottom so the signature line position is
# deterministic. The signature box sits ON TOP of the rule.
sig_rule_y = 1.55 * inch
sig_box_h = 42.0
sig_box_w = 3.1 * inch
sig_x = left
c.setFillColor(HexColor(NAVY))
c.setFont("Helvetica-Bold", 10.5)
c.drawString(left, sig_rule_y + sig_box_h + 14, "Authorized Signature")
# The signature rule
c.setStrokeColor(HexColor("#334155"))
c.setLineWidth(1)
c.line(sig_x, sig_rule_y, sig_x + sig_box_w, sig_rule_y)
# Date rule to the right
date_x = sig_x + sig_box_w + 0.5 * inch
date_w = right - date_x
c.line(date_x, sig_rule_y, right, sig_rule_y)
c.setFont("Helvetica", 8.5)
c.setFillColor(HexColor(SLATE))
c.drawString(sig_x, sig_rule_y - 11, "Signature")
c.drawString(date_x, sig_rule_y - 11, "Date")
# Printed name / title beneath
ny = sig_rule_y - 30
c.setFont("Helvetica", 9.5)
c.setFillColor(HexColor("#1f2937"))
c.drawString(sig_x, ny, f"Printed name: {signer_name or '________________________'}")
c.drawString(date_x, ny, f"Title: {signer_title or '____________'}")
# Footer note
c.setFont("Helvetica-Oblique", 7.5)
c.setFillColor(HexColor("#94a3b8"))
c.drawString(
left,
0.75 * inch,
f"Generated {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')} | "
f"Order {order_number} | Performance West Inc.",
)
c.save()
buf.seek(0)
pdf_bytes = buf.getvalue()
# Record the exact signature box so the stamper lands the signature on the
# rule. The box bottom == the rule's y, height extends upward.
anchors = [
signature_box(
field="signer",
page=0,
x=sig_x + 4, # small inset from the line's left end
y=sig_rule_y + 1, # rest the signature just on top of the rule
w=sig_box_w - 8,
h=sig_box_h,
page_w=page_w,
page_h=page_h,
),
signature_box(
field="date",
page=0,
x=date_x + 4,
y=sig_rule_y + 1,
w=date_w - 8,
h=20.0,
page_w=page_w,
page_h=page_h,
),
]
return pdf_bytes, anchors
if __name__ == "__main__": # quick local render for visual verification
pdf, anchors = build_state_trucking_authorization(
order_number="CO-TEST1234",
entity_name="Acme Freight LLC",
service_name="New York Highway Use Tax (HUT) Registration",
dot_number="3456789",
mc_number="MC-987654",
fein_last4="1234",
base_state="NY",
operating_states=["NY", "PA", "OH"],
signer_name="John Smith",
signer_title="Owner",
)
with open("/tmp/state_trucking_authorization.pdf", "wb") as f:
f.write(pdf)
print("wrote /tmp/state_trucking_authorization.pdf")
print("anchors:", anchors)