new-site/scripts/workers/fcc_rmd_auditor.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

1004 lines
40 KiB
Python

"""
fcc_rmd_auditor.py — Audit existing FCC RMD filings for deficiencies.
Three analysis layers:
Layer 1 — Structured data checks against local fcc_rmd table (fast, no network)
Layer 2 — Download certification PDF from ServiceNow Attachment API (or Playwright)
Layer 3 — Analyze PDF content for missing required sections (regex first, Ollama fallback)
Usage:
# Audit a single FRN (structured + PDF):
python -m workers.fcc_rmd_auditor --frn 0012345678
# Structured checks only (no PDF download):
python -m workers.fcc_rmd_auditor --frn 0012345678 --no-pdf
# Batch audit (structured only, fast):
python -m workers.fcc_rmd_auditor --batch --no-pdf
# Batch with PDF (slow, ~2s/record):
python -m workers.fcc_rmd_auditor --batch --limit 100
# JSON output for piping:
python -m workers.fcc_rmd_auditor --frn 0012345678 --json
Environment variables:
DATABASE_URL PostgreSQL connection string
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import re
import sys
import tempfile
import time
from datetime import date, datetime, timezone
from typing import Optional
from urllib.parse import urlparse, parse_qs
import psycopg2
import psycopg2.extras
import requests
LOG = logging.getLogger("workers.fcc_rmd_auditor")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
SERVICENOW_BASE = "https://fccprod.servicenowservices.com"
ATTACHMENT_API = f"{SERVICENOW_BASE}/api/now/attachment"
RMD_TABLE_NAME = "x_g_fmc_rmd_robocall_mitigation_database"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
)
# ═══════════════════════════════════════════════════════════════════════
# Finding data structure
# ═══════════════════════════════════════════════════════════════════════
def _finding(
check_id: str,
severity: str,
label: str,
detail: str,
) -> dict:
return {
"id": check_id,
"severity": severity,
"label": label,
"detail": detail,
}
def _worst_severity(findings: list[dict]) -> str:
order = {"critical": 0, "major": 1, "minor": 2}
worst = "clean"
for f in findings:
s = f["severity"]
if order.get(s, 99) < order.get(worst, 99):
worst = s
return worst
# ═══════════════════════════════════════════════════════════════════════
# Layer 1 — Structured data checks
# ═══════════════════════════════════════════════════════════════════════
def _check_structured(row: dict, removed_frns: set[str]) -> list[dict]:
"""Run rule-based checks against a single fcc_rmd row."""
findings: list[dict] = []
today = date.today()
# ── Certification staleness ──────────────────────────────────────
last_recert = row.get("last_recertified")
if last_recert:
if isinstance(last_recert, str):
try:
last_recert = datetime.strptime(last_recert, "%Y-%m-%d").date()
except ValueError:
last_recert = None
if last_recert:
months_ago = (today.year - last_recert.year) * 12 + (today.month - last_recert.month)
if months_ago > 13:
findings.append(_finding(
"stale_cert", "critical", "Stale Certification",
f"Last recertified {last_recert.isoformat()}{months_ago} months ago. "
"Annual recertification required by March 1.",
))
elif months_ago >= 10:
findings.append(_finding(
"expiring_cert", "major", "Certification Expiring Soon",
f"Last recertified {last_recert.isoformat()}{months_ago} months ago. "
"Recertification due within 3 months.",
))
else:
findings.append(_finding(
"no_recert_date", "major", "No Recertification Date",
"No last_recertified date on file. Unable to determine certification currency.",
))
# ── Provider classification ──────────────────────────────────────
vsp = bool(row.get("voice_service_provider"))
gw = bool(row.get("gateway_provider"))
inter = bool(row.get("intermediate_provider"))
if not vsp and not gw and not inter:
findings.append(_finding(
"no_classification", "critical", "No Provider Classification",
"None of Voice Service Provider, Gateway, or Intermediate Provider "
"is selected. At least one classification is required.",
))
# Note: having all three (VSP + gateway + intermediate) is valid for
# large carriers like Peerless Network. Only flag gateway + intermediate
# without VSP, which is unusual.
if gw and inter and not vsp:
findings.append(_finding(
"conflicting_classification", "minor",
"Unusual Provider Classification",
"Both Gateway Provider and Non-Gateway Intermediate Provider are "
"selected without Voice Service Provider. Verify this is correct — "
"most providers are one or the other.",
))
# ── STIR/SHAKEN consistency ──────────────────────────────────────
impl = (row.get("implementation") or "").lower()
if vsp and not gw and not inter:
# VSPs should implement STIR/SHAKEN unless exempt small carrier
if "robocall mitigation" in impl and "partial" not in impl and "complete" not in impl:
findings.append(_finding(
"ss_vsp_no_shaken", "major",
"VSP Without STIR/SHAKEN",
"Voice Service Provider selected 'Robocall Mitigation Only' but "
"VSPs are generally required to implement STIR/SHAKEN unless they "
"qualify for the small carrier exemption.",
))
if inter and not vsp and not gw:
# Intermediate-only providers don't originate calls → can't do "complete" STIR/SHAKEN
if "complete" in impl and "partial" not in impl:
findings.append(_finding(
"ss_intermediate_complete", "major",
"Intermediate Provider Claims Complete STIR/SHAKEN",
"Non-gateway intermediate provider claims Complete STIR/SHAKEN "
"Implementation, but intermediates don't originate calls and "
"therefore can't sign them with STIR/SHAKEN attestation.",
))
if "partial" in impl:
# Partial implementation should reference an upstream provider
# We can't check this from structured data alone — flag as informational
findings.append(_finding(
"ss_partial_note", "minor",
"Partial STIR/SHAKEN — Verify Upstream Provider",
"Partial STIR/SHAKEN implementation declared. Ensure the filing "
"names the upstream provider responsible for signing calls on "
"non-SIP portions of the network.",
))
# Note: contact_email/name are often NULL in our local DB because the RMD
# CSV doesn't include them (requires separate scrape). Not a filing deficiency.
# ── Removed from RMD ─────────────────────────────────────────────
frn = (row.get("frn") or "").strip()
if row.get("removed_from_rmd"):
findings.append(_finding(
"removed_from_rmd", "critical", "Removed from RMD",
"This provider has been removed from the Robocall Mitigation Database. "
"Downstream carriers are required to block their traffic.",
))
elif frn and frn in removed_frns:
findings.append(_finding(
"removed_from_rmd", "critical", "Removed from RMD (enforcement action)",
"This provider appears in the FCC's RMD removal list. "
"A deficiency or enforcement action has been taken.",
))
# ── Red light (CORES financial delinquency) ──────────────────────
if row.get("red_light_status") == "red":
findings.append(_finding(
"red_light", "critical", "CORES Red Light Status",
"FRN has outstanding delinquent debts to the FCC. "
"Red-light status blocks certain filings and authorizations.",
))
return findings
# ═══════════════════════════════════════════════════════════════════════
# Layer 2 — PDF download
# ═══════════════════════════════════════════════════════════════════════
def _extract_sys_id(filing_url: str) -> Optional[str]:
try:
qs = parse_qs(urlparse(filing_url).query)
ids = qs.get("sys_id", [])
return ids[0] if ids else None
except Exception:
return None
def _get_pdf_attachment_sys_id(record_sys_id: str) -> Optional[str]:
"""Query the SP page API (unauthenticated) to get the PDF attachment sys_id."""
url = (
f"{SERVICENOW_BASE}/api/now/sp/page"
f"?id=rmd_form&table={RMD_TABLE_NAME}"
f"&sys_id={record_sys_id}&view=sp&time=1"
f"&portal_id=ac2856301b92681048c6ed7bbc4bcb27"
f"&request_uri=%2Frmd"
)
try:
resp = requests.get(
url,
headers={"User-Agent": USER_AGENT, "Accept": "application/json"},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
# Walk the nested widget structure to find attachments.pdf
def _find_attachment_id(obj):
if isinstance(obj, dict):
if "attachments" in obj and isinstance(obj["attachments"], dict):
pdf_id = obj["attachments"].get("pdf", "")
if pdf_id:
return pdf_id
for v in obj.values():
r = _find_attachment_id(v)
if r:
return r
elif isinstance(obj, list):
for v in obj:
r = _find_attachment_id(v)
if r:
return r
return None
att_id = _find_attachment_id(data)
if att_id:
LOG.info("Found PDF attachment sys_id=%s for record=%s", att_id, record_sys_id)
else:
LOG.info("No PDF attachment found for record=%s", record_sys_id)
return att_id
except Exception as exc:
LOG.warning("SP API query failed: %s", exc)
return None
def _download_pdf_via_playwright(
record_sys_id: str, attachment_sys_id: str, dest_dir: str
) -> Optional[str]:
"""Use Playwright to download the PDF attachment from the RMD portal.
ServiceNow attachment APIs require authentication, but the portal page
renders publicly with the PDF embedded. We intercept the browser's
network requests to capture the attachment download.
"""
try:
from playwright.sync_api import sync_playwright
except ImportError:
LOG.warning("Playwright not available — skipping PDF download")
return None
dest_path = os.path.join(dest_dir, f"rmd_cert_{attachment_sys_id}.pdf")
portal_url = (
f"{SERVICENOW_BASE}/rmd"
f"?id=rmd_form&table={RMD_TABLE_NAME}"
f"&sys_id={record_sys_id}"
)
captured_content: list[bytes] = []
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
ctx = browser.new_context(user_agent=USER_AGENT)
# Intercept network requests for attachment downloads
def on_response(response):
url = response.url
if attachment_sys_id in url and response.status == 200:
try:
body = response.body()
if len(body) > 500 and not body[:20].lstrip().startswith(b"<"):
captured_content.append(body)
except Exception:
pass
page = ctx.new_page()
page.on("response", on_response)
# Navigate to the filing page — the portal loads the PDF viewer
page.goto(portal_url, wait_until="networkidle", timeout=45000)
page.wait_for_timeout(3000)
if captured_content:
with open(dest_path, "wb") as f:
f.write(captured_content[0])
LOG.info("Captured PDF from network: %s (%d bytes)", dest_path, len(captured_content[0]))
browser.close()
return dest_path
# Fallback: try clicking on the PDF link if visible
pdf_links = page.query_selector_all(f"a[href*='{attachment_sys_id}']")
if not pdf_links:
pdf_links = page.query_selector_all("a.attachment-link, a[data-type='pdf']")
if not pdf_links:
pdf_links = page.query_selector_all("a[href*='sys_attachment']")
for link in pdf_links:
try:
with page.expect_download(timeout=15000) as dl_info:
link.click()
download = dl_info.value
download.save_as(dest_path)
# Verify not HTML
with open(dest_path, "rb") as f:
header = f.read(20)
if not header.lstrip().startswith(b"<"):
LOG.info("Downloaded PDF via link click: %s", dest_path)
browser.close()
return dest_path
except Exception:
continue
# Last resort: capture the entire page as a PDF
# (won't have the original document content but captures the form data)
page.pdf(path=dest_path, format="Letter")
LOG.info("Captured filing page as PDF fallback: %s", dest_path)
browser.close()
return dest_path
except Exception as exc:
LOG.warning("Playwright PDF download failed: %s", exc)
return None
def download_certification_pdf(
filing_url: Optional[str], sys_id: Optional[str], dest_dir: str
) -> Optional[str]:
"""Download the RMD certification document.
1. Query SP page API (unauthenticated) to get the PDF attachment sys_id
2. Download via Playwright (sys_attachment.do requires a session cookie)
"""
if not sys_id and filing_url:
sys_id = _extract_sys_id(filing_url)
if not sys_id:
LOG.info("No sys_id available — cannot download PDF")
return None
# Step 1: Get the PDF attachment sys_id from the SP API
att_sys_id = _get_pdf_attachment_sys_id(sys_id)
if not att_sys_id:
LOG.info("No PDF attachment for record sys_id=%s", sys_id)
return None
# Step 2: Download via Playwright (needs both record and attachment sys_ids)
return _download_pdf_via_playwright(sys_id, att_sys_id, dest_dir)
# ═══════════════════════════════════════════════════════════════════════
# Layer 3 — PDF content analysis
# ═══════════════════════════════════════════════════════════════════════
# Required sections and their keyword indicators
REQUIRED_SECTIONS: list[tuple[str, str, str, list[str]]] = [
# (check_id, severity, label, keywords)
(
"missing_provider_id", "major", "Missing Provider Identification",
[r"\bfrn\b", r"\b\d{10}\b", "fcc registration number", "registration number"],
),
(
"missing_classification", "major", "Missing Provider Classification",
["voice service provider", "gateway provider", "intermediate provider",
"provider classification", "provider type"],
),
(
"missing_stir_shaken", "major", "Missing STIR/SHAKEN Details",
["stir/shaken", "stir-shaken", "stirshaken", "sti certificate",
"sti-ca", "spc token", "attestation"],
),
(
"missing_mitigation", "major", "Missing Robocall Mitigation Program",
["robocall mitigation", "call blocking", "call analytics",
"monitoring", "mitigation program", "call pattern"],
),
(
"missing_kyc", "major", "Missing KYC Procedures",
["know your customer", "know-your-customer", r"\bkyc\b", "customer vetting",
"identity verif", "customer verification", "due diligence"],
),
(
"missing_traceback", "major", "Missing Traceback Commitment",
["traceback", r"\bitg\b", "industry traceback", "24 hour",
"24-hour", "ustelecom"],
),
(
"missing_enforcement", "minor", "Missing Enforcement History Disclosure",
["enforcement", "citation", "forfeiture", "consent decree",
"adverse finding", "no pending"],
),
(
"missing_recertification", "minor", "Missing Recertification Acknowledgment",
["recertif", "annual certification", "march 1", "march 2",
"annually"],
),
(
"missing_material_change", "major", "Missing Material Change Update Commitment",
["material change", "10 business day", "10-business-day",
"update.*within", "update.*filing", "promptly update"],
),
(
"missing_perjury", "minor", "Missing Perjury Declaration in Document",
["perjury", "penalty of perjury", "true and correct",
"true, complete", "under penalty"],
),
(
"missing_dno", "minor", "Missing DNO List Reference",
["do-not-originate", r"\bdno\b", "do not originate",
"dno list"],
),
]
def _extract_pdf_text(pdf_path: str) -> str:
"""Extract text from a PDF using pdfplumber."""
try:
import pdfplumber
except ImportError:
LOG.warning("pdfplumber not installed — pip install pdfplumber")
return ""
try:
with pdfplumber.open(pdf_path) as pdf:
pages = [page.extract_text() or "" for page in pdf.pages]
return "\n".join(pages)
except Exception as exc:
LOG.warning("PDF text extraction failed: %s", exc)
return ""
def _check_pdf_regex(text: str) -> list[dict]:
"""Tier 1: regex/keyword matching for required sections."""
if not text.strip():
return [_finding(
"pdf_empty", "critical", "Empty or Unreadable PDF",
"The certification document could not be read or contained no extractable text.",
)]
findings: list[dict] = []
text_lower = text.lower()
for check_id, severity, label, keywords in REQUIRED_SECTIONS:
found = False
for kw in keywords:
if re.search(kw, text_lower):
found = True
break
if not found:
findings.append(_finding(
check_id, severity, label,
f"No reference to this required section found in the certification document. "
f"Searched for: {', '.join(k for k in keywords if not k.startswith('\\'))}.",
))
return findings
def _check_pdf_crossref(
text: str, row: dict
) -> list[dict]:
"""Cross-reference structured data selections against PDF content."""
findings: list[dict] = []
text_lower = text.lower()
impl = (row.get("implementation") or "").lower()
# Check: PDF mentions a different STIR/SHAKEN status than structured data
if "complete" in impl and "partial" not in impl:
if "partial" in text_lower and "complete" not in text_lower:
findings.append(_finding(
"xref_ss_mismatch", "major",
"STIR/SHAKEN Status Mismatch",
"Structured data says 'Complete STIR/SHAKEN' but the uploaded "
"certification document appears to reference 'partial' implementation.",
))
elif "partial" in impl:
if "complete implementation" in text_lower and "partial" not in text_lower:
findings.append(_finding(
"xref_ss_mismatch", "major",
"STIR/SHAKEN Status Mismatch",
"Structured data says 'Partial Implementation' but the uploaded "
"certification document appears to reference 'complete' implementation.",
))
# Check: document date is from a prior year
year_pattern = re.compile(r"\b(20\d{2})\b")
years_in_doc = set(int(m) for m in year_pattern.findall(text))
current_year = date.today().year
if years_in_doc and max(years_in_doc) < current_year - 1:
findings.append(_finding(
"xref_old_document", "major",
"Outdated Certification Document",
f"The most recent year referenced in the document is {max(years_in_doc)}, "
f"which is more than one year behind the current year ({current_year}). "
"The plan may not reflect current 2026 requirements.",
))
# Check: business name mismatch
biz_name = (row.get("business_name") or "").lower().strip()
if biz_name and len(biz_name) > 3:
# Normalize: remove Inc., LLC, etc.
biz_core = re.sub(r"\b(inc|llc|llp|corp|co|ltd)\.?\b", "", biz_name).strip()
if biz_core and biz_core not in text_lower:
findings.append(_finding(
"xref_name_mismatch", "minor",
"Business Name Not Found in Document",
f"The RMD business name '{row.get('business_name')}' was not found "
"in the uploaded certification document. The document may belong "
"to a different entity.",
))
return findings
def _check_pdf_ollama(text: str) -> list[dict]:
"""Tier 2: Use Ollama LLM for ambiguous documents.
Only called when regex finds < 6/11 sections or text is very short."""
try:
from scripts.ollama_client import generate, start_tunnel, warmup
except ImportError:
LOG.warning("Ollama client not available — skipping LLM analysis")
return []
if not start_tunnel():
LOG.warning("Ollama not reachable — skipping LLM analysis")
return []
warmup()
# Truncate text to fit model context (qwen2.5:7b ~ 8K tokens)
truncated = text[:6000]
section_names = [label for _, _, label, _ in REQUIRED_SECTIONS]
prompt = (
"Analyze this FCC Robocall Mitigation Database (RMD) certification document.\n"
"Determine which of these required sections are PRESENT or ABSENT:\n\n"
+ "\n".join(f"- {s}" for s in section_names) + "\n\n"
"Document text:\n"
"---\n"
f"{truncated}\n"
"---\n\n"
"Respond in JSON format as a list of objects:\n"
'[{"section": "...", "present": true/false, "reason": "brief explanation"}]\n'
"Only output the JSON array, nothing else."
)
try:
raw = generate(
prompt,
system="You are an FCC regulatory compliance auditor. Respond only in valid JSON.",
max_tokens=1500,
temperature=0.1,
)
# Try to parse JSON from the response
# Strip markdown code fences if present
raw = re.sub(r"^```json\s*", "", raw.strip())
raw = re.sub(r"\s*```$", "", raw.strip())
sections = json.loads(raw)
if not isinstance(sections, list):
LOG.warning("Ollama returned non-list JSON — ignoring")
return []
findings: list[dict] = []
for item in sections:
if not isinstance(item, dict):
continue
if item.get("present") is False:
section_name = item.get("section", "Unknown Section")
# Map back to check IDs
check_id = "llm_" + re.sub(r"[^a-z0-9]+", "_", section_name.lower()).strip("_")
findings.append(_finding(
check_id, "major",
f"Missing: {section_name} (LLM analysis)",
item.get("reason", "Section not found in document."),
))
return findings
except (json.JSONDecodeError, Exception) as exc:
LOG.warning("Ollama analysis failed: %s", exc)
return []
def analyze_pdf(
pdf_path: str, row: dict, use_ollama: bool = True
) -> list[dict]:
"""Full PDF analysis: extract text → regex → cross-reference → optional Ollama."""
text = _extract_pdf_text(pdf_path)
if not text.strip():
return [_finding(
"pdf_empty", "critical", "Empty or Unreadable PDF",
"The certification document could not be read or contained no extractable text. "
"The file may be scanned/image-based or corrupted.",
)]
findings: list[dict] = []
# Tier 1: regex keyword matching
regex_findings = _check_pdf_regex(text)
regex_missing_count = len(regex_findings)
# Tier 2: Ollama for ambiguous cases
total_sections = len(REQUIRED_SECTIONS)
sections_found = total_sections - regex_missing_count
if use_ollama and (sections_found < 6 or len(text) < 500):
LOG.info(
"Regex found %d/%d sections (text=%d chars) — running Ollama analysis",
sections_found, total_sections, len(text),
)
llm_findings = _check_pdf_ollama(text)
if llm_findings:
# LLM findings replace regex findings when LLM was used
findings.extend(llm_findings)
else:
findings.extend(regex_findings)
else:
findings.extend(regex_findings)
# Cross-reference: structured data vs PDF content
findings.extend(_check_pdf_crossref(text, row))
return findings
# ═══════════════════════════════════════════════════════════════════════
# Main audit function
# ═══════════════════════════════════════════════════════════════════════
def audit_single_filing(
conn,
*,
frn: Optional[str] = None,
rmd_number: Optional[str] = None,
include_pdf: bool = True,
use_ollama: bool = True,
) -> Optional[dict]:
"""Audit a single RMD filing. Returns result dict or None if not found."""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
if frn:
cur.execute("SELECT * FROM fcc_rmd WHERE frn = %s LIMIT 1", (frn,))
elif rmd_number:
cur.execute("SELECT * FROM fcc_rmd WHERE rmd_number = %s LIMIT 1", (rmd_number,))
else:
return None
row = cur.fetchone()
if not row:
LOG.info("No RMD record found for frn=%s rmd=%s", frn, rmd_number)
return None
row = dict(row)
frn = row.get("frn", "")
rmd_num = row.get("rmd_number", "")
# Get set of removed FRNs for cross-reference
cur.execute("SELECT frn FROM fcc_rmd_removed WHERE frn IS NOT NULL")
removed_frns = {r["frn"] for r in cur.fetchall()}
# Layer 1: structured checks
structured = _check_structured(row, removed_frns)
# Layer 2+3: PDF analysis
pdf_findings: list[dict] = []
pdf_downloaded = False
pdf_text_length = 0
if include_pdf:
sys_id = row.get("servicenow_sys_id") or _extract_sys_id(row.get("filing_url", ""))
filing_url = row.get("filing_url")
with tempfile.TemporaryDirectory(prefix="rmd_audit_") as tmpdir:
pdf_path = download_certification_pdf(filing_url, sys_id, tmpdir)
if pdf_path and os.path.exists(pdf_path):
pdf_downloaded = True
text = _extract_pdf_text(pdf_path)
pdf_text_length = len(text)
pdf_findings = analyze_pdf(pdf_path, row, use_ollama=use_ollama)
else:
LOG.info("Could not download PDF for %s — structured checks only", rmd_num)
# Assemble result
all_findings = structured + pdf_findings
total = len(all_findings)
severity = _worst_severity(all_findings) if all_findings else "clean"
result = {
"rmd_number": rmd_num,
"frn": frn,
"business_name": row.get("business_name", ""),
"total_deficiencies": total,
"severity": severity,
"structured_checks": structured,
"structured_score": len(structured),
"pdf_checks": pdf_findings if include_pdf else None,
"pdf_score": len(pdf_findings) if include_pdf else None,
"pdf_downloaded": pdf_downloaded,
"pdf_text_length": pdf_text_length,
"audited_at": datetime.now(timezone.utc).isoformat(),
}
# Cache in DB
_save_result(conn, row.get("id"), result)
return result
def _save_result(conn, fcc_rmd_id: Optional[int], result: dict) -> None:
"""Upsert the audit result into fcc_rmd_audit_results."""
try:
cur = conn.cursor()
cur.execute(
"""
INSERT INTO fcc_rmd_audit_results (
fcc_rmd_id, rmd_number, frn, business_name,
structured_checks, structured_score,
pdf_checks, pdf_score, pdf_downloaded, pdf_text_length,
total_deficiencies, severity,
audited_at, pdf_audited_at, updated_at
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,now())
ON CONFLICT (rmd_number) DO UPDATE SET
fcc_rmd_id = EXCLUDED.fcc_rmd_id,
frn = EXCLUDED.frn,
business_name = EXCLUDED.business_name,
structured_checks = EXCLUDED.structured_checks,
structured_score = EXCLUDED.structured_score,
pdf_checks = COALESCE(EXCLUDED.pdf_checks, fcc_rmd_audit_results.pdf_checks),
pdf_score = COALESCE(EXCLUDED.pdf_score, fcc_rmd_audit_results.pdf_score),
pdf_downloaded = EXCLUDED.pdf_downloaded OR fcc_rmd_audit_results.pdf_downloaded,
pdf_text_length = COALESCE(EXCLUDED.pdf_text_length, fcc_rmd_audit_results.pdf_text_length),
total_deficiencies = EXCLUDED.total_deficiencies,
severity = EXCLUDED.severity,
audited_at = EXCLUDED.audited_at,
pdf_audited_at = CASE WHEN EXCLUDED.pdf_checks IS NOT NULL THEN now() ELSE fcc_rmd_audit_results.pdf_audited_at END,
updated_at = now()
""",
(
fcc_rmd_id,
result["rmd_number"],
result["frn"],
result["business_name"],
json.dumps(result["structured_checks"]),
result["structured_score"],
json.dumps(result["pdf_checks"]) if result.get("pdf_checks") is not None else None,
result.get("pdf_score"),
result.get("pdf_downloaded", False),
result.get("pdf_text_length", 0),
result["total_deficiencies"],
result["severity"],
result["audited_at"],
result["audited_at"] if result.get("pdf_checks") is not None else None,
),
)
conn.commit()
except Exception as exc:
LOG.warning("Failed to save audit result: %s", exc)
conn.rollback()
# ═══════════════════════════════════════════════════════════════════════
# Batch mode
# ═══════════════════════════════════════════════════════════════════════
def run_batch(
conn,
*,
include_pdf: bool = False,
use_ollama: bool = True,
limit: Optional[int] = None,
offset: Optional[int] = None,
year: Optional[int] = None,
skip_recent: bool = True,
) -> dict:
"""Run structured checks on fcc_rmd records. Returns summary stats."""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
query = """
SELECT * FROM fcc_rmd
WHERE (removed_from_rmd = FALSE OR removed_from_rmd IS NULL)
"""
if year:
query += f" AND last_recertified >= '{year}-01-01'"
if skip_recent:
# Skip records already audited in the last 7 days
query += " AND rmd_number NOT IN (SELECT rmd_number FROM fcc_rmd_audit_results WHERE audited_at > NOW() - INTERVAL '7 days')"
query += " ORDER BY rmd_number"
if offset:
query += f" OFFSET {int(offset)}"
if limit:
query += f" LIMIT {int(limit)}"
cur.execute(query)
rows = cur.fetchall()
LOG.info("Batch auditing %d RMD records (pdf=%s)", len(rows), include_pdf)
# Get removed FRNs once
cur.execute("SELECT frn FROM fcc_rmd_removed WHERE frn IS NOT NULL")
removed_frns = {r["frn"] for r in cur.fetchall()}
stats = {"total": 0, "clean": 0, "minor": 0, "major": 0, "critical": 0}
for i, row in enumerate(rows):
row = dict(row)
rmd_num = row.get("rmd_number", "")
frn = row.get("frn", "")
structured = _check_structured(row, removed_frns)
pdf_findings: list[dict] = []
pdf_downloaded = False
pdf_text_length = 0
if include_pdf:
sys_id = row.get("servicenow_sys_id") or _extract_sys_id(row.get("filing_url", ""))
with tempfile.TemporaryDirectory(prefix="rmd_audit_") as tmpdir:
pdf_path = download_certification_pdf(row.get("filing_url"), sys_id, tmpdir)
if pdf_path and os.path.exists(pdf_path):
pdf_downloaded = True
text = _extract_pdf_text(pdf_path)
pdf_text_length = len(text)
pdf_findings = analyze_pdf(pdf_path, row, use_ollama=use_ollama)
time.sleep(1.0) # Rate limit
all_findings = structured + pdf_findings
severity = _worst_severity(all_findings) if all_findings else "clean"
result = {
"rmd_number": rmd_num,
"frn": frn,
"business_name": row.get("business_name", ""),
"total_deficiencies": len(all_findings),
"severity": severity,
"structured_checks": structured,
"structured_score": len(structured),
"pdf_checks": pdf_findings if include_pdf else None,
"pdf_score": len(pdf_findings) if include_pdf else None,
"pdf_downloaded": pdf_downloaded,
"pdf_text_length": pdf_text_length,
"audited_at": datetime.now(timezone.utc).isoformat(),
}
_save_result(conn, row.get("id"), result)
stats["total"] += 1
stats[severity] += 1
if (i + 1) % 500 == 0:
LOG.info("Progress: %d/%d processed", i + 1, len(rows))
return stats
# ═══════════════════════════════════════════════════════════════════════
# CLI
# ═══════════════════════════════════════════════════════════════════════
def _format_report(result: dict) -> str:
"""Format audit result as human-readable text."""
lines = [
f"RMD Audit Report — {result['rmd_number']}",
f" FRN: {result['frn']}",
f" Business: {result['business_name']}",
f" Severity: {result['severity'].upper()}",
f" Total deficiencies: {result['total_deficiencies']}",
"",
]
if result["structured_checks"]:
lines.append("Structured Data Checks:")
for f in result["structured_checks"]:
icon = {"critical": "!!!", "major": "!!", "minor": "!"}.get(f["severity"], "?")
lines.append(f" [{icon}] {f['label']}")
lines.append(f" {f['detail']}")
lines.append("")
if result.get("pdf_checks"):
lines.append(f"PDF Content Analysis (text: {result.get('pdf_text_length', 0)} chars):")
for f in result["pdf_checks"]:
icon = {"critical": "!!!", "major": "!!", "minor": "!"}.get(f["severity"], "?")
lines.append(f" [{icon}] {f['label']}")
lines.append(f" {f['detail']}")
lines.append("")
elif result.get("pdf_checks") is None and result.get("pdf_downloaded") is False:
lines.append("PDF: Not analyzed (use --no-pdf to skip, or PDF not downloadable)")
lines.append("")
if result["total_deficiencies"] == 0:
lines.append("No deficiencies found.")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Audit FCC RMD filings for deficiencies")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--frn", help="Audit a single FRN")
group.add_argument("--rmd", help="Audit a single RMD number")
group.add_argument("--batch", action="store_true", help="Batch audit all records")
parser.add_argument("--no-pdf", action="store_true", help="Skip PDF download and analysis")
parser.add_argument("--no-ollama", action="store_true", help="Skip Ollama LLM analysis")
parser.add_argument("--limit", type=int, help="Limit batch to N records")
parser.add_argument("--offset", type=int, help="Skip first N records (for parallel workers)")
parser.add_argument("--year", type=int, help="Only audit filings from this year (e.g., 2026)")
parser.add_argument("--no-skip-recent", action="store_true", help="Re-audit even recently audited records")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
if not DATABASE_URL:
LOG.error("DATABASE_URL not set")
sys.exit(1)
conn = psycopg2.connect(DATABASE_URL)
if args.batch:
stats = run_batch(
conn,
include_pdf=not args.no_pdf,
use_ollama=not args.no_ollama,
limit=args.limit,
offset=args.offset,
year=args.year,
skip_recent=not args.no_skip_recent,
)
if args.json:
print(json.dumps(stats, indent=2))
else:
print(f"\nBatch audit complete:")
print(f" Total: {stats['total']}")
print(f" Clean: {stats['clean']}")
print(f" Minor: {stats['minor']}")
print(f" Major: {stats['major']}")
print(f" Critical: {stats['critical']}")
else:
result = audit_single_filing(
conn,
frn=args.frn,
rmd_number=args.rmd,
include_pdf=not args.no_pdf,
use_ollama=not args.no_ollama,
)
if not result:
LOG.error("No RMD record found")
sys.exit(1)
if args.json:
print(json.dumps(result, indent=2, default=str))
else:
print(_format_report(result))
conn.close()
if __name__ == "__main__":
main()