new-site/scripts/workers/fcc_rmd_scraper.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

430 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
fcc_rmd_scraper.py — FCC Robocall Mitigation Database scraper.
Two-phase manual tool:
Phase 1 — CSV import
Downloads the full RMD registry CSV from the FCC ServiceNow portal and
upserts every record into the local `fcc_rmd` PostgreSQL table. All fields
except contact_email come directly from the CSV.
Phase 2 — Email scrape
For every row in fcc_rmd where contact_email IS NULL, launches a Playwright
browser, navigates to the record's filing_url, waits for the JS-rendered
ServiceNow form to load, extracts the "Contact Email" field, and writes it
back to the database.
Usage:
# Full run (both phases):
python -m workers.fcc_rmd_scraper
# Specific phases only:
python -m workers.fcc_rmd_scraper --phase csv
python -m workers.fcc_rmd_scraper --phase scrape
# Limit scrape to N records (useful for testing):
python -m workers.fcc_rmd_scraper --phase scrape --limit 50
Environment variables:
DATABASE_URL PostgreSQL connection string
Dependencies:
psycopg2-binary, playwright, requests
playwright install chromium
"""
from __future__ import annotations
import argparse
import csv
import io
import logging
import os
import sys
import time
from datetime import datetime, timezone
from typing import Optional
from urllib.parse import urlparse, parse_qs
import psycopg2
import psycopg2.extras
import requests
LOG = logging.getLogger("workers.fcc_rmd_scraper")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DATABASE_URL = os.environ.get("DATABASE_URL", "")
CSV_URL = "https://fccprod.servicenowservices.com/api/x_g_fmc_rmd/rmd/csv_download"
# Delay between SP API requests to avoid rate-limiting (~1 req/sec)
SCRAPE_DELAY_SECONDS = 1.0
# ---------------------------------------------------------------------------
# Phase 1 — CSV import
# ---------------------------------------------------------------------------
def _parse_bool(val: str) -> bool:
return val.strip().lower() in ("yes", "true", "1")
def _parse_date(val: str) -> Optional[str]:
val = val.strip()
if not val:
return None
# CSV dates arrive as YYYY-MM-DD
return val
def _extract_sys_id(filing_url: str) -> Optional[str]:
"""Pull sys_id query param from the filing URL."""
try:
qs = parse_qs(urlparse(filing_url).query)
ids = qs.get("sys_id", [])
return ids[0] if ids else None
except Exception:
return None
def run_csv_import(conn: psycopg2.extensions.connection) -> int:
"""Download the RMD CSV and upsert all records. Returns row count."""
LOG.info("Downloading RMD CSV from %s", CSV_URL)
resp = requests.get(CSV_URL, timeout=120)
resp.raise_for_status()
LOG.info("Downloaded %d bytes", len(resp.content))
reader = csv.DictReader(io.StringIO(resp.text))
rows = list(reader)
LOG.info("Parsed %d records from CSV", len(rows))
cur = conn.cursor()
upserted = 0
for row in rows:
filing_url = row.get("filing_url", "").strip()
sys_id = _extract_sys_id(filing_url)
cur.execute(
"""
INSERT INTO fcc_rmd (
rmd_number, frn, business_name, business_address,
foreign_voice_provider, country,
other_frns, other_dba_names, previous_dba_names,
contact_name, contact_title, contact_department,
contact_business_address, contact_country,
contact_telephone_number, contact_phone_extension,
implementation,
voice_service_provider, gateway_provider, intermediate_provider,
last_updated, last_recertified,
filing_url, servicenow_sys_id,
csv_imported_at, updated_at
) VALUES (
%(number)s, %(frn)s, %(business_name)s, %(business_address)s,
%(foreign_voice_provider)s, %(country)s,
%(other_frns)s, %(other_dba_names)s, %(previous_dba_names)s,
%(contact_name)s, %(contact_title)s, %(contact_department)s,
%(contact_business_address)s, %(contact_country)s,
%(contact_telephone_number)s, %(contact_phone_extension)s,
%(implementation)s,
%(voice_service_provider)s, %(gateway_provider)s, %(intermediate_provider)s,
%(last_updated)s, %(last_recertified)s,
%(filing_url)s, %(sys_id)s,
now(), now()
)
ON CONFLICT (rmd_number) DO UPDATE SET
frn = EXCLUDED.frn,
business_name = EXCLUDED.business_name,
business_address = EXCLUDED.business_address,
foreign_voice_provider = EXCLUDED.foreign_voice_provider,
country = EXCLUDED.country,
other_frns = EXCLUDED.other_frns,
other_dba_names = EXCLUDED.other_dba_names,
previous_dba_names = EXCLUDED.previous_dba_names,
contact_name = EXCLUDED.contact_name,
contact_title = EXCLUDED.contact_title,
contact_department = EXCLUDED.contact_department,
contact_business_address = EXCLUDED.contact_business_address,
contact_country = EXCLUDED.contact_country,
contact_telephone_number = EXCLUDED.contact_telephone_number,
contact_phone_extension = EXCLUDED.contact_phone_extension,
implementation = EXCLUDED.implementation,
voice_service_provider = EXCLUDED.voice_service_provider,
gateway_provider = EXCLUDED.gateway_provider,
intermediate_provider = EXCLUDED.intermediate_provider,
last_updated = EXCLUDED.last_updated,
last_recertified = EXCLUDED.last_recertified,
filing_url = EXCLUDED.filing_url,
servicenow_sys_id = EXCLUDED.servicenow_sys_id,
removed_from_rmd = FALSE,
removed_at = NULL,
last_seen_in_csv = now(),
updated_at = now()
-- Do NOT overwrite contact_email if already scraped
""",
{
"number": row.get("number", "").strip(),
"frn": row.get("frn", "").strip() or None,
"business_name": row.get("business_name", "").strip() or None,
"business_address": row.get("business_address", "").strip() or None,
"foreign_voice_provider": _parse_bool(row.get("foreign_voice_provider", "No")),
"country": row.get("country", "").strip() or None,
"other_frns": row.get("other_frns", "").strip() or None,
"other_dba_names": row.get("other_dba_names", "").strip() or None,
"previous_dba_names": row.get("previous_dba_names", "").strip() or None,
"contact_name": row.get("robocall_mitigation_contact_name", "").strip() or None,
"contact_title": row.get("contact_title", "").strip() or None,
"contact_department": row.get("contact_department", "").strip() or None,
"contact_business_address": row.get("contact_business_address", "").strip() or None,
"contact_country": row.get("contact_country", "").strip() or None,
"contact_telephone_number": row.get("contact_telephone_number", "").strip() or None,
"contact_phone_extension": row.get("contact_phone_extension", "").strip() or None,
"implementation": row.get("implementation", "").strip() or None,
"voice_service_provider": _parse_bool(row.get("voice_service_provider_choice", "No")),
"gateway_provider": _parse_bool(row.get("gateway_provider_choice", "No")),
"intermediate_provider": _parse_bool(row.get("intermediate_provider_choice", "No")),
"last_updated": _parse_date(row.get("last_updated", "")),
"last_recertified": _parse_date(row.get("last_recertified", "")),
"filing_url": filing_url or None,
"sys_id": sys_id,
},
)
upserted += 1
if upserted % 500 == 0:
conn.commit()
LOG.info(" … committed %d rows", upserted)
conn.commit()
LOG.info("CSV import complete — %d records upserted", upserted)
# Mark records NOT in the current CSV as removed from RMD.
# These providers still exist in our DB for lookup purposes,
# but we flag them so the compliance wizard shows "REMOVED".
csv_rmd_numbers = [r.get("number", "").strip() for r in rows if r.get("number", "").strip()]
if csv_rmd_numbers:
cur.execute(
"""UPDATE fcc_rmd
SET removed_from_rmd = TRUE, removed_at = COALESCE(removed_at, now()), updated_at = now()
WHERE removed_from_rmd = FALSE
AND rmd_number != ALL(%s)""",
(csv_rmd_numbers,),
)
newly_removed = cur.rowcount
conn.commit()
if newly_removed > 0:
LOG.warning("Flagged %d records as REMOVED from RMD (no longer in CSV)", newly_removed)
else:
LOG.info("No new RMD removals detected")
return upserted
# ---------------------------------------------------------------------------
# Phase 2 — ServiceNow SP API email fetch (no browser required)
# ---------------------------------------------------------------------------
#
# The FCC ServiceNow portal exposes an unauthenticated SP page API:
# GET /api/now/sp/page?id=rmd_form&table=...&sys_id={SYS_ID}&...
# The JSON response contains the full record field data, including
# contact_email, nested at:
# result.containers[*].rows[*].columns[*].widgets[*].widget.data.f._fields.contact_email.value
# We do a recursive key search so the exact path doesn't need to be hardcoded.
SP_API_BASE = (
"https://fccprod.servicenowservices.com/api/now/sp/page"
"?id=rmd_form"
"&table=x_g_fmc_rmd_robocall_mitigation_database"
"&sys_id={sys_id}"
"&view=sp&time=1"
"&portal_id=ac2856301b92681048c6ed7bbc4bcb27"
"&request_uri=%2Frmd"
)
_NOISE_DOMAINS = {"fcc.gov", "servicenow.com", "service-now.com"}
_NOISE_LOCALS = {"noreply", "no-reply", "support", "webmaster", "info", "admin", "guest"}
def _find_contact_email_in_json(obj) -> Optional[str]:
"""
Recursively walk a parsed JSON object looking for a dict that has
key "contact_email" with a sub-dict containing a "value" string.
This matches the ServiceNow widget data structure:
{"contact_email": {"value": "person@company.com", "displayValue": "...", ...}}
"""
if isinstance(obj, dict):
if "contact_email" in obj:
candidate = obj["contact_email"]
if isinstance(candidate, dict):
val = candidate.get("value") or candidate.get("displayValue")
if val and isinstance(val, str) and "@" in val:
return val.strip().lower()
elif isinstance(candidate, str) and "@" in candidate:
return candidate.strip().lower()
for v in obj.values():
result = _find_contact_email_in_json(v)
if result:
return result
elif isinstance(obj, list):
for item in obj:
result = _find_contact_email_in_json(item)
if result:
return result
return None
def _fetch_email_via_sp_api(sys_id: str, session: requests.Session) -> Optional[str]:
"""Call the SP page API and extract contact_email from the JSON response."""
url = SP_API_BASE.format(sys_id=sys_id)
try:
resp = session.get(url, timeout=20)
resp.raise_for_status()
data = resp.json()
return _find_contact_email_in_json(data.get("result", {}))
except Exception as exc:
LOG.debug("SP API fetch failed for %s: %s", sys_id, exc)
return None
def run_email_scrape(
conn: psycopg2.extensions.connection,
limit: Optional[int] = None,
) -> int:
"""
Fetch contact email for every fcc_rmd row that has a servicenow_sys_id
but no contact_email yet. Uses the ServiceNow SP page JSON API directly —
no browser required, ~12 requests/sec sustainable.
Returns the number of emails successfully fetched.
"""
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
query = """
SELECT id, rmd_number, servicenow_sys_id
FROM fcc_rmd
WHERE contact_email IS NULL
AND servicenow_sys_id IS NOT NULL
ORDER BY rmd_number
"""
if limit:
query += f" LIMIT {int(limit)}"
cur.execute(query)
rows = cur.fetchall()
LOG.info("Fetching email for %d records via SP API …", len(rows))
scraped = 0
no_email = 0
errors = 0
session = requests.Session()
session.headers.update({
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept": "application/json",
})
for i, row in enumerate(rows, 1):
rmd = row["rmd_number"]
sys_id = row["servicenow_sys_id"]
try:
email_val = _fetch_email_via_sp_api(sys_id, session)
if email_val:
cur.execute(
"""
UPDATE fcc_rmd
SET contact_email = %s,
contact_email_scraped_at = now(),
updated_at = now()
WHERE id = %s
""",
(email_val, row["id"]),
)
conn.commit()
scraped += 1
if i <= 10 or i % 500 == 0:
LOG.info("[%d/%d] %s -> %s", i, len(rows), rmd, email_val)
else:
no_email += 1
if i <= 10 or i % 500 == 0:
LOG.info("[%d/%d] %s -> no email", i, len(rows), rmd)
except Exception as exc:
LOG.error("[%d/%d] %s -> error: %s", i, len(rows), rmd, exc)
errors += 1
try:
conn.rollback()
except Exception:
pass
# Polite delay: ~1 req/sec to avoid rate-limiting
time.sleep(SCRAPE_DELAY_SECONDS)
# Batch progress log every 1000
if i % 1000 == 0:
LOG.info("Progress: %d/%d%d emails, %d no-email, %d errors",
i, len(rows), scraped, no_email, errors)
LOG.info(
"Email fetch complete — %d found, %d no email in record, %d errors",
scraped, no_email, errors,
)
return scraped
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="FCC RMD scraper — import CSV and scrape emails"
)
parser.add_argument(
"--phase",
choices=["csv", "scrape", "all"],
default="all",
help="Which phase(s) to run (default: all)",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limit number of records to scrape (phase=scrape only)",
)
args = parser.parse_args()
if not DATABASE_URL:
LOG.error("DATABASE_URL is not set")
sys.exit(1)
conn = psycopg2.connect(DATABASE_URL)
conn.autocommit = False
try:
phase = args.phase
if phase in ("csv", "all"):
run_csv_import(conn)
if phase in ("scrape", "all"):
run_email_scrape(conn, limit=args.limit)
finally:
conn.close()
LOG.info("Done.")
if __name__ == "__main__":
main()