Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
442 lines
18 KiB
Python
442 lines
18 KiB
Python
"""
|
||
Relay Deposit Monitor — IMAP watcher for Relay ACH deposit notifications.
|
||
|
||
Watches relay-deposits@performancewest.net for forwarded emails from support@relayfi.com.
|
||
When a deposit notification is detected, records it in the relay_deposits table and
|
||
processes pending "Awaiting Funds" orders using a FIFO reservation system.
|
||
|
||
Email format (from Relay):
|
||
From: support@relayfi.com
|
||
Subject: (varies — may contain "received", "deposit", "transfer")
|
||
Body: "You have received an ACH transfer!
|
||
Hi, You received $XX.XX from FID BKG SVC LLC.
|
||
The ACH transfer has been deposited into Performance West Inc's Business Checking account."
|
||
|
||
"FID BKG SVC LLC" = Fidelity National Information Services / Stripe ACH originator.
|
||
Other ACH senders are still recorded but treated as generic deposits.
|
||
|
||
Architecture:
|
||
IMAP IDLE / periodic poll → parse email → INSERT relay_deposits
|
||
→ process_pending_orders() → FIFO check filing_fee_reservations view
|
||
→ UPDATE sales order workflow via ERPNext API → dispatch filing job to worker
|
||
|
||
Run as a long-running process (cron every 5 min, or IMAP IDLE if server supports it).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import email
|
||
import imaplib
|
||
import logging
|
||
import os
|
||
import re
|
||
import time
|
||
from datetime import datetime
|
||
from email.header import decode_header
|
||
from typing import Optional
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
|
||
LOG = logging.getLogger("workers.relay_deposit_monitor")
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
||
)
|
||
|
||
# ── IMAP configuration ──────────────────────────────────────────────────────
|
||
IMAP_HOST = os.getenv("RELAY_IMAP_HOST", "mail.performancewest.net")
|
||
IMAP_PORT = int(os.getenv("RELAY_IMAP_PORT", "993"))
|
||
IMAP_USER = os.getenv("RELAY_IMAP_USER", "relay-deposits@performancewest.net")
|
||
IMAP_PASS = os.getenv("RELAY_IMAP_PASS", "")
|
||
IMAP_FOLDER = os.getenv("RELAY_IMAP_FOLDER", "INBOX")
|
||
|
||
# ── Relay sender ─────────────────────────────────────────────────────────────
|
||
RELAY_SENDER = "support@relayfi.com"
|
||
STRIPE_SENDER_NAME = "FID BKG SVC LLC" # Stripe's ACH originator name in Relay emails
|
||
|
||
# ── Database ─────────────────────────────────────────────────────────────────
|
||
DATABASE_URL = os.getenv("DATABASE_URL", "")
|
||
|
||
# ── ERPNext / Worker ─────────────────────────────────────────────────────────
|
||
ERPNEXT_URL = os.getenv("ERPNEXT_URL", "http://erpnext:8080")
|
||
ERPNEXT_KEY = os.getenv("ERPNEXT_API_KEY", "")
|
||
ERPNEXT_SECRET = os.getenv("ERPNEXT_API_SECRET", "")
|
||
WORKER_URL = os.getenv("WORKER_URL", "http://workers:8090")
|
||
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "")
|
||
|
||
# ── BC incorporation fee estimate in USD cents ────────────────────────────────
|
||
# C$350 (incorporation) + C$99 (mailbox) at ~0.73 USD/CAD ≈ $326 USD
|
||
# We add a 10% buffer for exchange rate fluctuation
|
||
BC_FILING_FEE_USD_CENTS = int(os.getenv("BC_FILING_FEE_USD_CENTS", "36000")) # $360 buffer
|
||
|
||
# ── Poll interval ────────────────────────────────────────────────────────────
|
||
POLL_INTERVAL_SECONDS = int(os.getenv("RELAY_POLL_INTERVAL", "300")) # 5 min
|
||
|
||
# ── Amount regex ─────────────────────────────────────────────────────────────
|
||
AMOUNT_RE = re.compile(r"You received \$([0-9,]+\.\d{2})", re.IGNORECASE)
|
||
SENDER_RE = re.compile(r"received \$[0-9,]+\.\d{2} from (.+?)\.", re.IGNORECASE)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Database helpers
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def get_db():
|
||
"""Return a psycopg2 connection."""
|
||
return psycopg2.connect(DATABASE_URL, cursor_factory=psycopg2.extras.RealDictCursor)
|
||
|
||
|
||
def record_deposit(
|
||
amount_cents: int,
|
||
sender_name: str,
|
||
source: str,
|
||
email_uid: str,
|
||
email_subject: str,
|
||
) -> Optional[int]:
|
||
"""Insert a deposit record. Returns the new row id, or None if duplicate."""
|
||
try:
|
||
with get_db() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO relay_deposits
|
||
(amount_cents, sender_name, source, email_uid, email_subject)
|
||
VALUES (%s, %s, %s, %s, %s)
|
||
ON CONFLICT (email_uid) DO NOTHING
|
||
RETURNING id
|
||
""",
|
||
(amount_cents, sender_name, source, email_uid, email_subject),
|
||
)
|
||
row = cur.fetchone()
|
||
conn.commit()
|
||
if row:
|
||
LOG.info(
|
||
"Recorded deposit: $%.2f from %s (id=%s)",
|
||
amount_cents / 100, sender_name, row["id"],
|
||
)
|
||
return row["id"]
|
||
LOG.debug("Duplicate email_uid %s — skipping", email_uid)
|
||
return None
|
||
except Exception as exc:
|
||
LOG.error("Failed to record deposit: %s", exc)
|
||
return None
|
||
|
||
|
||
def get_available_balance_cents() -> int:
|
||
"""Return available balance = total deposits − reserved/spent reservations."""
|
||
try:
|
||
with get_db() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT available_cents FROM relay_available_balance")
|
||
row = cur.fetchone()
|
||
return int(row["available_cents"]) if row else 0
|
||
except Exception as exc:
|
||
LOG.error("Failed to read available balance: %s", exc)
|
||
return 0
|
||
|
||
|
||
def get_pending_orders() -> list[dict]:
|
||
"""
|
||
Return CRTC orders in 'Awaiting Funds' state, ordered FIFO.
|
||
These are orders that have been paid by customer but not yet filed with BC.
|
||
"""
|
||
try:
|
||
with get_db() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT
|
||
o.order_number,
|
||
o.erpnext_order_name,
|
||
o.payment_method,
|
||
COALESCE(r.amount_cents, %s) AS filing_fee_cents
|
||
FROM canada_crtc_orders o
|
||
LEFT JOIN filing_fee_reservations r
|
||
ON r.order_id = o.order_number AND r.status = 'pending'
|
||
WHERE o.payment_status = 'paid'
|
||
AND o.status = 'awaiting_funds'
|
||
ORDER BY o.created_at ASC
|
||
""",
|
||
(BC_FILING_FEE_USD_CENTS,),
|
||
)
|
||
return [dict(r) for r in cur.fetchall()]
|
||
except Exception as exc:
|
||
LOG.error("Failed to fetch pending orders: %s", exc)
|
||
return []
|
||
|
||
|
||
def reserve_filing_fee(order_id: str, order_type: str, amount_cents: int, deposit_id: Optional[int]) -> bool:
|
||
"""Mark a filing fee as reserved for an order."""
|
||
try:
|
||
with get_db() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO filing_fee_reservations
|
||
(order_id, order_type, amount_cents, status, relay_deposit_id, reserved_at)
|
||
VALUES (%s, %s, %s, 'reserved', %s, NOW())
|
||
ON CONFLICT DO NOTHING
|
||
""",
|
||
(order_id, order_type, amount_cents, deposit_id),
|
||
)
|
||
# Mark the canada_crtc_orders row as no longer awaiting funds
|
||
cur.execute(
|
||
"UPDATE canada_crtc_orders SET status = 'funded' WHERE order_number = %s",
|
||
(order_id,),
|
||
)
|
||
conn.commit()
|
||
LOG.info("Reserved $%.2f for order %s", amount_cents / 100, order_id)
|
||
return True
|
||
except Exception as exc:
|
||
LOG.error("Failed to reserve filing fee for %s: %s", order_id, exc)
|
||
return False
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# ERPNext / Worker helpers
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def advance_to_incorporation(erpnext_order_name: str, order_number: str) -> bool:
|
||
"""Advance the ERPNext Sales Order from 'Awaiting Funds' → 'Incorporation'."""
|
||
if not ERPNEXT_KEY or not erpnext_order_name:
|
||
LOG.warning("ERPNext not configured — cannot advance workflow for %s", order_number)
|
||
return False
|
||
try:
|
||
import urllib.request
|
||
import urllib.error
|
||
import json as _json
|
||
|
||
data = _json.dumps({
|
||
"cmd": "frappe.model.workflow.apply_workflow",
|
||
"doc": _json.dumps({"doctype": "Sales Order", "name": erpnext_order_name}),
|
||
"action": "Funds Available",
|
||
}).encode()
|
||
|
||
req = urllib.request.Request(
|
||
f"{ERPNEXT_URL}/api/method/frappe.client.call",
|
||
data=data,
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"token {ERPNEXT_KEY}:{ERPNEXT_SECRET}",
|
||
},
|
||
method="POST",
|
||
)
|
||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||
LOG.info("Workflow advanced to Incorporation for %s", erpnext_order_name)
|
||
return resp.status == 200
|
||
except Exception as exc:
|
||
LOG.error("Failed to advance ERPNext workflow for %s: %s", erpnext_order_name, exc)
|
||
return False
|
||
|
||
|
||
def dispatch_incorporation_job(order_name: str, order_number: str) -> bool:
|
||
"""Tell the job server to start BC incorporation for this order."""
|
||
try:
|
||
import urllib.request
|
||
import json as _json
|
||
|
||
payload = _json.dumps({
|
||
"action": "file_bc_incorporation",
|
||
"order_name": order_name,
|
||
"order_number": order_number,
|
||
}).encode()
|
||
|
||
req = urllib.request.Request(
|
||
f"{WORKER_URL}/jobs",
|
||
data=payload,
|
||
headers={"Content-Type": "application/json"},
|
||
method="POST",
|
||
)
|
||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||
LOG.info("Dispatched incorporation job for %s", order_number)
|
||
return resp.status in (200, 202)
|
||
except Exception as exc:
|
||
LOG.error("Failed to dispatch incorporation job for %s: %s", order_number, exc)
|
||
return False
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Core processing logic
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def process_pending_orders(deposit_id: Optional[int] = None) -> int:
|
||
"""
|
||
FIFO: check available balance and advance orders out of 'Awaiting Funds'.
|
||
|
||
Returns the number of orders advanced.
|
||
"""
|
||
available = get_available_balance_cents()
|
||
LOG.info("Available balance: $%.2f. Checking pending orders…", available / 100)
|
||
|
||
if available <= 0:
|
||
LOG.info("No available balance — no orders can be advanced")
|
||
return 0
|
||
|
||
orders = get_pending_orders()
|
||
if not orders:
|
||
LOG.info("No orders awaiting funds")
|
||
return 0
|
||
|
||
advanced = 0
|
||
for order in orders:
|
||
fee = order["filing_fee_cents"]
|
||
order_id = order["order_number"]
|
||
erp_name = order.get("erpnext_order_name") or order_id
|
||
|
||
if available < fee:
|
||
LOG.info(
|
||
"Insufficient balance ($%.2f) for order %s (needs $%.2f) — stopping",
|
||
available / 100, order_id, fee / 100,
|
||
)
|
||
break
|
||
|
||
# Reserve the fee
|
||
if not reserve_filing_fee(order_id, "canada_crtc", fee, deposit_id):
|
||
continue
|
||
|
||
available -= fee
|
||
|
||
# Advance ERPNext workflow
|
||
if advance_to_incorporation(erp_name, order_id):
|
||
# Dispatch job to worker — the CRTC webhook will also do this,
|
||
# but we dispatch directly as a belt-and-suspenders measure
|
||
dispatch_incorporation_job(erp_name, order_id)
|
||
advanced += 1
|
||
else:
|
||
LOG.warning("Workflow advance failed for %s — reservation kept", order_id)
|
||
|
||
LOG.info("Advanced %d order(s) from Awaiting Funds to Incorporation", advanced)
|
||
return advanced
|
||
|
||
|
||
def parse_deposit_email(msg: email.message.Message) -> Optional[tuple[int, str]]:
|
||
"""
|
||
Parse a Relay deposit notification email.
|
||
|
||
Returns (amount_cents, sender_name) or None if not a deposit email.
|
||
"""
|
||
# Check sender
|
||
from_addr = msg.get("From", "")
|
||
if RELAY_SENDER not in from_addr.lower():
|
||
return None
|
||
|
||
# Extract body text
|
||
body = ""
|
||
if msg.is_multipart():
|
||
for part in msg.walk():
|
||
if part.get_content_type() == "text/plain":
|
||
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
|
||
break
|
||
else:
|
||
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
|
||
|
||
# Must contain ACH transfer signal
|
||
if "ACH transfer" not in body and "received" not in body.lower():
|
||
return None
|
||
|
||
# Extract amount
|
||
amount_match = AMOUNT_RE.search(body)
|
||
if not amount_match:
|
||
LOG.debug("No amount found in Relay email body")
|
||
return None
|
||
|
||
amount_str = amount_match.group(1).replace(",", "")
|
||
amount_cents = int(float(amount_str) * 100)
|
||
|
||
# Extract sender name
|
||
sender_match = SENDER_RE.search(body)
|
||
sender_name = sender_match.group(1).strip() if sender_match else "Unknown"
|
||
|
||
return amount_cents, sender_name
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# IMAP polling loop
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def poll_once() -> int:
|
||
"""
|
||
Connect to IMAP, fetch unseen messages, process deposits.
|
||
Returns number of new deposits found.
|
||
"""
|
||
if not IMAP_PASS:
|
||
LOG.warning("RELAY_IMAP_PASS not set — skipping IMAP poll")
|
||
return 0
|
||
|
||
deposits_found = 0
|
||
try:
|
||
mail = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
|
||
mail.login(IMAP_USER, IMAP_PASS)
|
||
mail.select(IMAP_FOLDER)
|
||
|
||
# Search for unread messages from Relay
|
||
_, msg_nums = mail.search(None, f'(UNSEEN FROM "{RELAY_SENDER}")')
|
||
num_list = msg_nums[0].split()
|
||
|
||
if not num_list:
|
||
LOG.debug("No new Relay emails")
|
||
mail.logout()
|
||
return 0
|
||
|
||
LOG.info("Found %d unseen Relay email(s)", len(num_list))
|
||
|
||
for num in num_list:
|
||
_, msg_data = mail.fetch(num, "(UID RFC822)")
|
||
if not msg_data or not msg_data[0]:
|
||
continue
|
||
|
||
raw = msg_data[0][1]
|
||
msg = email.message_from_bytes(raw)
|
||
|
||
# Get IMAP UID for deduplication
|
||
uid_data = mail.fetch(num, "(UID)")[1][0].decode()
|
||
uid_match = re.search(r"UID (\d+)", uid_data)
|
||
email_uid = uid_match.group(1) if uid_match else num.decode()
|
||
|
||
result = parse_deposit_email(msg)
|
||
if result is None:
|
||
LOG.debug("Email %s is not a deposit notification — skipping", email_uid)
|
||
# Still mark as read so we don't re-process
|
||
mail.store(num, "+FLAGS", "\\Seen")
|
||
continue
|
||
|
||
amount_cents, sender_name = result
|
||
source = "stripe" if STRIPE_SENDER_NAME in sender_name else "other"
|
||
subject = msg.get("Subject", "")
|
||
|
||
deposit_id = record_deposit(amount_cents, sender_name, source, email_uid, subject)
|
||
|
||
# Mark as read
|
||
mail.store(num, "+FLAGS", "\\Seen")
|
||
|
||
if deposit_id:
|
||
deposits_found += 1
|
||
# Process pending orders immediately after recording deposit
|
||
process_pending_orders(deposit_id)
|
||
|
||
mail.logout()
|
||
except imaplib.IMAP4.error as exc:
|
||
LOG.error("IMAP error: %s", exc)
|
||
except Exception as exc:
|
||
LOG.error("Unexpected error in poll_once: %s", exc)
|
||
|
||
return deposits_found
|
||
|
||
|
||
def run_forever() -> None:
|
||
"""Main loop: poll IMAP every POLL_INTERVAL_SECONDS seconds."""
|
||
LOG.info(
|
||
"Relay deposit monitor starting. Polling %s@%s every %ds",
|
||
IMAP_USER, IMAP_HOST, POLL_INTERVAL_SECONDS,
|
||
)
|
||
while True:
|
||
try:
|
||
poll_once()
|
||
except Exception as exc:
|
||
LOG.error("Poll cycle failed: %s", exc)
|
||
time.sleep(POLL_INTERVAL_SECONDS)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run_forever()
|