Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
395 lines
14 KiB
Python
395 lines
14 KiB
Python
"""
|
|
Conversation Monitor — Anti-Bypass Detection for Accounting Support.
|
|
|
|
Scans messages in accounting support Issue threads for attempts to
|
|
exchange direct contact info or move the conversation off-platform.
|
|
|
|
Called via ERPNext webhook when a Communication is created on an
|
|
Accounting Support Issue, or can be called directly by the job server.
|
|
|
|
Actions on detection:
|
|
1. Warning banner injected into the Issue thread (message still delivered)
|
|
2. Admin alert created (ERPNext Issue, type: Compliance Alert)
|
|
3. Flag logged in conversation_flags table
|
|
4. Escalation on repeat flags (3rd flag = admin review + possible suspension)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
|
|
LOG = logging.getLogger("workers.conversation_monitor")
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "")
|
|
|
|
# =========================================================================
|
|
# Flagged patterns — comprehensive coverage of bypass attempts
|
|
# =========================================================================
|
|
|
|
# Email addresses (excluding whitelisted .ca domains and known safe domains)
|
|
EMAIL_PATTERN = re.compile(
|
|
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Phone numbers (North American 10-digit, international with +, various formats)
|
|
PHONE_PATTERNS = [
|
|
re.compile(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b'), # 555-123-4567
|
|
re.compile(r'\b\(\d{3}\)\s?\d{3}[-.\s]?\d{4}\b'), # (555) 123-4567
|
|
re.compile(r'\+\d{1,3}[-.\s]?\(?\d{1,4}\)?[-.\s]?\d{3,12}\b'), # +1 555-123-4567
|
|
re.compile(r'\b1[-.\s]?\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b'), # 1-555-123-4567
|
|
]
|
|
|
|
# Messaging platforms and communication apps
|
|
MESSAGING_PLATFORMS = re.compile(
|
|
r'\b('
|
|
r'whatsapp|whats\s?app|'
|
|
r'telegram|tg\s?://|t\.me/|'
|
|
r'signal|signal\.me|'
|
|
r'skype|skype\.com|'
|
|
r'facetime|face\s?time|'
|
|
r'imessage|i\s?message|'
|
|
r'facebook\s?messenger|fb\s?messenger|messenger\.com|'
|
|
r'wechat|we\s?chat|'
|
|
r'viber|'
|
|
r'line\s?app|'
|
|
r'discord|discord\.gg|'
|
|
r'slack|'
|
|
r'zoom\.us|zoom\s?meeting|'
|
|
r'teams\.ms|microsoft\s?teams|ms\s?teams|'
|
|
r'google\s?meet|meet\.google|'
|
|
r'webex|'
|
|
r'sms|text\s?me|text\s?message|send\s?me\s?a\s?text|'
|
|
r'imsg|'
|
|
r'snap\s?chat|snapchat'
|
|
r')\b',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Direct contact solicitation language
|
|
BYPASS_LANGUAGE = re.compile(
|
|
r'\b('
|
|
r'work\s?directly|go\s?direct|cut\s?(them|us|pw|performance)\s?out|'
|
|
r'outside\s?(the|this)\s?(portal|platform|system)|'
|
|
r'off[-.\s]?platform|off[-.\s]?system|'
|
|
r'my\s?personal\s?(email|number|phone|cell)|'
|
|
r'my\s?private\s?(email|number|phone|cell)|'
|
|
r'reach\s?me\s?(at|on|via)|'
|
|
r'contact\s?me\s?(at|on|via|directly)|'
|
|
r'here\'?s?\s?my\s?(number|email|contact|cell|mobile)|'
|
|
r'don\'?t\s?tell\s?(them|pw|performance|anyone)|'
|
|
r'between\s?(you\s?and\s?me|us\s?only|just\s?us)|'
|
|
r'private\s?arrangement|'
|
|
r'side\s?deal|'
|
|
r'take\s?this\s?(offline|outside|private)|'
|
|
r'let\'?s?\s?(talk|chat|meet|connect)\s?(outside|directly|privately)|'
|
|
r'I\s?can\s?(do|offer)\s?(it|this)\s?(cheaper|for\s?less|directly)|'
|
|
r'save\s?you\s?(money|the\s?fee|the\s?markup)'
|
|
r')\b',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Payment bypass (direct payment solicitation)
|
|
PAYMENT_BYPASS = re.compile(
|
|
r'\b('
|
|
r'venmo|cash\s?app|cashapp|'
|
|
r'zelle|'
|
|
r'e[-.\s]?transfer|interac\s?transfer|'
|
|
r'pay\s?me\s?directly|pay\s?outside|send\s?payment\s?to|'
|
|
r'wire\s?(me|transfer\s?to)|'
|
|
r'my\s?(bank|account)\s?(details|number|info)|'
|
|
r'invoice\s?you\s?directly|'
|
|
r'crypto|bitcoin|btc|eth|usdt'
|
|
r')\b',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Social media handles
|
|
SOCIAL_MEDIA = re.compile(
|
|
r'\b('
|
|
r'@[A-Za-z0-9_]{2,30}\b|' # @username pattern
|
|
r'(instagram|insta)\.com/|'
|
|
r'twitter\.com/|x\.com/|'
|
|
r'linkedin\.com/in/|'
|
|
r'facebook\.com/|fb\.com/|'
|
|
r'tiktok\.com/@|'
|
|
r'(my|find\s?me\s?on)\s?(linkedin|instagram|twitter|facebook|x\b)'
|
|
r')',
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def scan_message(
|
|
message_text: str,
|
|
sender_email: str,
|
|
whitelisted_emails: list[str],
|
|
whitelisted_phones: list[str],
|
|
) -> list[dict]:
|
|
"""Scan a message for bypass attempt patterns.
|
|
|
|
Args:
|
|
message_text: The message content to scan
|
|
sender_email: Who sent this message
|
|
whitelisted_emails: Emails that are safe to mention (client's .ca, PW emails, etc.)
|
|
whitelisted_phones: Phone numbers already on file
|
|
|
|
Returns:
|
|
List of flag dicts: [{"pattern": "...", "matched_text": "...", "category": "..."}]
|
|
"""
|
|
flags: list[dict] = []
|
|
text = message_text
|
|
|
|
# Check for email addresses (excluding whitelisted)
|
|
for match in EMAIL_PATTERN.finditer(text):
|
|
found_email = match.group().lower()
|
|
is_whitelisted = any(
|
|
found_email == wl.lower() or
|
|
found_email.endswith("@performancewest.net") or
|
|
found_email.endswith("@carrierone.com")
|
|
for wl in whitelisted_emails
|
|
)
|
|
if not is_whitelisted:
|
|
flags.append({
|
|
"pattern": "email_address",
|
|
"matched_text": found_email,
|
|
"category": "direct_contact",
|
|
})
|
|
|
|
# Check for phone numbers (excluding whitelisted)
|
|
for pattern in PHONE_PATTERNS:
|
|
for match in pattern.finditer(text):
|
|
found_phone = re.sub(r'[-.\s()\+]', '', match.group())
|
|
is_whitelisted = any(
|
|
re.sub(r'[-.\s()\+]', '', wl) == found_phone
|
|
for wl in whitelisted_phones
|
|
)
|
|
if not is_whitelisted and len(found_phone) >= 10:
|
|
flags.append({
|
|
"pattern": "phone_number",
|
|
"matched_text": match.group(),
|
|
"category": "direct_contact",
|
|
})
|
|
|
|
# Check for messaging platforms
|
|
for match in MESSAGING_PLATFORMS.finditer(text):
|
|
flags.append({
|
|
"pattern": "messaging_platform",
|
|
"matched_text": match.group(),
|
|
"category": "platform_bypass",
|
|
})
|
|
|
|
# Check for bypass language
|
|
for match in BYPASS_LANGUAGE.finditer(text):
|
|
flags.append({
|
|
"pattern": "bypass_language",
|
|
"matched_text": match.group(),
|
|
"category": "bypass_intent",
|
|
})
|
|
|
|
# Check for payment bypass
|
|
for match in PAYMENT_BYPASS.finditer(text):
|
|
flags.append({
|
|
"pattern": "payment_bypass",
|
|
"matched_text": match.group(),
|
|
"category": "payment_bypass",
|
|
})
|
|
|
|
# Check for social media handles
|
|
for match in SOCIAL_MEDIA.finditer(text):
|
|
# Skip @mentions that look like email domains
|
|
matched = match.group()
|
|
if "@" in matched and "." in matched:
|
|
continue # Already caught by email pattern
|
|
flags.append({
|
|
"pattern": "social_media",
|
|
"matched_text": matched,
|
|
"category": "direct_contact",
|
|
})
|
|
|
|
return flags
|
|
|
|
|
|
WARNING_MESSAGE = (
|
|
"⚠️ **Reminder:** All accounting consultations must be conducted through "
|
|
"this portal. Sharing personal contact information, phone numbers, messaging "
|
|
"app details, or arranging services outside Performance West is a violation "
|
|
"of our terms of service. If you need additional accounting hours, they are "
|
|
"available at $75 USD/hr through this portal."
|
|
)
|
|
|
|
ESCALATION_MESSAGE = (
|
|
"🚨 **Notice:** Multiple attempts to share contact information or move this "
|
|
"conversation off-platform have been detected. This thread has been flagged "
|
|
"for review by our compliance team. Continued violations may result in "
|
|
"suspension of accounting support access."
|
|
)
|
|
|
|
|
|
def process_message(
|
|
issue_reference: str,
|
|
message_text: str,
|
|
sender_email: str,
|
|
sender_type: str, # 'client' or 'advisor'
|
|
client_email: str,
|
|
advisor_email: str,
|
|
whitelisted_emails: list[str],
|
|
whitelisted_phones: list[str],
|
|
) -> dict:
|
|
"""Process a message through the conversation monitor.
|
|
|
|
Returns:
|
|
{
|
|
"flagged": bool,
|
|
"flags": [...],
|
|
"action": "none" | "warning" | "escalation",
|
|
"flag_count": int,
|
|
}
|
|
"""
|
|
flags = scan_message(message_text, sender_email, whitelisted_emails, whitelisted_phones)
|
|
|
|
if not flags:
|
|
return {"flagged": False, "flags": [], "action": "none", "flag_count": 0}
|
|
|
|
LOG.warning(
|
|
"Conversation flag: issue=%s sender=%s type=%s flags=%d patterns=%s",
|
|
issue_reference, sender_email, sender_type, len(flags),
|
|
[f["pattern"] for f in flags],
|
|
)
|
|
|
|
# Record flags in database
|
|
conn = None
|
|
flag_count = 1
|
|
try:
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Get running flag count for this client-advisor pair
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM conversation_flags WHERE client_email = %s AND advisor_email = %s",
|
|
[client_email, advisor_email],
|
|
)
|
|
previous_count = cur.fetchone()[0]
|
|
flag_count = previous_count + 1
|
|
|
|
# Insert flag record for each pattern matched
|
|
for flag in flags:
|
|
cur.execute(
|
|
"""INSERT INTO conversation_flags
|
|
(issue_reference, flagged_user, user_type, client_email, advisor_email,
|
|
flagged_pattern, flagged_text, flag_count_for_pair, warning_sent, admin_alerted)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, TRUE, TRUE)""",
|
|
[issue_reference, sender_email, sender_type, client_email, advisor_email,
|
|
flag["pattern"], flag["matched_text"][:200], flag_count],
|
|
)
|
|
|
|
conn.commit()
|
|
except Exception as e:
|
|
LOG.error("Failed to record conversation flag: %s", e)
|
|
if conn:
|
|
conn.rollback()
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
# Determine action based on escalation level
|
|
if flag_count >= 3:
|
|
action = "escalation"
|
|
else:
|
|
action = "warning"
|
|
|
|
return {
|
|
"flagged": True,
|
|
"flags": flags,
|
|
"action": action,
|
|
"flag_count": flag_count,
|
|
"warning_message": WARNING_MESSAGE if action == "warning" else ESCALATION_MESSAGE,
|
|
}
|
|
|
|
|
|
def handle_webhook(payload: dict) -> dict:
|
|
"""Handle a webhook from ERPNext when a Communication is created.
|
|
|
|
Expected payload:
|
|
{
|
|
"issue_reference": "ACC-2026-001",
|
|
"message_text": "...",
|
|
"sender_email": "client@example.com",
|
|
"sender_type": "client",
|
|
"client_email": "client@example.com",
|
|
"advisor_email": "advisor@example.com",
|
|
"whitelisted_emails": ["client@company.ca", "info@performancewest.net"],
|
|
"whitelisted_phones": ["6045551234"],
|
|
}
|
|
"""
|
|
result = process_message(
|
|
issue_reference=payload.get("issue_reference", ""),
|
|
message_text=payload.get("message_text", ""),
|
|
sender_email=payload.get("sender_email", ""),
|
|
sender_type=payload.get("sender_type", "client"),
|
|
client_email=payload.get("client_email", ""),
|
|
advisor_email=payload.get("advisor_email", ""),
|
|
whitelisted_emails=payload.get("whitelisted_emails", []),
|
|
whitelisted_phones=payload.get("whitelisted_phones", []),
|
|
)
|
|
|
|
if result["flagged"]:
|
|
# Create admin alert via ERPNext
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
|
|
flag_summary = ", ".join(set(f["pattern"] for f in result["flags"]))
|
|
matched_texts = "; ".join(f["matched_text"] for f in result["flags"][:5])
|
|
|
|
client.create_resource("Issue", {
|
|
"subject": f"[Compliance Alert] Conversation flag in {payload.get('issue_reference', '?')}",
|
|
"description": (
|
|
f"**Automated conversation monitor flag**\n\n"
|
|
f"**Issue:** {payload.get('issue_reference', '?')}\n"
|
|
f"**Sender:** {payload.get('sender_email', '?')} ({payload.get('sender_type', '?')})\n"
|
|
f"**Client:** {payload.get('client_email', '?')}\n"
|
|
f"**Advisor:** {payload.get('advisor_email', '?')}\n"
|
|
f"**Patterns detected:** {flag_summary}\n"
|
|
f"**Matched text:** {matched_texts}\n"
|
|
f"**Flag count for this pair:** {result['flag_count']}\n"
|
|
f"**Action taken:** {result['action']}\n\n"
|
|
f"Please review the conversation thread."
|
|
),
|
|
"issue_type": "Bug",
|
|
"priority": "High" if result["flag_count"] >= 3 else "Medium",
|
|
})
|
|
except Exception as e:
|
|
LOG.error("Failed to create admin alert: %s", e)
|
|
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test the scanner
|
|
import sys
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
test_messages = [
|
|
"Can you help me with GST/HST registration?", # Clean
|
|
"Here's my number: 604-555-1234, give me a call", # Phone
|
|
"Let's move to WhatsApp, it's easier", # Messaging platform
|
|
"My personal email is john@gmail.com", # External email
|
|
"I can do this cheaper if we work directly", # Bypass language
|
|
"Send me $200 via e-transfer", # Payment bypass
|
|
"Find me on LinkedIn at /in/johnsmith", # Social media
|
|
"Let's hop on a Zoom call outside the portal", # Platform + bypass
|
|
"Text me at 416-555-9999 on Telegram", # Phone + messaging
|
|
]
|
|
|
|
for msg in test_messages:
|
|
flags = scan_message(msg, "test@test.com", ["client@company.ca"], [])
|
|
status = "🚩 FLAGGED" if flags else "✅ CLEAN"
|
|
patterns = [f["pattern"] for f in flags] if flags else []
|
|
print(f"{status}: \"{msg[:60]}\" → {patterns}")
|