new-site/scripts/workers/services/ink_signature_plotter.py
justin d5e66786a2 mcs150: enrich intake from FMCSA carrier census before PDF fill
The MCS-150 biennial update re-confirms the carrier's existing FMCSA
record. Previously the PDF filler only had whatever the intake form
collected; rescued/sparse orders (or orders where the carrier's data
lives in FMCSA, not the intake) produced near-empty forms. Now we pull
the carrier census (legal name, address, EIN, fleet counts) from the
FMCSA carrier API and merge it under any customer-provided intake values
(customer edits win), so the form is pre-filled with the carrier's
current registered data. Refactored the FMCSA fetch into a shared
_fetch_fmcsa_carrier helper used by both enrichment and status check.
2026-06-10 12:35:52 -05:00

948 lines
37 KiB
Python

"""Pen-plotter ink-signature pipeline: captured strokes -> G-code (and preview).
The Standard (no-login) CMS filing path requires an ORIGINAL ink signature on
paper ("Stamped, faxed or copied signatures will not be accepted"). To produce a
genuine wet-ink signature from a signature captured online, we redraw the
provider's own stroke paths onto the printed form with a pen mounted on a 3-axis
motion system (a Creality CR-10 V2 here, but any Marlin/GRBL machine works).
This module is HARDWARE-INDEPENDENT and fully testable without a plotter:
strokes (normalized 0..1) <- esign_records.signature_vector
| fit into the signature anchor box (PDF points)
v
paper-space coordinates (mm) <- via PlotterConfig (bed origin + jig)
|
+--> emit_gcode() -> Marlin/GRBL G-code for the CR-10 (Z = pen lift)
+--> render_preview_svg()/_overlay_pdf() -> verify the strokes land on
the cert line WITHOUT any hardware (this is our correctness test)
Coordinate frames
-----------------
* Capture box: signature pad canvas, origin top-left, x/y in [0,1].
* PDF: points, origin bottom-left (matches reportlab + our signature anchors).
* Plotter bed: millimetres, origin at the machine's homed corner (front-left for
a CR-10). A paper jig fixes the sheet at a known offset from that corner, so
PDF (0,0) of the sheet maps to (jig_x_mm, jig_y_mm) on the bed.
The signature anchor box (x, y, w, h, page_w, page_h in PDF points) tells us
exactly where on the page the signature must land — the same anchors the digital
stamper consumes. We fit the captured strokes into that box (preserving aspect),
then translate everything into bed mm using the jig offset.
"""
from __future__ import annotations
import io
import math
from dataclasses import dataclass, field
from typing import Any
PT_PER_INCH = 72.0
MM_PER_INCH = 25.4
PT_TO_MM = MM_PER_INCH / PT_PER_INCH # 0.352777...
@dataclass(frozen=True)
class PlotterConfig:
"""Physical configuration of the pen plotter + paper jig.
Defaults target a Creality CR-10 V2 (300x300x400 bed) with a US-Letter sheet
held by a corner jig at the front-left of the bed. Tune jig_x_mm / jig_y_mm
and pen heights during calibration with the test-box routine.
Two control dialects are supported:
- "marlin" (default): Marlin/GRBL G-code in millimetres, pen up/down via Z
or M280 servo. For the CR-10 and AxiDraw-class machines.
- "lineus": the Line-us palm-size drawing arm. It speaks its own GCode in
its OWN units over a small, fan-shaped reach envelope, with the
pen height encoded as Z in each G01. See LineUsConfig.
The internal geometry pipeline always works in millimetres; the emitter for
each dialect converts at the end.
"""
# Control dialect: "marlin" or "lineus".
dialect: str = "marlin"
# Human label for the profile (e.g. "cr10", "axidraw", "lineus").
name: str = "cr10"
# Where PDF (0,0) — the sheet's bottom-left corner — sits on the bed, in mm
# from the machine's homed origin.
jig_x_mm: float = 20.0
jig_y_mm: float = 20.0
# Pen Z heights (mm). pen_down should be the height at which the pen touches
# paper; with a spring-loaded holder a small over-press is safe.
pen_up_mm: float = 3.0
pen_down_mm: float = 0.0
# Feed rates (mm/min).
travel_feed: float = 3000.0 # pen-up moves
draw_feed: float = 1200.0 # pen-down (drawing) moves
z_feed: float = 600.0 # pen raise/lower
# Bed safety envelope (mm). Emitter raises if a point would exceed these.
bed_max_x_mm: float = 300.0
bed_max_y_mm: float = 300.0
# Resampling: drop points closer than this (mm) to keep G-code compact and
# the pen smooth.
min_segment_mm: float = 0.3
# Pen-up/down via BLTouch/servo instead of Z (set servo_pen=True to emit
# M280 deploy/stow instead of G1 Z moves — useful if you mount the pen to a
# servo or repurpose the BLTouch as the actuator).
servo_pen: bool = False
servo_index: int = 0
servo_down_angle: int = 10
servo_up_angle: int = 90
# Line-us mapping (only used when dialect == "lineus").
lineus: "LineUsConfig | None" = None
# Pen-specific tuning (ink flow, dwell, line look). See PenProfile / PENS.
pen: "PenProfile | None" = None
@dataclass(frozen=True)
class PenProfile:
"""Per-pen tuning for a writing instrument mounted in the plotter.
Different pens lay ink differently when the plotter drops the tip STRAIGHT
DOWN (no human wrist angle/pressure modulation). Wet gel/rollerball pens read
as a genuine signature but need (a) a short dwell at pen-down so the ink wets
the paper before the first move (avoids a starting gap), and (b) a slower draw
feed so the line lays down evenly without skipping on fast reversals. These
values are applied by the emitters on top of the machine PlotterConfig.
Recommended default for legal filings: uni-ball Signo (UM-151 0.38mm) —
archival, waterproof, fraud-resistant gel ink that reads as original wet ink.
pen_down_dwell_ms : pause after the pen contacts paper, before the first
drawing move, to let wet ink start cleanly (G4 dwell).
pen_up_dwell_ms : brief pause after lifting (lets gel/rollerball tips
"snap" off cleanly without a tail). Usually small/0.
draw_feed : per-pen drawing feed (mm/min). Overrides the machine
default when set; wet pens like ~600-900.
pen_down_bias_mm : small extra downward offset (mm) added to pen_down so a
spring holder keeps positive contact for this tip.
tip_mm : nominal tip width (mm), informational / preview only.
"""
name: str = "uniball-signo"
label: str = "uni-ball Signo UM-151 0.38 (gel)"
pen_down_dwell_ms: int = 120
pen_up_dwell_ms: int = 0
draw_feed: float = 750.0
pen_down_bias_mm: float = 0.0
tip_mm: float = 0.38
# Tuned presets for the pens we keep on hand. Default = uni-ball Signo.
PENS: dict[str, PenProfile] = {
"uniball-signo": PenProfile(
name="uniball-signo",
label="uni-ball Signo UM-151 0.38 (gel, archival/waterproof)",
pen_down_dwell_ms=120, pen_up_dwell_ms=0, draw_feed=750.0,
pen_down_bias_mm=0.0, tip_mm=0.38,
),
"pilot-g2": PenProfile(
name="pilot-g2",
label="Pilot G-2 0.7 (gel, hand-written look)",
# G-2 can blob on a long pen-down dwell; keep dwell short.
pen_down_dwell_ms=80, pen_up_dwell_ms=0, draw_feed=800.0,
pen_down_bias_mm=0.0, tip_mm=0.7,
),
"energel": PenProfile(
name="energel",
label="Pentel EnerGel 0.5 (liquid-gel, fast-dry batch)",
# Liquid-gel starts instantly and dries fast — minimal dwell, faster feed.
pen_down_dwell_ms=60, pen_up_dwell_ms=0, draw_feed=900.0,
pen_down_bias_mm=0.0, tip_mm=0.5,
),
"fountain": PenProfile(
name="fountain",
label="Fountain pen + document ink (max ink character)",
# Fountain nibs need a touch more settle time and a gentle, slower line;
# add a small downward bias so the nib keeps consistent flow.
pen_down_dwell_ms=180, pen_up_dwell_ms=40, draw_feed=600.0,
pen_down_bias_mm=-0.1, tip_mm=0.5,
),
}
DEFAULT_PEN = "uniball-signo"
def load_pen(name: str | None) -> PenProfile:
"""Return a PenProfile for a named pen (default uni-ball Signo)."""
key = (name or DEFAULT_PEN).lower()
if key not in PENS:
raise ValueError(f"unknown pen '{name}'; choices: {sorted(PENS)}")
return PENS[key]
@dataclass(frozen=True)
class LineUsConfig:
"""Maps bed millimetres to the Line-us arm's native coordinate space.
The Line-us is a small 2-link arm that draws with a real pen. It accepts
GCode (G01 X Y Z, G28 home) but in its OWN units over a fan-shaped reach,
NOT millimetres, and the pen height is the Z value in each move (low Z =
pen down, high Z = pen up). Its usable area is roughly a half-disc in front
of the shoulder pivot.
We model the reachable area as: distance from the shoulder pivot (in Line-us
units) must lie within [reach_min, reach_max], and we affine-map bed mm to
Line-us units with a uniform scale + origin. Because the work area is small,
the paper jig must place the signature cell inside reach — check_reach()
verifies this BEFORE plotting and the jig calculator helps position the form.
Defaults follow the documented Line-us envelope (X ~700..1775, Y ~-1000..1000,
Z pen 0=down .. 1000=up). units_per_mm is the conversion derived from the
arm's ~A6 usable width.
"""
units_per_mm: float = 13.0 # Line-us units per millimetre on the page
origin_x_units: float = 1000.0 # Line-us X where the jig's bed-origin sits
origin_y_units: float = 0.0 # Line-us Y where the jig's bed-origin sits
pen_down_z: int = 0 # Z value for pen on paper
pen_up_z: int = 1000 # Z value for pen lifted
# Reach envelope, measured from the shoulder pivot at Line-us (0,0), in units.
shoulder_x_units: float = 0.0
shoulder_y_units: float = 0.0
reach_min_units: float = 700.0
reach_max_units: float = 2000.0
feed: int = 0 # Line-us ignores F; kept for parity
@dataclass
class PlannedStroke:
"""A single pen-down polyline in bed mm (one continuous pen contact)."""
points_mm: list[tuple[float, float]] = field(default_factory=list)
# ── Built-in machine profiles ────────────────────────────────────────────────
#
# Named presets so the home CR-10 and a portable Line-us are both first-class.
# Geometry/jig values are starting points; calibrate per device with --test-box.
def _profile_cr10() -> PlotterConfig:
"""Home station: Creality CR-10 V2 with a corner jig (Marlin, Z pen-lift)."""
return PlotterConfig(dialect="marlin", name="cr10")
def _profile_axidraw() -> PlotterConfig:
"""Portable A4 pen plotter (AxiDraw/iDraw class): Marlin/GRBL with a servo lift."""
return PlotterConfig(
dialect="marlin", name="axidraw",
bed_max_x_mm=210.0, bed_max_y_mm=297.0,
servo_pen=True, servo_up_angle=70, servo_down_angle=30,
jig_x_mm=10.0, jig_y_mm=10.0,
)
def _profile_lineus() -> PlotterConfig:
"""Pocket station: Line-us folding pen-arm (own GCode/units, small reach).
The arm's reach is small, so the jig must place the signature cell in the
sweet spot — use compute_jig_offset_for_box() (the CLI --solve-jig does this)
to get jig_x_mm/jig_y_mm for a given form before plotting.
"""
return PlotterConfig(
dialect="lineus", name="lineus",
# mm bookkeeping is still used internally; reach is enforced in Line-us units.
bed_max_x_mm=150.0, bed_max_y_mm=150.0,
jig_x_mm=0.0, jig_y_mm=0.0,
lineus=LineUsConfig(),
)
PROFILES: dict[str, "callable[[], PlotterConfig]"] = {
"cr10": _profile_cr10,
"axidraw": _profile_axidraw,
"lineus": _profile_lineus,
}
def load_profile(name: str) -> PlotterConfig:
"""Return a PlotterConfig for a named profile (cr10 | axidraw | lineus)."""
key = (name or "cr10").lower()
if key not in PROFILES:
raise ValueError(f"unknown plotter profile '{name}'; choices: {sorted(PROFILES)}")
return PROFILES[key]()
# ── Geometry: strokes (0..1) -> fitted PDF points -> bed mm ──────────────────
def _vector_bbox(strokes: list[list[dict]]) -> tuple[float, float, float, float]:
"""Bounding box (minx, miny, maxx, maxy) of normalized strokes."""
xs = [p["x"] for s in strokes for p in s]
ys = [p["y"] for s in strokes for p in s]
if not xs:
return 0.0, 0.0, 1.0, 1.0
return min(xs), min(ys), max(xs), max(ys)
def fit_strokes_to_box(
vector: dict,
box: dict,
*,
h_pad_frac: float = 0.04,
v_pad_frac: float = 0.12,
baseline_lift_pt: float = 1.5,
) -> list[list[tuple[float, float]]]:
"""Fit normalized capture strokes into the anchor box, in PDF points.
Returns a list of strokes; each stroke is a list of (x_pt, y_pt) with the
PDF convention (origin bottom-left). Aspect ratio is preserved, the ink is
left-aligned and rests just above the box bottom (the signature rule), and
the capture's top-left origin is flipped to PDF's bottom-left.
"""
strokes = (vector or {}).get("strokes") or []
if not strokes:
return []
minx, miny, maxx, maxy = _vector_bbox(strokes)
src_w = max(maxx - minx, 1e-6)
src_h = max(maxy - miny, 1e-6)
bx, by = float(box["x"]), float(box["y"])
bw, bh = float(box["w"]), float(box["h"])
avail_w = bw * (1.0 - 2 * h_pad_frac)
avail_h = bh * (1.0 - 2 * v_pad_frac)
# Scale to fit, preserving aspect ratio.
scale = min(avail_w / src_w, avail_h / src_h)
drawn_w = src_w * scale
drawn_h = src_h * scale
# Left-aligned within the box; bottom-anchored just above the rule.
off_x = bx + bw * h_pad_frac
off_y = by + baseline_lift_pt + (avail_h - drawn_h) * 0.0 # rest on baseline
out: list[list[tuple[float, float]]] = []
for s in strokes:
pts: list[tuple[float, float]] = []
for p in s:
nx = (p["x"] - minx) * scale
ny = (p["y"] - miny) * scale
x_pt = off_x + nx
# Flip Y: capture origin is top-left, PDF origin is bottom-left.
y_pt = off_y + (drawn_h - ny)
pts.append((x_pt, y_pt))
if pts:
out.append(pts)
return out
def pdf_points_to_bed_mm(
strokes_pt: list[list[tuple[float, float]]],
cfg: PlotterConfig,
) -> list[PlannedStroke]:
"""Translate fitted PDF-point strokes into bed millimetres via the jig offset."""
planned: list[PlannedStroke] = []
for s in strokes_pt:
mm = [
(cfg.jig_x_mm + x_pt * PT_TO_MM, cfg.jig_y_mm + y_pt * PT_TO_MM)
for (x_pt, y_pt) in s
]
mm = _resample(mm, cfg.min_segment_mm)
if mm:
planned.append(PlannedStroke(points_mm=mm))
return planned
def _resample(points: list[tuple[float, float]], min_seg_mm: float) -> list[tuple[float, float]]:
"""Drop points closer than min_seg_mm to the previous kept point."""
if not points:
return []
kept = [points[0]]
for p in points[1:]:
lx, ly = kept[-1]
if math.hypot(p[0] - lx, p[1] - ly) >= min_seg_mm:
kept.append(p)
if len(kept) == 1 and len(points) > 1:
kept.append(points[-1])
return kept
def plan_signature(vector: dict, box: dict, cfg: PlotterConfig) -> list[PlannedStroke]:
"""Full geometry pipeline: normalized strokes + anchor box -> bed-mm strokes."""
fitted = fit_strokes_to_box(vector, box)
return pdf_points_to_bed_mm(fitted, cfg)
# ── Line-us coordinate mapping + reach checking ──────────────────────────────
#
# The Line-us arm has a small, fan-shaped reach. Bed-mm coordinates (the same
# internal representation every dialect uses) are affine-mapped to Line-us units
# with a uniform scale + origin, then reach-checked against the shoulder pivot.
def bed_mm_to_lineus_units(x_mm: float, y_mm: float, lu: LineUsConfig) -> tuple[float, float]:
"""Map a bed-mm point to Line-us native units."""
return (
lu.origin_x_units + x_mm * lu.units_per_mm,
lu.origin_y_units + y_mm * lu.units_per_mm,
)
def _lineus_reach_ok(ux: float, uy: float, lu: LineUsConfig) -> bool:
"""True if a Line-us-unit point lies within the arm's reach annulus."""
d = math.hypot(ux - lu.shoulder_x_units, uy - lu.shoulder_y_units)
return lu.reach_min_units <= d <= lu.reach_max_units
def check_reach(planned: list[PlannedStroke], cfg: PlotterConfig) -> dict:
"""Verify EVERY planned point is reachable by the configured machine.
For "lineus" this checks the fan-shaped reach annulus; for "marlin" it checks
the rectangular bed envelope. Returns a report dict:
{ ok, dialect, total_points, out_of_reach, first_bad, min_d, max_d }
Call this BEFORE plotting so a tiny machine never tries to draw where it
cannot reach (which would distort the signature).
"""
pts = [(x, y) for s in planned for (x, y) in s.points_mm]
total = len(pts)
if cfg.dialect == "lineus":
lu = cfg.lineus or LineUsConfig()
bad = 0
first_bad: tuple[float, float] | None = None
dists: list[float] = []
for (x_mm, y_mm) in pts:
ux, uy = bed_mm_to_lineus_units(x_mm, y_mm, lu)
d = math.hypot(ux - lu.shoulder_x_units, uy - lu.shoulder_y_units)
dists.append(d)
if not (lu.reach_min_units <= d <= lu.reach_max_units):
bad += 1
if first_bad is None:
first_bad = (round(x_mm, 2), round(y_mm, 2))
return {
"ok": bad == 0,
"dialect": "lineus",
"total_points": total,
"out_of_reach": bad,
"first_bad": first_bad,
"min_d": round(min(dists), 1) if dists else None,
"max_d": round(max(dists), 1) if dists else None,
"reach": [lu.reach_min_units, lu.reach_max_units],
}
# marlin: rectangular bed
bad = 0
first_bad = None
for (x, y) in pts:
if x < 0 or y < 0 or x > cfg.bed_max_x_mm or y > cfg.bed_max_y_mm:
bad += 1
if first_bad is None:
first_bad = (round(x, 2), round(y, 2))
return {
"ok": bad == 0,
"dialect": "marlin",
"total_points": total,
"out_of_reach": bad,
"first_bad": first_bad,
"bed": [cfg.bed_max_x_mm, cfg.bed_max_y_mm],
}
def compute_jig_offset_for_box(
box: dict,
cfg: PlotterConfig,
*,
vector: dict | None = None,
margin_units: float = 50.0,
) -> dict:
"""For a small machine, find a jig offset that brings the signature into reach.
The signature box (PDF points) defines where ink must land on the sheet. On a
small arm we cannot reach an arbitrary page location, so we slide the paper
(i.e. choose jig_x_mm/jig_y_mm) until the signature sits in the sweet spot of
the reach annulus. This returns the recommended jig offset (mm) plus whether
the whole signature then fits.
When ``vector`` is given, we solve + reach-check against the ACTUAL fitted ink
extent (left-aligned, sub-cell) rather than the full cell rectangle. A full
cell line (e.g. CMS-855 spans ~422pt) is far wider than the real signature, so
using the ink extent is both more correct and far more likely to fit a small
arm. When ``vector`` is None we fall back to the whole-cell corners.
Strategy: aim the CENTRE of the (ink or cell) bbox at the mid-radius of the
reach annulus, straight ahead of the shoulder (uy = shoulder_y). Solve for the
jig offset that places it there, then reach-check the bbox corners.
"""
if cfg.dialect != "lineus":
return {"ok": True, "note": "Reach is rectangular; no jig solve needed.",
"jig_x_mm": cfg.jig_x_mm, "jig_y_mm": cfg.jig_y_mm}
lu = cfg.lineus or LineUsConfig()
# Determine the target bbox (PDF points): the actual ink extent if we have the
# vector, otherwise the full cell.
if vector and (vector.get("strokes")):
fitted = fit_strokes_to_box(vector, box)
ink_pts = [p for s in fitted for p in s]
if ink_pts:
bx = min(p[0] for p in ink_pts)
by = min(p[1] for p in ink_pts)
bw = max(p[0] for p in ink_pts) - bx
bh = max(p[1] for p in ink_pts) - by
else:
bx, by = float(box["x"]), float(box["y"])
bw, bh = float(box["w"]), float(box["h"])
else:
bx, by = float(box["x"]), float(box["y"])
bw, bh = float(box["w"]), float(box["h"])
# Centre of the target bbox in PDF points -> the bed-mm point we want at the
# annulus mid-radius, directly ahead of the shoulder.
cx_pt = bx + bw / 2.0
cy_pt = by + bh / 2.0
cx_mm_in_sheet = cx_pt * PT_TO_MM
cy_mm_in_sheet = cy_pt * PT_TO_MM
mid_r = (lu.reach_min_units + lu.reach_max_units) / 2.0
target_ux = lu.shoulder_x_units + mid_r
target_uy = lu.shoulder_y_units
# We need: origin + (jig + sheet_offset)*units_per_mm = target_units.
# => jig_mm = (target_units - origin_units)/units_per_mm - sheet_offset_mm
new_jig_x = (target_ux - lu.origin_x_units) / lu.units_per_mm - cx_mm_in_sheet
new_jig_y = (target_uy - lu.origin_y_units) / lu.units_per_mm - cy_mm_in_sheet
test_cfg = _replace_cfg(cfg, jig_x_mm=new_jig_x, jig_y_mm=new_jig_y)
# Reach-check: against the real fitted ink if we have it, else the cell corners.
if vector and (vector.get("strokes")):
check_planned = plan_signature(vector, box, test_cfg)
else:
corners_pt = [(bx, by), (bx + bw, by), (bx + bw, by + bh), (bx, by + bh)]
check_planned = pdf_points_to_bed_mm([corners_pt], test_cfg)
rep = check_reach(check_planned, test_cfg)
return {
"ok": rep["ok"],
"jig_x_mm": round(new_jig_x, 2),
"jig_y_mm": round(new_jig_y, 2),
"reach_report": rep,
"note": (
"Cell fits in reach with this jig offset."
if rep["ok"]
else "Cell does NOT fully fit; the machine's reach is too small for this "
"signature-cell size — reduce the cell or use a larger machine."
),
}
def _replace_cfg(cfg: PlotterConfig, **changes: Any) -> PlotterConfig:
"""Return a copy of cfg with fields overridden (frozen dataclass helper)."""
from dataclasses import replace
return replace(cfg, **changes)
# ── G-code emission (Marlin/GRBL) ────────────────────────────────────────────
def emit_gcode(planned: list[PlannedStroke], cfg: PlotterConfig) -> str:
"""Emit machine code for the configured dialect.
Dispatches to the Marlin/GRBL emitter (CR-10, AxiDraw) or the Line-us
emitter. Both consume the same bed-mm planned strokes; the per-dialect
converter runs at the end.
"""
if cfg.dialect == "lineus":
return emit_lineus_gcode(planned, cfg)
return emit_marlin_gcode(planned, cfg)
def emit_marlin_gcode(planned: list[PlannedStroke], cfg: PlotterConfig) -> str:
"""Emit Marlin/GRBL G-code that draws the planned strokes with the pen.
Assumes the machine is homed (G28) so (0,0) is the bed corner; the jig offset
is already baked into the planned coordinates. Pen up/down is done via Z
moves (default) or a servo/BLTouch M280 (cfg.servo_pen).
"""
out: list[str] = []
w = out.append
def pen_up():
if cfg.servo_pen:
w(f"M280 P{cfg.servo_index} S{cfg.servo_up_angle} ; pen up")
w("G4 P150")
else:
w(f"G1 Z{cfg.pen_up_mm:.2f} F{cfg.z_feed:.0f} ; pen up")
def pen_down():
if cfg.servo_pen:
w(f"M280 P{cfg.servo_index} S{cfg.servo_down_angle} ; pen down")
w("G4 P150")
else:
w(f"G1 Z{cfg.pen_down_mm:.2f} F{cfg.z_feed:.0f} ; pen down")
over_bed = any(
x > cfg.bed_max_x_mm or y > cfg.bed_max_y_mm or x < 0 or y < 0
for s in planned for (x, y) in s.points_mm
)
w("; --- Performance West ink-signature plot ---")
w("; CR-10 V2 / Marlin (or GRBL); machine assumed homed (G28) before this.")
if over_bed:
w("; !! WARNING: some points fall outside the configured bed envelope.")
w("G21 ; mm")
w("G90 ; absolute positioning")
pen_up()
for s in planned:
if not s.points_mm:
continue
x0, y0 = s.points_mm[0]
w(f"G0 X{x0:.3f} Y{y0:.3f} F{cfg.travel_feed:.0f} ; travel to stroke start")
pen_down()
for (x, y) in s.points_mm[1:]:
w(f"G1 X{x:.3f} Y{y:.3f} F{cfg.draw_feed:.0f}")
pen_up()
w("G0 X0 Y0 F{:.0f} ; park".format(cfg.travel_feed))
w("; --- end plot ---")
return "\n".join(out) + "\n"
def emit_lineus_gcode(planned: list[PlannedStroke], cfg: PlotterConfig) -> str:
"""Emit Line-us GCode for the planned strokes.
The Line-us speaks a GCode subset in its OWN units, where the pen height is
the Z value of each move (low Z = down, high Z = up). It acks each command
with "ok". We convert bed-mm to Line-us units, raise the pen between strokes,
and emit one G01 per point. A leading comment carries a reach warning if any
point falls outside the arm's envelope (call check_reach() to gate plotting).
"""
lu = cfg.lineus or LineUsConfig()
out: list[str] = []
w = out.append
def to_units(x_mm: float, y_mm: float) -> tuple[float, float]:
return bed_mm_to_lineus_units(x_mm, y_mm, lu)
report = check_reach(planned, cfg)
w("; --- Performance West ink-signature plot (Line-us) ---")
w("; Line-us native GCode; pen height is the Z value of each move.")
if not report["ok"]:
w(f"; !! WARNING: {report['out_of_reach']} of {report['total_points']} points "
f"fall outside the arm reach {report.get('reach')}. Re-jig the form "
f"(compute_jig_offset_for_box) before plotting.")
w("G28 ; home the arm")
# Start pen up at the first stroke's start to avoid a dragging line.
for s in planned:
if not s.points_mm:
continue
x0, y0 = s.points_mm[0]
ux0, uy0 = to_units(x0, y0)
# travel with pen up, then drop pen
w(f"G01 X{ux0:.0f} Y{uy0:.0f} Z{lu.pen_up_z:d} ; travel to stroke start (pen up)")
w(f"G01 X{ux0:.0f} Y{uy0:.0f} Z{lu.pen_down_z:d} ; pen down")
for (x, y) in s.points_mm[1:]:
ux, uy = to_units(x, y)
w(f"G01 X{ux:.0f} Y{uy:.0f} Z{lu.pen_down_z:d}")
# lift pen at end of stroke
lx, ly = s.points_mm[-1]
ulx, uly = to_units(lx, ly)
w(f"G01 X{ulx:.0f} Y{uly:.0f} Z{lu.pen_up_z:d} ; pen up")
# Park: pen up, return to a safe rest position ahead of the shoulder.
park_x = lu.shoulder_x_units + (lu.reach_min_units + lu.reach_max_units) / 2.0
w(f"G01 X{park_x:.0f} Y{lu.shoulder_y_units:.0f} Z{lu.pen_up_z:d} ; park")
w("; --- end plot ---")
return "\n".join(out) + "\n"
def emit_test_box_gcode(cfg: PlotterConfig, box: dict) -> str:
"""Emit G-code that draws the OUTLINE of the signature box (no signature).
Use this during calibration: tape a blank sheet in the jig, run this, and
confirm the rectangle lands exactly on the form's signature line. Adjust
jig_x_mm/jig_y_mm/pen heights until it does.
"""
bx, by = float(box["x"]), float(box["y"])
bw, bh = float(box["w"]), float(box["h"])
corners_pt = [(bx, by), (bx + bw, by), (bx + bw, by + bh), (bx, by + bh), (bx, by)]
planned = pdf_points_to_bed_mm([corners_pt], cfg)
return emit_gcode(planned, cfg)
# ── Serial sender (Marlin/GRBL over USB) — gated, dry-run safe ───────────────
def send_gcode_serial(
gcode: str,
port: str = "/dev/ttyUSB0",
baud: int = 115200,
*,
dry_run: bool = True,
home_first: bool = False,
line_timeout: float = 30.0,
) -> dict:
"""Stream G-code to a Marlin/GRBL controller over USB serial.
Defaults to ``dry_run=True`` so it never moves hardware unless explicitly
enabled — call with dry_run=False only when a sheet is loaded in the jig.
Marlin acks each line with "ok"; we wait for it before sending the next line
(simple, reliable flow control). Returns a summary dict.
"""
lines = [ln for ln in (l.strip() for l in gcode.splitlines())
if ln and not ln.startswith(";")]
if home_first:
lines = ["G28"] + lines
if dry_run:
return {
"sent": False, "dry_run": True, "port": port,
"lines": len(lines),
"note": "DRY RUN — no serial I/O. Set dry_run=False to plot.",
}
try:
import serial # pyserial
except ImportError as exc: # pragma: no cover
raise RuntimeError("pyserial is required to send G-code (pip install pyserial)") from exc
import time
ser = serial.Serial(port, baud, timeout=line_timeout)
try:
time.sleep(2.0) # board reset on connect
ser.reset_input_buffer()
sent = 0
for ln in lines:
ser.write((ln + "\n").encode("ascii"))
ser.flush()
# Wait for the controller's "ok" ack.
deadline = time.time() + line_timeout
while time.time() < deadline:
resp = ser.readline().decode("ascii", "ignore").strip().lower()
if resp.startswith("ok") or resp.startswith("done"):
break
if resp.startswith("error") or resp.startswith("!!"):
raise RuntimeError(f"controller error on '{ln}': {resp}")
sent += 1
return {"sent": True, "dry_run": False, "port": port, "lines": sent}
finally:
ser.close()
def send_lineus(
gcode: str,
*,
host: str = "line-us.local",
tcp_port: int = 1337,
serial_port: str | None = None,
baud: int = 115200,
dry_run: bool = True,
line_timeout: float = 30.0,
) -> dict:
"""Send Line-us GCode over WiFi (TCP 1337) or USB serial.
The Line-us connects over its own TCP socket (sends a "hello"/greeting on
connect) OR over USB serial, and acks each command with "ok". Defaults to
``dry_run=True`` so it never moves the arm unless explicitly enabled.
Pass serial_port to use USB instead of WiFi.
"""
lines = [ln for ln in (l.strip() for l in gcode.splitlines())
if ln and not ln.startswith(";")]
if dry_run:
return {
"sent": False, "dry_run": True,
"transport": "serial" if serial_port else "tcp",
"target": serial_port or f"{host}:{tcp_port}",
"lines": len(lines),
"note": "DRY RUN — no I/O. Set dry_run=False to plot.",
}
import time
def _wait_ok(read_line) -> None:
deadline = time.time() + line_timeout
while time.time() < deadline:
resp = read_line().decode("ascii", "ignore").strip().lower()
if resp.startswith("ok"):
return
if resp.startswith("error") or resp.startswith("!!"):
raise RuntimeError(f"Line-us error: {resp}")
raise RuntimeError("Line-us ack timeout")
if serial_port:
try:
import serial # pyserial
except ImportError as exc: # pragma: no cover
raise RuntimeError("pyserial required for USB (pip install pyserial)") from exc
ser = serial.Serial(serial_port, baud, timeout=line_timeout)
try:
time.sleep(2.0)
ser.reset_input_buffer()
sent = 0
for ln in lines:
ser.write((ln + "\n").encode("ascii"))
ser.flush()
_wait_ok(ser.readline)
sent += 1
return {"sent": True, "dry_run": False, "transport": "serial",
"target": serial_port, "lines": sent}
finally:
ser.close()
# WiFi: raw TCP socket.
import socket
sock = socket.create_connection((host, tcp_port), timeout=line_timeout)
sock.settimeout(line_timeout)
buf = b""
def _readline() -> bytes:
nonlocal buf
while b"\x00" not in buf and b"\n" not in buf:
chunk = sock.recv(256)
if not chunk:
break
buf += chunk
# Line-us terminates responses with a NUL byte.
sep = b"\x00" if b"\x00" in buf else b"\n"
if sep in buf:
line, _, buf = buf.partition(sep)
return line
line, buf = buf, b""
return line
try:
_readline() # consume the connect greeting
sent = 0
for ln in lines:
sock.sendall(ln.encode("ascii") + b"\x00")
_wait_ok(_readline)
sent += 1
return {"sent": True, "dry_run": False, "transport": "tcp",
"target": f"{host}:{tcp_port}", "lines": sent}
finally:
sock.close()
# ── Preview rendering (verify WITHOUT hardware) ──────────────────────────────
def render_preview_svg(
planned: list[PlannedStroke],
cfg: PlotterConfig,
*,
show_bed: bool = True,
) -> str:
"""Render the planned pen path as an SVG in bed mm (origin bottom-left).
Pen-down strokes are solid; pen-up travels are dashed grey. Lets us eyeball
the toolpath without a plotter.
"""
W = cfg.bed_max_x_mm
H = cfg.bed_max_y_mm
parts = [
f'<svg xmlns="http://www.w3.org/2000/svg" width="{W}mm" height="{H}mm" '
f'viewBox="0 0 {W} {H}">',
# Flip Y so bottom-left origin reads naturally.
f'<g transform="translate(0,{H}) scale(1,-1)">',
]
if show_bed:
parts.append(f'<rect x="0" y="0" width="{W}" height="{H}" fill="#fff" stroke="#ccc" stroke-width="0.5"/>')
prev_end: tuple[float, float] | None = None
for s in planned:
if not s.points_mm:
continue
if prev_end is not None:
x0, y0 = s.points_mm[0]
parts.append(
f'<line x1="{prev_end[0]:.2f}" y1="{prev_end[1]:.2f}" '
f'x2="{x0:.2f}" y2="{y0:.2f}" stroke="#bbb" stroke-width="0.2" '
f'stroke-dasharray="1,1"/>'
)
d = "M " + " L ".join(f"{x:.2f},{y:.2f}" for (x, y) in s.points_mm)
parts.append(f'<path d="{d}" fill="none" stroke="#10243f" stroke-width="0.6" stroke-linecap="round" stroke-linejoin="round"/>')
prev_end = s.points_mm[-1]
parts.append("</g></svg>")
return "\n".join(parts)
def render_signature_on_pdf(
pdf_bytes: bytes,
vector: dict,
anchors: list[dict],
*,
signer_field: str | None = None,
) -> bytes:
"""Stamp the VECTOR strokes onto the PDF at the anchor box(es).
This is the digital twin of what the pen will draw — same geometry path as
the plotter — so a visual diff against the real plotted sheet (or just an
eyeball of this PDF) proves the toolpath lands on the cert line. Returns a
flattened signed PDF.
"""
from pypdf import PdfReader, PdfWriter
from reportlab.pdfgen import canvas as rl_canvas
reader = PdfReader(io.BytesIO(pdf_bytes))
writer = PdfWriter()
# Group anchors by page.
by_page: dict[int, list[dict]] = {}
for a in anchors:
if signer_field and a.get("field") != signer_field:
continue
by_page.setdefault(int(a.get("page", 0)), []).append(a)
for i, page in enumerate(reader.pages):
page_anchors = by_page.get(i)
if page_anchors:
mb = page.mediabox
pw = float(mb.width)
ph = float(mb.height)
buf = io.BytesIO()
c = rl_canvas.Canvas(buf, pagesize=(pw, ph))
c.setStrokeColorRGB(0.06, 0.14, 0.25)
c.setLineWidth(1.3)
c.setLineCap(1)
c.setLineJoin(1)
for box in page_anchors:
# Scale anchor from authored page size to this page size.
sx = pw / (box.get("page_w") or pw)
sy = ph / (box.get("page_h") or ph)
scaled = {
"x": box["x"] * sx, "y": box["y"] * sy,
"w": box["w"] * sx, "h": box["h"] * sy,
}
for stroke in fit_strokes_to_box(vector, scaled):
if len(stroke) < 2:
continue
path = c.beginPath()
path.moveTo(*stroke[0])
for pt in stroke[1:]:
path.lineTo(*pt)
c.drawPath(path, stroke=1, fill=0)
c.save()
buf.seek(0)
overlay = PdfReader(buf)
page.merge_page(overlay.pages[0])
writer.add_page(page)
out = io.BytesIO()
writer.write(out)
return out.getvalue()
if __name__ == "__main__": # local demo: synth a signature, plan, preview, gcode
import json
# Synthetic cursive-ish signature (a few strokes), normalized 0..1.
def wave(n, x0, x1, y0, amp, ph):
return [{"x": x0 + (x1 - x0) * k / (n - 1),
"y": y0 + amp * math.sin(2 * math.pi * (k / (n - 1) * 2) + ph),
"t": k * 8} for k in range(n)]
vector = {"v": 1, "w": 600, "h": 160, "strokes": [
wave(40, 0.08, 0.45, 0.55, 0.18, 0.0),
wave(40, 0.50, 0.92, 0.55, 0.15, 1.2),
]}
box = {"field": "signer", "page": 4, "x": 64.0, "y": 471.0, "w": 402.0, "h": 19.0,
"page_w": 612.0, "page_h": 792.0}
cfg = PlotterConfig()
planned = plan_signature(vector, box, cfg)
print(f"planned strokes: {len(planned)}; points: {sum(len(s.points_mm) for s in planned)}")
with open("/tmp/ink_preview.svg", "w") as f:
f.write(render_preview_svg(planned, cfg))
gcode = emit_gcode(planned, cfg)
with open("/tmp/ink_signature.gcode", "w") as f:
f.write(gcode)
print("wrote /tmp/ink_preview.svg and /tmp/ink_signature.gcode")
print("first 12 gcode lines:")
print("\n".join(gcode.splitlines()[:12]))