new-site/scripts/workers/relay_deposit_monitor.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

442 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Relay Deposit Monitor — IMAP watcher for Relay ACH deposit notifications.
Watches relay-deposits@performancewest.net for forwarded emails from support@relayfi.com.
When a deposit notification is detected, records it in the relay_deposits table and
processes pending "Awaiting Funds" orders using a FIFO reservation system.
Email format (from Relay):
From: support@relayfi.com
Subject: (varies — may contain "received", "deposit", "transfer")
Body: "You have received an ACH transfer!
Hi, You received $XX.XX from FID BKG SVC LLC.
The ACH transfer has been deposited into Performance West Inc's Business Checking account."
"FID BKG SVC LLC" = Fidelity National Information Services / Stripe ACH originator.
Other ACH senders are still recorded but treated as generic deposits.
Architecture:
IMAP IDLE / periodic poll → parse email → INSERT relay_deposits
→ process_pending_orders() → FIFO check filing_fee_reservations view
→ UPDATE sales order workflow via ERPNext API → dispatch filing job to worker
Run as a long-running process (cron every 5 min, or IMAP IDLE if server supports it).
"""
from __future__ import annotations
import email
import imaplib
import logging
import os
import re
import time
from datetime import datetime
from email.header import decode_header
from typing import Optional
import psycopg2
import psycopg2.extras
LOG = logging.getLogger("workers.relay_deposit_monitor")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
# ── IMAP configuration ──────────────────────────────────────────────────────
IMAP_HOST = os.getenv("RELAY_IMAP_HOST", "mail.performancewest.net")
IMAP_PORT = int(os.getenv("RELAY_IMAP_PORT", "993"))
IMAP_USER = os.getenv("RELAY_IMAP_USER", "relay-deposits@performancewest.net")
IMAP_PASS = os.getenv("RELAY_IMAP_PASS", "")
IMAP_FOLDER = os.getenv("RELAY_IMAP_FOLDER", "INBOX")
# ── Relay sender ─────────────────────────────────────────────────────────────
RELAY_SENDER = "support@relayfi.com"
STRIPE_SENDER_NAME = "FID BKG SVC LLC" # Stripe's ACH originator name in Relay emails
# ── Database ─────────────────────────────────────────────────────────────────
DATABASE_URL = os.getenv("DATABASE_URL", "")
# ── ERPNext / Worker ─────────────────────────────────────────────────────────
ERPNEXT_URL = os.getenv("ERPNEXT_URL", "http://erpnext:8080")
ERPNEXT_KEY = os.getenv("ERPNEXT_API_KEY", "")
ERPNEXT_SECRET = os.getenv("ERPNEXT_API_SECRET", "")
WORKER_URL = os.getenv("WORKER_URL", "http://workers:8090")
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "")
# ── BC incorporation fee estimate in USD cents ────────────────────────────────
# C$350 (incorporation) + C$99 (mailbox) at ~0.73 USD/CAD ≈ $326 USD
# We add a 10% buffer for exchange rate fluctuation
BC_FILING_FEE_USD_CENTS = int(os.getenv("BC_FILING_FEE_USD_CENTS", "36000")) # $360 buffer
# ── Poll interval ────────────────────────────────────────────────────────────
POLL_INTERVAL_SECONDS = int(os.getenv("RELAY_POLL_INTERVAL", "300")) # 5 min
# ── Amount regex ─────────────────────────────────────────────────────────────
AMOUNT_RE = re.compile(r"You received \$([0-9,]+\.\d{2})", re.IGNORECASE)
SENDER_RE = re.compile(r"received \$[0-9,]+\.\d{2} from (.+?)\.", re.IGNORECASE)
# ─────────────────────────────────────────────────────────────────────────────
# Database helpers
# ─────────────────────────────────────────────────────────────────────────────
def get_db():
"""Return a psycopg2 connection."""
return psycopg2.connect(DATABASE_URL, cursor_factory=psycopg2.extras.RealDictCursor)
def record_deposit(
amount_cents: int,
sender_name: str,
source: str,
email_uid: str,
email_subject: str,
) -> Optional[int]:
"""Insert a deposit record. Returns the new row id, or None if duplicate."""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO relay_deposits
(amount_cents, sender_name, source, email_uid, email_subject)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (email_uid) DO NOTHING
RETURNING id
""",
(amount_cents, sender_name, source, email_uid, email_subject),
)
row = cur.fetchone()
conn.commit()
if row:
LOG.info(
"Recorded deposit: $%.2f from %s (id=%s)",
amount_cents / 100, sender_name, row["id"],
)
return row["id"]
LOG.debug("Duplicate email_uid %s — skipping", email_uid)
return None
except Exception as exc:
LOG.error("Failed to record deposit: %s", exc)
return None
def get_available_balance_cents() -> int:
"""Return available balance = total deposits reserved/spent reservations."""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT available_cents FROM relay_available_balance")
row = cur.fetchone()
return int(row["available_cents"]) if row else 0
except Exception as exc:
LOG.error("Failed to read available balance: %s", exc)
return 0
def get_pending_orders() -> list[dict]:
"""
Return CRTC orders in 'Awaiting Funds' state, ordered FIFO.
These are orders that have been paid by customer but not yet filed with BC.
"""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
o.order_number,
o.erpnext_order_name,
o.payment_method,
COALESCE(r.amount_cents, %s) AS filing_fee_cents
FROM canada_crtc_orders o
LEFT JOIN filing_fee_reservations r
ON r.order_id = o.order_number AND r.status = 'pending'
WHERE o.payment_status = 'paid'
AND o.status = 'awaiting_funds'
ORDER BY o.created_at ASC
""",
(BC_FILING_FEE_USD_CENTS,),
)
return [dict(r) for r in cur.fetchall()]
except Exception as exc:
LOG.error("Failed to fetch pending orders: %s", exc)
return []
def reserve_filing_fee(order_id: str, order_type: str, amount_cents: int, deposit_id: Optional[int]) -> bool:
"""Mark a filing fee as reserved for an order."""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO filing_fee_reservations
(order_id, order_type, amount_cents, status, relay_deposit_id, reserved_at)
VALUES (%s, %s, %s, 'reserved', %s, NOW())
ON CONFLICT DO NOTHING
""",
(order_id, order_type, amount_cents, deposit_id),
)
# Mark the canada_crtc_orders row as no longer awaiting funds
cur.execute(
"UPDATE canada_crtc_orders SET status = 'funded' WHERE order_number = %s",
(order_id,),
)
conn.commit()
LOG.info("Reserved $%.2f for order %s", amount_cents / 100, order_id)
return True
except Exception as exc:
LOG.error("Failed to reserve filing fee for %s: %s", order_id, exc)
return False
# ─────────────────────────────────────────────────────────────────────────────
# ERPNext / Worker helpers
# ─────────────────────────────────────────────────────────────────────────────
def advance_to_incorporation(erpnext_order_name: str, order_number: str) -> bool:
"""Advance the ERPNext Sales Order from 'Awaiting Funds''Incorporation'."""
if not ERPNEXT_KEY or not erpnext_order_name:
LOG.warning("ERPNext not configured — cannot advance workflow for %s", order_number)
return False
try:
import urllib.request
import urllib.error
import json as _json
data = _json.dumps({
"cmd": "frappe.model.workflow.apply_workflow",
"doc": _json.dumps({"doctype": "Sales Order", "name": erpnext_order_name}),
"action": "Funds Available",
}).encode()
req = urllib.request.Request(
f"{ERPNEXT_URL}/api/method/frappe.client.call",
data=data,
headers={
"Content-Type": "application/json",
"Authorization": f"token {ERPNEXT_KEY}:{ERPNEXT_SECRET}",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
LOG.info("Workflow advanced to Incorporation for %s", erpnext_order_name)
return resp.status == 200
except Exception as exc:
LOG.error("Failed to advance ERPNext workflow for %s: %s", erpnext_order_name, exc)
return False
def dispatch_incorporation_job(order_name: str, order_number: str) -> bool:
"""Tell the job server to start BC incorporation for this order."""
try:
import urllib.request
import json as _json
payload = _json.dumps({
"action": "file_bc_incorporation",
"order_name": order_name,
"order_number": order_number,
}).encode()
req = urllib.request.Request(
f"{WORKER_URL}/jobs",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
LOG.info("Dispatched incorporation job for %s", order_number)
return resp.status in (200, 202)
except Exception as exc:
LOG.error("Failed to dispatch incorporation job for %s: %s", order_number, exc)
return False
# ─────────────────────────────────────────────────────────────────────────────
# Core processing logic
# ─────────────────────────────────────────────────────────────────────────────
def process_pending_orders(deposit_id: Optional[int] = None) -> int:
"""
FIFO: check available balance and advance orders out of 'Awaiting Funds'.
Returns the number of orders advanced.
"""
available = get_available_balance_cents()
LOG.info("Available balance: $%.2f. Checking pending orders…", available / 100)
if available <= 0:
LOG.info("No available balance — no orders can be advanced")
return 0
orders = get_pending_orders()
if not orders:
LOG.info("No orders awaiting funds")
return 0
advanced = 0
for order in orders:
fee = order["filing_fee_cents"]
order_id = order["order_number"]
erp_name = order.get("erpnext_order_name") or order_id
if available < fee:
LOG.info(
"Insufficient balance ($%.2f) for order %s (needs $%.2f) — stopping",
available / 100, order_id, fee / 100,
)
break
# Reserve the fee
if not reserve_filing_fee(order_id, "canada_crtc", fee, deposit_id):
continue
available -= fee
# Advance ERPNext workflow
if advance_to_incorporation(erp_name, order_id):
# Dispatch job to worker — the CRTC webhook will also do this,
# but we dispatch directly as a belt-and-suspenders measure
dispatch_incorporation_job(erp_name, order_id)
advanced += 1
else:
LOG.warning("Workflow advance failed for %s — reservation kept", order_id)
LOG.info("Advanced %d order(s) from Awaiting Funds to Incorporation", advanced)
return advanced
def parse_deposit_email(msg: email.message.Message) -> Optional[tuple[int, str]]:
"""
Parse a Relay deposit notification email.
Returns (amount_cents, sender_name) or None if not a deposit email.
"""
# Check sender
from_addr = msg.get("From", "")
if RELAY_SENDER not in from_addr.lower():
return None
# Extract body text
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
break
else:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
# Must contain ACH transfer signal
if "ACH transfer" not in body and "received" not in body.lower():
return None
# Extract amount
amount_match = AMOUNT_RE.search(body)
if not amount_match:
LOG.debug("No amount found in Relay email body")
return None
amount_str = amount_match.group(1).replace(",", "")
amount_cents = int(float(amount_str) * 100)
# Extract sender name
sender_match = SENDER_RE.search(body)
sender_name = sender_match.group(1).strip() if sender_match else "Unknown"
return amount_cents, sender_name
# ─────────────────────────────────────────────────────────────────────────────
# IMAP polling loop
# ─────────────────────────────────────────────────────────────────────────────
def poll_once() -> int:
"""
Connect to IMAP, fetch unseen messages, process deposits.
Returns number of new deposits found.
"""
if not IMAP_PASS:
LOG.warning("RELAY_IMAP_PASS not set — skipping IMAP poll")
return 0
deposits_found = 0
try:
mail = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
mail.login(IMAP_USER, IMAP_PASS)
mail.select(IMAP_FOLDER)
# Search for unread messages from Relay
_, msg_nums = mail.search(None, f'(UNSEEN FROM "{RELAY_SENDER}")')
num_list = msg_nums[0].split()
if not num_list:
LOG.debug("No new Relay emails")
mail.logout()
return 0
LOG.info("Found %d unseen Relay email(s)", len(num_list))
for num in num_list:
_, msg_data = mail.fetch(num, "(UID RFC822)")
if not msg_data or not msg_data[0]:
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
# Get IMAP UID for deduplication
uid_data = mail.fetch(num, "(UID)")[1][0].decode()
uid_match = re.search(r"UID (\d+)", uid_data)
email_uid = uid_match.group(1) if uid_match else num.decode()
result = parse_deposit_email(msg)
if result is None:
LOG.debug("Email %s is not a deposit notification — skipping", email_uid)
# Still mark as read so we don't re-process
mail.store(num, "+FLAGS", "\\Seen")
continue
amount_cents, sender_name = result
source = "stripe" if STRIPE_SENDER_NAME in sender_name else "other"
subject = msg.get("Subject", "")
deposit_id = record_deposit(amount_cents, sender_name, source, email_uid, subject)
# Mark as read
mail.store(num, "+FLAGS", "\\Seen")
if deposit_id:
deposits_found += 1
# Process pending orders immediately after recording deposit
process_pending_orders(deposit_id)
mail.logout()
except imaplib.IMAP4.error as exc:
LOG.error("IMAP error: %s", exc)
except Exception as exc:
LOG.error("Unexpected error in poll_once: %s", exc)
return deposits_found
def run_forever() -> None:
"""Main loop: poll IMAP every POLL_INTERVAL_SECONDS seconds."""
LOG.info(
"Relay deposit monitor starting. Polling %s@%s every %ds",
IMAP_USER, IMAP_HOST, POLL_INTERVAL_SECONDS,
)
while True:
try:
poll_once()
except Exception as exc:
LOG.error("Poll cycle failed: %s", exc)
time.sleep(POLL_INTERVAL_SECONDS)
if __name__ == "__main__":
run_forever()