diff --git a/api/migrations/085_esign_signature_anchors.sql b/api/migrations/085_esign_signature_anchors.sql new file mode 100644 index 0000000..121d457 --- /dev/null +++ b/api/migrations/085_esign_signature_anchors.sql @@ -0,0 +1,32 @@ +-- 085: Precise signature placement for e-signed documents. +-- +-- Requirement: the client's drawn/typed signature must land exactly on the +-- signature line of the generated form, not in a generic "signed" sidecar. +-- +-- When a document generator draws a signature line, it records the exact +-- PDF coordinates of each signature box in `signature_anchors`. At sign time +-- the stamper composites the signature PNG onto those coordinates and writes +-- the flattened, signed PDF to `signed_document_minio_key`. +-- +-- signature_anchors shape (array, one entry per place a signature goes): +-- [ +-- { +-- "field": "signer", -- which signature this box expects +-- "page": 0, -- 0-based page index +-- "x": 72.0, -- lower-left x of the box, in PDF points +-- "y": 120.0, -- lower-left y of the box, in PDF points +-- "w": 216.0, -- box width in points +-- "h": 40.0, -- box height in points +-- "page_w": 612.0, -- page width the coordinates were authored against +-- "page_h": 792.0 -- page height the coordinates were authored against +-- } +-- ] + +ALTER TABLE esign_records + ADD COLUMN IF NOT EXISTS signature_anchors JSONB, + ADD COLUMN IF NOT EXISTS signed_document_minio_key TEXT; + +COMMENT ON COLUMN esign_records.signature_anchors IS + 'Exact PDF coordinates of each signature box, recorded by the document generator so the stamper can place the signature on the signature line.'; +COMMENT ON COLUMN esign_records.signed_document_minio_key IS + 'MinIO key of the flattened PDF with the signature stamped onto the signature line.'; diff --git a/scripts/tests/test_mcs150_signature_placement.py b/scripts/tests/test_mcs150_signature_placement.py new file mode 100644 index 0000000..32dd99d --- /dev/null +++ b/scripts/tests/test_mcs150_signature_placement.py @@ -0,0 +1,117 @@ +"""Verify the captured signature lands on the official MCS-150 signature line. + +Fills the real MCS-150 AcroForm, extracts the ``certifySignature`` anchor from +the form's own field rectangle, stamps a synthetic drawn signature, renders the +signature page, and asserts a meaningful number of dark pixels fall inside the +recorded signature box (i.e. the signature is on the rule, not floating). + +Requires: reportlab, pypdf, pillow, and (for rendering) pdftoppm. +Run from the repo root with a venv that has those installed. +""" +from __future__ import annotations + +import base64 +import importlib.util +import io +import os +import subprocess +import sys +import tempfile + +REPO = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + +def _load(name: str, rel: str): + spec = importlib.util.spec_from_file_location(name, os.path.join(REPO, rel)) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) # type: ignore[union-attr] + return mod + + +def _synthetic_signature_png() -> str: + from PIL import Image, ImageDraw + + im = Image.new("RGBA", (600, 140), (0, 0, 0, 0)) + d = ImageDraw.Draw(im) + d.line( + [(20, 110), (120, 30), (220, 110), (340, 40), (480, 100), (580, 50)], + fill=(10, 20, 60, 255), + width=6, + ) + buf = io.BytesIO() + im.save(buf, "PNG") + return base64.b64encode(buf.getvalue()).decode() + + +def main() -> int: + sys.path.insert(0, REPO) + mcs = _load("mcs150_pdf_filler", "scripts/document_gen/templates/mcs150_pdf_filler.py") + stamper = _load("signature_stamper", "scripts/workers/services/signature_stamper.py") + + intake = { + "legal_name": "ADAMS LUMBER INC", + "dot_number": "1157913", + "entity_type": "corporation", + "carrier_operation": "authorized_for_hire", + "interstate_intrastate": "interstate", + "hazmat": "no", + "power_units": "5", + "drivers": "6", + "annual_miles": "250000", + "cargo_types": ["general"], + "signer_name": "Mark Adams", + "signer_title": "President", + "address_state": "OR", + "email": "m@a.com", + } + pdf_path = mcs.fill_mcs150(intake, order_number="TEST-MCS150-SIG") + pdf_bytes = open(pdf_path, "rb").read() + + anchors = stamper.anchors_from_acroform(pdf_bytes, {"certifySignature": "signer"}) + assert anchors, "no signature anchor extracted from MCS-150 AcroForm" + box = anchors[0] + print("signature anchor:", box) + + signed = stamper.stamp_signature( + pdf_bytes, + anchors, + signature_type="drawn", + signature_data=_synthetic_signature_png(), + ) + + with tempfile.TemporaryDirectory() as td: + signed_path = os.path.join(td, "signed.pdf") + open(signed_path, "wb").write(signed) + page_no = box["page"] + 1 + prefix = os.path.join(td, "render") + subprocess.run( + ["pdftoppm", "-png", "-f", str(page_no), "-l", str(page_no), "-r", "150", signed_path, prefix], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + # pdftoppm zero-pads the page number; find the produced PNG. + png = next( + os.path.join(td, f) for f in sorted(os.listdir(td)) if f.startswith("render") and f.endswith(".png") + ) + from PIL import Image + + im = Image.open(png).convert("L") + W, H = im.size + px = im.load() + sc = 150 / 72.0 + bx, by, bw, bh = box["x"], box["y"], box["w"], box["h"] + dark = 0 + for X in range(int(bx * sc), int((bx + bw) * sc)): + for Y in range(int(H - (by + bh) * sc), int(H - by * sc)): + if 0 <= X < W and 0 <= Y < H and px[X, Y] < 120: + dark += 1 + print("dark signature pixels inside MCS-150 signature box:", dark) + assert dark > 500, f"signature not on the line (only {dark} dark px in box)" + + print("PASS: signature stamped on the MCS-150 signature line") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/tests/test_signature_placement.py b/scripts/tests/test_signature_placement.py new file mode 100644 index 0000000..5520acf --- /dev/null +++ b/scripts/tests/test_signature_placement.py @@ -0,0 +1,103 @@ +"""Verify the e-signature lands exactly on the signature line of the auth form. + +Renders the authorization PDF, captures the signature-box anchors, creates a +synthetic drawn signature, stamps it, and asserts the stamped ink falls inside +the recorded signature box (and on/just-above the signature rule). +""" +import base64 +import io +import os +import sys + +_SVC = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "workers", "services")) +sys.path.insert(0, _SVC) + +import signature_stamper # noqa: E402 +from state_trucking_authorization import ( # noqa: E402 + build_state_trucking_authorization, +) + + +def make_signature_png(w=600, h=200) -> bytes: + """A black squiggle on transparent background -> base64 PNG.""" + from PIL import Image, ImageDraw + + im = Image.new("RGBA", (w, h), (0, 0, 0, 0)) + d = ImageDraw.Draw(im) + pts = [(20, 150), (120, 40), (220, 160), (320, 50), (430, 150), (560, 60)] + d.line(pts, fill=(10, 20, 60, 255), width=8, joint="curve") + buf = io.BytesIO() + im.save(buf, "PNG") + return buf.getvalue() + + +def main() -> int: + pdf, anchors = build_state_trucking_authorization( + order_number="CO-TEST1234", + entity_name="Acme Freight LLC", + service_name="New York Highway Use Tax (HUT) Registration", + dot_number="3456789", + mc_number="MC-987654", + fein_last4="1234", + base_state="NY", + operating_states=["NY", "PA", "OH"], + signer_name="John Smith", + signer_title="Owner", + ) + assert pdf[:4] == b"%PDF", "generator did not emit a PDF" + signer_anchor = next(a for a in anchors if a["field"] == "signer") + print(f"signer box: x={signer_anchor['x']:.1f} y={signer_anchor['y']:.1f} " + f"w={signer_anchor['w']:.1f} h={signer_anchor['h']:.1f}") + + sig_png = make_signature_png() + sig_b64 = "data:image/png;base64," + base64.b64encode(sig_png).decode() + + signed = signature_stamper.stamp_signature( + pdf, + anchors, + signature_type="drawn", + signature_data=sig_b64, + signer_field="signer", + ) + assert signed[:4] == b"%PDF", "stamper did not emit a PDF" + assert len(signed) > len(pdf), "stamped PDF should be larger (image added)" + + # Render the signed PDF to an image and confirm dark ink appears inside the + # signature box and essentially nowhere far outside it. + try: + from pdf2image import convert_from_bytes + except ImportError: + print("pdf2image not installed — skipping pixel check (PDF structure OK)") + with open("/tmp/state_trucking_authorization_signed.pdf", "wb") as f: + f.write(signed) + print("wrote /tmp/state_trucking_authorization_signed.pdf") + return 0 + + pages = convert_from_bytes(signed, dpi=150) + page = pages[0] + pw, ph = page.size + # PDF points -> pixels (origin top-left in image, bottom-left in PDF) + scale = pw / 612.0 + bx = signer_anchor["x"] * scale + bw = signer_anchor["w"] * scale + by_top = (792.0 - (signer_anchor["y"] + signer_anchor["h"])) * scale + bh = signer_anchor["h"] * scale + + px = page.convert("L").load() + ink_in_box = 0 + for yy in range(int(by_top) - 4, int(by_top + bh) + 6): + for xx in range(int(bx) - 4, int(bx + bw) + 4): + if 0 <= xx < pw and 0 <= yy < ph and px[xx, yy] < 90: + ink_in_box += 1 + print(f"dark signature pixels inside signature box: {ink_in_box}") + assert ink_in_box > 200, "signature ink not found on the signature line!" + + with open("/tmp/state_trucking_authorization_signed.pdf", "wb") as f: + f.write(signed) + print("PASS: signature stamped on the signature line") + print("wrote /tmp/state_trucking_authorization_signed.pdf") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/workers/job_server.py b/scripts/workers/job_server.py index a781cb5..6b16b0b 100644 --- a/scripts/workers/job_server.py +++ b/scripts/workers/job_server.py @@ -1635,6 +1635,21 @@ def handle_esign_completed(payload: dict) -> dict: LOG.info("[esign_completed] Signature received for %s (type=%s)", order_number, document_type) + # Stamp the captured signature onto the signature line of the form PDF, so + # the signed document carries the client's signature exactly where it + # belongs (see services/signature_stamper.py). Non-fatal on failure — the + # signature is still recorded in esign_records. + esign_record_id = payload.get("esign_record_id") + if esign_record_id: + try: + from scripts.workers.services.esign_stamp import stamp_esign_document + signed_key = stamp_esign_document(int(esign_record_id)) + if signed_key: + LOG.info("[esign_completed] Signed form stamped for %s -> %s", order_number, signed_key) + except Exception as exc: + LOG.warning("[esign_completed] Signature stamp failed for %s: %s", order_number, exc) + + try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) diff --git a/scripts/workers/services/dot_esign.py b/scripts/workers/services/dot_esign.py index 62d8886..e6a52d9 100644 --- a/scripts/workers/services/dot_esign.py +++ b/scripts/workers/services/dot_esign.py @@ -104,6 +104,47 @@ def requires_signature(slug: str) -> bool: return slug in DOT_SIGNING +# AcroForm signature field name -> logical anchor field, per document_type. +# The stamper grows the box upward so the signature rests on the official rule. +DOC_SIG_FIELDS: dict[str, dict[str, str]] = { + "mcs150": {"certifySignature": "signer"}, + "carrier-closeout": {"certifySignature": "signer"}, +} + + +def _extract_acroform_anchors(document_minio_key: str, document_type: str) -> list | None: + """Download the unsigned form from MinIO and return signature anchors. + + Returns a list of anchor dicts (placed on the official signature line) for + document types backed by a real AcroForm PDF, or None if not applicable / on + any failure (the signing flow still works without anchors, falling back to a + typed-name overlay only if anchors are present). + """ + field_map = DOC_SIG_FIELDS.get(document_type) + if not field_map or not document_minio_key: + return None + try: + import tempfile + + from .signature_stamper import anchors_from_acroform + + try: + from scripts.document_gen.minio_client import MinioStorage + except ImportError: + from document_gen.minio_client import MinioStorage # type: ignore + + storage = MinioStorage() + with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf: + storage.download(document_minio_key, tf.name) + tf.seek(0) + pdf_bytes = open(tf.name, "rb").read() + anchors = anchors_from_acroform(pdf_bytes, field_map) + return anchors or None + except Exception as exc: + LOG.warning("Could not extract signature anchors from %s: %s", document_minio_key, exc) + return None + + def request_dot_esign( order_number: str, slug: str, @@ -140,6 +181,10 @@ def request_dot_esign( if extra_metadata: metadata.update(extra_metadata) + # Extract the exact signature-line coordinates from the unsigned form so the + # client's signature can be stamped right on the rule after they sign. + anchors = _extract_acroform_anchors(document_minio_key, document_type) + # 1. Upsert the pending record esign_id = None try: @@ -151,9 +196,9 @@ def request_dot_esign( """ INSERT INTO esign_records ( order_number, document_type, document_title, entity_name, - document_minio_key, document_metadata, + document_minio_key, document_metadata, signature_anchors, requires_perjury, status, expires_at - ) VALUES (%s, %s, %s, %s, %s, %s, TRUE, 'pending', + ) VALUES (%s, %s, %s, %s, %s, %s, %s, TRUE, 'pending', NOW() + (%s || ' days')::interval) ON CONFLICT (order_number, document_type) WHERE status IN ('pending', 'signed') @@ -162,13 +207,15 @@ def request_dot_esign( entity_name = EXCLUDED.entity_name, document_minio_key = EXCLUDED.document_minio_key, document_metadata = EXCLUDED.document_metadata, + signature_anchors = EXCLUDED.signature_anchors, expires_at = EXCLUDED.expires_at, updated_at = NOW() RETURNING id """, ( order_number, document_type, document_title, entity_name, - document_minio_key, json.dumps(metadata), str(expires_days), + document_minio_key, json.dumps(metadata), + json.dumps(anchors) if anchors else None, str(expires_days), ), ) row = cur.fetchone() diff --git a/scripts/workers/services/esign_stamp.py b/scripts/workers/services/esign_stamp.py new file mode 100644 index 0000000..92ca633 --- /dev/null +++ b/scripts/workers/services/esign_stamp.py @@ -0,0 +1,122 @@ +"""Stamp the captured signature onto the form PDF stored in MinIO. + +Glue between the e-sign record (signature image + recorded anchor coordinates) +and the actual form document. After a client signs, call +:func:`stamp_esign_document` to: + + 1. read the signature, type, and signature_anchors from esign_records; + 2. download the unsigned form PDF (document_minio_key) from MinIO; + 3. stamp the signature onto the recorded signature line(s); + 4. upload the flattened signed PDF and store signed_document_minio_key. + +Safe to call more than once — it is idempotent on signed_document_minio_key. +""" +from __future__ import annotations + +import json +import logging +import os +import tempfile + +LOG = logging.getLogger("workers.services.esign_stamp") + + +def stamp_esign_document(esign_record_id: int) -> str | None: + """Stamp the signature onto the form for one esign_records row. + + Returns the signed document's MinIO key, or None if nothing was stamped + (no document, no anchors, or already stamped). + """ + try: + import psycopg2 + except ImportError: # pragma: no cover + LOG.error("psycopg2 unavailable — cannot stamp signature") + return None + + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + try: + with conn.cursor() as cur: + cur.execute( + """SELECT order_number, document_type, signature_type, + signature_data, document_minio_key, signature_anchors, + signed_document_minio_key + FROM esign_records WHERE id = %s""", + (esign_record_id,), + ) + row = cur.fetchone() + if not row: + LOG.warning("[esign_stamp] No esign record %s", esign_record_id) + return None + + (order_number, document_type, signature_type, signature_data, + document_key, anchors_raw, already_signed_key) = row + + if already_signed_key: + LOG.info("[esign_stamp] %s already stamped (%s)", order_number, already_signed_key) + return already_signed_key + + if not document_key: + LOG.info("[esign_stamp] %s has no form PDF to stamp", order_number) + return None + + anchors = anchors_raw if isinstance(anchors_raw, list) else ( + json.loads(anchors_raw) if anchors_raw else [] + ) + if not anchors: + LOG.info("[esign_stamp] %s has no signature anchors — skipping stamp", order_number) + return None + + if not signature_type or not signature_data: + LOG.warning("[esign_stamp] %s missing signature data", order_number) + return None + + # Download the unsigned form PDF. + try: + from scripts.document_gen.minio_client import MinioStorage + except ImportError: # pragma: no cover - alt sys.path layouts + from document_gen.minio_client import MinioStorage # type: ignore + try: + from .signature_stamper import stamp_signature + except ImportError: # pragma: no cover + from signature_stamper import stamp_signature # type: ignore + + storage = MinioStorage() + with tempfile.TemporaryDirectory() as tmp: + local_in = os.path.join(tmp, "form.pdf") + storage.download(document_key, local_in) + with open(local_in, "rb") as f: + pdf_bytes = f.read() + + signed_bytes = stamp_signature( + pdf_bytes, + anchors, + signature_type=signature_type, + signature_data=signature_data, + signer_field="signer", + ) + + local_out = os.path.join(tmp, "form_signed.pdf") + with open(local_out, "wb") as f: + f.write(signed_bytes) + + # Signed key sits next to the original with a _signed suffix. + if document_key.endswith(".pdf"): + signed_key = document_key[:-4] + "_signed.pdf" + else: + signed_key = document_key + "_signed.pdf" + storage.upload(local_out, signed_key, content_type="application/pdf") + + with conn.cursor() as cur: + cur.execute( + "UPDATE esign_records SET signed_document_minio_key = %s, updated_at = NOW() WHERE id = %s", + (signed_key, esign_record_id), + ) + conn.commit() + LOG.info("[esign_stamp] %s signature stamped onto form -> %s", order_number, signed_key) + return signed_key + except Exception as exc: + conn.rollback() + LOG.error("[esign_stamp] Failed for record %s: %s", esign_record_id, exc) + return None + finally: + conn.close() diff --git a/scripts/workers/services/signature_stamper.py b/scripts/workers/services/signature_stamper.py new file mode 100644 index 0000000..43ae8a8 --- /dev/null +++ b/scripts/workers/services/signature_stamper.py @@ -0,0 +1,336 @@ +"""Stamp a captured e-signature onto the exact signature line of a PDF form. + +Requirement: the client's drawn or typed signature must land **right on top of +where the signature belongs** on the generated form, not in a separate +"signature page" sidecar. + +How it works +------------ +1. When a document generator draws a signature line, it records the exact PDF + coordinates of every signature box (see ``signature_box`` below). Those + anchors are stored in ``esign_records.signature_anchors``. +2. After the client signs, :func:`stamp_signature` builds a transparent overlay + the same size as each page, draws the signature image (or a script-font + rendering of the typed name) inside the recorded box, then merges the overlay + onto the page with pypdf. The result is a flattened, signed PDF. + +The overlay is authored in PDF user-space points with the origin at the +lower-left corner, matching reportlab and the coordinates the generators record, +so a box recorded at ``(x, y, w, h)`` receives the signature at that exact spot. + +Only reportlab + pypdf + Pillow-free stdlib are required; if Pillow is present we +use it to read the PNG's intrinsic aspect ratio, otherwise reportlab reads it. +""" +from __future__ import annotations + +import base64 +import io +import logging +from typing import Any + +LOG = logging.getLogger("workers.services.signature_stamper") + +# Default signature-box geometry (PDF points). 1 inch = 72 pt. +SIG_BOX_W = 216.0 # 3.0 in +SIG_BOX_H = 40.0 # signature sits in a ~0.55 in tall band above the rule +PAGE_W = 612.0 # US Letter portrait +PAGE_H = 792.0 + + +def signature_box( + field: str, + page: int, + x: float, + y: float, + w: float = SIG_BOX_W, + h: float = SIG_BOX_H, + page_w: float = PAGE_W, + page_h: float = PAGE_H, +) -> dict[str, Any]: + """Build a signature-anchor record for a generator to persist. + + Coordinates are the **lower-left** corner of the box and the box height/width, + in PDF points, authored against a page of ``page_w`` x ``page_h``. The + signature is drawn to sit on top of the signature rule (the rule should be at + or just below ``y``). + """ + return { + "field": field, + "page": int(page), + "x": float(x), + "y": float(y), + "w": float(w), + "h": float(h), + "page_w": float(page_w), + "page_h": float(page_h), + } + + +def anchors_from_acroform( + pdf_bytes: bytes, + field_map: dict[str, str], + *, + password: str = "", + grow_up: float = 22.0, + drop: float = 7.0, + left_pad: float = 2.0, +) -> list[dict[str, Any]]: + """Extract signature anchors from a real AcroForm PDF by field name. + + ``field_map`` maps an AcroForm field name (e.g. ``"certifySignature"``) to the + logical anchor ``field`` we want to store (e.g. ``"signer"``). The field's + ``/Rect`` becomes the box; the signature *line* (visible rule) usually sits a + few points below the field's lower edge, so we lower the box by ``drop`` so the + stamped signature rests on the rule, and grow it upward by ``grow_up`` so a + tall signature is not clipped. + + Coordinates are returned in PDF points (lower-left origin) against the field's + own page size, exactly what :func:`stamp_signature` expects. + """ + from pypdf import PdfReader + + reader = PdfReader(io.BytesIO(pdf_bytes)) + if reader.is_encrypted: + try: + reader.decrypt(password) + except Exception as exc: # pragma: no cover - defensive + LOG.warning("Could not decrypt AcroForm PDF: %s", exc) + + anchors: list[dict[str, Any]] = [] + for page_index, page in enumerate(reader.pages): + page_w = float(page.mediabox.width) + page_h = float(page.mediabox.height) + annots = page.get("/Annots") or [] + for annot in annots: + try: + obj = annot.get_object() + except Exception: + continue + name = obj.get("/T") + if not name: + continue + key = str(name) + if key not in field_map: + continue + rect = obj.get("/Rect") + if not rect or len(rect) != 4: + continue + x0, y0, x1, y1 = (float(v) for v in rect) + llx, lly = min(x0, x1), min(y0, y1) + urx, ury = max(x0, x1), max(y0, y1) + box_x = llx + left_pad + box_y = lly - drop + box_w = max(10.0, (urx - llx) - left_pad) + box_h = (ury - lly) + grow_up + drop + anchors.append( + signature_box( + field_map[key], + page_index, + box_x, + box_y, + w=box_w, + h=box_h, + page_w=page_w, + page_h=page_h, + ) + ) + return anchors + + +def _decode_png(signature_data: str) -> bytes | None: + """Decode a base64 PNG (with or without the data: URI prefix).""" + if not signature_data: + return None + raw = signature_data.split(",", 1)[-1].strip() + try: + return base64.b64decode(raw) + except Exception as exc: # pragma: no cover - defensive + LOG.warning("Could not decode signature PNG: %s", exc) + return None + + +def _png_aspect(png_bytes: bytes) -> float: + """Width/height aspect ratio of a PNG. Falls back to a sane default.""" + try: + from PIL import Image # type: ignore + + with Image.open(io.BytesIO(png_bytes)) as im: + w, h = im.size + if h: + return w / h + except Exception: + # Parse the IHDR chunk directly (PNG header) without Pillow. + try: + if png_bytes[:8] == b"\x89PNG\r\n\x1a\n": + width = int.from_bytes(png_bytes[16:20], "big") + height = int.from_bytes(png_bytes[20:24], "big") + if height: + return width / height + except Exception: + pass + return 3.0 # typical handwritten signature is wider than tall + + +def _trim_png(png_bytes: bytes) -> bytes: + """Trim transparent/white margins so the ink bounding box fills the image. + + This lets us bottom-anchor the *ink* (not the canvas) onto the signature + rule. Falls back to the original bytes if Pillow is unavailable. + """ + try: + from PIL import Image # type: ignore + except Exception: + return png_bytes + try: + with Image.open(io.BytesIO(png_bytes)) as im: + im = im.convert("RGBA") + alpha = im.split()[-1] + bbox = alpha.getbbox() + if not bbox or bbox == (0, 0, im.width, im.height): + # No alpha to trim (e.g. white background) — trim near-white. + gray = im.convert("L") + from PIL import ImageOps + + inverted = ImageOps.invert(gray) + bbox = inverted.getbbox() + if bbox: + im = im.crop(bbox) + out = io.BytesIO() + im.save(out, "PNG") + return out.getvalue() + except Exception: + return png_bytes + + +def _draw_signature_in_box( + c, + box: dict[str, Any], + *, + signature_type: str, + signature_data: str, + page_w: float, + page_h: float, +) -> None: + """Draw one signature inside ``box`` on an open reportlab canvas. + + Scales the box coordinates from the authored page size to the actual page + size so the signature stays on the line even if the page differs slightly. + """ + from reportlab.lib.colors import HexColor + from reportlab.lib.utils import ImageReader + + src_w = box.get("page_w") or page_w + src_h = box.get("page_h") or page_h + sx = page_w / src_w if src_w else 1.0 + sy = page_h / src_h if src_h else 1.0 + + bx = box["x"] * sx + by = box["y"] * sy + bw = box["w"] * sx + bh = box["h"] * sy + + if signature_type == "drawn": + png = _decode_png(signature_data) + if not png: + return + png = _trim_png(png) + aspect = _png_aspect(png) + # Fit the trimmed signature ink inside the box, preserving aspect ratio, + # left-aligned and bottom-anchored so the ink rests right on the rule. A + # tiny lift keeps the strokes from colliding with the printed line. + lift = 1.5 + draw_h = bh + draw_w = draw_h * aspect + if draw_w > bw: + draw_w = bw + draw_h = draw_w / aspect + img = ImageReader(io.BytesIO(png)) + c.drawImage( + img, + bx, + by + lift, + width=draw_w, + height=draw_h, + mask="auto", + preserveAspectRatio=True, + anchor="sw", + ) + else: # typed + name = (signature_data or "").strip() + if not name: + return + # Render the typed name in an italic/script style, vertically centered in + # the box, resting just above the rule. + font_name = "Helvetica-Oblique" + font_size = min(bh * 0.7, 22.0) + c.setFont(font_name, font_size) + c.setFillColor(HexColor("#0b1f3a")) + # Baseline a few points above the box bottom so the name sits on the line. + c.drawString(bx + 2, by + max(3.0, bh * 0.18), name) + + +def stamp_signature( + pdf_bytes: bytes, + anchors: list[dict[str, Any]], + *, + signature_type: str, + signature_data: str, + signer_field: str | None = None, +) -> bytes: + """Return a new PDF with the signature stamped onto each matching anchor. + + Args: + pdf_bytes: the unsigned form PDF. + anchors: list of signature-box dicts (see :func:`signature_box`). + signature_type: "drawn" or "typed". + signature_data: base64 PNG (drawn) or the typed full name. + signer_field: if given, only stamp anchors whose ``field`` matches; + otherwise stamp every anchor. + + The signature is placed at the recorded coordinates, so it lands exactly on + the signature line the generator drew. + """ + from pypdf import PdfReader, PdfWriter + from reportlab.pdfgen import canvas + + if not anchors: + LOG.warning("stamp_signature called with no anchors — returning original PDF") + return pdf_bytes + + reader = PdfReader(io.BytesIO(pdf_bytes)) + writer = PdfWriter() + + # Group anchors by page so we build one overlay per signed page. + by_page: dict[int, list[dict[str, Any]]] = {} + for a in anchors: + if signer_field and a.get("field") != signer_field: + continue + by_page.setdefault(int(a.get("page", 0)), []).append(a) + + for page_index, page in enumerate(reader.pages): + page_w = float(page.mediabox.width) + page_h = float(page.mediabox.height) + page_anchors = by_page.get(page_index) + + if page_anchors: + buf = io.BytesIO() + c = canvas.Canvas(buf, pagesize=(page_w, page_h)) + for box in page_anchors: + _draw_signature_in_box( + c, + box, + signature_type=signature_type, + signature_data=signature_data, + page_w=page_w, + page_h=page_h, + ) + c.save() + buf.seek(0) + overlay = PdfReader(buf).pages[0] + page.merge_page(overlay) + + writer.add_page(page) + + out = io.BytesIO() + writer.write(out) + return out.getvalue() diff --git a/scripts/workers/services/state_trucking.py b/scripts/workers/services/state_trucking.py index 49bae8d..091c4ab 100644 --- a/scripts/workers/services/state_trucking.py +++ b/scripts/workers/services/state_trucking.py @@ -233,6 +233,46 @@ class StateTruckingHandler: base_state = intake.get("base_state", intake.get("phy_state", "")) operating_states = intake.get("operating_states", []) + # ── Authorization gate ─────────────────────────────────────────── + # Every state portal filing legally requires the customer's signed + # "Limited Authorization to File State Motor Carrier Compliance + # Documents" before we touch a state portal/tax system. On the first + # run we generate that authorization, email the signing link, and + # PAUSE. The pipeline resumes here with client_approved=True once the + # customer signs (handle_esign_completed re-dispatches us). + client_approved = bool(order_data.get("client_approved")) + signed_auth_key = None + if not client_approved: + requested = self._request_authorization( + order_number=order_number, + service_slug=service_slug, + service_name=service_name, + entity_name=entity_name, + customer_email=customer_email, + dot_number=dot_number, + mc_number=str(intake.get("mc_number", "")), + fein_last4=str(intake.get("fein_last4", "")), + base_state=base_state, + operating_states=operating_states, + signer_name=str(intake.get("signer_name", order_data.get("customer_name", ""))), + signer_title=str(intake.get("signer_title", "")), + ) + if requested: + LOG.info( + "[%s] Authorization requested — pipeline PAUSED pending signature", + order_number, + ) + return [] + # If we could not request a signature (e.g. no email), fall through + # and create the admin todo so ops can chase the authorization. + LOG.warning( + "[%s] Could not request authorization e-sign — proceeding to admin todo", + order_number, + ) + else: + signed_auth_key = self._signed_authorization_key(order_number) + LOG.info("[%s] Authorization signed (%s) — proceeding to filing", order_number, signed_auth_key) + # Slug-specific intake fields collected by StateTruckingIntakeStep. intake_summary = self._summarize_intake(service_slug, intake) @@ -263,6 +303,7 @@ class StateTruckingHandler: "intake_summary": intake_summary, "state_requirements": state_reqs, "steps": steps, + "signed_authorization_minio_key": signed_auth_key, } # Render the slug-specific intake fields into the description. @@ -431,10 +472,236 @@ class StateTruckingHandler: return {"agency": reqs[keys[0]], "url": reqs.get(keys[1], "")} return None + # ── Authorization (signed "Limited Authorization to File") ────────── + + # document_type used in esign_records for the state-trucking authorization. + AUTH_DOCUMENT_TYPE = "state-trucking-authorization" + + def _request_authorization( + self, + *, + order_number: str, + service_slug: str, + service_name: str, + entity_name: str, + customer_email: str, + dot_number: str = "", + mc_number: str = "", + fein_last4: str = "", + base_state: str = "", + operating_states: list | None = None, + signer_name: str = "", + signer_title: str = "", + ) -> bool: + """Generate + upload the authorization PDF and email the signing link. + + Returns True if a signing request was created (pipeline should pause), + False if it could not be requested (e.g. no customer email) so the + caller can fall back to an admin todo. + """ + if not customer_email: + return False + + # If a record already exists (pending or signed), don't duplicate. + existing = self._authorization_status(order_number) + if existing == "signed": + return False # already signed — proceed to filing + # (pending records are upserted/refreshed below) + + try: + from .state_trucking_authorization import build_state_trucking_authorization + from .signature_stamper import signature_box # noqa: F401 (ensures module import) + except Exception as exc: + LOG.error("[%s] Authorization generator unavailable: %s", order_number, exc) + return False + + try: + pdf_bytes, anchors = build_state_trucking_authorization( + order_number=order_number, + entity_name=entity_name, + service_name=service_name, + dot_number=dot_number, + mc_number=mc_number, + fein_last4=fein_last4, + base_state=base_state, + operating_states=operating_states or [], + signer_name=signer_name, + signer_title=signer_title, + ) + except Exception as exc: + LOG.error("[%s] Failed to render authorization PDF: %s", order_number, exc) + return False + + # Upload the unsigned authorization to MinIO. + document_key = f"compliance/{order_number}/state_trucking_authorization.pdf" + try: + import tempfile + try: + from scripts.document_gen.minio_client import MinioStorage + except ImportError: + from document_gen.minio_client import MinioStorage # type: ignore + storage = MinioStorage() + with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf: + tf.write(pdf_bytes) + tf.flush() + storage.upload(tf.name, document_key, content_type="application/pdf") + except Exception as exc: + LOG.error("[%s] Failed to upload authorization PDF: %s", order_number, exc) + return False + + # Create the esign record (with the signature anchors so the stamper can + # place the signature on the line) and email the signing link. + try: + import json + import psycopg2 + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + try: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO esign_records ( + order_number, document_type, document_title, entity_name, + document_minio_key, document_metadata, signature_anchors, + requires_perjury, status, expires_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s, FALSE, 'pending', + NOW() + INTERVAL '14 days') + ON CONFLICT (order_number, document_type) + WHERE status IN ('pending', 'signed') + DO UPDATE SET + document_title = EXCLUDED.document_title, + entity_name = EXCLUDED.entity_name, + document_minio_key = EXCLUDED.document_minio_key, + document_metadata = EXCLUDED.document_metadata, + signature_anchors = EXCLUDED.signature_anchors, + expires_at = EXCLUDED.expires_at, + updated_at = NOW() + """, + ( + order_number, + self.AUTH_DOCUMENT_TYPE, + f"Authorization to File: {service_name}", + entity_name, + document_key, + json.dumps({"service_slug": service_slug, "dot_number": dot_number}), + json.dumps(anchors), + ), + ) + conn.commit() + finally: + conn.close() + except Exception as exc: + LOG.error("[%s] Failed to create authorization esign record: %s", order_number, exc) + return False + + # Email the signing link (JWT signed with CUSTOMER_JWT_SECRET). + try: + self._send_authorization_email( + order_number=order_number, + service_name=service_name, + entity_name=entity_name, + customer_email=customer_email, + ) + except Exception as exc: + LOG.warning("[%s] Authorization email failed (record exists): %s", order_number, exc) + + return True + + def _authorization_status(self, order_number: str) -> str | None: + """Return the current authorization esign status: 'signed', 'pending', or None.""" + try: + import psycopg2 + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + try: + with conn.cursor() as cur: + cur.execute( + """SELECT status FROM esign_records + WHERE order_number = %s AND document_type = %s + AND status IN ('pending', 'signed') + ORDER BY created_at DESC LIMIT 1""", + (order_number, self.AUTH_DOCUMENT_TYPE), + ) + row = cur.fetchone() + return row[0] if row else None + finally: + conn.close() + except Exception: + return None + + def _signed_authorization_key(self, order_number: str) -> str | None: + """Return the MinIO key of the stamped, signed authorization PDF if available.""" + try: + import psycopg2 + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + try: + with conn.cursor() as cur: + cur.execute( + """SELECT signed_document_minio_key, document_minio_key + FROM esign_records + WHERE order_number = %s AND document_type = %s + AND status = 'signed' + ORDER BY signed_at DESC NULLS LAST LIMIT 1""", + (order_number, self.AUTH_DOCUMENT_TYPE), + ) + row = cur.fetchone() + if not row: + return None + return row[0] or row[1] + finally: + conn.close() + except Exception: + return None + + def _send_authorization_email( + self, *, order_number, service_name, entity_name, customer_email + ): + """Email the customer a link to review and sign the authorization.""" + import os as _os + import smtplib + from datetime import datetime, timedelta, timezone + from email.mime.text import MIMEText + + try: + import jwt as pyjwt + except ImportError: # pragma: no cover + import PyJWT as pyjwt # type: ignore + + secret = _os.environ.get("CUSTOMER_JWT_SECRET", "changeme_long_random_string") + domain = _os.environ.get("DOMAIN", "performancewest.net") + token = pyjwt.encode( + { + "order_id": order_number, + "order_type": self.AUTH_DOCUMENT_TYPE, + "email": customer_email, + "exp": datetime.now(timezone.utc) + timedelta(days=14), + }, + secret, + algorithm="HS256", + ) + sign_url = f"https://{domain}/portal/esign?token={token}" + body = ( + f"Hi,\n\n" + f"To complete your {service_name} order for {entity_name}, we need your " + f"signed authorization before we can file with the state on your behalf.\n\n" + f"Please review and sign here:\n{sign_url}\n\n" + f"This authorization lets Performance West prepare and submit your state " + f"motor carrier filing, communicate with the agency, and remit government " + f"fees you provide. Government fees, taxes, card processing fees, decals, " + f"permits, bonds, and insurance filing costs are pass-through charges and " + f"may be billed separately if not known at checkout.\n\n" + f"This link expires in 14 days.\n\n" + f"Order: {order_number}\n" + f"Questions? Call (888) 411-0383.\n\n" + f"Performance West Inc.\nDOT / State Motor Carrier Compliance\n" + ) + msg = MIMEText(body) + msg["Subject"] = f"Action Required: Sign Your Authorization — {service_name}" + msg["From"] = "noreply@performancewest.net" + msg["To"] = customer_email + with smtplib.SMTP("localhost", 25, timeout=30) as s: + s.sendmail(msg["From"], [customer_email], msg.as_string()) + def _send_status_email(self, order_number, service_name, entity_name, dot_number, customer_email): """Send client a status email.""" - if not customer_email: - return try: import smtplib from email.mime.text import MIMEText diff --git a/scripts/workers/services/state_trucking_authorization.py b/scripts/workers/services/state_trucking_authorization.py new file mode 100644 index 0000000..c0acc37 --- /dev/null +++ b/scripts/workers/services/state_trucking_authorization.py @@ -0,0 +1,278 @@ +"""Generate the state-trucking "Limited Authorization to File" PDF. + +This is the signed authorization the customer must provide before Performance +West can file state motor-carrier compliance documents on their behalf (NY HUT, +CT HUF, OR Weight-Mile, CA MCP/CARB, KY KYU, NM WDT, IRP/IFTA, etc.). See +docs/trucking-state-authorization-plan.md. + +The generator draws an explicit signature line and records its exact PDF +coordinates via :func:`signature_box`, so the e-signature stamper can place the +client's signature precisely on that line (see signature_stamper.py). + +Returns ``(pdf_bytes, anchors)`` where ``anchors`` is the list to persist in +``esign_records.signature_anchors``. +""" +from __future__ import annotations + +import io +import logging +from datetime import datetime, timezone +from typing import Any + +try: # normal import when loaded as part of the services package + from .signature_stamper import signature_box +except ImportError: # pragma: no cover - standalone/CLI use + from signature_stamper import signature_box # type: ignore + +LOG = logging.getLogger("workers.services.state_trucking_authorization") + +NAVY = "#0b1f3a" +SLATE = "#475569" +RULE = "#94a3b8" + +_AUTH_BODY = ( + "By signing below, the Customer named above authorizes Performance West Inc. " + "(\"Performance West\") to act as its limited agent to:" +) + +_AUTH_GRANTS = [ + "prepare, review, submit, and amend the state motor carrier compliance " + "filing(s) identified in this authorization;", + "access state agency portals or tax systems where representative, " + "practitioner, bulk-filer, or delegate access is available;", + "communicate with state agencies about the filing and receive filing status, " + "notices, confirmations, decals, credentials, and deficiency messages;", + "coordinate with the Customer's insurer or insurance agent for any required " + "insurance filings;", + "remit government fees, taxes, card processing fees, decals, permits, bonds, " + "and pass-through charges when collected from the Customer;", + "retain a copy of this signed authorization for agency audit or request " + "purposes.", +] + +_AUTH_ACK = [ + "The Customer remains responsible for the accuracy and completeness of the " + "information it provides.", + "Government fees, taxes, card processing fees, bonds/security, decals, " + "permits, and insurance filing costs are pass-through charges and may be " + "billed separately if not known at the time of order.", + "Some state filings require the Customer to grant portal access/delegation or " + "approve account changes; Performance West will request this where needed.", + "This authorization is limited to the filing(s) listed and may be revoked in " + "writing, except as to filings already submitted.", +] + + +def build_state_trucking_authorization( + *, + order_number: str, + entity_name: str, + service_name: str, + dot_number: str = "", + mc_number: str = "", + fein_last4: str = "", + base_state: str = "", + operating_states: list[str] | None = None, + signer_name: str = "", + signer_title: str = "", +) -> tuple[bytes, list[dict[str, Any]]]: + """Render the authorization PDF and return (pdf_bytes, signature_anchors).""" + from reportlab.lib.colors import HexColor + from reportlab.lib.pagesizes import letter + from reportlab.lib.units import inch + from reportlab.pdfgen import canvas + + buf = io.BytesIO() + page_w, page_h = letter # 612 x 792 + c = canvas.Canvas(buf, pagesize=letter) + + left = 0.85 * inch + right = page_w - 0.85 * inch + width = right - left + + def wrap(text: str, font: str, size: float, max_w: float) -> list[str]: + c.setFont(font, size) + words = text.split() + lines: list[str] = [] + cur = "" + for w in words: + trial = (cur + " " + w).strip() + if c.stringWidth(trial, font, size) <= max_w: + cur = trial + else: + if cur: + lines.append(cur) + cur = w + if cur: + lines.append(cur) + return lines + + y = page_h - 0.9 * inch + + # Header + c.setFillColor(HexColor(NAVY)) + c.setFont("Helvetica-Bold", 16) + c.drawString(left, y, "Performance West Inc.") + y -= 18 + c.setFont("Helvetica", 9.5) + c.setFillColor(HexColor(SLATE)) + c.drawString(left, y, "DOT / State Motor Carrier Compliance Division | (888) 411-0383 | performancewest.net") + y -= 8 + c.setStrokeColor(HexColor(RULE)) + c.line(left, y, right, y) + y -= 26 + + c.setFillColor(HexColor(NAVY)) + c.setFont("Helvetica-Bold", 13.5) + c.drawString(left, y, "Limited Authorization to File State Motor Carrier Compliance Documents") + y -= 22 + + # Customer / order block + def field_row(label: str, value: str) -> None: + nonlocal y + c.setFont("Helvetica-Bold", 10) + c.setFillColor(HexColor(NAVY)) + c.drawString(left, y, label) + c.setFont("Helvetica", 10) + c.setFillColor(HexColor("#1f2937")) + c.drawString(left + 1.6 * inch, y, value or "—") + y -= 15 + + states = ", ".join(operating_states or []) if operating_states else (base_state or "—") + field_row("Customer (Carrier):", entity_name) + field_row("USDOT #:", dot_number) + if mc_number: + field_row("MC/MX/FF #:", mc_number) + if fein_last4: + field_row("FEIN (last 4):", fein_last4) + field_row("Covered filing:", service_name) + field_row("Covered state(s):", states) + field_row("Order #:", order_number) + y -= 8 + + # Body intro + c.setFillColor(HexColor("#1f2937")) + for ln in wrap(_AUTH_BODY, "Helvetica", 10, width): + c.setFont("Helvetica", 10) + c.drawString(left, y, ln) + y -= 14 + y -= 4 + + # Numbered grants + for i, grant in enumerate(_AUTH_GRANTS, 1): + bullet = f"{i}." + c.setFont("Helvetica-Bold", 10) + c.drawString(left, y, bullet) + lines = wrap(grant, "Helvetica", 10, width - 0.3 * inch) + for j, ln in enumerate(lines): + c.setFont("Helvetica", 10) + c.drawString(left + 0.3 * inch, y, ln) + y -= 13 + y -= 3 + + y -= 6 + c.setFont("Helvetica-Bold", 10.5) + c.setFillColor(HexColor(NAVY)) + c.drawString(left, y, "Acknowledgements") + y -= 16 + for ack in _AUTH_ACK: + c.setFillColor(HexColor("#475569")) + c.setFont("Helvetica", 9.5) + c.drawString(left, y, "\u2022") + for j, ln in enumerate(wrap(ack, "Helvetica", 9.5, width - 0.25 * inch)): + c.drawString(left + 0.25 * inch, y, ln) + y -= 12 + y -= 2 + + # ── Signature block ────────────────────────────────────────────── + # Reserve a fixed band near the bottom so the signature line position is + # deterministic. The signature box sits ON TOP of the rule. + sig_rule_y = 1.55 * inch + sig_box_h = 42.0 + sig_box_w = 3.1 * inch + sig_x = left + + c.setFillColor(HexColor(NAVY)) + c.setFont("Helvetica-Bold", 10.5) + c.drawString(left, sig_rule_y + sig_box_h + 14, "Authorized Signature") + + # The signature rule + c.setStrokeColor(HexColor("#334155")) + c.setLineWidth(1) + c.line(sig_x, sig_rule_y, sig_x + sig_box_w, sig_rule_y) + + # Date rule to the right + date_x = sig_x + sig_box_w + 0.5 * inch + date_w = right - date_x + c.line(date_x, sig_rule_y, right, sig_rule_y) + + c.setFont("Helvetica", 8.5) + c.setFillColor(HexColor(SLATE)) + c.drawString(sig_x, sig_rule_y - 11, "Signature") + c.drawString(date_x, sig_rule_y - 11, "Date") + + # Printed name / title beneath + ny = sig_rule_y - 30 + c.setFont("Helvetica", 9.5) + c.setFillColor(HexColor("#1f2937")) + c.drawString(sig_x, ny, f"Printed name: {signer_name or '________________________'}") + c.drawString(date_x, ny, f"Title: {signer_title or '____________'}") + + # Footer note + c.setFont("Helvetica-Oblique", 7.5) + c.setFillColor(HexColor("#94a3b8")) + c.drawString( + left, + 0.75 * inch, + f"Generated {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')} | " + f"Order {order_number} | Performance West Inc.", + ) + + c.save() + buf.seek(0) + pdf_bytes = buf.getvalue() + + # Record the exact signature box so the stamper lands the signature on the + # rule. The box bottom == the rule's y, height extends upward. + anchors = [ + signature_box( + field="signer", + page=0, + x=sig_x + 4, # small inset from the line's left end + y=sig_rule_y + 1, # rest the signature just on top of the rule + w=sig_box_w - 8, + h=sig_box_h, + page_w=page_w, + page_h=page_h, + ), + signature_box( + field="date", + page=0, + x=date_x + 4, + y=sig_rule_y + 1, + w=date_w - 8, + h=20.0, + page_w=page_w, + page_h=page_h, + ), + ] + return pdf_bytes, anchors + + +if __name__ == "__main__": # quick local render for visual verification + pdf, anchors = build_state_trucking_authorization( + order_number="CO-TEST1234", + entity_name="Acme Freight LLC", + service_name="New York Highway Use Tax (HUT) Registration", + dot_number="3456789", + mc_number="MC-987654", + fein_last4="1234", + base_state="NY", + operating_states=["NY", "PA", "OH"], + signer_name="John Smith", + signer_title="Owner", + ) + with open("/tmp/state_trucking_authorization.pdf", "wb") as f: + f.write(pdf) + print("wrote /tmp/state_trucking_authorization.pdf") + print("anchors:", anchors)