Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
891 lines
38 KiB
Python
891 lines
38 KiB
Python
#!/usr/bin/env python3
|
||
"""Crypto payment orchestrator worker.
|
||
|
||
State machine per order (one row in `crypto_payment_jobs`):
|
||
received → sizing (compute vendor_obligations + needed_usd)
|
||
sizing → offramping (Coinbase Prime sell + wire)
|
||
offramping → funds_at_relay (matched deposit found via IMAP)
|
||
funds_at_relay → ready (filing_fee_reservations reserved)
|
||
ready → settled (Playwright flow confirmed vendor charge)
|
||
any → failed (5 attempts exhausted)
|
||
any → manual (provider degraded / kyc hold / underpayment)
|
||
|
||
Triggered by:
|
||
- SHKeeper webhook (insert job row, worker picks it up on next poll)
|
||
- Cron every 60s (advance stuck jobs, retry failed ones with backoff)
|
||
- Manual admin retry (POST /api/v1/admin/crypto-payments/:id/retry-offramp)
|
||
|
||
Usage:
|
||
python -m scripts.workers.crypto_payment_worker # poll once
|
||
python -m scripts.workers.crypto_payment_worker --loop # daemon
|
||
python -m scripts.workers.crypto_payment_worker --order=CO-XYZ # one order
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import sys
|
||
from datetime import datetime, timedelta
|
||
from decimal import Decimal
|
||
from typing import Optional
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
|
||
from scripts.workers.crypto_offramp import (
|
||
KycHoldError,
|
||
OfframpError,
|
||
ProviderDegradedError,
|
||
SlippageExceededError,
|
||
)
|
||
from scripts.workers.crypto_offramp.bridge import BridgeOfframp
|
||
from scripts.workers.crypto_offramp.shkeeper_client import SHKeeperClient
|
||
from scripts.workers.crypto_offramp.sizer import (
|
||
SizingResult,
|
||
compute_obligations,
|
||
persist_obligations,
|
||
)
|
||
|
||
log = logging.getLogger("crypto_payment_worker")
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
handlers=[logging.StreamHandler(sys.stdout)],
|
||
)
|
||
|
||
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
||
MAX_ATTEMPTS = 5
|
||
BACKOFF_SCHEDULE_SEC = [30, 120, 600, 3600, 21600] # 30s, 2m, 10m, 1h, 6h
|
||
RELAY_MEMO_PREFIX = os.environ.get("RELAY_BANK_MEMO_PREFIX", "PW-ORDER-")
|
||
DEPOSIT_MATCH_TOLERANCE_CENTS = 500 # $5
|
||
|
||
# Treasury mode — 'manual' parks every order in state='manual' after sizing,
|
||
# awaiting admin approval. 'auto' runs the full offramp pipeline. Default
|
||
# is 'manual' so we don't accidentally fire Bridge calls before Bridge is
|
||
# approved + credentials are loaded.
|
||
TREASURY_MODE = os.environ.get("CRYPTO_TREASURY_MODE", "manual").lower()
|
||
if TREASURY_MODE not in {"manual", "auto"}:
|
||
log.warning("CRYPTO_TREASURY_MODE=%r not recognized; defaulting to 'manual'", TREASURY_MODE)
|
||
TREASURY_MODE = "manual"
|
||
|
||
|
||
# ── DB helpers ──────────────────────────────────────────────────────────
|
||
|
||
def _connect():
|
||
return psycopg2.connect(DATABASE_URL)
|
||
|
||
|
||
def _dict_cur(conn):
|
||
return conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||
|
||
|
||
# ── Orchestrator core ──────────────────────────────────────────────────
|
||
|
||
class Orchestrator:
|
||
def __init__(
|
||
self,
|
||
offramp=None,
|
||
shkeeper: Optional[SHKeeperClient] = None,
|
||
):
|
||
# Offramp provider — default Bridge (Stripe). Duck-typed to accept
|
||
# any adapter that exposes prepare_transfer() + status() + quote()
|
||
# (or, for legacy adapters, deposit_address + execute).
|
||
self.offramp = offramp or BridgeOfframp()
|
||
self.shkeeper = shkeeper or SHKeeperClient()
|
||
|
||
async def run_once(self, order_id: Optional[str] = None) -> dict:
|
||
"""Advance every job ready to advance. Returns a counts dict."""
|
||
conn = _connect()
|
||
counts = {"advanced": 0, "errored": 0, "skipped": 0}
|
||
try:
|
||
with _dict_cur(conn) as cur:
|
||
if order_id:
|
||
cur.execute(
|
||
"SELECT * FROM crypto_payment_jobs WHERE order_id = %s",
|
||
(order_id,),
|
||
)
|
||
else:
|
||
# Pull anything not terminal. Respect next_retry_at if set.
|
||
cur.execute(
|
||
"""
|
||
SELECT * FROM crypto_payment_jobs
|
||
WHERE state NOT IN ('settled','failed','manual')
|
||
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
|
||
ORDER BY created_at ASC
|
||
LIMIT 50
|
||
"""
|
||
)
|
||
jobs = cur.fetchall()
|
||
|
||
for job in jobs:
|
||
try:
|
||
advanced = await self._advance(job, conn)
|
||
if advanced:
|
||
counts["advanced"] += 1
|
||
else:
|
||
counts["skipped"] += 1
|
||
except Exception as exc: # noqa: BLE001
|
||
counts["errored"] += 1
|
||
self._record_error(conn, job["order_id"], str(exc))
|
||
finally:
|
||
conn.close()
|
||
return counts
|
||
|
||
async def _advance(self, job: dict, conn) -> bool:
|
||
"""Dispatch on state. Returns True if state advanced."""
|
||
state = job["state"]
|
||
order_id = job["order_id"]
|
||
log.info("advancing order=%s state=%s", order_id, state)
|
||
|
||
if state == "received":
|
||
return await self._do_sizing(job, conn)
|
||
if state == "sizing":
|
||
return await self._do_offramp(job, conn)
|
||
if state == "offramping":
|
||
return await self._match_relay_deposit(job, conn)
|
||
if state == "funds_at_relay":
|
||
return await self._reserve_for_playwright(job, conn)
|
||
if state == "ready":
|
||
return await self._check_settlement(job, conn)
|
||
log.debug("order=%s in state=%s — nothing to do", order_id, state)
|
||
return False
|
||
|
||
# ── Phase: sizing (received → sizing → offramping) ──────────────────
|
||
|
||
async def _do_sizing(self, job: dict, conn) -> bool:
|
||
order_id = job["order_id"]
|
||
order_type = job["order_type"]
|
||
try:
|
||
result = compute_obligations(
|
||
order_id=order_id, order_type=order_type, conn=conn,
|
||
)
|
||
if result.needed_usd_cents <= 0 and not result.obligations:
|
||
# No vendor obligations — skip offramp, mark settled
|
||
# directly (e.g., DC agent with no external fees).
|
||
self._transition(
|
||
conn, order_id,
|
||
to_state="settled",
|
||
extra_sets={
|
||
"needed_usd_cents": 0,
|
||
"settled_at": "NOW()",
|
||
},
|
||
)
|
||
log.info("order=%s no vendor obligations — settled directly", order_id)
|
||
return True
|
||
|
||
# Underpayment check: SHKeeper balance_fiat < needed
|
||
if job["amount_usd_cents"] < result.needed_usd_cents:
|
||
self._transition(
|
||
conn, order_id,
|
||
to_state="manual",
|
||
extra_sets={
|
||
"needed_usd_cents": result.needed_usd_cents,
|
||
"last_error": (
|
||
f"underpayment: received ${job['amount_usd_cents']/100:.2f} "
|
||
f"but need ${result.needed_usd_cents/100:.2f}"
|
||
),
|
||
},
|
||
)
|
||
log.warning(
|
||
"order=%s underpayment — manual review (received=%s, need=%s)",
|
||
order_id, job["amount_usd_cents"], result.needed_usd_cents,
|
||
)
|
||
return True
|
||
|
||
persist_obligations(order_id, result, conn=conn)
|
||
|
||
# In manual treasury mode, park at 'manual' so admin can approve
|
||
# before any offramp/Bridge call fires. Compliance service runs
|
||
# independently (Playwright uses Relay card directly), so this
|
||
# only gates the treasury reconciliation side.
|
||
if TREASURY_MODE == "manual":
|
||
needed = result.needed_usd_cents
|
||
filing = result.filing_total_with_buffer_cents
|
||
commission = result.commission_total_cents
|
||
self._transition(
|
||
conn, order_id,
|
||
to_state="manual",
|
||
extra_sets={
|
||
"needed_usd_cents": needed,
|
||
"sized_at": "NOW()",
|
||
"last_error": (
|
||
f"awaiting admin approval (TREASURY_MODE=manual). "
|
||
f"Sized: ${needed/100:.2f} needed "
|
||
f"(filing ${filing/100:.2f} "
|
||
f"+ commission ${commission/100:.2f})."
|
||
),
|
||
},
|
||
)
|
||
log.info(
|
||
"order=%s sized + parked for admin (mode=manual, need $%.2f)",
|
||
order_id, needed / 100,
|
||
)
|
||
|
||
# ── Notify admin: manual treasury mode requires a bank transfer ──
|
||
# Email the admin with the exact amount to transfer from crypto
|
||
# proceeds into the Relay bank account so the filing card has
|
||
# enough balance to pay vendors.
|
||
self._send_manual_treasury_email(
|
||
order_id=order_id,
|
||
order_type=order_type,
|
||
coin=job["coin"],
|
||
received_usd=job["amount_usd_cents"] / 100,
|
||
needed_usd=needed / 100,
|
||
filing_usd=filing / 100,
|
||
commission_usd=commission / 100,
|
||
obligations=result.obligations,
|
||
)
|
||
return True
|
||
|
||
# Auto mode: advance to offramping
|
||
self._transition(
|
||
conn, order_id,
|
||
to_state="sizing",
|
||
extra_sets={
|
||
"needed_usd_cents": result.needed_usd_cents,
|
||
"sized_at": "NOW()",
|
||
},
|
||
)
|
||
return True
|
||
except Exception as exc: # noqa: BLE001
|
||
log.exception("sizing failed for %s: %s", order_id, exc)
|
||
raise
|
||
|
||
# ── Phase: offramp (sizing → offramping) ────────────────────────────
|
||
|
||
async def _do_offramp(self, job: dict, conn) -> bool:
|
||
"""Offramp phase via Bridge (RTP-native).
|
||
|
||
Bridge's model: create one transfer (atomic + idempotent), which
|
||
returns a deposit address. SHKeeper sends coins there; Bridge
|
||
automatically converts + settles USD to RelayFi via RTP.
|
||
|
||
Stable idempotency keys — NOT parameterized by attempt_count.
|
||
A retry produces the same keys so the ledger UNIQUE constraint
|
||
dedups and Bridge's Idempotency-Key header returns the same
|
||
transfer each call.
|
||
"""
|
||
order_id = job["order_id"]
|
||
coin = job["coin"]
|
||
amount_coin = Decimal(str(job["amount_coin"]))
|
||
needed_usd_cents = int(job["needed_usd_cents"])
|
||
bridge_key = f"{order_id}:offramp:bridge-transfer"
|
||
shkeeper_key = f"{order_id}:offramp:shkeeper-payout"
|
||
memo = f"{RELAY_MEMO_PREFIX}{order_id}"
|
||
|
||
try:
|
||
# Skip unsupported coins → manual review
|
||
if coin.upper() not in self.offramp.accepts_coins:
|
||
self._transition(
|
||
conn, order_id, to_state="manual",
|
||
extra_sets={"last_error": f"coin {coin} not supported by Bridge"},
|
||
)
|
||
log.warning("order=%s coin %s not supported — manual", order_id, coin)
|
||
return True
|
||
|
||
quote = await self.offramp.quote(amount_coin, coin)
|
||
|
||
# ── Step 1: create the Bridge transfer (idempotent) ──────
|
||
prior_bridge = self._find_ledger_by_key(conn, bridge_key)
|
||
if prior_bridge:
|
||
log.info(
|
||
"order=%s Bridge transfer already created (ref=%s) — reusing",
|
||
order_id, prior_bridge.get("provider_ref"),
|
||
)
|
||
bridge_transfer_id = prior_bridge.get("provider_ref") or ""
|
||
deposit_addr = (
|
||
(prior_bridge.get("raw_row") or {}).get("deposit_address", "")
|
||
if isinstance(prior_bridge.get("raw_row"), dict) else ""
|
||
)
|
||
# Fallback: re-call prepare_transfer (idempotent) to get the address
|
||
if not deposit_addr:
|
||
prep = await self.offramp.prepare_transfer(
|
||
target_usd_cents=needed_usd_cents,
|
||
from_coin=coin, rail="rtp", memo=memo,
|
||
idempotency_key=bridge_key,
|
||
)
|
||
deposit_addr = prep.deposit_address
|
||
bridge_transfer_id = prep.transfer_id
|
||
else:
|
||
prep = await self.offramp.prepare_transfer(
|
||
target_usd_cents=needed_usd_cents,
|
||
from_coin=coin, rail="rtp", memo=memo,
|
||
idempotency_key=bridge_key,
|
||
)
|
||
bridge_transfer_id = prep.transfer_id
|
||
deposit_addr = prep.deposit_address
|
||
# Write ledger 'offramp' row with the Bridge transfer id —
|
||
# this is the disposal event for 8949 cost basis.
|
||
self._ledger_insert(conn, {
|
||
"order_id": order_id,
|
||
"order_type": job["order_type"],
|
||
"coin": coin,
|
||
"movement_type": "offramp",
|
||
"amount_coin": None,
|
||
"amount_usd_cents": needed_usd_cents,
|
||
"fx_rate_usd": quote.fx_rate_usd,
|
||
"disposed_at": "NOW()",
|
||
"proceeds_usd_cents": needed_usd_cents,
|
||
"provider": "bridge",
|
||
"provider_ref": bridge_transfer_id,
|
||
"provider_status": "awaiting_funds",
|
||
"state": "pending",
|
||
"idempotency_key": bridge_key,
|
||
"notes": f"Bridge transfer created; awaiting {coin} deposit to {deposit_addr[:16]}…",
|
||
})
|
||
|
||
# ── Step 2: SHKeeper sends coins to the Bridge deposit address ──
|
||
prior_shk = self._find_ledger_by_key(conn, shkeeper_key)
|
||
if prior_shk:
|
||
log.info(
|
||
"order=%s SHKeeper payout already recorded (ref=%s) — skipping duplicate move",
|
||
order_id, prior_shk.get("provider_ref"),
|
||
)
|
||
else:
|
||
# Compute coin amount to send (needed_usd / rate × 1.01 overhead
|
||
# for exchange fees + network fees). Cap at job.amount_coin.
|
||
rate = quote.fx_rate_usd if quote.fx_rate_usd > 0 else Decimal("1")
|
||
target_coin = min(
|
||
(Decimal(needed_usd_cents) / Decimal("100"))
|
||
/ rate * Decimal("1.01"),
|
||
amount_coin,
|
||
)
|
||
log.info(
|
||
"order=%s sending %s %s → Bridge deposit %s (for $%.2f)",
|
||
order_id, target_coin, coin, deposit_addr, needed_usd_cents / 100,
|
||
)
|
||
payout = await self.shkeeper.payout(
|
||
coin=coin, destination=deposit_addr, amount=target_coin,
|
||
)
|
||
self._ledger_insert(conn, {
|
||
"order_id": order_id,
|
||
"order_type": job["order_type"],
|
||
"coin": coin,
|
||
"movement_type": "swap",
|
||
"amount_coin": -target_coin,
|
||
"amount_usd_cents": -needed_usd_cents,
|
||
"fx_rate_usd": quote.fx_rate_usd,
|
||
"provider": "shkeeper",
|
||
"provider_ref": payout.withdraw_id,
|
||
"provider_status": "broadcast",
|
||
"state": "pending",
|
||
"idempotency_key": shkeeper_key,
|
||
"notes": f"SHKeeper payout to Bridge for offramp ({order_id})",
|
||
})
|
||
|
||
# ── Step 3: transition to offramping (worker will poll status) ──
|
||
self._transition(
|
||
conn, order_id,
|
||
to_state="offramping",
|
||
extra_sets={
|
||
"offramp_provider": "bridge",
|
||
"offramp_ref": bridge_transfer_id,
|
||
"offramping_at": "NOW()",
|
||
},
|
||
)
|
||
return True
|
||
except ProviderDegradedError as exc:
|
||
self._transition(conn, order_id, to_state="manual",
|
||
extra_sets={"last_error": f"provider_degraded: {exc}"})
|
||
log.warning("order=%s offramp parked — provider degraded", order_id)
|
||
return True
|
||
except KycHoldError as exc:
|
||
self._transition(conn, order_id, to_state="manual",
|
||
extra_sets={"last_error": f"kyc_hold: {exc}"})
|
||
log.warning("order=%s KYC hold — manual review", order_id)
|
||
return True
|
||
except SlippageExceededError as exc:
|
||
self._transition(conn, order_id, to_state="manual",
|
||
extra_sets={"last_error": f"slippage: {exc}"})
|
||
log.warning("order=%s slippage exceeded — manual review", order_id)
|
||
return True
|
||
except Exception as exc:
|
||
log.exception("offramp failed for %s: %s", order_id, exc)
|
||
raise
|
||
|
||
# ── Phase: match Relay deposit (offramping → funds_at_relay) ────────
|
||
|
||
async def _match_relay_deposit(self, job: dict, conn) -> bool:
|
||
"""Find a relay_deposits row that matches this job.
|
||
|
||
Match priority:
|
||
1. memo contains PW-ORDER-<order_id> (strongest signal)
|
||
2. amount within ±DEPOSIT_MATCH_TOLERANCE_CENTS AND source_kind='offramp_coinbase_prime'
|
||
"""
|
||
order_id = job["order_id"]
|
||
needed = int(job["needed_usd_cents"] or 0)
|
||
with _dict_cur(conn) as cur:
|
||
# Primary: memo match
|
||
cur.execute(
|
||
"""
|
||
SELECT id FROM relay_deposits
|
||
WHERE memo LIKE %s
|
||
AND processed = FALSE
|
||
ORDER BY detected_at ASC LIMIT 1
|
||
""",
|
||
(f"%{RELAY_MEMO_PREFIX}{order_id}%",),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row:
|
||
# Fallback: amount + source_kind
|
||
cur.execute(
|
||
"""
|
||
SELECT id FROM relay_deposits
|
||
WHERE source_kind = 'offramp_coinbase_prime'
|
||
AND processed = FALSE
|
||
AND amount_cents BETWEEN %s AND %s
|
||
ORDER BY detected_at ASC LIMIT 1
|
||
""",
|
||
(needed - DEPOSIT_MATCH_TOLERANCE_CENTS,
|
||
needed + DEPOSIT_MATCH_TOLERANCE_CENTS),
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
# Not found yet — check provider status; may still be wiring.
|
||
# (Don't transition; just schedule the next retry.)
|
||
self._schedule_retry(conn, order_id, int(job["attempt_count"] or 0) + 1)
|
||
return False
|
||
|
||
deposit_id = row["id"]
|
||
cur.execute(
|
||
"UPDATE relay_deposits SET processed = TRUE, processed_at = NOW() WHERE id = %s",
|
||
(deposit_id,),
|
||
)
|
||
|
||
# Link + transition
|
||
self._ledger_insert(conn, {
|
||
"order_id": order_id,
|
||
"order_type": job["order_type"],
|
||
"coin": "USD",
|
||
"movement_type": "bank_deposit",
|
||
"amount_coin": None,
|
||
"amount_usd_cents": int(job["needed_usd_cents"]),
|
||
"provider": "relay",
|
||
"provider_ref": str(deposit_id),
|
||
"provider_status": "matched",
|
||
"state": "confirmed",
|
||
"idempotency_key": f"{order_id}:bank_deposit:{deposit_id}",
|
||
"notes": f"Relay deposit #{deposit_id} matched",
|
||
})
|
||
self._transition(conn, order_id,
|
||
to_state="funds_at_relay",
|
||
extra_sets={
|
||
"relay_deposit_id": deposit_id,
|
||
"funds_at_relay_at": "NOW()",
|
||
})
|
||
log.info("order=%s matched deposit #%s", order_id, deposit_id)
|
||
return True
|
||
|
||
# ── Phase: reserve for Playwright (funds_at_relay → ready) ──────────
|
||
|
||
async def _reserve_for_playwright(self, job: dict, conn) -> bool:
|
||
"""Flip vendor_obligations to 'reserved' + create/update
|
||
filing_fee_reservations rows so existing Playwright flow picks it up."""
|
||
order_id = job["order_id"]
|
||
with _dict_cur(conn) as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT id, amount_usd_cents FROM vendor_obligations
|
||
WHERE order_id = %s AND obligation_kind = 'filing_fee' AND status = 'pending'
|
||
""",
|
||
(order_id,),
|
||
)
|
||
filings = cur.fetchall()
|
||
total_filing_cents = sum(int(f["amount_usd_cents"]) for f in filings)
|
||
|
||
if total_filing_cents > 0:
|
||
# filing_fee_reservations has no unique constraint on
|
||
# order_id, so ON CONFLICT DO NOTHING is a no-op. Instead
|
||
# we check first and skip if a prior reservation exists.
|
||
cur.execute(
|
||
"""
|
||
SELECT id FROM filing_fee_reservations
|
||
WHERE order_id = %s AND status IN ('pending','reserved')
|
||
ORDER BY id ASC LIMIT 1
|
||
""",
|
||
(order_id,),
|
||
)
|
||
existing = cur.fetchone()
|
||
if existing:
|
||
res_id = existing["id"]
|
||
# Make sure it's marked reserved + linked to this deposit
|
||
cur.execute(
|
||
"""
|
||
UPDATE filing_fee_reservations
|
||
SET status = 'reserved',
|
||
relay_deposit_id = COALESCE(relay_deposit_id, %s),
|
||
reserved_at = COALESCE(reserved_at, NOW()),
|
||
amount_cents = %s
|
||
WHERE id = %s
|
||
""",
|
||
(job["relay_deposit_id"], total_filing_cents, res_id),
|
||
)
|
||
else:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO filing_fee_reservations
|
||
(order_id, order_type, amount_cents, status,
|
||
relay_deposit_id, reserved_at, created_at)
|
||
VALUES (%s, %s, %s, 'reserved', %s, NOW(), NOW())
|
||
RETURNING id
|
||
""",
|
||
(order_id, job["order_type"], total_filing_cents, job["relay_deposit_id"]),
|
||
)
|
||
res_id = cur.fetchone()["id"]
|
||
|
||
cur.execute(
|
||
"""
|
||
UPDATE vendor_obligations
|
||
SET status = 'reserved',
|
||
filing_fee_reservation_id = %s,
|
||
updated_at = NOW()
|
||
WHERE order_id = %s
|
||
AND obligation_kind = 'filing_fee'
|
||
AND status = 'pending'
|
||
""",
|
||
(res_id, order_id),
|
||
)
|
||
|
||
# Commission obligations just wait; `commission_ledger` flow
|
||
# handles their payout.
|
||
cur.execute(
|
||
"""
|
||
UPDATE vendor_obligations
|
||
SET status = 'reserved', updated_at = NOW()
|
||
WHERE order_id = %s
|
||
AND obligation_kind = 'commission'
|
||
AND status = 'pending'
|
||
""",
|
||
(order_id,),
|
||
)
|
||
|
||
self._transition(conn, order_id, to_state="ready")
|
||
log.info("order=%s funds reserved for playwright (%s cents)", order_id, total_filing_cents)
|
||
return True
|
||
|
||
# ── Phase: check settlement (ready → settled) ───────────────────────
|
||
|
||
async def _check_settlement(self, job: dict, conn) -> bool:
|
||
"""Transition 'ready' → 'settled' once all filing-fee obligations
|
||
have been paid via the Playwright flow.
|
||
|
||
Three cases:
|
||
(a) Filing-fee reservations exist and are all spent/released → settled
|
||
(b) No filing-fee reservations exist but vendor_obligations has only
|
||
commission rows (commission-only orders). Those get paid by the
|
||
separate commission payout process; for the crypto treasury
|
||
pipeline's purposes, 'settled' means "funds arrived and
|
||
vendor-facing work is done." Transition settled.
|
||
(c) Otherwise (reservations exist but still pending) — hold.
|
||
"""
|
||
order_id = job["order_id"]
|
||
with _dict_cur(conn) as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT COUNT(*) AS n,
|
||
COUNT(*) FILTER (WHERE status IN ('spent', 'released')) AS done
|
||
FROM filing_fee_reservations
|
||
WHERE order_id = %s
|
||
""",
|
||
(order_id,),
|
||
)
|
||
r = cur.fetchone() or {"n": 0, "done": 0}
|
||
reservations_n = int(r["n"])
|
||
reservations_done = int(r["done"])
|
||
|
||
# Case (b): no reservations but we have at least one commission
|
||
# obligation → commission-only order → treasury pipeline done.
|
||
if reservations_n == 0:
|
||
cur.execute(
|
||
"""
|
||
SELECT COUNT(*) FILTER (WHERE obligation_kind = 'filing_fee') AS filing_n,
|
||
COUNT(*) FILTER (WHERE obligation_kind = 'commission') AS commission_n
|
||
FROM vendor_obligations
|
||
WHERE order_id = %s
|
||
""",
|
||
(order_id,),
|
||
)
|
||
ob = cur.fetchone() or {"filing_n": 0, "commission_n": 0}
|
||
filing_n = int(ob["filing_n"])
|
||
commission_n = int(ob["commission_n"])
|
||
if filing_n == 0:
|
||
# No filing obligations ever existed. Safe to settle.
|
||
self._transition(conn, order_id, to_state="settled",
|
||
extra_sets={"settled_at": "NOW()"})
|
||
log.info(
|
||
"order=%s settled (commission-only, %s commission obligation(s))",
|
||
order_id, commission_n,
|
||
)
|
||
return True
|
||
# Filing obligations exist but no reservation — likely racing
|
||
# with _reserve_for_playwright. Hold and re-poll next pass.
|
||
return False
|
||
|
||
# Case (a): reservations all done
|
||
if reservations_done == reservations_n:
|
||
self._transition(conn, order_id, to_state="settled",
|
||
extra_sets={"settled_at": "NOW()"})
|
||
log.info("order=%s settled", order_id)
|
||
return True
|
||
# Case (c): still pending
|
||
return False
|
||
|
||
# ── Helpers ─────────────────────────────────────────────────────────
|
||
|
||
def _find_ledger_by_key(self, conn, idempotency_key: str) -> Optional[dict]:
|
||
"""Return the ledger row with this idempotency_key, if any.
|
||
|
||
Used by the offramp retry guard: before calling SHKeeper or
|
||
Coinbase Prime, check whether a prior attempt already succeeded
|
||
at that external step. If so, reuse the prior result instead of
|
||
double-sending funds.
|
||
"""
|
||
with _dict_cur(conn) as cur:
|
||
cur.execute(
|
||
"SELECT * FROM crypto_payment_ledger WHERE idempotency_key = %s",
|
||
(idempotency_key,),
|
||
)
|
||
return cur.fetchone()
|
||
|
||
def _transition(
|
||
self, conn, order_id: str, *, to_state: str,
|
||
extra_sets: Optional[dict] = None,
|
||
):
|
||
extra_sets = extra_sets or {}
|
||
sets = ["state = %s", "updated_at = NOW()"]
|
||
params = [to_state]
|
||
for k, v in extra_sets.items():
|
||
if v == "NOW()":
|
||
sets.append(f"{k} = NOW()")
|
||
else:
|
||
sets.append(f"{k} = %s")
|
||
params.append(v)
|
||
# Reset retry state on successful transition. Only clear columns
|
||
# the caller didn't explicitly set — otherwise we'd get SQL error
|
||
# from duplicate column assignment (e.g., caller passed last_error
|
||
# when transitioning to 'manual').
|
||
sets.append("next_retry_at = NULL")
|
||
if "last_error" not in extra_sets:
|
||
sets.append("last_error = NULL")
|
||
params.append(order_id)
|
||
sql = f"UPDATE crypto_payment_jobs SET {', '.join(sets)} WHERE order_id = %s"
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql, params)
|
||
conn.commit()
|
||
|
||
def _ledger_insert(self, conn, row: dict):
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO crypto_payment_ledger (
|
||
order_id, order_type, coin, movement_type,
|
||
amount_coin, amount_usd_cents, fx_rate_usd,
|
||
basis_usd_cents, proceeds_usd_cents,
|
||
acquired_at, disposed_at,
|
||
provider, provider_ref, provider_status,
|
||
state, idempotency_key, notes
|
||
) VALUES (
|
||
%(order_id)s, %(order_type)s, %(coin)s, %(movement_type)s,
|
||
%(amount_coin)s, %(amount_usd_cents)s, %(fx_rate_usd)s,
|
||
%(basis_usd_cents)s, %(proceeds_usd_cents)s,
|
||
CASE WHEN %(acquired_at)s = 'NOW()' THEN NOW() ELSE NULL END,
|
||
CASE WHEN %(disposed_at)s = 'NOW()' THEN NOW() ELSE NULL END,
|
||
%(provider)s, %(provider_ref)s, %(provider_status)s,
|
||
%(state)s, %(idempotency_key)s, %(notes)s
|
||
)
|
||
ON CONFLICT (idempotency_key) DO NOTHING
|
||
""",
|
||
{
|
||
"order_id": row["order_id"],
|
||
"order_type": row["order_type"],
|
||
"coin": row["coin"],
|
||
"movement_type": row["movement_type"],
|
||
"amount_coin": row.get("amount_coin"),
|
||
"amount_usd_cents": row["amount_usd_cents"],
|
||
"fx_rate_usd": row.get("fx_rate_usd"),
|
||
"basis_usd_cents": row.get("basis_usd_cents"),
|
||
"proceeds_usd_cents": row.get("proceeds_usd_cents"),
|
||
"acquired_at": row.get("acquired_at", ""),
|
||
"disposed_at": row.get("disposed_at", ""),
|
||
"provider": row.get("provider"),
|
||
"provider_ref": row.get("provider_ref"),
|
||
"provider_status": row.get("provider_status"),
|
||
"state": row.get("state", "pending"),
|
||
"idempotency_key": row["idempotency_key"],
|
||
"notes": row.get("notes"),
|
||
},
|
||
)
|
||
conn.commit()
|
||
|
||
def _record_error(self, conn, order_id: str, err: str):
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
UPDATE crypto_payment_jobs
|
||
SET last_error = %s,
|
||
attempt_count = attempt_count + 1,
|
||
updated_at = NOW()
|
||
WHERE order_id = %s
|
||
RETURNING attempt_count
|
||
""",
|
||
(err[:500], order_id),
|
||
)
|
||
row = cur.fetchone()
|
||
conn.commit()
|
||
attempt = int(row[0]) if row else 0
|
||
if attempt >= MAX_ATTEMPTS:
|
||
log.error("order=%s exhausted retries — marking failed", order_id)
|
||
self._transition(conn, order_id, to_state="failed",
|
||
extra_sets={"last_error": err[:500]})
|
||
else:
|
||
self._schedule_retry(conn, order_id, attempt)
|
||
|
||
def _send_manual_treasury_email(
|
||
self, *, order_id: str, order_type: str, coin: str,
|
||
received_usd: float, needed_usd: float,
|
||
filing_usd: float, commission_usd: float,
|
||
obligations: list,
|
||
):
|
||
"""Email admin when a crypto order is parked in manual mode.
|
||
|
||
The email tells the admin exactly how much USD to transfer from
|
||
crypto sale proceeds into the Relay bank account so the filing
|
||
card has enough balance for vendor payments.
|
||
"""
|
||
admin_email = os.environ.get(
|
||
"CRYPTO_SWEEP_ADMIN_EMAIL",
|
||
os.environ.get("ADMIN_EMAIL", "ops@performancewest.net"),
|
||
)
|
||
from_email = os.environ.get(
|
||
"FROM_EMAIL",
|
||
os.environ.get("SMTP_FROM", "notifications@performancewest.net"),
|
||
)
|
||
|
||
obligation_lines = []
|
||
for ob in obligations:
|
||
vendor = getattr(ob, "vendor", ob.get("vendor", "")) if isinstance(ob, dict) else ob.vendor
|
||
amount = getattr(ob, "amount_usd_cents", ob.get("amount_usd_cents", 0)) if isinstance(ob, dict) else ob.amount_usd_cents
|
||
kind = getattr(ob, "obligation_kind", ob.get("obligation_kind", "")) if isinstance(ob, dict) else ob.obligation_kind
|
||
obligation_lines.append(
|
||
f" - {vendor} ({kind}): ${amount/100:.2f}"
|
||
)
|
||
ob_text = "\n".join(obligation_lines) if obligation_lines else " (none computed)"
|
||
|
||
subject = f"Crypto order {order_id}: transfer ${needed_usd:.2f} to Relay for vendor payments"
|
||
body = f"""A crypto payment was received and sized. Treasury mode is MANUAL — you need to
|
||
transfer funds into the Relay bank account so the filing card can pay vendors.
|
||
|
||
Order: {order_id} ({order_type})
|
||
Coin: {coin}
|
||
Received: ${received_usd:.2f} USD equivalent
|
||
|
||
Amount needed at Relay: ${needed_usd:.2f}
|
||
Filing fees (with 10% buffer): ${filing_usd:.2f}
|
||
Commission reserve: ${commission_usd:.2f}
|
||
|
||
Vendor obligations:
|
||
{ob_text}
|
||
|
||
Action required:
|
||
1. Sell ${needed_usd:.2f} worth of {coin} on your exchange (Coinbase, Kraken, etc.)
|
||
2. Wire/ACH/RTP the USD to the RelayFi bank account
|
||
3. Once the deposit lands, the crypto_payment_worker will detect it and advance
|
||
the order to 'funds_at_relay' → 'ready' → 'settled'
|
||
|
||
Or use the admin panel to retry with auto-offramp:
|
||
POST /api/v1/admin/crypto-payments/{order_id}/retry-offramp
|
||
|
||
This email was sent because CRYPTO_TREASURY_MODE=manual.
|
||
When Bridge (Stripe) is approved, set CRYPTO_TREASURY_MODE=auto to
|
||
automate this process.
|
||
"""
|
||
|
||
try:
|
||
import smtplib
|
||
from email.mime.text import MIMEText
|
||
|
||
smtp_host = os.environ.get("SMTP_HOST", "mail.smtp2go.com")
|
||
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
|
||
smtp_user = os.environ.get("SMTP_USER", "")
|
||
smtp_pass = os.environ.get("SMTP_PASS", "")
|
||
|
||
if not smtp_user or not smtp_pass:
|
||
log.warning("SMTP not configured — skipping manual treasury email for %s", order_id)
|
||
return
|
||
|
||
msg = MIMEText(body)
|
||
msg["Subject"] = subject
|
||
msg["From"] = from_email
|
||
msg["To"] = admin_email
|
||
|
||
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
||
if smtp_port != 25:
|
||
server.starttls()
|
||
server.login(smtp_user, smtp_pass)
|
||
server.sendmail(from_email, [admin_email], msg.as_string())
|
||
log.info("Manual treasury email sent to %s for order %s ($%.2f)", admin_email, order_id, needed_usd)
|
||
except Exception as exc:
|
||
log.error("Failed to send manual treasury email for %s: %s", order_id, exc)
|
||
|
||
# Also create an ERPNext ToDo for visibility in the admin dashboard
|
||
try:
|
||
from scripts.workers.erpnext_client import ERPNextClient
|
||
ERPNextClient().create_resource("ToDo", {
|
||
"description": (
|
||
f"[crypto-treasury] {order_id}\n\n"
|
||
f"Transfer ${needed_usd:.2f} to Relay for vendor payments.\n"
|
||
f"Coin: {coin} | Received: ${received_usd:.2f}\n"
|
||
f"Filing: ${filing_usd:.2f} | Commission: ${commission_usd:.2f}\n\n"
|
||
f"Obligations:\n{ob_text}"
|
||
),
|
||
"priority": "High",
|
||
"role": "Accounting Advisor",
|
||
})
|
||
except Exception as exc:
|
||
log.error("Could not create treasury ToDo: %s", exc)
|
||
|
||
def _schedule_retry(self, conn, order_id: str, attempt: int):
|
||
backoff_idx = min(attempt, len(BACKOFF_SCHEDULE_SEC) - 1)
|
||
delay = BACKOFF_SCHEDULE_SEC[backoff_idx]
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"UPDATE crypto_payment_jobs "
|
||
"SET next_retry_at = NOW() + (%s || ' seconds')::interval, "
|
||
" updated_at = NOW() "
|
||
"WHERE order_id = %s",
|
||
(str(delay), order_id),
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
# ── CLI ─────────────────────────────────────────────────────────────────
|
||
|
||
async def _main():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--order", help="process one specific order_id")
|
||
parser.add_argument("--loop", action="store_true", help="run as daemon (60s poll)")
|
||
parser.add_argument("--interval", type=int, default=60, help="poll interval seconds")
|
||
args = parser.parse_args()
|
||
|
||
orch = Orchestrator()
|
||
if args.loop:
|
||
log.info("starting crypto_payment_worker loop (interval=%ss)", args.interval)
|
||
while True:
|
||
try:
|
||
counts = await orch.run_once()
|
||
log.info("pass complete: %s", counts)
|
||
except Exception as exc:
|
||
log.exception("run_once raised: %s", exc)
|
||
await asyncio.sleep(args.interval)
|
||
else:
|
||
counts = await orch.run_once(order_id=args.order)
|
||
log.info("done: %s", counts)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(_main())
|