new-site/scripts/workers/crypto_payment_worker.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

891 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Crypto payment orchestrator worker.
State machine per order (one row in `crypto_payment_jobs`):
received → sizing (compute vendor_obligations + needed_usd)
sizing → offramping (Coinbase Prime sell + wire)
offramping → funds_at_relay (matched deposit found via IMAP)
funds_at_relay → ready (filing_fee_reservations reserved)
ready → settled (Playwright flow confirmed vendor charge)
any → failed (5 attempts exhausted)
any → manual (provider degraded / kyc hold / underpayment)
Triggered by:
- SHKeeper webhook (insert job row, worker picks it up on next poll)
- Cron every 60s (advance stuck jobs, retry failed ones with backoff)
- Manual admin retry (POST /api/v1/admin/crypto-payments/:id/retry-offramp)
Usage:
python -m scripts.workers.crypto_payment_worker # poll once
python -m scripts.workers.crypto_payment_worker --loop # daemon
python -m scripts.workers.crypto_payment_worker --order=CO-XYZ # one order
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import os
import sys
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Optional
import psycopg2
import psycopg2.extras
from scripts.workers.crypto_offramp import (
KycHoldError,
OfframpError,
ProviderDegradedError,
SlippageExceededError,
)
from scripts.workers.crypto_offramp.bridge import BridgeOfframp
from scripts.workers.crypto_offramp.shkeeper_client import SHKeeperClient
from scripts.workers.crypto_offramp.sizer import (
SizingResult,
compute_obligations,
persist_obligations,
)
log = logging.getLogger("crypto_payment_worker")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
MAX_ATTEMPTS = 5
BACKOFF_SCHEDULE_SEC = [30, 120, 600, 3600, 21600] # 30s, 2m, 10m, 1h, 6h
RELAY_MEMO_PREFIX = os.environ.get("RELAY_BANK_MEMO_PREFIX", "PW-ORDER-")
DEPOSIT_MATCH_TOLERANCE_CENTS = 500 # $5
# Treasury mode — 'manual' parks every order in state='manual' after sizing,
# awaiting admin approval. 'auto' runs the full offramp pipeline. Default
# is 'manual' so we don't accidentally fire Bridge calls before Bridge is
# approved + credentials are loaded.
TREASURY_MODE = os.environ.get("CRYPTO_TREASURY_MODE", "manual").lower()
if TREASURY_MODE not in {"manual", "auto"}:
log.warning("CRYPTO_TREASURY_MODE=%r not recognized; defaulting to 'manual'", TREASURY_MODE)
TREASURY_MODE = "manual"
# ── DB helpers ──────────────────────────────────────────────────────────
def _connect():
return psycopg2.connect(DATABASE_URL)
def _dict_cur(conn):
return conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# ── Orchestrator core ──────────────────────────────────────────────────
class Orchestrator:
def __init__(
self,
offramp=None,
shkeeper: Optional[SHKeeperClient] = None,
):
# Offramp provider — default Bridge (Stripe). Duck-typed to accept
# any adapter that exposes prepare_transfer() + status() + quote()
# (or, for legacy adapters, deposit_address + execute).
self.offramp = offramp or BridgeOfframp()
self.shkeeper = shkeeper or SHKeeperClient()
async def run_once(self, order_id: Optional[str] = None) -> dict:
"""Advance every job ready to advance. Returns a counts dict."""
conn = _connect()
counts = {"advanced": 0, "errored": 0, "skipped": 0}
try:
with _dict_cur(conn) as cur:
if order_id:
cur.execute(
"SELECT * FROM crypto_payment_jobs WHERE order_id = %s",
(order_id,),
)
else:
# Pull anything not terminal. Respect next_retry_at if set.
cur.execute(
"""
SELECT * FROM crypto_payment_jobs
WHERE state NOT IN ('settled','failed','manual')
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
ORDER BY created_at ASC
LIMIT 50
"""
)
jobs = cur.fetchall()
for job in jobs:
try:
advanced = await self._advance(job, conn)
if advanced:
counts["advanced"] += 1
else:
counts["skipped"] += 1
except Exception as exc: # noqa: BLE001
counts["errored"] += 1
self._record_error(conn, job["order_id"], str(exc))
finally:
conn.close()
return counts
async def _advance(self, job: dict, conn) -> bool:
"""Dispatch on state. Returns True if state advanced."""
state = job["state"]
order_id = job["order_id"]
log.info("advancing order=%s state=%s", order_id, state)
if state == "received":
return await self._do_sizing(job, conn)
if state == "sizing":
return await self._do_offramp(job, conn)
if state == "offramping":
return await self._match_relay_deposit(job, conn)
if state == "funds_at_relay":
return await self._reserve_for_playwright(job, conn)
if state == "ready":
return await self._check_settlement(job, conn)
log.debug("order=%s in state=%s — nothing to do", order_id, state)
return False
# ── Phase: sizing (received → sizing → offramping) ──────────────────
async def _do_sizing(self, job: dict, conn) -> bool:
order_id = job["order_id"]
order_type = job["order_type"]
try:
result = compute_obligations(
order_id=order_id, order_type=order_type, conn=conn,
)
if result.needed_usd_cents <= 0 and not result.obligations:
# No vendor obligations — skip offramp, mark settled
# directly (e.g., DC agent with no external fees).
self._transition(
conn, order_id,
to_state="settled",
extra_sets={
"needed_usd_cents": 0,
"settled_at": "NOW()",
},
)
log.info("order=%s no vendor obligations — settled directly", order_id)
return True
# Underpayment check: SHKeeper balance_fiat < needed
if job["amount_usd_cents"] < result.needed_usd_cents:
self._transition(
conn, order_id,
to_state="manual",
extra_sets={
"needed_usd_cents": result.needed_usd_cents,
"last_error": (
f"underpayment: received ${job['amount_usd_cents']/100:.2f} "
f"but need ${result.needed_usd_cents/100:.2f}"
),
},
)
log.warning(
"order=%s underpayment — manual review (received=%s, need=%s)",
order_id, job["amount_usd_cents"], result.needed_usd_cents,
)
return True
persist_obligations(order_id, result, conn=conn)
# In manual treasury mode, park at 'manual' so admin can approve
# before any offramp/Bridge call fires. Compliance service runs
# independently (Playwright uses Relay card directly), so this
# only gates the treasury reconciliation side.
if TREASURY_MODE == "manual":
needed = result.needed_usd_cents
filing = result.filing_total_with_buffer_cents
commission = result.commission_total_cents
self._transition(
conn, order_id,
to_state="manual",
extra_sets={
"needed_usd_cents": needed,
"sized_at": "NOW()",
"last_error": (
f"awaiting admin approval (TREASURY_MODE=manual). "
f"Sized: ${needed/100:.2f} needed "
f"(filing ${filing/100:.2f} "
f"+ commission ${commission/100:.2f})."
),
},
)
log.info(
"order=%s sized + parked for admin (mode=manual, need $%.2f)",
order_id, needed / 100,
)
# ── Notify admin: manual treasury mode requires a bank transfer ──
# Email the admin with the exact amount to transfer from crypto
# proceeds into the Relay bank account so the filing card has
# enough balance to pay vendors.
self._send_manual_treasury_email(
order_id=order_id,
order_type=order_type,
coin=job["coin"],
received_usd=job["amount_usd_cents"] / 100,
needed_usd=needed / 100,
filing_usd=filing / 100,
commission_usd=commission / 100,
obligations=result.obligations,
)
return True
# Auto mode: advance to offramping
self._transition(
conn, order_id,
to_state="sizing",
extra_sets={
"needed_usd_cents": result.needed_usd_cents,
"sized_at": "NOW()",
},
)
return True
except Exception as exc: # noqa: BLE001
log.exception("sizing failed for %s: %s", order_id, exc)
raise
# ── Phase: offramp (sizing → offramping) ────────────────────────────
async def _do_offramp(self, job: dict, conn) -> bool:
"""Offramp phase via Bridge (RTP-native).
Bridge's model: create one transfer (atomic + idempotent), which
returns a deposit address. SHKeeper sends coins there; Bridge
automatically converts + settles USD to RelayFi via RTP.
Stable idempotency keys — NOT parameterized by attempt_count.
A retry produces the same keys so the ledger UNIQUE constraint
dedups and Bridge's Idempotency-Key header returns the same
transfer each call.
"""
order_id = job["order_id"]
coin = job["coin"]
amount_coin = Decimal(str(job["amount_coin"]))
needed_usd_cents = int(job["needed_usd_cents"])
bridge_key = f"{order_id}:offramp:bridge-transfer"
shkeeper_key = f"{order_id}:offramp:shkeeper-payout"
memo = f"{RELAY_MEMO_PREFIX}{order_id}"
try:
# Skip unsupported coins → manual review
if coin.upper() not in self.offramp.accepts_coins:
self._transition(
conn, order_id, to_state="manual",
extra_sets={"last_error": f"coin {coin} not supported by Bridge"},
)
log.warning("order=%s coin %s not supported — manual", order_id, coin)
return True
quote = await self.offramp.quote(amount_coin, coin)
# ── Step 1: create the Bridge transfer (idempotent) ──────
prior_bridge = self._find_ledger_by_key(conn, bridge_key)
if prior_bridge:
log.info(
"order=%s Bridge transfer already created (ref=%s) — reusing",
order_id, prior_bridge.get("provider_ref"),
)
bridge_transfer_id = prior_bridge.get("provider_ref") or ""
deposit_addr = (
(prior_bridge.get("raw_row") or {}).get("deposit_address", "")
if isinstance(prior_bridge.get("raw_row"), dict) else ""
)
# Fallback: re-call prepare_transfer (idempotent) to get the address
if not deposit_addr:
prep = await self.offramp.prepare_transfer(
target_usd_cents=needed_usd_cents,
from_coin=coin, rail="rtp", memo=memo,
idempotency_key=bridge_key,
)
deposit_addr = prep.deposit_address
bridge_transfer_id = prep.transfer_id
else:
prep = await self.offramp.prepare_transfer(
target_usd_cents=needed_usd_cents,
from_coin=coin, rail="rtp", memo=memo,
idempotency_key=bridge_key,
)
bridge_transfer_id = prep.transfer_id
deposit_addr = prep.deposit_address
# Write ledger 'offramp' row with the Bridge transfer id —
# this is the disposal event for 8949 cost basis.
self._ledger_insert(conn, {
"order_id": order_id,
"order_type": job["order_type"],
"coin": coin,
"movement_type": "offramp",
"amount_coin": None,
"amount_usd_cents": needed_usd_cents,
"fx_rate_usd": quote.fx_rate_usd,
"disposed_at": "NOW()",
"proceeds_usd_cents": needed_usd_cents,
"provider": "bridge",
"provider_ref": bridge_transfer_id,
"provider_status": "awaiting_funds",
"state": "pending",
"idempotency_key": bridge_key,
"notes": f"Bridge transfer created; awaiting {coin} deposit to {deposit_addr[:16]}",
})
# ── Step 2: SHKeeper sends coins to the Bridge deposit address ──
prior_shk = self._find_ledger_by_key(conn, shkeeper_key)
if prior_shk:
log.info(
"order=%s SHKeeper payout already recorded (ref=%s) — skipping duplicate move",
order_id, prior_shk.get("provider_ref"),
)
else:
# Compute coin amount to send (needed_usd / rate × 1.01 overhead
# for exchange fees + network fees). Cap at job.amount_coin.
rate = quote.fx_rate_usd if quote.fx_rate_usd > 0 else Decimal("1")
target_coin = min(
(Decimal(needed_usd_cents) / Decimal("100"))
/ rate * Decimal("1.01"),
amount_coin,
)
log.info(
"order=%s sending %s %s → Bridge deposit %s (for $%.2f)",
order_id, target_coin, coin, deposit_addr, needed_usd_cents / 100,
)
payout = await self.shkeeper.payout(
coin=coin, destination=deposit_addr, amount=target_coin,
)
self._ledger_insert(conn, {
"order_id": order_id,
"order_type": job["order_type"],
"coin": coin,
"movement_type": "swap",
"amount_coin": -target_coin,
"amount_usd_cents": -needed_usd_cents,
"fx_rate_usd": quote.fx_rate_usd,
"provider": "shkeeper",
"provider_ref": payout.withdraw_id,
"provider_status": "broadcast",
"state": "pending",
"idempotency_key": shkeeper_key,
"notes": f"SHKeeper payout to Bridge for offramp ({order_id})",
})
# ── Step 3: transition to offramping (worker will poll status) ──
self._transition(
conn, order_id,
to_state="offramping",
extra_sets={
"offramp_provider": "bridge",
"offramp_ref": bridge_transfer_id,
"offramping_at": "NOW()",
},
)
return True
except ProviderDegradedError as exc:
self._transition(conn, order_id, to_state="manual",
extra_sets={"last_error": f"provider_degraded: {exc}"})
log.warning("order=%s offramp parked — provider degraded", order_id)
return True
except KycHoldError as exc:
self._transition(conn, order_id, to_state="manual",
extra_sets={"last_error": f"kyc_hold: {exc}"})
log.warning("order=%s KYC hold — manual review", order_id)
return True
except SlippageExceededError as exc:
self._transition(conn, order_id, to_state="manual",
extra_sets={"last_error": f"slippage: {exc}"})
log.warning("order=%s slippage exceeded — manual review", order_id)
return True
except Exception as exc:
log.exception("offramp failed for %s: %s", order_id, exc)
raise
# ── Phase: match Relay deposit (offramping → funds_at_relay) ────────
async def _match_relay_deposit(self, job: dict, conn) -> bool:
"""Find a relay_deposits row that matches this job.
Match priority:
1. memo contains PW-ORDER-<order_id> (strongest signal)
2. amount within ±DEPOSIT_MATCH_TOLERANCE_CENTS AND source_kind='offramp_coinbase_prime'
"""
order_id = job["order_id"]
needed = int(job["needed_usd_cents"] or 0)
with _dict_cur(conn) as cur:
# Primary: memo match
cur.execute(
"""
SELECT id FROM relay_deposits
WHERE memo LIKE %s
AND processed = FALSE
ORDER BY detected_at ASC LIMIT 1
""",
(f"%{RELAY_MEMO_PREFIX}{order_id}%",),
)
row = cur.fetchone()
if not row:
# Fallback: amount + source_kind
cur.execute(
"""
SELECT id FROM relay_deposits
WHERE source_kind = 'offramp_coinbase_prime'
AND processed = FALSE
AND amount_cents BETWEEN %s AND %s
ORDER BY detected_at ASC LIMIT 1
""",
(needed - DEPOSIT_MATCH_TOLERANCE_CENTS,
needed + DEPOSIT_MATCH_TOLERANCE_CENTS),
)
row = cur.fetchone()
if not row:
# Not found yet — check provider status; may still be wiring.
# (Don't transition; just schedule the next retry.)
self._schedule_retry(conn, order_id, int(job["attempt_count"] or 0) + 1)
return False
deposit_id = row["id"]
cur.execute(
"UPDATE relay_deposits SET processed = TRUE, processed_at = NOW() WHERE id = %s",
(deposit_id,),
)
# Link + transition
self._ledger_insert(conn, {
"order_id": order_id,
"order_type": job["order_type"],
"coin": "USD",
"movement_type": "bank_deposit",
"amount_coin": None,
"amount_usd_cents": int(job["needed_usd_cents"]),
"provider": "relay",
"provider_ref": str(deposit_id),
"provider_status": "matched",
"state": "confirmed",
"idempotency_key": f"{order_id}:bank_deposit:{deposit_id}",
"notes": f"Relay deposit #{deposit_id} matched",
})
self._transition(conn, order_id,
to_state="funds_at_relay",
extra_sets={
"relay_deposit_id": deposit_id,
"funds_at_relay_at": "NOW()",
})
log.info("order=%s matched deposit #%s", order_id, deposit_id)
return True
# ── Phase: reserve for Playwright (funds_at_relay → ready) ──────────
async def _reserve_for_playwright(self, job: dict, conn) -> bool:
"""Flip vendor_obligations to 'reserved' + create/update
filing_fee_reservations rows so existing Playwright flow picks it up."""
order_id = job["order_id"]
with _dict_cur(conn) as cur:
cur.execute(
"""
SELECT id, amount_usd_cents FROM vendor_obligations
WHERE order_id = %s AND obligation_kind = 'filing_fee' AND status = 'pending'
""",
(order_id,),
)
filings = cur.fetchall()
total_filing_cents = sum(int(f["amount_usd_cents"]) for f in filings)
if total_filing_cents > 0:
# filing_fee_reservations has no unique constraint on
# order_id, so ON CONFLICT DO NOTHING is a no-op. Instead
# we check first and skip if a prior reservation exists.
cur.execute(
"""
SELECT id FROM filing_fee_reservations
WHERE order_id = %s AND status IN ('pending','reserved')
ORDER BY id ASC LIMIT 1
""",
(order_id,),
)
existing = cur.fetchone()
if existing:
res_id = existing["id"]
# Make sure it's marked reserved + linked to this deposit
cur.execute(
"""
UPDATE filing_fee_reservations
SET status = 'reserved',
relay_deposit_id = COALESCE(relay_deposit_id, %s),
reserved_at = COALESCE(reserved_at, NOW()),
amount_cents = %s
WHERE id = %s
""",
(job["relay_deposit_id"], total_filing_cents, res_id),
)
else:
cur.execute(
"""
INSERT INTO filing_fee_reservations
(order_id, order_type, amount_cents, status,
relay_deposit_id, reserved_at, created_at)
VALUES (%s, %s, %s, 'reserved', %s, NOW(), NOW())
RETURNING id
""",
(order_id, job["order_type"], total_filing_cents, job["relay_deposit_id"]),
)
res_id = cur.fetchone()["id"]
cur.execute(
"""
UPDATE vendor_obligations
SET status = 'reserved',
filing_fee_reservation_id = %s,
updated_at = NOW()
WHERE order_id = %s
AND obligation_kind = 'filing_fee'
AND status = 'pending'
""",
(res_id, order_id),
)
# Commission obligations just wait; `commission_ledger` flow
# handles their payout.
cur.execute(
"""
UPDATE vendor_obligations
SET status = 'reserved', updated_at = NOW()
WHERE order_id = %s
AND obligation_kind = 'commission'
AND status = 'pending'
""",
(order_id,),
)
self._transition(conn, order_id, to_state="ready")
log.info("order=%s funds reserved for playwright (%s cents)", order_id, total_filing_cents)
return True
# ── Phase: check settlement (ready → settled) ───────────────────────
async def _check_settlement(self, job: dict, conn) -> bool:
"""Transition 'ready''settled' once all filing-fee obligations
have been paid via the Playwright flow.
Three cases:
(a) Filing-fee reservations exist and are all spent/released → settled
(b) No filing-fee reservations exist but vendor_obligations has only
commission rows (commission-only orders). Those get paid by the
separate commission payout process; for the crypto treasury
pipeline's purposes, 'settled' means "funds arrived and
vendor-facing work is done." Transition settled.
(c) Otherwise (reservations exist but still pending) — hold.
"""
order_id = job["order_id"]
with _dict_cur(conn) as cur:
cur.execute(
"""
SELECT COUNT(*) AS n,
COUNT(*) FILTER (WHERE status IN ('spent', 'released')) AS done
FROM filing_fee_reservations
WHERE order_id = %s
""",
(order_id,),
)
r = cur.fetchone() or {"n": 0, "done": 0}
reservations_n = int(r["n"])
reservations_done = int(r["done"])
# Case (b): no reservations but we have at least one commission
# obligation → commission-only order → treasury pipeline done.
if reservations_n == 0:
cur.execute(
"""
SELECT COUNT(*) FILTER (WHERE obligation_kind = 'filing_fee') AS filing_n,
COUNT(*) FILTER (WHERE obligation_kind = 'commission') AS commission_n
FROM vendor_obligations
WHERE order_id = %s
""",
(order_id,),
)
ob = cur.fetchone() or {"filing_n": 0, "commission_n": 0}
filing_n = int(ob["filing_n"])
commission_n = int(ob["commission_n"])
if filing_n == 0:
# No filing obligations ever existed. Safe to settle.
self._transition(conn, order_id, to_state="settled",
extra_sets={"settled_at": "NOW()"})
log.info(
"order=%s settled (commission-only, %s commission obligation(s))",
order_id, commission_n,
)
return True
# Filing obligations exist but no reservation — likely racing
# with _reserve_for_playwright. Hold and re-poll next pass.
return False
# Case (a): reservations all done
if reservations_done == reservations_n:
self._transition(conn, order_id, to_state="settled",
extra_sets={"settled_at": "NOW()"})
log.info("order=%s settled", order_id)
return True
# Case (c): still pending
return False
# ── Helpers ─────────────────────────────────────────────────────────
def _find_ledger_by_key(self, conn, idempotency_key: str) -> Optional[dict]:
"""Return the ledger row with this idempotency_key, if any.
Used by the offramp retry guard: before calling SHKeeper or
Coinbase Prime, check whether a prior attempt already succeeded
at that external step. If so, reuse the prior result instead of
double-sending funds.
"""
with _dict_cur(conn) as cur:
cur.execute(
"SELECT * FROM crypto_payment_ledger WHERE idempotency_key = %s",
(idempotency_key,),
)
return cur.fetchone()
def _transition(
self, conn, order_id: str, *, to_state: str,
extra_sets: Optional[dict] = None,
):
extra_sets = extra_sets or {}
sets = ["state = %s", "updated_at = NOW()"]
params = [to_state]
for k, v in extra_sets.items():
if v == "NOW()":
sets.append(f"{k} = NOW()")
else:
sets.append(f"{k} = %s")
params.append(v)
# Reset retry state on successful transition. Only clear columns
# the caller didn't explicitly set — otherwise we'd get SQL error
# from duplicate column assignment (e.g., caller passed last_error
# when transitioning to 'manual').
sets.append("next_retry_at = NULL")
if "last_error" not in extra_sets:
sets.append("last_error = NULL")
params.append(order_id)
sql = f"UPDATE crypto_payment_jobs SET {', '.join(sets)} WHERE order_id = %s"
with conn.cursor() as cur:
cur.execute(sql, params)
conn.commit()
def _ledger_insert(self, conn, row: dict):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO crypto_payment_ledger (
order_id, order_type, coin, movement_type,
amount_coin, amount_usd_cents, fx_rate_usd,
basis_usd_cents, proceeds_usd_cents,
acquired_at, disposed_at,
provider, provider_ref, provider_status,
state, idempotency_key, notes
) VALUES (
%(order_id)s, %(order_type)s, %(coin)s, %(movement_type)s,
%(amount_coin)s, %(amount_usd_cents)s, %(fx_rate_usd)s,
%(basis_usd_cents)s, %(proceeds_usd_cents)s,
CASE WHEN %(acquired_at)s = 'NOW()' THEN NOW() ELSE NULL END,
CASE WHEN %(disposed_at)s = 'NOW()' THEN NOW() ELSE NULL END,
%(provider)s, %(provider_ref)s, %(provider_status)s,
%(state)s, %(idempotency_key)s, %(notes)s
)
ON CONFLICT (idempotency_key) DO NOTHING
""",
{
"order_id": row["order_id"],
"order_type": row["order_type"],
"coin": row["coin"],
"movement_type": row["movement_type"],
"amount_coin": row.get("amount_coin"),
"amount_usd_cents": row["amount_usd_cents"],
"fx_rate_usd": row.get("fx_rate_usd"),
"basis_usd_cents": row.get("basis_usd_cents"),
"proceeds_usd_cents": row.get("proceeds_usd_cents"),
"acquired_at": row.get("acquired_at", ""),
"disposed_at": row.get("disposed_at", ""),
"provider": row.get("provider"),
"provider_ref": row.get("provider_ref"),
"provider_status": row.get("provider_status"),
"state": row.get("state", "pending"),
"idempotency_key": row["idempotency_key"],
"notes": row.get("notes"),
},
)
conn.commit()
def _record_error(self, conn, order_id: str, err: str):
with conn.cursor() as cur:
cur.execute(
"""
UPDATE crypto_payment_jobs
SET last_error = %s,
attempt_count = attempt_count + 1,
updated_at = NOW()
WHERE order_id = %s
RETURNING attempt_count
""",
(err[:500], order_id),
)
row = cur.fetchone()
conn.commit()
attempt = int(row[0]) if row else 0
if attempt >= MAX_ATTEMPTS:
log.error("order=%s exhausted retries — marking failed", order_id)
self._transition(conn, order_id, to_state="failed",
extra_sets={"last_error": err[:500]})
else:
self._schedule_retry(conn, order_id, attempt)
def _send_manual_treasury_email(
self, *, order_id: str, order_type: str, coin: str,
received_usd: float, needed_usd: float,
filing_usd: float, commission_usd: float,
obligations: list,
):
"""Email admin when a crypto order is parked in manual mode.
The email tells the admin exactly how much USD to transfer from
crypto sale proceeds into the Relay bank account so the filing
card has enough balance for vendor payments.
"""
admin_email = os.environ.get(
"CRYPTO_SWEEP_ADMIN_EMAIL",
os.environ.get("ADMIN_EMAIL", "ops@performancewest.net"),
)
from_email = os.environ.get(
"FROM_EMAIL",
os.environ.get("SMTP_FROM", "notifications@performancewest.net"),
)
obligation_lines = []
for ob in obligations:
vendor = getattr(ob, "vendor", ob.get("vendor", "")) if isinstance(ob, dict) else ob.vendor
amount = getattr(ob, "amount_usd_cents", ob.get("amount_usd_cents", 0)) if isinstance(ob, dict) else ob.amount_usd_cents
kind = getattr(ob, "obligation_kind", ob.get("obligation_kind", "")) if isinstance(ob, dict) else ob.obligation_kind
obligation_lines.append(
f" - {vendor} ({kind}): ${amount/100:.2f}"
)
ob_text = "\n".join(obligation_lines) if obligation_lines else " (none computed)"
subject = f"Crypto order {order_id}: transfer ${needed_usd:.2f} to Relay for vendor payments"
body = f"""A crypto payment was received and sized. Treasury mode is MANUAL — you need to
transfer funds into the Relay bank account so the filing card can pay vendors.
Order: {order_id} ({order_type})
Coin: {coin}
Received: ${received_usd:.2f} USD equivalent
Amount needed at Relay: ${needed_usd:.2f}
Filing fees (with 10% buffer): ${filing_usd:.2f}
Commission reserve: ${commission_usd:.2f}
Vendor obligations:
{ob_text}
Action required:
1. Sell ${needed_usd:.2f} worth of {coin} on your exchange (Coinbase, Kraken, etc.)
2. Wire/ACH/RTP the USD to the RelayFi bank account
3. Once the deposit lands, the crypto_payment_worker will detect it and advance
the order to 'funds_at_relay''ready''settled'
Or use the admin panel to retry with auto-offramp:
POST /api/v1/admin/crypto-payments/{order_id}/retry-offramp
This email was sent because CRYPTO_TREASURY_MODE=manual.
When Bridge (Stripe) is approved, set CRYPTO_TREASURY_MODE=auto to
automate this process.
"""
try:
import smtplib
from email.mime.text import MIMEText
smtp_host = os.environ.get("SMTP_HOST", "mail.smtp2go.com")
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_user = os.environ.get("SMTP_USER", "")
smtp_pass = os.environ.get("SMTP_PASS", "")
if not smtp_user or not smtp_pass:
log.warning("SMTP not configured — skipping manual treasury email for %s", order_id)
return
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = from_email
msg["To"] = admin_email
with smtplib.SMTP(smtp_host, smtp_port) as server:
if smtp_port != 25:
server.starttls()
server.login(smtp_user, smtp_pass)
server.sendmail(from_email, [admin_email], msg.as_string())
log.info("Manual treasury email sent to %s for order %s ($%.2f)", admin_email, order_id, needed_usd)
except Exception as exc:
log.error("Failed to send manual treasury email for %s: %s", order_id, exc)
# Also create an ERPNext ToDo for visibility in the admin dashboard
try:
from scripts.workers.erpnext_client import ERPNextClient
ERPNextClient().create_resource("ToDo", {
"description": (
f"[crypto-treasury] {order_id}\n\n"
f"Transfer ${needed_usd:.2f} to Relay for vendor payments.\n"
f"Coin: {coin} | Received: ${received_usd:.2f}\n"
f"Filing: ${filing_usd:.2f} | Commission: ${commission_usd:.2f}\n\n"
f"Obligations:\n{ob_text}"
),
"priority": "High",
"role": "Accounting Advisor",
})
except Exception as exc:
log.error("Could not create treasury ToDo: %s", exc)
def _schedule_retry(self, conn, order_id: str, attempt: int):
backoff_idx = min(attempt, len(BACKOFF_SCHEDULE_SEC) - 1)
delay = BACKOFF_SCHEDULE_SEC[backoff_idx]
with conn.cursor() as cur:
cur.execute(
"UPDATE crypto_payment_jobs "
"SET next_retry_at = NOW() + (%s || ' seconds')::interval, "
" updated_at = NOW() "
"WHERE order_id = %s",
(str(delay), order_id),
)
conn.commit()
# ── CLI ─────────────────────────────────────────────────────────────────
async def _main():
parser = argparse.ArgumentParser()
parser.add_argument("--order", help="process one specific order_id")
parser.add_argument("--loop", action="store_true", help="run as daemon (60s poll)")
parser.add_argument("--interval", type=int, default=60, help="poll interval seconds")
args = parser.parse_args()
orch = Orchestrator()
if args.loop:
log.info("starting crypto_payment_worker loop (interval=%ss)", args.interval)
while True:
try:
counts = await orch.run_once()
log.info("pass complete: %s", counts)
except Exception as exc:
log.exception("run_once raised: %s", exc)
await asyncio.sleep(args.interval)
else:
counts = await orch.run_once(order_id=args.order)
log.info("done: %s", counts)
if __name__ == "__main__":
asyncio.run(_main())