Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
429 lines
18 KiB
Python
429 lines
18 KiB
Python
"""
|
|
FCC CORES Scraper — Red Light Status + 499 Filer Database
|
|
|
|
Two functions:
|
|
1. check_red_light(frn) — logs into CORES with PW credentials,
|
|
navigates to FRN Financial / Red Light Display, returns red/green status
|
|
2. download_499_filer_excel() — logs into the 499 filer database page,
|
|
downloads the Excel dump of all filers, parses and upserts into PG
|
|
|
|
Credentials from environment:
|
|
FCC_CORES_USERNAME — CORES login email
|
|
FCC_CORES_PASSWORD — CORES login password
|
|
|
|
Schedule:
|
|
Red light: on-demand per FRN (called by API or job server)
|
|
499 filer: daily via cron alongside RMD scraper
|
|
|
|
Usage:
|
|
# Check red light for a single FRN
|
|
python3 -m scripts.workers.fcc_cores_scraper --red-light 0027160886
|
|
|
|
# Download and import 499 filer database
|
|
python3 -m scripts.workers.fcc_cores_scraper --filer-db
|
|
|
|
# Both
|
|
python3 -m scripts.workers.fcc_cores_scraper --filer-db --red-light 0027160886
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import csv
|
|
import io
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import psycopg2
|
|
|
|
LOG = logging.getLogger("workers.cores_scraper")
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s")
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
|
|
FCC_CORES_USERNAME = os.getenv("FCC_CORES_USERNAME", "")
|
|
FCC_CORES_PASSWORD = os.getenv("FCC_CORES_PASSWORD", "")
|
|
|
|
CORES_LOGIN_URL = "https://apps.fcc.gov/cores/userLogin.do"
|
|
CORES_RLD_URL = "https://apps.fcc.gov/cores/paymentFrnSearch.do"
|
|
FILER_DB_URL = "https://apps.fcc.gov/cgb/form499/499a.cfm"
|
|
|
|
|
|
async def _login_cores(page) -> bool:
|
|
"""Log into FCC CORES. Returns True on success."""
|
|
if not FCC_CORES_USERNAME or not FCC_CORES_PASSWORD:
|
|
LOG.error("FCC_CORES_USERNAME / FCC_CORES_PASSWORD not set")
|
|
return False
|
|
|
|
LOG.info("Logging into FCC CORES as %s", FCC_CORES_USERNAME)
|
|
await page.goto(CORES_LOGIN_URL, wait_until="domcontentloaded", timeout=30000)
|
|
await page.wait_for_timeout(2000)
|
|
|
|
# CORES uses: input[name="userName"] + input[name="mima"] + input[type="image"][name="button.submit"]
|
|
username = page.locator('input[name="userName"]')
|
|
password = page.locator('input[name="mima"]')
|
|
|
|
if await username.count() == 0 or await password.count() == 0:
|
|
LOG.error("CORES login form not found — page may have changed")
|
|
return False
|
|
|
|
await username.fill(FCC_CORES_USERNAME)
|
|
await password.fill(FCC_CORES_PASSWORD)
|
|
|
|
# Submit is an image input, not a regular button
|
|
submit = page.locator('input[name="button.submit"]')
|
|
if await submit.count() == 0:
|
|
submit = page.locator('input[type="image"]').first
|
|
await submit.click()
|
|
await page.wait_for_timeout(5000)
|
|
|
|
# Check for "continue" button (CORES sometimes shows a session warning)
|
|
continue_btn = page.locator('#continue, input[id="continue"]')
|
|
if await continue_btn.count() > 0 and await continue_btn.is_visible():
|
|
await continue_btn.click()
|
|
await page.wait_for_timeout(3000)
|
|
|
|
page_text = await page.inner_text("body")
|
|
if "invalid" in page_text.lower() or "incorrect" in page_text.lower():
|
|
LOG.error("CORES login failed — invalid credentials")
|
|
return False
|
|
|
|
LOG.info("CORES login successful")
|
|
return True
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# Red Light Status Checker
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
async def check_red_light(frn: str) -> dict:
|
|
"""Check the red/green light status of an FRN via CORES.
|
|
|
|
Returns:
|
|
{"frn": "...", "status": "green"|"red"|"unknown", "detail": "..."}
|
|
"""
|
|
from playwright.async_api import async_playwright
|
|
|
|
result = {"frn": frn, "status": "unknown", "detail": ""}
|
|
|
|
async with async_playwright() as pw:
|
|
browser = await pw.chromium.launch(headless=True)
|
|
page = await browser.new_page()
|
|
|
|
try:
|
|
if not await _login_cores(page):
|
|
result["detail"] = "CORES login failed"
|
|
return result
|
|
|
|
# Navigate to FRN Financial / Red Light search
|
|
LOG.info("Checking red light status for FRN %s", frn)
|
|
await page.goto(CORES_RLD_URL, wait_until="domcontentloaded", timeout=15000)
|
|
await page.wait_for_timeout(2000)
|
|
|
|
# After login, take a dashboard screenshot then navigate to FRN Financial
|
|
await page.wait_for_timeout(2000)
|
|
|
|
# Debug: screenshot the post-login dashboard
|
|
try:
|
|
await page.screenshot(path=f"/app/data/debug_cores_dashboard.png", full_page=True)
|
|
LOG.info("Dashboard screenshot saved. URL: %s", page.url)
|
|
# Also log all links on the page
|
|
links = await page.evaluate("() => [...document.querySelectorAll('a')].map(a => ({text: a.textContent.trim().substring(0,50), href: a.href})).filter(l => l.text)")
|
|
for link in (links or [])[:20]:
|
|
LOG.info(" Link: %s → %s", link.get("text",""), link.get("href","")[:80])
|
|
except Exception as ss_err:
|
|
LOG.warning("Dashboard screenshot failed: %s", ss_err)
|
|
|
|
# Click "FRN Financial" or "Manage Existing FRNs" link in the dashboard
|
|
fin_link = page.locator('a:has-text("FRN Financial"), a:has-text("Financial"), a[href*="payment"], a[href*="financial"]')
|
|
if await fin_link.count() > 0:
|
|
await fin_link.first.click()
|
|
await page.wait_for_timeout(3000)
|
|
else:
|
|
# Try navigating to manage FRNs first
|
|
manage_link = page.locator('a:has-text("Manage Existing FRNs"), a:has-text("Manage FRN")')
|
|
if await manage_link.count() > 0:
|
|
await manage_link.first.click()
|
|
await page.wait_for_timeout(2000)
|
|
fin_link = page.locator('a:has-text("FRN Financial"), a:has-text("Financial")')
|
|
if await fin_link.count() > 0:
|
|
await fin_link.first.click()
|
|
await page.wait_for_timeout(3000)
|
|
|
|
# Now we should be on the FRN Financial search page — enter the FRN
|
|
frn_input = page.locator('input[name*="frn" i], input[name*="FRN"], input[id*="frn" i]')
|
|
if await frn_input.count() > 0:
|
|
await frn_input.first.fill(frn)
|
|
search_btn = page.locator('input[type="submit"], input[type="image"], button:has-text("Search"), button:has-text("Submit"), input[value*="Search"], input[value*="Go"]')
|
|
if await search_btn.count() > 0:
|
|
await search_btn.first.click()
|
|
await page.wait_for_timeout(3000)
|
|
|
|
# Take a debug screenshot
|
|
try:
|
|
await page.screenshot(path=f"/app/data/debug_redlight_{frn}.png", full_page=True)
|
|
LOG.info("Debug screenshot saved: /app/data/debug_redlight_%s.png", frn)
|
|
except Exception:
|
|
pass
|
|
|
|
page_text = await page.inner_text("body")
|
|
page_lower = page_text.lower()
|
|
|
|
if "green" in page_lower and "light" in page_lower:
|
|
result["status"] = "green"
|
|
result["detail"] = "No delinquent debts — GREEN light status"
|
|
elif "red" in page_lower and "light" in page_lower:
|
|
result["status"] = "red"
|
|
result["detail"] = "Outstanding delinquent debt — RED light status"
|
|
elif "no outstanding" in page_lower or "no delinquent" in page_lower:
|
|
result["status"] = "green"
|
|
result["detail"] = "No outstanding debts found"
|
|
elif "delinquent" in page_lower or "outstanding" in page_lower:
|
|
result["status"] = "red"
|
|
result["detail"] = "Delinquent debt found"
|
|
else:
|
|
result["detail"] = "Could not determine red/green light status from page"
|
|
|
|
LOG.info("FRN %s: %s — %s", frn, result["status"], result["detail"])
|
|
|
|
# Store in DB
|
|
try:
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""UPDATE fcc_rmd SET
|
|
red_light_status = %s,
|
|
red_light_checked_at = NOW()
|
|
WHERE frn = %s""",
|
|
(result["status"], frn),
|
|
)
|
|
if cur.rowcount == 0:
|
|
# Also try telecom_entities
|
|
cur.execute(
|
|
"""UPDATE telecom_entities SET
|
|
notes = COALESCE(notes, '') || %s
|
|
WHERE frn = %s""",
|
|
(f"\n[Red Light: {result['status']} checked {datetime.now(timezone.utc).isoformat()}]", frn),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as db_err:
|
|
LOG.warning("DB update failed: %s", db_err)
|
|
|
|
except Exception as e:
|
|
LOG.error("Red light check failed: %s", e)
|
|
result["detail"] = str(e)
|
|
finally:
|
|
await browser.close()
|
|
|
|
return result
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# 499 Filer Database Excel Downloader
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
async def download_499_filer_db() -> int:
|
|
"""Download the 499 filer Excel dump from FCC and import into PG.
|
|
|
|
Returns count of upserted records.
|
|
"""
|
|
from playwright.async_api import async_playwright
|
|
|
|
async with async_playwright() as pw:
|
|
browser = await pw.chromium.launch(headless=True)
|
|
context = await browser.new_context(accept_downloads=True)
|
|
page = await context.new_page()
|
|
|
|
try:
|
|
LOG.info("Navigating to FCC 499 Filer Database")
|
|
await page.goto(FILER_DB_URL, wait_until="domcontentloaded", timeout=30000)
|
|
await page.wait_for_timeout(2000)
|
|
|
|
# Look for the "Excel Dump" link
|
|
excel_link = page.locator('a:has-text("Excel"), a[href*="excel"], a[href*="xls"], a:has-text("Download")')
|
|
|
|
if await excel_link.count() == 0:
|
|
LOG.warning("No Excel download link found — trying XML API instead")
|
|
# Try the XML API endpoint
|
|
await page.goto(f"{FILER_DB_URL.replace('499a.cfm', '499results.cfm')}?XML=TRUE",
|
|
wait_until="domcontentloaded", timeout=30000)
|
|
content = await page.content()
|
|
LOG.info("XML response length: %d", len(content))
|
|
# Parse XML and import
|
|
count = _parse_and_import_xml(content)
|
|
await browser.close()
|
|
return count
|
|
|
|
# Click Excel download
|
|
async with page.expect_download(timeout=60000) as download_info:
|
|
await excel_link.first.click()
|
|
download = await download_info.value
|
|
|
|
# Save to temp file
|
|
temp_path = f"/tmp/fcc_499_filers_{datetime.now().strftime('%Y%m%d')}.xlsx"
|
|
await download.save_as(temp_path)
|
|
LOG.info("Downloaded 499 filer Excel to %s (%d bytes)",
|
|
temp_path, Path(temp_path).stat().st_size)
|
|
|
|
# Parse Excel and import
|
|
count = _parse_and_import_excel(temp_path)
|
|
|
|
except Exception as e:
|
|
LOG.error("499 filer download failed: %s", e)
|
|
count = 0
|
|
finally:
|
|
await browser.close()
|
|
|
|
return count
|
|
|
|
|
|
def _parse_and_import_excel(path: str) -> int:
|
|
"""Parse the 499 filer Excel file and upsert into PG."""
|
|
try:
|
|
import openpyxl
|
|
except ImportError:
|
|
LOG.error("openpyxl not installed — pip install openpyxl")
|
|
return 0
|
|
|
|
wb = openpyxl.load_workbook(path, read_only=True)
|
|
ws = wb.active
|
|
rows = list(ws.iter_rows(values_only=True))
|
|
|
|
if len(rows) < 2:
|
|
LOG.warning("Excel file has fewer than 2 rows — no data")
|
|
return 0
|
|
|
|
headers = [str(h or "").strip().lower().replace(" ", "_") for h in rows[0]]
|
|
LOG.info("Excel headers: %s", headers[:10])
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
now = datetime.now(timezone.utc)
|
|
count = 0
|
|
|
|
with conn.cursor() as cur:
|
|
for row in rows[1:]:
|
|
data = dict(zip(headers, row))
|
|
filer_id = str(data.get("filer_id", data.get("filerid", data.get("499_id", "")))).strip()
|
|
if not filer_id:
|
|
continue
|
|
|
|
frn = str(data.get("frn", data.get("coresid", data.get("fcc_registration_number", "")))).strip() or None
|
|
legal_name = str(data.get("legal_name", data.get("legalname", data.get("company_name", "")))).strip()
|
|
if not legal_name:
|
|
continue
|
|
|
|
cur.execute("""
|
|
INSERT INTO fcc_499_filers (filer_id, frn, legal_name, trade_name, state,
|
|
service_type, holding_company, status, last_scraped_at, updated_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (filer_id) DO UPDATE SET
|
|
frn = COALESCE(EXCLUDED.frn, fcc_499_filers.frn),
|
|
legal_name = EXCLUDED.legal_name,
|
|
trade_name = COALESCE(EXCLUDED.trade_name, fcc_499_filers.trade_name),
|
|
state = COALESCE(EXCLUDED.state, fcc_499_filers.state),
|
|
service_type = COALESCE(EXCLUDED.service_type, fcc_499_filers.service_type),
|
|
holding_company = COALESCE(EXCLUDED.holding_company, fcc_499_filers.holding_company),
|
|
last_scraped_at = %s,
|
|
updated_at = %s
|
|
""", (
|
|
filer_id, frn, legal_name,
|
|
str(data.get("trade_name", data.get("tradename", ""))).strip() or None,
|
|
str(data.get("state", "")).strip()[:2] or None,
|
|
str(data.get("service_type", data.get("servicetype", data.get("principal_communications_type", "")))).strip() or None,
|
|
str(data.get("holding_company", data.get("holdingcompany", ""))).strip() or None,
|
|
str(data.get("status", data.get("operational_status", "Active"))).strip(),
|
|
now, now, now, now,
|
|
))
|
|
count += 1
|
|
|
|
if count % 1000 == 0:
|
|
conn.commit()
|
|
LOG.info(" … committed %d rows", count)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
LOG.info("Imported %d filers from Excel", count)
|
|
return count
|
|
|
|
|
|
def _parse_and_import_xml(xml_content: str) -> int:
|
|
"""Parse XML filer data and upsert. Fallback if Excel isn't available."""
|
|
import xml.etree.ElementTree as ET
|
|
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
except ET.ParseError:
|
|
LOG.error("Failed to parse XML")
|
|
return 0
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
now = datetime.now(timezone.utc)
|
|
count = 0
|
|
|
|
with conn.cursor() as cur:
|
|
for child in root:
|
|
data = {}
|
|
for field in child:
|
|
tag = field.tag.lower().replace(" ", "_").replace("-", "_")
|
|
data[tag] = (field.text or "").strip()
|
|
|
|
filer_id = data.get("filerid", data.get("filer_id", ""))
|
|
legal_name = data.get("legal_name", data.get("legalname", data.get("name", "")))
|
|
if not filer_id or not legal_name:
|
|
continue
|
|
|
|
cur.execute("""
|
|
INSERT INTO fcc_499_filers (filer_id, frn, legal_name, trade_name, state,
|
|
service_type, status, last_scraped_at, updated_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (filer_id) DO UPDATE SET
|
|
frn = COALESCE(EXCLUDED.frn, fcc_499_filers.frn),
|
|
legal_name = EXCLUDED.legal_name,
|
|
last_scraped_at = %s,
|
|
updated_at = %s
|
|
""", (
|
|
filer_id,
|
|
data.get("frn", data.get("coresid")) or None,
|
|
legal_name,
|
|
data.get("trade_name", data.get("tradename")) or None,
|
|
(data.get("state") or "")[:2] or None,
|
|
data.get("service_type", data.get("servicetype")) or None,
|
|
data.get("status", "Active"),
|
|
now, now, now, now,
|
|
))
|
|
count += 1
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
LOG.info("Imported %d filers from XML", count)
|
|
return count
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
# CLI
|
|
# ──────────────────────────────────────────────────────────────────────
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description="FCC CORES Scraper")
|
|
parser.add_argument("--red-light", metavar="FRN", help="Check red light status for an FRN")
|
|
parser.add_argument("--filer-db", action="store_true", help="Download and import 499 filer database")
|
|
args = parser.parse_args()
|
|
|
|
if args.red_light:
|
|
result = await check_red_light(args.red_light)
|
|
print(f"FRN {result['frn']}: {result['status']} — {result['detail']}")
|
|
|
|
if args.filer_db:
|
|
count = await download_499_filer_db()
|
|
print(f"Imported {count} filers")
|
|
|
|
if not args.red_light and not args.filer_db:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|