#!/usr/bin/env python3 """Cold wallet sweeper — hot-wallet excess → hardware / multisig cold storage. Reads `cold_wallet_config` rows (seeded from env at startup) and for each enabled coin: 1. Read the SHKeeper hot-wallet balance. 2. Compute excess = balance_usd - hot_float_usd. 3. If excess <= 0: skip. 4. If excess > auto_sweep_ceiling_usd: insert a pending sweep row with requires_approval=TRUE; notify ops; do NOT move funds. 5. Otherwise: insert a sweep row (auto-approved), call shkeeper.payout() to send the excess to the cold address, poll for confirmation, write the matching crypto_payment_ledger row. Startup validation: on boot, read COLD_WALLET__ADDR env vars and compare against each row's ``address_checksum``. A mismatch (attacker tampering with env vars, or rotation without DB update) aborts the worker with exit(1). Usage: python -m scripts.workers.cold_wallet_sweeper # one pass python -m scripts.workers.cold_wallet_sweeper --loop # daemon (30m) python -m scripts.workers.cold_wallet_sweeper --seed # one-time env import """ from __future__ import annotations import argparse import asyncio import hashlib import logging import os import re import sys from decimal import Decimal from typing import Optional import psycopg2 import psycopg2.extras from scripts.workers.crypto_offramp.shkeeper_client import SHKeeperClient, SHKeeperError log = logging.getLogger("cold_wallet_sweeper") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) DATABASE_URL = os.environ.get("DATABASE_URL", "") COINS = ["BTC", "ETH", "MATIC", "LTC", "TRX", "BNB", "DOGE", "USDC"] # Admin notification email (low-friction alerting; plug into real channel # — SES, Carbonio, Slack — later) ADMIN_EMAIL = os.environ.get("CRYPTO_SWEEP_ADMIN_EMAIL", "ops@performancewest.net") DUST_THRESHOLD_USD_CENTS = 1000 # below $10 USD equiv, skip (dust) # ── Address normalization + checksum ──────────────────────────────────── # # We use SHA-256 of the lowercased address as the "checksum" stored in # cold_wallet_config.address_checksum. This is NOT a cryptographic # address checksum (e.g. EIP-55) — it's our own integrity seal so that # if the env var changes to an attacker-controlled address, the DB # checksum won't match and the worker aborts. Rotation workflow: # admin changes both env var AND `address_checksum` column together. def compute_checksum(address: str) -> str: return hashlib.sha256(address.strip().lower().encode("utf-8")).hexdigest() # ── Basic address shape validation per coin ───────────────────────────── # (Prevents obvious typos like pasting an ETH address into BTC env var.) ADDRESS_PATTERNS: dict[str, re.Pattern] = { "BTC": re.compile(r"^(bc1[a-z0-9]{25,90}|[13][a-km-zA-HJ-NP-Z1-9]{25,34})$"), "LTC": re.compile(r"^(ltc1[a-z0-9]{25,90}|[LM3][a-km-zA-HJ-NP-Z1-9]{25,34})$"), "DOGE": re.compile(r"^D[a-km-zA-HJ-NP-Z1-9]{25,34}$"), "ETH": re.compile(r"^0x[a-fA-F0-9]{40}$"), "MATIC": re.compile(r"^0x[a-fA-F0-9]{40}$"), "BNB": re.compile(r"^0x[a-fA-F0-9]{40}$"), # BSC Smart Chain (EVM) "USDC": re.compile(r"^0x[a-fA-F0-9]{40}$"), # on Ethereum; Solana USDC has different shape "TRX": re.compile(r"^T[a-km-zA-HJ-NP-Z1-9]{33}$"), } def validate_address(coin: str, address: str) -> bool: pattern = ADDRESS_PATTERNS.get(coin.upper()) if not pattern: return True # unknown coin — trust input (extend patterns as needed) return bool(pattern.match(address.strip())) def _connect(): return psycopg2.connect(DATABASE_URL) # ── Seed / validate startup ───────────────────────────────────────────── def seed_from_env(): """Read COLD_WALLET__ADDR env vars and UPSERT into cold_wallet_config. Intended as a one-time setup step; subsequent changes should go through ``UPDATE cold_wallet_config`` via migration or admin script. """ hot_float_default = int(os.environ.get("COLD_WALLET_HOT_FLOAT_USD_CENTS", "50000")) auto_ceiling_default = int(os.environ.get("COLD_WALLET_AUTO_SWEEP_CEILING_USD_CENTS", "500000")) seeded = [] conn = _connect() try: for coin in COINS: addr = os.environ.get(f"COLD_WALLET_{coin}_ADDR", "").strip() if not addr: log.info("no COLD_WALLET_%s_ADDR set — skipping seed", coin) continue if not validate_address(coin, addr): log.error( "COLD_WALLET_%s_ADDR=%s fails address shape check — refusing to seed", coin, addr[:10] + "…" if len(addr) > 10 else addr, ) continue checksum = compute_checksum(addr) with conn.cursor() as cur: cur.execute( """ INSERT INTO cold_wallet_config (coin, cold_address, address_checksum, hot_float_usd_cents, auto_sweep_ceiling_usd_cents, enabled) VALUES (%s, %s, %s, %s, %s, TRUE) ON CONFLICT (coin) DO UPDATE SET cold_address = EXCLUDED.cold_address, address_checksum = EXCLUDED.address_checksum, updated_at = NOW() """, (coin, addr, checksum, hot_float_default, auto_ceiling_default), ) seeded.append(coin) conn.commit() finally: conn.close() log.info("seeded cold_wallet_config for: %s", seeded) return seeded def validate_startup() -> bool: """Read DB rows + env vars; abort if env drifted from DB checksum. Returns True if all enabled coins validated; False otherwise (caller should exit(1)). """ conn = _connect() ok = True try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( "SELECT coin, cold_address, address_checksum FROM cold_wallet_config " "WHERE enabled = TRUE", ) rows = cur.fetchall() or [] for row in rows: coin = row["coin"] env_addr = os.environ.get(f"COLD_WALLET_{coin}_ADDR", "").strip() if not env_addr: log.error("STARTUP: COLD_WALLET_%s_ADDR not set but DB has enabled row — aborting", coin) ok = False continue env_checksum = compute_checksum(env_addr) if env_checksum != row["address_checksum"]: log.error( "STARTUP: COLD_WALLET_%s_ADDR env checksum %s != DB checksum %s — aborting", coin, env_checksum[:12], row["address_checksum"][:12], ) ok = False continue if env_addr != row["cold_address"]: log.error( "STARTUP: COLD_WALLET_%s_ADDR env string differs from DB address — aborting", coin, ) ok = False finally: conn.close() if ok: log.info("STARTUP: cold wallet validation passed") return ok # ── Sweep logic ──────────────────────────────────────────────────────── async def sweep_once(shkeeper: Optional[SHKeeperClient] = None) -> dict: """One pass: first broadcast any admin-approved sweeps, then scan balances to create new sweeps. Returns summary counts.""" shk = shkeeper or SHKeeperClient() counts = { "checked": 0, "swept_auto": 0, "pending_approval": 0, "skipped_dust": 0, "errored": 0, "broadcast_approved": 0, } conn = _connect() try: # ── Phase 1: broadcast admin-approved sweeps ───────────────── # When admin approves a pending sweep via POST, status flips to # 'approved' but nothing sends it on-chain. This phase catches # those and broadcasts them. with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( "SELECT * FROM cold_wallet_sweeps WHERE status = 'approved' " "ORDER BY approved_at ASC LIMIT 20", ) approved = cur.fetchall() or [] for sweep in approved: try: payout = await shk.payout( coin=sweep["coin"], destination=sweep["cold_address"], amount=Decimal(str(sweep["amount_coin"])), ) except SHKeeperError as exc: log.error("broadcast sweep %s failed: %s", sweep["id"], exc) with conn.cursor() as cur2: cur2.execute( "UPDATE cold_wallet_sweeps SET status='failed', " "failure_reason=%s, updated_at=NOW() WHERE id=%s", (str(exc)[:500], sweep["id"]), ) conn.commit() counts["errored"] += 1 continue # Record ledger + flip status with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur2: cur2.execute( """ INSERT INTO crypto_payment_ledger (order_id, order_type, coin, movement_type, amount_coin, amount_usd_cents, provider, provider_ref, provider_status, state, idempotency_key, notes) VALUES ('COLD-SWEEP', 'internal', %s, 'sweep', %s, %s, 'shkeeper', %s, 'broadcast', 'pending', %s, 'Cold wallet sweep — admin-approved') ON CONFLICT (idempotency_key) DO NOTHING RETURNING id """, ( sweep["coin"], f"-{sweep['amount_coin']}", -int(sweep["amount_usd_cents"] or 0), payout.withdraw_id, f"sweep:{sweep['id']}:{payout.withdraw_id}", ), ) ledger_row = cur2.fetchone() ledger_id = ledger_row["id"] if ledger_row else None cur2.execute( "UPDATE cold_wallet_sweeps SET status='broadcast', " "shkeeper_withdraw_id=%s, tx_hash=%s, ledger_entry_id=%s, " "updated_at=NOW() WHERE id=%s", (payout.withdraw_id, payout.tx_hash, ledger_id, sweep["id"]), ) conn.commit() counts["broadcast_approved"] += 1 log.info( "sweep %s broadcast (approved earlier): %s %s → %s", sweep["id"], sweep["amount_coin"], sweep["coin"], sweep["cold_address"][:12] + "…", ) # ── Phase 2: scan balances + create new sweeps ─────────────── with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( "SELECT coin, cold_address, hot_float_usd_cents, auto_sweep_ceiling_usd_cents " "FROM cold_wallet_config WHERE enabled = TRUE", ) configs = cur.fetchall() or [] for cfg in configs: counts["checked"] += 1 coin = cfg["coin"] # Skip if an outstanding sweep exists for this coin. Otherwise # we'd create a second sweep on every pass while the first is # still awaiting approval or broadcasting on-chain (balance # doesn't decrease until the tx confirms). with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur_ck: cur_ck.execute( "SELECT id, status FROM cold_wallet_sweeps " "WHERE coin = %s AND status IN ('pending','approved','broadcast') " "LIMIT 1", (coin,), ) outstanding = cur_ck.fetchone() if outstanding: log.debug( "skip %s — outstanding sweep #%s in state %s", coin, outstanding["id"], outstanding["status"], ) continue try: bal = await shk.balance(coin) except SHKeeperError as exc: log.warning("balance check failed for %s: %s", coin, exc) counts["errored"] += 1 continue excess_usd_cents = int(bal.balance_usd * Decimal("100")) - int(cfg["hot_float_usd_cents"]) if excess_usd_cents <= DUST_THRESHOLD_USD_CENTS: counts["skipped_dust"] += 1 continue # amount_coin to sweep: convert excess_usd → coin using balance USD rate if bal.balance_coin <= 0: continue coin_per_usd = bal.balance_coin / bal.balance_usd if bal.balance_usd > 0 else Decimal("0") amount_coin = (Decimal(excess_usd_cents) / Decimal("100")) * coin_per_usd requires_approval = excess_usd_cents > int(cfg["auto_sweep_ceiling_usd_cents"]) with conn.cursor() as cur2: cur2.execute( """ INSERT INTO cold_wallet_sweeps (coin, amount_coin, amount_usd_cents, cold_address, requires_approval, status) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id """, ( coin, str(amount_coin), excess_usd_cents, cfg["cold_address"], requires_approval, "pending" if requires_approval else "approved", ), ) sweep_id = cur2.fetchone()[0] conn.commit() if requires_approval: log.warning( "sweep %s: $%.2f of %s exceeds auto-ceiling $%.2f — awaiting admin approval", sweep_id, excess_usd_cents / 100, coin, cfg["auto_sweep_ceiling_usd_cents"] / 100, ) counts["pending_approval"] += 1 _notify_admin(sweep_id, coin, excess_usd_cents / 100) continue # Auto-execute try: payout = await shk.payout( coin=coin, destination=cfg["cold_address"], amount=amount_coin, ) except SHKeeperError as exc: log.error("sweep %s payout failed: %s", sweep_id, exc) with conn.cursor() as cur2: cur2.execute( "UPDATE cold_wallet_sweeps SET status='failed', failure_reason=%s, updated_at=NOW() " "WHERE id=%s", (str(exc)[:500], sweep_id), ) conn.commit() counts["errored"] += 1 continue # Record ledger row + update sweep status with conn.cursor() as cur2: cur2.execute( """ INSERT INTO crypto_payment_ledger (order_id, order_type, coin, movement_type, amount_coin, amount_usd_cents, provider, provider_ref, provider_status, state, idempotency_key, notes) VALUES ('COLD-SWEEP', 'internal', %s, 'sweep', %s, %s, 'shkeeper', %s, 'broadcast', 'pending', %s, 'Cold wallet sweep of excess above hot float') RETURNING id """, ( coin, f"-{amount_coin}", -excess_usd_cents, payout.withdraw_id, f"sweep:{sweep_id}:{payout.withdraw_id}", ), ) ledger_id = cur2.fetchone()[0] cur2.execute( """ UPDATE cold_wallet_sweeps SET status = 'broadcast', shkeeper_withdraw_id = %s, tx_hash = %s, ledger_entry_id = %s, updated_at = NOW() WHERE id = %s """, (payout.withdraw_id, payout.tx_hash, ledger_id, sweep_id), ) conn.commit() counts["swept_auto"] += 1 log.info( "sweep %s: %.8f %s → %s (withdraw %s)", sweep_id, amount_coin, coin, cfg["cold_address"][:12] + "…", payout.withdraw_id, ) finally: conn.close() return counts def _notify_admin(sweep_id: int, coin: str, amount_usd: float) -> None: """Notify ops that a sweep needs manual approval. Stub — real channel (SMTP via relay_integration.send_email, or Slack webhook) plugged in during first real deploy. """ log.info( "TODO notify %s: sweep #%s needs approval — $%.2f of %s", ADMIN_EMAIL, sweep_id, amount_usd, coin, ) # ── CLI ──────────────────────────────────────────────────────────────── async def _main(): parser = argparse.ArgumentParser() parser.add_argument("--seed", action="store_true", help="seed cold_wallet_config from env and exit") parser.add_argument("--loop", action="store_true", help="daemon mode (1800s interval)") parser.add_argument("--interval", type=int, default=1800, help="poll interval seconds") parser.add_argument("--skip-validation", action="store_true", help="skip startup checksum validation") args = parser.parse_args() if args.seed: seed_from_env() return if not args.skip_validation: if not validate_startup(): log.error("startup validation failed; exiting") sys.exit(1) if args.loop: log.info("starting cold wallet sweeper loop (interval=%ss)", args.interval) while True: try: counts = await sweep_once() log.info("pass complete: %s", counts) except Exception as exc: log.exception("sweep_once raised: %s", exc) await asyncio.sleep(args.interval) else: counts = await sweep_once() log.info("done: %s", counts) if __name__ == "__main__": asyncio.run(_main())