new-site/scripts/workers/usac_audit_scraper.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

433 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""USAC E-File audit scraper.
Logs into USAC E-File (forms.universalservice.org) using our delegate
credentials, navigates to a carrier's filing history, and scrapes every
499-A and 499-Q filing on record.
This script is triggered by the USAC audit handler when a client grants
us delegate access. It stores the scraped data in `usac_filing_history`
and flags gaps/errors for the audit report.
USAC E-File limits:
- Session-based login (Okta SSO), one session at a time
- No documented rate limit, but we add human-paced delays
- Filing history goes back ~5-6 years in the UI
- Older filings require USAC Contribution Division correspondence
Usage:
python -m scripts.workers.usac_audit_scraper --frn=0033716911
python -m scripts.workers.usac_audit_scraper --frn=0033716911 --dry-run
Environment:
DATABASE_URL PostgreSQL
USAC_USERNAME Okta username (justin@performancewest.net)
USAC_PASSWORD Okta password
USAC_TOTP_SECRET TOTP secret for MFA (optional — manual if not set)
PLAYWRIGHT_STORAGE_BUCKET MinIO bucket for browser state
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from datetime import date, datetime
from decimal import Decimal
from typing import Optional
import psycopg2
import psycopg2.extras
LOG = logging.getLogger("usac_audit_scraper")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
USAC_EFILE_URL = "https://forms.universalservice.org"
# Pricing constants (cents)
PRICE_NEW_FILING_CENTS = 49900 # $499 per year
PRICE_AMENDMENT_CENTS = 29900 # $299 per year
MULTI_YEAR_DISCOUNT_PCT = 15 # 15% off for 2+ years
MAX_EFILE_YEARS = 6 # USAC E-File goes back ~6 years
def verify_delegate_access(frn: str) -> tuple[bool, str]:
"""Check if we have delegate access to this FRN on USAC E-File.
Returns (success, error_message). In production this would use
Playwright to log in and verify. For now it checks if we have a
storage state file for this entity.
"""
# TODO: Playwright verification against USAC E-File
# For now, assume access is granted if the entity exists with an FRN
LOG.info("Delegate access verification for FRN %s — assumed granted (manual check required)", frn)
return True, ""
def scrape_filing_history(frn: str, filer_id: str) -> list[dict]:
"""Scrape the filing history from USAC E-File for a given FRN.
Returns a list of dicts, one per filing year found.
TODO: Replace with actual Playwright scraping once USAC storage
state is provisioned. For now, returns an empty list — the audit
will mark all expected years as 'unknown' and the report will
note that manual verification is needed.
"""
# TODO: Playwright implementation:
# 1. Load storage state from MinIO (playwright-storage/usac/<entity_id>/)
# 2. Navigate to forms.universalservice.org
# 3. Select the FRN/Filer ID
# 4. Go to Filing History
# 5. For each year: extract status, date filed, revenue, contribution
# 6. Return structured data
LOG.warning(
"USAC scraper not yet implemented — filing history will be estimated. "
"FRN=%s filer_id=%s", frn, filer_id,
)
return []
def determine_expected_years(frn: str, conn) -> list[int]:
"""Determine which reporting years this carrier should have filed.
Based on when the FRN was registered (from fcc_499_filers) up to
the current reporting year.
"""
current_reporting_year = date.today().year - 1
# Check if we know when they started
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"SELECT MIN(created_at) AS earliest FROM compliance_orders WHERE telecom_entity_id = "
"(SELECT id FROM telecom_entities WHERE frn = %s LIMIT 1)",
(frn,),
)
row = cur.fetchone()
# Default: expect filings for the last MAX_EFILE_YEARS years
earliest_year = current_reporting_year - MAX_EFILE_YEARS + 1
if row and row.get("earliest"):
earliest_year = max(earliest_year, row["earliest"].year - 1)
cur.close()
return list(range(earliest_year, current_reporting_year + 1))
def analyze_gaps(
expected_years: list[int],
scraped_filings: list[dict],
frn: str,
conn,
) -> list[dict]:
"""Compare expected vs actual filings and produce audit findings.
Returns a list of dicts suitable for insertion into usac_filing_history.
"""
scraped_by_year = {f["reporting_year"]: f for f in scraped_filings}
current_year = date.today().year
findings = []
for year in expected_years:
if year in scraped_by_year:
f = scraped_by_year[year]
# Check for issues in filed years
flag = "ok"
notes = ""
# Revenue sanity check
rev = f.get("total_revenue_cents", 0) or 0
if rev == 0 and f.get("filing_status") == "filed":
flag = "revenue_suspect"
notes = "Filed with $0 total revenue — verify if correct"
findings.append({
**f,
"audit_flag": flag,
"audit_notes": notes,
})
else:
# Missing filing
too_old = (current_year - year) > MAX_EFILE_YEARS
findings.append({
"frn": frn,
"form_type": "499-A",
"reporting_year": year,
"filing_status": "missing",
"audit_flag": "too_old" if too_old else "missing",
"audit_notes": (
f"No 499-A filing found for CY{year}. "
+ ("This year is >5 years old — requires USAC correspondence, not E-File."
if too_old else
"Can be filed via USAC E-File.")
),
})
return findings
def estimate_retroactive_usf(findings: list[dict], conn) -> list[dict]:
"""Estimate retroactive USF owed for missing years.
Uses the most recent de minimis factor and the carrier's most
recent filed revenue to estimate what they'd owe.
"""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# Get the most recent USF contribution factor
cur.execute(
"SELECT factor FROM fcc_deminimis_factors ORDER BY form_year DESC LIMIT 1"
)
factor_row = cur.fetchone()
factor = Decimal(str(factor_row["factor"])) if factor_row else Decimal("0.256")
# Find the most recent filed revenue for this carrier
filed = [f for f in findings if f.get("filing_status") == "filed" and f.get("total_revenue_cents")]
if filed:
ref_revenue = filed[-1]["total_revenue_cents"]
else:
ref_revenue = 0 # can't estimate without revenue data
for f in findings:
if f.get("audit_flag") in ("missing", "too_old") and ref_revenue > 0:
# Rough estimate: revenue × factor = annual USF contribution
estimated_usf = int(ref_revenue * factor)
# Interest: ~10% per year simple interest (USAC rate varies)
years_overdue = date.today().year - f["reporting_year"]
estimated_interest = int(estimated_usf * Decimal("0.10") * years_overdue)
f["estimated_usf_owed_cents"] = estimated_usf
f["estimated_interest_cents"] = estimated_interest
cur.close()
return findings
def generate_quote(findings: list[dict]) -> dict:
"""Generate a remediation quote based on audit findings."""
missing = [f for f in findings if f.get("audit_flag") == "missing"]
amendments = [f for f in findings if f.get("audit_flag") in ("revenue_suspect", "wrong_category", "should_amend")]
too_old = [f for f in findings if f.get("audit_flag") == "too_old"]
new_count = len(missing)
amend_count = len(amendments)
# Apply multi-year discount if 2+ years
if new_count >= 2:
new_total = int(PRICE_NEW_FILING_CENTS * new_count * (1 - MULTI_YEAR_DISCOUNT_PCT / 100))
else:
new_total = PRICE_NEW_FILING_CENTS * new_count
if amend_count >= 2:
amend_total = int(PRICE_AMENDMENT_CENTS * amend_count * (1 - MULTI_YEAR_DISCOUNT_PCT / 100))
else:
amend_total = PRICE_AMENDMENT_CENTS * amend_count
total_usf = sum(f.get("estimated_usf_owed_cents", 0) for f in findings)
total_interest = sum(f.get("estimated_interest_cents", 0) for f in findings)
return {
"new_filing_years": [f["reporting_year"] for f in missing],
"amendment_years": [f["reporting_year"] for f in amendments],
"too_old_years": [f["reporting_year"] for f in too_old],
"ok_years": [f["reporting_year"] for f in findings if f.get("audit_flag") == "ok"],
"new_filing_count": new_count,
"amendment_count": amend_count,
"too_old_count": len(too_old),
"quote_new_filings_cents": new_total,
"quote_amendments_cents": amend_total,
"quote_total_cents": new_total + amend_total,
"multi_year_discount_applied": new_count >= 2 or amend_count >= 2,
"estimated_usf_owed_cents": total_usf,
"estimated_interest_cents": total_interest,
"estimated_total_liability_cents": total_usf + total_interest,
}
def save_findings(frn: str, entity_id: int | None, findings: list[dict], conn):
"""Persist audit findings to usac_filing_history."""
cur = conn.cursor()
for f in findings:
cur.execute("""
INSERT INTO usac_filing_history (
telecom_entity_id, frn, form_type, reporting_year,
filing_status, date_filed, confirmation_number,
total_revenue_cents, interstate_revenue_cents,
international_revenue_cents, contribution_amount_cents,
line_105_primary, is_deminimis, is_contributor,
audit_flag, audit_notes,
estimated_usf_owed_cents, estimated_interest_cents,
scraped_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, NOW()
)
ON CONFLICT (frn, form_type, reporting_year, reporting_quarter)
DO UPDATE SET
filing_status = EXCLUDED.filing_status,
audit_flag = EXCLUDED.audit_flag,
audit_notes = EXCLUDED.audit_notes,
estimated_usf_owed_cents = EXCLUDED.estimated_usf_owed_cents,
estimated_interest_cents = EXCLUDED.estimated_interest_cents,
scraped_at = NOW(),
updated_at = NOW()
""", (
entity_id, frn,
f.get("form_type", "499-A"),
f["reporting_year"],
f.get("filing_status", "unknown"),
f.get("date_filed"),
f.get("confirmation_number"),
f.get("total_revenue_cents"),
f.get("interstate_revenue_cents"),
f.get("international_revenue_cents"),
f.get("contribution_amount_cents"),
f.get("line_105_primary"),
f.get("is_deminimis"),
f.get("is_contributor"),
f.get("audit_flag"),
f.get("audit_notes"),
f.get("estimated_usf_owed_cents"),
f.get("estimated_interest_cents"),
))
conn.commit()
cur.close()
LOG.info("Saved %d audit findings for FRN %s", len(findings), frn)
def save_audit_run(
frn: str, entity_id: int | None, order_number: str | None,
findings: list[dict], quote: dict, conn,
) -> int:
"""Create/update the audit run record. Returns the run ID."""
cur = conn.cursor()
cur.execute("""
INSERT INTO usac_audit_runs (
telecom_entity_id, frn, order_number,
delegate_access_verified, delegate_access_checked_at,
years_audited, years_filed_ok, years_missing,
years_needs_amendment, years_too_old,
total_estimated_usf_owed_cents, total_estimated_interest_cents,
quote_new_filings_cents, quote_amendments_cents, quote_total_cents,
status
) VALUES (
%s, %s, %s, TRUE, NOW(),
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
'report_ready'
)
RETURNING id
""", (
entity_id, frn, order_number,
len(findings),
len(quote["ok_years"]),
quote["new_filing_count"],
quote["amendment_count"],
quote["too_old_count"],
quote["estimated_usf_owed_cents"],
quote["estimated_interest_cents"],
quote["quote_new_filings_cents"],
quote["quote_amendments_cents"],
quote["quote_total_cents"],
))
run_id = cur.fetchone()[0]
conn.commit()
cur.close()
LOG.info("Audit run %d created for FRN %s", run_id, frn)
return run_id
def run_audit(frn: str, order_number: str | None = None, dry_run: bool = False) -> dict:
"""Run a full USAC filing audit for a carrier.
Returns a summary dict with findings + quote.
"""
if not DATABASE_URL:
LOG.error("DATABASE_URL not set")
return {"error": "DATABASE_URL not set"}
conn = psycopg2.connect(DATABASE_URL)
# Resolve entity
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT id, filer_id_499 FROM telecom_entities WHERE frn = %s LIMIT 1", (frn,))
entity = cur.fetchone()
entity_id = entity["id"] if entity else None
filer_id = (entity or {}).get("filer_id_499", "")
cur.close()
# Step 1: Verify delegate access
access_ok, access_err = verify_delegate_access(frn)
if not access_ok:
LOG.error("Delegate access verification failed for %s: %s", frn, access_err)
return {"error": f"Access denied: {access_err}"}
# Step 2: Determine expected filing years
expected = determine_expected_years(frn, conn)
LOG.info("Expected filing years for %s: %s", frn, expected)
# Step 3: Scrape filing history
scraped = scrape_filing_history(frn, filer_id)
LOG.info("Scraped %d filings from USAC E-File", len(scraped))
# Step 4: Analyze gaps
findings = analyze_gaps(expected, scraped, frn, conn)
# Step 5: Estimate retroactive USF
findings = estimate_retroactive_usf(findings, conn)
# Step 6: Generate quote
quote = generate_quote(findings)
LOG.info(
"Audit summary for %s: %d years checked, %d OK, %d missing, "
"%d need amendment, %d too old",
frn, len(findings), len(quote["ok_years"]),
quote["new_filing_count"], quote["amendment_count"],
quote["too_old_count"],
)
LOG.info(
"Quote: new=$%.2f, amend=$%.2f, total=$%.2f | "
"Est USF owed=$%.2f + interest=$%.2f",
quote["quote_new_filings_cents"] / 100,
quote["quote_amendments_cents"] / 100,
quote["quote_total_cents"] / 100,
quote["estimated_usf_owed_cents"] / 100,
quote["estimated_interest_cents"] / 100,
)
if dry_run:
LOG.info("DRY RUN — not saving to DB")
for f in findings:
LOG.info(" CY%d: %s%s", f["reporting_year"], f.get("audit_flag", "?"), f.get("audit_notes", ""))
conn.close()
return {"findings": findings, "quote": quote}
# Step 7: Save to DB
save_findings(frn, entity_id, findings, conn)
run_id = save_audit_run(frn, entity_id, order_number, findings, quote, conn)
conn.close()
return {"run_id": run_id, "findings": findings, "quote": quote}
def main():
parser = argparse.ArgumentParser(description="USAC E-File filing history audit")
parser.add_argument("--frn", required=True, help="FRN to audit")
parser.add_argument("--order", help="Compliance order number (optional)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
result = run_audit(frn=args.frn, order_number=args.order, dry_run=args.dry_run)
if "error" in result:
LOG.error("Audit failed: %s", result["error"])
sys.exit(1)
if __name__ == "__main__":
main()