Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1004 lines
40 KiB
Python
1004 lines
40 KiB
Python
"""
|
|
fcc_rmd_auditor.py — Audit existing FCC RMD filings for deficiencies.
|
|
|
|
Three analysis layers:
|
|
Layer 1 — Structured data checks against local fcc_rmd table (fast, no network)
|
|
Layer 2 — Download certification PDF from ServiceNow Attachment API (or Playwright)
|
|
Layer 3 — Analyze PDF content for missing required sections (regex first, Ollama fallback)
|
|
|
|
Usage:
|
|
# Audit a single FRN (structured + PDF):
|
|
python -m workers.fcc_rmd_auditor --frn 0012345678
|
|
|
|
# Structured checks only (no PDF download):
|
|
python -m workers.fcc_rmd_auditor --frn 0012345678 --no-pdf
|
|
|
|
# Batch audit (structured only, fast):
|
|
python -m workers.fcc_rmd_auditor --batch --no-pdf
|
|
|
|
# Batch with PDF (slow, ~2s/record):
|
|
python -m workers.fcc_rmd_auditor --batch --limit 100
|
|
|
|
# JSON output for piping:
|
|
python -m workers.fcc_rmd_auditor --frn 0012345678 --json
|
|
|
|
Environment variables:
|
|
DATABASE_URL PostgreSQL connection string
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from datetime import date, datetime, timezone
|
|
from typing import Optional
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import requests
|
|
|
|
LOG = logging.getLogger("workers.fcc_rmd_auditor")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
|
|
SERVICENOW_BASE = "https://fccprod.servicenowservices.com"
|
|
ATTACHMENT_API = f"{SERVICENOW_BASE}/api/now/attachment"
|
|
RMD_TABLE_NAME = "x_g_fmc_rmd_robocall_mitigation_database"
|
|
|
|
USER_AGENT = (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
|
)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# Finding data structure
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def _finding(
|
|
check_id: str,
|
|
severity: str,
|
|
label: str,
|
|
detail: str,
|
|
) -> dict:
|
|
return {
|
|
"id": check_id,
|
|
"severity": severity,
|
|
"label": label,
|
|
"detail": detail,
|
|
}
|
|
|
|
|
|
def _worst_severity(findings: list[dict]) -> str:
|
|
order = {"critical": 0, "major": 1, "minor": 2}
|
|
worst = "clean"
|
|
for f in findings:
|
|
s = f["severity"]
|
|
if order.get(s, 99) < order.get(worst, 99):
|
|
worst = s
|
|
return worst
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# Layer 1 — Structured data checks
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def _check_structured(row: dict, removed_frns: set[str]) -> list[dict]:
|
|
"""Run rule-based checks against a single fcc_rmd row."""
|
|
findings: list[dict] = []
|
|
today = date.today()
|
|
|
|
# ── Certification staleness ──────────────────────────────────────
|
|
last_recert = row.get("last_recertified")
|
|
if last_recert:
|
|
if isinstance(last_recert, str):
|
|
try:
|
|
last_recert = datetime.strptime(last_recert, "%Y-%m-%d").date()
|
|
except ValueError:
|
|
last_recert = None
|
|
|
|
if last_recert:
|
|
months_ago = (today.year - last_recert.year) * 12 + (today.month - last_recert.month)
|
|
if months_ago > 13:
|
|
findings.append(_finding(
|
|
"stale_cert", "critical", "Stale Certification",
|
|
f"Last recertified {last_recert.isoformat()} — {months_ago} months ago. "
|
|
"Annual recertification required by March 1.",
|
|
))
|
|
elif months_ago >= 10:
|
|
findings.append(_finding(
|
|
"expiring_cert", "major", "Certification Expiring Soon",
|
|
f"Last recertified {last_recert.isoformat()} — {months_ago} months ago. "
|
|
"Recertification due within 3 months.",
|
|
))
|
|
else:
|
|
findings.append(_finding(
|
|
"no_recert_date", "major", "No Recertification Date",
|
|
"No last_recertified date on file. Unable to determine certification currency.",
|
|
))
|
|
|
|
# ── Provider classification ──────────────────────────────────────
|
|
vsp = bool(row.get("voice_service_provider"))
|
|
gw = bool(row.get("gateway_provider"))
|
|
inter = bool(row.get("intermediate_provider"))
|
|
|
|
if not vsp and not gw and not inter:
|
|
findings.append(_finding(
|
|
"no_classification", "critical", "No Provider Classification",
|
|
"None of Voice Service Provider, Gateway, or Intermediate Provider "
|
|
"is selected. At least one classification is required.",
|
|
))
|
|
|
|
# Note: having all three (VSP + gateway + intermediate) is valid for
|
|
# large carriers like Peerless Network. Only flag gateway + intermediate
|
|
# without VSP, which is unusual.
|
|
if gw and inter and not vsp:
|
|
findings.append(_finding(
|
|
"conflicting_classification", "minor",
|
|
"Unusual Provider Classification",
|
|
"Both Gateway Provider and Non-Gateway Intermediate Provider are "
|
|
"selected without Voice Service Provider. Verify this is correct — "
|
|
"most providers are one or the other.",
|
|
))
|
|
|
|
# ── STIR/SHAKEN consistency ──────────────────────────────────────
|
|
impl = (row.get("implementation") or "").lower()
|
|
|
|
if vsp and not gw and not inter:
|
|
# VSPs should implement STIR/SHAKEN unless exempt small carrier
|
|
if "robocall mitigation" in impl and "partial" not in impl and "complete" not in impl:
|
|
findings.append(_finding(
|
|
"ss_vsp_no_shaken", "major",
|
|
"VSP Without STIR/SHAKEN",
|
|
"Voice Service Provider selected 'Robocall Mitigation Only' but "
|
|
"VSPs are generally required to implement STIR/SHAKEN unless they "
|
|
"qualify for the small carrier exemption.",
|
|
))
|
|
|
|
if inter and not vsp and not gw:
|
|
# Intermediate-only providers don't originate calls → can't do "complete" STIR/SHAKEN
|
|
if "complete" in impl and "partial" not in impl:
|
|
findings.append(_finding(
|
|
"ss_intermediate_complete", "major",
|
|
"Intermediate Provider Claims Complete STIR/SHAKEN",
|
|
"Non-gateway intermediate provider claims Complete STIR/SHAKEN "
|
|
"Implementation, but intermediates don't originate calls and "
|
|
"therefore can't sign them with STIR/SHAKEN attestation.",
|
|
))
|
|
|
|
if "partial" in impl:
|
|
# Partial implementation should reference an upstream provider
|
|
# We can't check this from structured data alone — flag as informational
|
|
findings.append(_finding(
|
|
"ss_partial_note", "minor",
|
|
"Partial STIR/SHAKEN — Verify Upstream Provider",
|
|
"Partial STIR/SHAKEN implementation declared. Ensure the filing "
|
|
"names the upstream provider responsible for signing calls on "
|
|
"non-SIP portions of the network.",
|
|
))
|
|
|
|
# Note: contact_email/name are often NULL in our local DB because the RMD
|
|
# CSV doesn't include them (requires separate scrape). Not a filing deficiency.
|
|
|
|
# ── Removed from RMD ─────────────────────────────────────────────
|
|
frn = (row.get("frn") or "").strip()
|
|
if row.get("removed_from_rmd"):
|
|
findings.append(_finding(
|
|
"removed_from_rmd", "critical", "Removed from RMD",
|
|
"This provider has been removed from the Robocall Mitigation Database. "
|
|
"Downstream carriers are required to block their traffic.",
|
|
))
|
|
elif frn and frn in removed_frns:
|
|
findings.append(_finding(
|
|
"removed_from_rmd", "critical", "Removed from RMD (enforcement action)",
|
|
"This provider appears in the FCC's RMD removal list. "
|
|
"A deficiency or enforcement action has been taken.",
|
|
))
|
|
|
|
# ── Red light (CORES financial delinquency) ──────────────────────
|
|
if row.get("red_light_status") == "red":
|
|
findings.append(_finding(
|
|
"red_light", "critical", "CORES Red Light Status",
|
|
"FRN has outstanding delinquent debts to the FCC. "
|
|
"Red-light status blocks certain filings and authorizations.",
|
|
))
|
|
|
|
return findings
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# Layer 2 — PDF download
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def _extract_sys_id(filing_url: str) -> Optional[str]:
|
|
try:
|
|
qs = parse_qs(urlparse(filing_url).query)
|
|
ids = qs.get("sys_id", [])
|
|
return ids[0] if ids else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _get_pdf_attachment_sys_id(record_sys_id: str) -> Optional[str]:
|
|
"""Query the SP page API (unauthenticated) to get the PDF attachment sys_id."""
|
|
url = (
|
|
f"{SERVICENOW_BASE}/api/now/sp/page"
|
|
f"?id=rmd_form&table={RMD_TABLE_NAME}"
|
|
f"&sys_id={record_sys_id}&view=sp&time=1"
|
|
f"&portal_id=ac2856301b92681048c6ed7bbc4bcb27"
|
|
f"&request_uri=%2Frmd"
|
|
)
|
|
try:
|
|
resp = requests.get(
|
|
url,
|
|
headers={"User-Agent": USER_AGENT, "Accept": "application/json"},
|
|
timeout=30,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
# Walk the nested widget structure to find attachments.pdf
|
|
def _find_attachment_id(obj):
|
|
if isinstance(obj, dict):
|
|
if "attachments" in obj and isinstance(obj["attachments"], dict):
|
|
pdf_id = obj["attachments"].get("pdf", "")
|
|
if pdf_id:
|
|
return pdf_id
|
|
for v in obj.values():
|
|
r = _find_attachment_id(v)
|
|
if r:
|
|
return r
|
|
elif isinstance(obj, list):
|
|
for v in obj:
|
|
r = _find_attachment_id(v)
|
|
if r:
|
|
return r
|
|
return None
|
|
|
|
att_id = _find_attachment_id(data)
|
|
if att_id:
|
|
LOG.info("Found PDF attachment sys_id=%s for record=%s", att_id, record_sys_id)
|
|
else:
|
|
LOG.info("No PDF attachment found for record=%s", record_sys_id)
|
|
return att_id
|
|
|
|
except Exception as exc:
|
|
LOG.warning("SP API query failed: %s", exc)
|
|
return None
|
|
|
|
|
|
def _download_pdf_via_playwright(
|
|
record_sys_id: str, attachment_sys_id: str, dest_dir: str
|
|
) -> Optional[str]:
|
|
"""Use Playwright to download the PDF attachment from the RMD portal.
|
|
|
|
ServiceNow attachment APIs require authentication, but the portal page
|
|
renders publicly with the PDF embedded. We intercept the browser's
|
|
network requests to capture the attachment download.
|
|
"""
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except ImportError:
|
|
LOG.warning("Playwright not available — skipping PDF download")
|
|
return None
|
|
|
|
dest_path = os.path.join(dest_dir, f"rmd_cert_{attachment_sys_id}.pdf")
|
|
portal_url = (
|
|
f"{SERVICENOW_BASE}/rmd"
|
|
f"?id=rmd_form&table={RMD_TABLE_NAME}"
|
|
f"&sys_id={record_sys_id}"
|
|
)
|
|
|
|
captured_content: list[bytes] = []
|
|
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
ctx = browser.new_context(user_agent=USER_AGENT)
|
|
|
|
# Intercept network requests for attachment downloads
|
|
def on_response(response):
|
|
url = response.url
|
|
if attachment_sys_id in url and response.status == 200:
|
|
try:
|
|
body = response.body()
|
|
if len(body) > 500 and not body[:20].lstrip().startswith(b"<"):
|
|
captured_content.append(body)
|
|
except Exception:
|
|
pass
|
|
|
|
page = ctx.new_page()
|
|
page.on("response", on_response)
|
|
|
|
# Navigate to the filing page — the portal loads the PDF viewer
|
|
page.goto(portal_url, wait_until="networkidle", timeout=45000)
|
|
page.wait_for_timeout(3000)
|
|
|
|
if captured_content:
|
|
with open(dest_path, "wb") as f:
|
|
f.write(captured_content[0])
|
|
LOG.info("Captured PDF from network: %s (%d bytes)", dest_path, len(captured_content[0]))
|
|
browser.close()
|
|
return dest_path
|
|
|
|
# Fallback: try clicking on the PDF link if visible
|
|
pdf_links = page.query_selector_all(f"a[href*='{attachment_sys_id}']")
|
|
if not pdf_links:
|
|
pdf_links = page.query_selector_all("a.attachment-link, a[data-type='pdf']")
|
|
if not pdf_links:
|
|
pdf_links = page.query_selector_all("a[href*='sys_attachment']")
|
|
|
|
for link in pdf_links:
|
|
try:
|
|
with page.expect_download(timeout=15000) as dl_info:
|
|
link.click()
|
|
download = dl_info.value
|
|
download.save_as(dest_path)
|
|
|
|
# Verify not HTML
|
|
with open(dest_path, "rb") as f:
|
|
header = f.read(20)
|
|
if not header.lstrip().startswith(b"<"):
|
|
LOG.info("Downloaded PDF via link click: %s", dest_path)
|
|
browser.close()
|
|
return dest_path
|
|
except Exception:
|
|
continue
|
|
|
|
# Last resort: capture the entire page as a PDF
|
|
# (won't have the original document content but captures the form data)
|
|
page.pdf(path=dest_path, format="Letter")
|
|
LOG.info("Captured filing page as PDF fallback: %s", dest_path)
|
|
browser.close()
|
|
return dest_path
|
|
|
|
except Exception as exc:
|
|
LOG.warning("Playwright PDF download failed: %s", exc)
|
|
return None
|
|
|
|
|
|
def download_certification_pdf(
|
|
filing_url: Optional[str], sys_id: Optional[str], dest_dir: str
|
|
) -> Optional[str]:
|
|
"""Download the RMD certification document.
|
|
|
|
1. Query SP page API (unauthenticated) to get the PDF attachment sys_id
|
|
2. Download via Playwright (sys_attachment.do requires a session cookie)
|
|
"""
|
|
if not sys_id and filing_url:
|
|
sys_id = _extract_sys_id(filing_url)
|
|
if not sys_id:
|
|
LOG.info("No sys_id available — cannot download PDF")
|
|
return None
|
|
|
|
# Step 1: Get the PDF attachment sys_id from the SP API
|
|
att_sys_id = _get_pdf_attachment_sys_id(sys_id)
|
|
if not att_sys_id:
|
|
LOG.info("No PDF attachment for record sys_id=%s", sys_id)
|
|
return None
|
|
|
|
# Step 2: Download via Playwright (needs both record and attachment sys_ids)
|
|
return _download_pdf_via_playwright(sys_id, att_sys_id, dest_dir)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# Layer 3 — PDF content analysis
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
# Required sections and their keyword indicators
|
|
REQUIRED_SECTIONS: list[tuple[str, str, str, list[str]]] = [
|
|
# (check_id, severity, label, keywords)
|
|
(
|
|
"missing_provider_id", "major", "Missing Provider Identification",
|
|
[r"\bfrn\b", r"\b\d{10}\b", "fcc registration number", "registration number"],
|
|
),
|
|
(
|
|
"missing_classification", "major", "Missing Provider Classification",
|
|
["voice service provider", "gateway provider", "intermediate provider",
|
|
"provider classification", "provider type"],
|
|
),
|
|
(
|
|
"missing_stir_shaken", "major", "Missing STIR/SHAKEN Details",
|
|
["stir/shaken", "stir-shaken", "stirshaken", "sti certificate",
|
|
"sti-ca", "spc token", "attestation"],
|
|
),
|
|
(
|
|
"missing_mitigation", "major", "Missing Robocall Mitigation Program",
|
|
["robocall mitigation", "call blocking", "call analytics",
|
|
"monitoring", "mitigation program", "call pattern"],
|
|
),
|
|
(
|
|
"missing_kyc", "major", "Missing KYC Procedures",
|
|
["know your customer", "know-your-customer", r"\bkyc\b", "customer vetting",
|
|
"identity verif", "customer verification", "due diligence"],
|
|
),
|
|
(
|
|
"missing_traceback", "major", "Missing Traceback Commitment",
|
|
["traceback", r"\bitg\b", "industry traceback", "24 hour",
|
|
"24-hour", "ustelecom"],
|
|
),
|
|
(
|
|
"missing_enforcement", "minor", "Missing Enforcement History Disclosure",
|
|
["enforcement", "citation", "forfeiture", "consent decree",
|
|
"adverse finding", "no pending"],
|
|
),
|
|
(
|
|
"missing_recertification", "minor", "Missing Recertification Acknowledgment",
|
|
["recertif", "annual certification", "march 1", "march 2",
|
|
"annually"],
|
|
),
|
|
(
|
|
"missing_material_change", "major", "Missing Material Change Update Commitment",
|
|
["material change", "10 business day", "10-business-day",
|
|
"update.*within", "update.*filing", "promptly update"],
|
|
),
|
|
(
|
|
"missing_perjury", "minor", "Missing Perjury Declaration in Document",
|
|
["perjury", "penalty of perjury", "true and correct",
|
|
"true, complete", "under penalty"],
|
|
),
|
|
(
|
|
"missing_dno", "minor", "Missing DNO List Reference",
|
|
["do-not-originate", r"\bdno\b", "do not originate",
|
|
"dno list"],
|
|
),
|
|
]
|
|
|
|
|
|
def _extract_pdf_text(pdf_path: str) -> str:
|
|
"""Extract text from a PDF using pdfplumber."""
|
|
try:
|
|
import pdfplumber
|
|
except ImportError:
|
|
LOG.warning("pdfplumber not installed — pip install pdfplumber")
|
|
return ""
|
|
|
|
try:
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
pages = [page.extract_text() or "" for page in pdf.pages]
|
|
return "\n".join(pages)
|
|
except Exception as exc:
|
|
LOG.warning("PDF text extraction failed: %s", exc)
|
|
return ""
|
|
|
|
|
|
def _check_pdf_regex(text: str) -> list[dict]:
|
|
"""Tier 1: regex/keyword matching for required sections."""
|
|
if not text.strip():
|
|
return [_finding(
|
|
"pdf_empty", "critical", "Empty or Unreadable PDF",
|
|
"The certification document could not be read or contained no extractable text.",
|
|
)]
|
|
|
|
findings: list[dict] = []
|
|
text_lower = text.lower()
|
|
|
|
for check_id, severity, label, keywords in REQUIRED_SECTIONS:
|
|
found = False
|
|
for kw in keywords:
|
|
if re.search(kw, text_lower):
|
|
found = True
|
|
break
|
|
if not found:
|
|
findings.append(_finding(
|
|
check_id, severity, label,
|
|
f"No reference to this required section found in the certification document. "
|
|
f"Searched for: {', '.join(k for k in keywords if not k.startswith('\\'))}.",
|
|
))
|
|
|
|
return findings
|
|
|
|
|
|
def _check_pdf_crossref(
|
|
text: str, row: dict
|
|
) -> list[dict]:
|
|
"""Cross-reference structured data selections against PDF content."""
|
|
findings: list[dict] = []
|
|
text_lower = text.lower()
|
|
impl = (row.get("implementation") or "").lower()
|
|
|
|
# Check: PDF mentions a different STIR/SHAKEN status than structured data
|
|
if "complete" in impl and "partial" not in impl:
|
|
if "partial" in text_lower and "complete" not in text_lower:
|
|
findings.append(_finding(
|
|
"xref_ss_mismatch", "major",
|
|
"STIR/SHAKEN Status Mismatch",
|
|
"Structured data says 'Complete STIR/SHAKEN' but the uploaded "
|
|
"certification document appears to reference 'partial' implementation.",
|
|
))
|
|
elif "partial" in impl:
|
|
if "complete implementation" in text_lower and "partial" not in text_lower:
|
|
findings.append(_finding(
|
|
"xref_ss_mismatch", "major",
|
|
"STIR/SHAKEN Status Mismatch",
|
|
"Structured data says 'Partial Implementation' but the uploaded "
|
|
"certification document appears to reference 'complete' implementation.",
|
|
))
|
|
|
|
# Check: document date is from a prior year
|
|
year_pattern = re.compile(r"\b(20\d{2})\b")
|
|
years_in_doc = set(int(m) for m in year_pattern.findall(text))
|
|
current_year = date.today().year
|
|
if years_in_doc and max(years_in_doc) < current_year - 1:
|
|
findings.append(_finding(
|
|
"xref_old_document", "major",
|
|
"Outdated Certification Document",
|
|
f"The most recent year referenced in the document is {max(years_in_doc)}, "
|
|
f"which is more than one year behind the current year ({current_year}). "
|
|
"The plan may not reflect current 2026 requirements.",
|
|
))
|
|
|
|
# Check: business name mismatch
|
|
biz_name = (row.get("business_name") or "").lower().strip()
|
|
if biz_name and len(biz_name) > 3:
|
|
# Normalize: remove Inc., LLC, etc.
|
|
biz_core = re.sub(r"\b(inc|llc|llp|corp|co|ltd)\.?\b", "", biz_name).strip()
|
|
if biz_core and biz_core not in text_lower:
|
|
findings.append(_finding(
|
|
"xref_name_mismatch", "minor",
|
|
"Business Name Not Found in Document",
|
|
f"The RMD business name '{row.get('business_name')}' was not found "
|
|
"in the uploaded certification document. The document may belong "
|
|
"to a different entity.",
|
|
))
|
|
|
|
return findings
|
|
|
|
|
|
def _check_pdf_ollama(text: str) -> list[dict]:
|
|
"""Tier 2: Use Ollama LLM for ambiguous documents.
|
|
Only called when regex finds < 6/11 sections or text is very short."""
|
|
try:
|
|
from scripts.ollama_client import generate, start_tunnel, warmup
|
|
except ImportError:
|
|
LOG.warning("Ollama client not available — skipping LLM analysis")
|
|
return []
|
|
|
|
if not start_tunnel():
|
|
LOG.warning("Ollama not reachable — skipping LLM analysis")
|
|
return []
|
|
|
|
warmup()
|
|
|
|
# Truncate text to fit model context (qwen2.5:7b ~ 8K tokens)
|
|
truncated = text[:6000]
|
|
|
|
section_names = [label for _, _, label, _ in REQUIRED_SECTIONS]
|
|
|
|
prompt = (
|
|
"Analyze this FCC Robocall Mitigation Database (RMD) certification document.\n"
|
|
"Determine which of these required sections are PRESENT or ABSENT:\n\n"
|
|
+ "\n".join(f"- {s}" for s in section_names) + "\n\n"
|
|
"Document text:\n"
|
|
"---\n"
|
|
f"{truncated}\n"
|
|
"---\n\n"
|
|
"Respond in JSON format as a list of objects:\n"
|
|
'[{"section": "...", "present": true/false, "reason": "brief explanation"}]\n'
|
|
"Only output the JSON array, nothing else."
|
|
)
|
|
|
|
try:
|
|
raw = generate(
|
|
prompt,
|
|
system="You are an FCC regulatory compliance auditor. Respond only in valid JSON.",
|
|
max_tokens=1500,
|
|
temperature=0.1,
|
|
)
|
|
|
|
# Try to parse JSON from the response
|
|
# Strip markdown code fences if present
|
|
raw = re.sub(r"^```json\s*", "", raw.strip())
|
|
raw = re.sub(r"\s*```$", "", raw.strip())
|
|
|
|
sections = json.loads(raw)
|
|
if not isinstance(sections, list):
|
|
LOG.warning("Ollama returned non-list JSON — ignoring")
|
|
return []
|
|
|
|
findings: list[dict] = []
|
|
for item in sections:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
if item.get("present") is False:
|
|
section_name = item.get("section", "Unknown Section")
|
|
# Map back to check IDs
|
|
check_id = "llm_" + re.sub(r"[^a-z0-9]+", "_", section_name.lower()).strip("_")
|
|
findings.append(_finding(
|
|
check_id, "major",
|
|
f"Missing: {section_name} (LLM analysis)",
|
|
item.get("reason", "Section not found in document."),
|
|
))
|
|
|
|
return findings
|
|
|
|
except (json.JSONDecodeError, Exception) as exc:
|
|
LOG.warning("Ollama analysis failed: %s", exc)
|
|
return []
|
|
|
|
|
|
def analyze_pdf(
|
|
pdf_path: str, row: dict, use_ollama: bool = True
|
|
) -> list[dict]:
|
|
"""Full PDF analysis: extract text → regex → cross-reference → optional Ollama."""
|
|
text = _extract_pdf_text(pdf_path)
|
|
if not text.strip():
|
|
return [_finding(
|
|
"pdf_empty", "critical", "Empty or Unreadable PDF",
|
|
"The certification document could not be read or contained no extractable text. "
|
|
"The file may be scanned/image-based or corrupted.",
|
|
)]
|
|
|
|
findings: list[dict] = []
|
|
|
|
# Tier 1: regex keyword matching
|
|
regex_findings = _check_pdf_regex(text)
|
|
regex_missing_count = len(regex_findings)
|
|
|
|
# Tier 2: Ollama for ambiguous cases
|
|
total_sections = len(REQUIRED_SECTIONS)
|
|
sections_found = total_sections - regex_missing_count
|
|
|
|
if use_ollama and (sections_found < 6 or len(text) < 500):
|
|
LOG.info(
|
|
"Regex found %d/%d sections (text=%d chars) — running Ollama analysis",
|
|
sections_found, total_sections, len(text),
|
|
)
|
|
llm_findings = _check_pdf_ollama(text)
|
|
if llm_findings:
|
|
# LLM findings replace regex findings when LLM was used
|
|
findings.extend(llm_findings)
|
|
else:
|
|
findings.extend(regex_findings)
|
|
else:
|
|
findings.extend(regex_findings)
|
|
|
|
# Cross-reference: structured data vs PDF content
|
|
findings.extend(_check_pdf_crossref(text, row))
|
|
|
|
return findings
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# Main audit function
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def audit_single_filing(
|
|
conn,
|
|
*,
|
|
frn: Optional[str] = None,
|
|
rmd_number: Optional[str] = None,
|
|
include_pdf: bool = True,
|
|
use_ollama: bool = True,
|
|
) -> Optional[dict]:
|
|
"""Audit a single RMD filing. Returns result dict or None if not found."""
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
if frn:
|
|
cur.execute("SELECT * FROM fcc_rmd WHERE frn = %s LIMIT 1", (frn,))
|
|
elif rmd_number:
|
|
cur.execute("SELECT * FROM fcc_rmd WHERE rmd_number = %s LIMIT 1", (rmd_number,))
|
|
else:
|
|
return None
|
|
|
|
row = cur.fetchone()
|
|
if not row:
|
|
LOG.info("No RMD record found for frn=%s rmd=%s", frn, rmd_number)
|
|
return None
|
|
|
|
row = dict(row)
|
|
frn = row.get("frn", "")
|
|
rmd_num = row.get("rmd_number", "")
|
|
|
|
# Get set of removed FRNs for cross-reference
|
|
cur.execute("SELECT frn FROM fcc_rmd_removed WHERE frn IS NOT NULL")
|
|
removed_frns = {r["frn"] for r in cur.fetchall()}
|
|
|
|
# Layer 1: structured checks
|
|
structured = _check_structured(row, removed_frns)
|
|
|
|
# Layer 2+3: PDF analysis
|
|
pdf_findings: list[dict] = []
|
|
pdf_downloaded = False
|
|
pdf_text_length = 0
|
|
|
|
if include_pdf:
|
|
sys_id = row.get("servicenow_sys_id") or _extract_sys_id(row.get("filing_url", ""))
|
|
filing_url = row.get("filing_url")
|
|
|
|
with tempfile.TemporaryDirectory(prefix="rmd_audit_") as tmpdir:
|
|
pdf_path = download_certification_pdf(filing_url, sys_id, tmpdir)
|
|
if pdf_path and os.path.exists(pdf_path):
|
|
pdf_downloaded = True
|
|
text = _extract_pdf_text(pdf_path)
|
|
pdf_text_length = len(text)
|
|
pdf_findings = analyze_pdf(pdf_path, row, use_ollama=use_ollama)
|
|
else:
|
|
LOG.info("Could not download PDF for %s — structured checks only", rmd_num)
|
|
|
|
# Assemble result
|
|
all_findings = structured + pdf_findings
|
|
total = len(all_findings)
|
|
severity = _worst_severity(all_findings) if all_findings else "clean"
|
|
|
|
result = {
|
|
"rmd_number": rmd_num,
|
|
"frn": frn,
|
|
"business_name": row.get("business_name", ""),
|
|
"total_deficiencies": total,
|
|
"severity": severity,
|
|
"structured_checks": structured,
|
|
"structured_score": len(structured),
|
|
"pdf_checks": pdf_findings if include_pdf else None,
|
|
"pdf_score": len(pdf_findings) if include_pdf else None,
|
|
"pdf_downloaded": pdf_downloaded,
|
|
"pdf_text_length": pdf_text_length,
|
|
"audited_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
# Cache in DB
|
|
_save_result(conn, row.get("id"), result)
|
|
|
|
return result
|
|
|
|
|
|
def _save_result(conn, fcc_rmd_id: Optional[int], result: dict) -> None:
|
|
"""Upsert the audit result into fcc_rmd_audit_results."""
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO fcc_rmd_audit_results (
|
|
fcc_rmd_id, rmd_number, frn, business_name,
|
|
structured_checks, structured_score,
|
|
pdf_checks, pdf_score, pdf_downloaded, pdf_text_length,
|
|
total_deficiencies, severity,
|
|
audited_at, pdf_audited_at, updated_at
|
|
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,now())
|
|
ON CONFLICT (rmd_number) DO UPDATE SET
|
|
fcc_rmd_id = EXCLUDED.fcc_rmd_id,
|
|
frn = EXCLUDED.frn,
|
|
business_name = EXCLUDED.business_name,
|
|
structured_checks = EXCLUDED.structured_checks,
|
|
structured_score = EXCLUDED.structured_score,
|
|
pdf_checks = COALESCE(EXCLUDED.pdf_checks, fcc_rmd_audit_results.pdf_checks),
|
|
pdf_score = COALESCE(EXCLUDED.pdf_score, fcc_rmd_audit_results.pdf_score),
|
|
pdf_downloaded = EXCLUDED.pdf_downloaded OR fcc_rmd_audit_results.pdf_downloaded,
|
|
pdf_text_length = COALESCE(EXCLUDED.pdf_text_length, fcc_rmd_audit_results.pdf_text_length),
|
|
total_deficiencies = EXCLUDED.total_deficiencies,
|
|
severity = EXCLUDED.severity,
|
|
audited_at = EXCLUDED.audited_at,
|
|
pdf_audited_at = CASE WHEN EXCLUDED.pdf_checks IS NOT NULL THEN now() ELSE fcc_rmd_audit_results.pdf_audited_at END,
|
|
updated_at = now()
|
|
""",
|
|
(
|
|
fcc_rmd_id,
|
|
result["rmd_number"],
|
|
result["frn"],
|
|
result["business_name"],
|
|
json.dumps(result["structured_checks"]),
|
|
result["structured_score"],
|
|
json.dumps(result["pdf_checks"]) if result.get("pdf_checks") is not None else None,
|
|
result.get("pdf_score"),
|
|
result.get("pdf_downloaded", False),
|
|
result.get("pdf_text_length", 0),
|
|
result["total_deficiencies"],
|
|
result["severity"],
|
|
result["audited_at"],
|
|
result["audited_at"] if result.get("pdf_checks") is not None else None,
|
|
),
|
|
)
|
|
conn.commit()
|
|
except Exception as exc:
|
|
LOG.warning("Failed to save audit result: %s", exc)
|
|
conn.rollback()
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# Batch mode
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def run_batch(
|
|
conn,
|
|
*,
|
|
include_pdf: bool = False,
|
|
use_ollama: bool = True,
|
|
limit: Optional[int] = None,
|
|
offset: Optional[int] = None,
|
|
year: Optional[int] = None,
|
|
skip_recent: bool = True,
|
|
) -> dict:
|
|
"""Run structured checks on fcc_rmd records. Returns summary stats."""
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
query = """
|
|
SELECT * FROM fcc_rmd
|
|
WHERE (removed_from_rmd = FALSE OR removed_from_rmd IS NULL)
|
|
"""
|
|
if year:
|
|
query += f" AND last_recertified >= '{year}-01-01'"
|
|
if skip_recent:
|
|
# Skip records already audited in the last 7 days
|
|
query += " AND rmd_number NOT IN (SELECT rmd_number FROM fcc_rmd_audit_results WHERE audited_at > NOW() - INTERVAL '7 days')"
|
|
query += " ORDER BY rmd_number"
|
|
if offset:
|
|
query += f" OFFSET {int(offset)}"
|
|
if limit:
|
|
query += f" LIMIT {int(limit)}"
|
|
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
LOG.info("Batch auditing %d RMD records (pdf=%s)", len(rows), include_pdf)
|
|
|
|
# Get removed FRNs once
|
|
cur.execute("SELECT frn FROM fcc_rmd_removed WHERE frn IS NOT NULL")
|
|
removed_frns = {r["frn"] for r in cur.fetchall()}
|
|
|
|
stats = {"total": 0, "clean": 0, "minor": 0, "major": 0, "critical": 0}
|
|
|
|
for i, row in enumerate(rows):
|
|
row = dict(row)
|
|
rmd_num = row.get("rmd_number", "")
|
|
frn = row.get("frn", "")
|
|
|
|
structured = _check_structured(row, removed_frns)
|
|
|
|
pdf_findings: list[dict] = []
|
|
pdf_downloaded = False
|
|
pdf_text_length = 0
|
|
|
|
if include_pdf:
|
|
sys_id = row.get("servicenow_sys_id") or _extract_sys_id(row.get("filing_url", ""))
|
|
with tempfile.TemporaryDirectory(prefix="rmd_audit_") as tmpdir:
|
|
pdf_path = download_certification_pdf(row.get("filing_url"), sys_id, tmpdir)
|
|
if pdf_path and os.path.exists(pdf_path):
|
|
pdf_downloaded = True
|
|
text = _extract_pdf_text(pdf_path)
|
|
pdf_text_length = len(text)
|
|
pdf_findings = analyze_pdf(pdf_path, row, use_ollama=use_ollama)
|
|
time.sleep(1.0) # Rate limit
|
|
|
|
all_findings = structured + pdf_findings
|
|
severity = _worst_severity(all_findings) if all_findings else "clean"
|
|
|
|
result = {
|
|
"rmd_number": rmd_num,
|
|
"frn": frn,
|
|
"business_name": row.get("business_name", ""),
|
|
"total_deficiencies": len(all_findings),
|
|
"severity": severity,
|
|
"structured_checks": structured,
|
|
"structured_score": len(structured),
|
|
"pdf_checks": pdf_findings if include_pdf else None,
|
|
"pdf_score": len(pdf_findings) if include_pdf else None,
|
|
"pdf_downloaded": pdf_downloaded,
|
|
"pdf_text_length": pdf_text_length,
|
|
"audited_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
_save_result(conn, row.get("id"), result)
|
|
stats["total"] += 1
|
|
stats[severity] += 1
|
|
|
|
if (i + 1) % 500 == 0:
|
|
LOG.info("Progress: %d/%d processed", i + 1, len(rows))
|
|
|
|
return stats
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# CLI
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def _format_report(result: dict) -> str:
|
|
"""Format audit result as human-readable text."""
|
|
lines = [
|
|
f"RMD Audit Report — {result['rmd_number']}",
|
|
f" FRN: {result['frn']}",
|
|
f" Business: {result['business_name']}",
|
|
f" Severity: {result['severity'].upper()}",
|
|
f" Total deficiencies: {result['total_deficiencies']}",
|
|
"",
|
|
]
|
|
|
|
if result["structured_checks"]:
|
|
lines.append("Structured Data Checks:")
|
|
for f in result["structured_checks"]:
|
|
icon = {"critical": "!!!", "major": "!!", "minor": "!"}.get(f["severity"], "?")
|
|
lines.append(f" [{icon}] {f['label']}")
|
|
lines.append(f" {f['detail']}")
|
|
lines.append("")
|
|
|
|
if result.get("pdf_checks"):
|
|
lines.append(f"PDF Content Analysis (text: {result.get('pdf_text_length', 0)} chars):")
|
|
for f in result["pdf_checks"]:
|
|
icon = {"critical": "!!!", "major": "!!", "minor": "!"}.get(f["severity"], "?")
|
|
lines.append(f" [{icon}] {f['label']}")
|
|
lines.append(f" {f['detail']}")
|
|
lines.append("")
|
|
elif result.get("pdf_checks") is None and result.get("pdf_downloaded") is False:
|
|
lines.append("PDF: Not analyzed (use --no-pdf to skip, or PDF not downloadable)")
|
|
lines.append("")
|
|
|
|
if result["total_deficiencies"] == 0:
|
|
lines.append("No deficiencies found.")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Audit FCC RMD filings for deficiencies")
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument("--frn", help="Audit a single FRN")
|
|
group.add_argument("--rmd", help="Audit a single RMD number")
|
|
group.add_argument("--batch", action="store_true", help="Batch audit all records")
|
|
|
|
parser.add_argument("--no-pdf", action="store_true", help="Skip PDF download and analysis")
|
|
parser.add_argument("--no-ollama", action="store_true", help="Skip Ollama LLM analysis")
|
|
parser.add_argument("--limit", type=int, help="Limit batch to N records")
|
|
parser.add_argument("--offset", type=int, help="Skip first N records (for parallel workers)")
|
|
parser.add_argument("--year", type=int, help="Only audit filings from this year (e.g., 2026)")
|
|
parser.add_argument("--no-skip-recent", action="store_true", help="Re-audit even recently audited records")
|
|
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not DATABASE_URL:
|
|
LOG.error("DATABASE_URL not set")
|
|
sys.exit(1)
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
|
|
if args.batch:
|
|
stats = run_batch(
|
|
conn,
|
|
include_pdf=not args.no_pdf,
|
|
use_ollama=not args.no_ollama,
|
|
limit=args.limit,
|
|
offset=args.offset,
|
|
year=args.year,
|
|
skip_recent=not args.no_skip_recent,
|
|
)
|
|
if args.json:
|
|
print(json.dumps(stats, indent=2))
|
|
else:
|
|
print(f"\nBatch audit complete:")
|
|
print(f" Total: {stats['total']}")
|
|
print(f" Clean: {stats['clean']}")
|
|
print(f" Minor: {stats['minor']}")
|
|
print(f" Major: {stats['major']}")
|
|
print(f" Critical: {stats['critical']}")
|
|
else:
|
|
result = audit_single_filing(
|
|
conn,
|
|
frn=args.frn,
|
|
rmd_number=args.rmd,
|
|
include_pdf=not args.no_pdf,
|
|
use_ollama=not args.no_ollama,
|
|
)
|
|
if not result:
|
|
LOG.error("No RMD record found")
|
|
sys.exit(1)
|
|
|
|
if args.json:
|
|
print(json.dumps(result, indent=2, default=str))
|
|
else:
|
|
print(_format_report(result))
|
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|