Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
430 lines
16 KiB
Python
430 lines
16 KiB
Python
"""
|
||
fcc_rmd_scraper.py — FCC Robocall Mitigation Database scraper.
|
||
|
||
Two-phase manual tool:
|
||
|
||
Phase 1 — CSV import
|
||
Downloads the full RMD registry CSV from the FCC ServiceNow portal and
|
||
upserts every record into the local `fcc_rmd` PostgreSQL table. All fields
|
||
except contact_email come directly from the CSV.
|
||
|
||
Phase 2 — Email scrape
|
||
For every row in fcc_rmd where contact_email IS NULL, launches a Playwright
|
||
browser, navigates to the record's filing_url, waits for the JS-rendered
|
||
ServiceNow form to load, extracts the "Contact Email" field, and writes it
|
||
back to the database.
|
||
|
||
Usage:
|
||
# Full run (both phases):
|
||
python -m workers.fcc_rmd_scraper
|
||
|
||
# Specific phases only:
|
||
python -m workers.fcc_rmd_scraper --phase csv
|
||
python -m workers.fcc_rmd_scraper --phase scrape
|
||
|
||
# Limit scrape to N records (useful for testing):
|
||
python -m workers.fcc_rmd_scraper --phase scrape --limit 50
|
||
|
||
Environment variables:
|
||
DATABASE_URL PostgreSQL connection string
|
||
|
||
Dependencies:
|
||
psycopg2-binary, playwright, requests
|
||
playwright install chromium
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import io
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
import requests
|
||
|
||
LOG = logging.getLogger("workers.fcc_rmd_scraper")
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
||
stream=sys.stdout,
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
|
||
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
||
|
||
CSV_URL = "https://fccprod.servicenowservices.com/api/x_g_fmc_rmd/rmd/csv_download"
|
||
|
||
# Delay between SP API requests to avoid rate-limiting (~1 req/sec)
|
||
SCRAPE_DELAY_SECONDS = 1.0
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Phase 1 — CSV import
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _parse_bool(val: str) -> bool:
|
||
return val.strip().lower() in ("yes", "true", "1")
|
||
|
||
|
||
def _parse_date(val: str) -> Optional[str]:
|
||
val = val.strip()
|
||
if not val:
|
||
return None
|
||
# CSV dates arrive as YYYY-MM-DD
|
||
return val
|
||
|
||
|
||
def _extract_sys_id(filing_url: str) -> Optional[str]:
|
||
"""Pull sys_id query param from the filing URL."""
|
||
try:
|
||
qs = parse_qs(urlparse(filing_url).query)
|
||
ids = qs.get("sys_id", [])
|
||
return ids[0] if ids else None
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def run_csv_import(conn: psycopg2.extensions.connection) -> int:
|
||
"""Download the RMD CSV and upsert all records. Returns row count."""
|
||
LOG.info("Downloading RMD CSV from %s …", CSV_URL)
|
||
resp = requests.get(CSV_URL, timeout=120)
|
||
resp.raise_for_status()
|
||
LOG.info("Downloaded %d bytes", len(resp.content))
|
||
|
||
reader = csv.DictReader(io.StringIO(resp.text))
|
||
rows = list(reader)
|
||
LOG.info("Parsed %d records from CSV", len(rows))
|
||
|
||
cur = conn.cursor()
|
||
upserted = 0
|
||
|
||
for row in rows:
|
||
filing_url = row.get("filing_url", "").strip()
|
||
sys_id = _extract_sys_id(filing_url)
|
||
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO fcc_rmd (
|
||
rmd_number, frn, business_name, business_address,
|
||
foreign_voice_provider, country,
|
||
other_frns, other_dba_names, previous_dba_names,
|
||
contact_name, contact_title, contact_department,
|
||
contact_business_address, contact_country,
|
||
contact_telephone_number, contact_phone_extension,
|
||
implementation,
|
||
voice_service_provider, gateway_provider, intermediate_provider,
|
||
last_updated, last_recertified,
|
||
filing_url, servicenow_sys_id,
|
||
csv_imported_at, updated_at
|
||
) VALUES (
|
||
%(number)s, %(frn)s, %(business_name)s, %(business_address)s,
|
||
%(foreign_voice_provider)s, %(country)s,
|
||
%(other_frns)s, %(other_dba_names)s, %(previous_dba_names)s,
|
||
%(contact_name)s, %(contact_title)s, %(contact_department)s,
|
||
%(contact_business_address)s, %(contact_country)s,
|
||
%(contact_telephone_number)s, %(contact_phone_extension)s,
|
||
%(implementation)s,
|
||
%(voice_service_provider)s, %(gateway_provider)s, %(intermediate_provider)s,
|
||
%(last_updated)s, %(last_recertified)s,
|
||
%(filing_url)s, %(sys_id)s,
|
||
now(), now()
|
||
)
|
||
ON CONFLICT (rmd_number) DO UPDATE SET
|
||
frn = EXCLUDED.frn,
|
||
business_name = EXCLUDED.business_name,
|
||
business_address = EXCLUDED.business_address,
|
||
foreign_voice_provider = EXCLUDED.foreign_voice_provider,
|
||
country = EXCLUDED.country,
|
||
other_frns = EXCLUDED.other_frns,
|
||
other_dba_names = EXCLUDED.other_dba_names,
|
||
previous_dba_names = EXCLUDED.previous_dba_names,
|
||
contact_name = EXCLUDED.contact_name,
|
||
contact_title = EXCLUDED.contact_title,
|
||
contact_department = EXCLUDED.contact_department,
|
||
contact_business_address = EXCLUDED.contact_business_address,
|
||
contact_country = EXCLUDED.contact_country,
|
||
contact_telephone_number = EXCLUDED.contact_telephone_number,
|
||
contact_phone_extension = EXCLUDED.contact_phone_extension,
|
||
implementation = EXCLUDED.implementation,
|
||
voice_service_provider = EXCLUDED.voice_service_provider,
|
||
gateway_provider = EXCLUDED.gateway_provider,
|
||
intermediate_provider = EXCLUDED.intermediate_provider,
|
||
last_updated = EXCLUDED.last_updated,
|
||
last_recertified = EXCLUDED.last_recertified,
|
||
filing_url = EXCLUDED.filing_url,
|
||
servicenow_sys_id = EXCLUDED.servicenow_sys_id,
|
||
removed_from_rmd = FALSE,
|
||
removed_at = NULL,
|
||
last_seen_in_csv = now(),
|
||
updated_at = now()
|
||
-- Do NOT overwrite contact_email if already scraped
|
||
""",
|
||
{
|
||
"number": row.get("number", "").strip(),
|
||
"frn": row.get("frn", "").strip() or None,
|
||
"business_name": row.get("business_name", "").strip() or None,
|
||
"business_address": row.get("business_address", "").strip() or None,
|
||
"foreign_voice_provider": _parse_bool(row.get("foreign_voice_provider", "No")),
|
||
"country": row.get("country", "").strip() or None,
|
||
"other_frns": row.get("other_frns", "").strip() or None,
|
||
"other_dba_names": row.get("other_dba_names", "").strip() or None,
|
||
"previous_dba_names": row.get("previous_dba_names", "").strip() or None,
|
||
"contact_name": row.get("robocall_mitigation_contact_name", "").strip() or None,
|
||
"contact_title": row.get("contact_title", "").strip() or None,
|
||
"contact_department": row.get("contact_department", "").strip() or None,
|
||
"contact_business_address": row.get("contact_business_address", "").strip() or None,
|
||
"contact_country": row.get("contact_country", "").strip() or None,
|
||
"contact_telephone_number": row.get("contact_telephone_number", "").strip() or None,
|
||
"contact_phone_extension": row.get("contact_phone_extension", "").strip() or None,
|
||
"implementation": row.get("implementation", "").strip() or None,
|
||
"voice_service_provider": _parse_bool(row.get("voice_service_provider_choice", "No")),
|
||
"gateway_provider": _parse_bool(row.get("gateway_provider_choice", "No")),
|
||
"intermediate_provider": _parse_bool(row.get("intermediate_provider_choice", "No")),
|
||
"last_updated": _parse_date(row.get("last_updated", "")),
|
||
"last_recertified": _parse_date(row.get("last_recertified", "")),
|
||
"filing_url": filing_url or None,
|
||
"sys_id": sys_id,
|
||
},
|
||
)
|
||
upserted += 1
|
||
|
||
if upserted % 500 == 0:
|
||
conn.commit()
|
||
LOG.info(" … committed %d rows", upserted)
|
||
|
||
conn.commit()
|
||
LOG.info("CSV import complete — %d records upserted", upserted)
|
||
|
||
# Mark records NOT in the current CSV as removed from RMD.
|
||
# These providers still exist in our DB for lookup purposes,
|
||
# but we flag them so the compliance wizard shows "REMOVED".
|
||
csv_rmd_numbers = [r.get("number", "").strip() for r in rows if r.get("number", "").strip()]
|
||
if csv_rmd_numbers:
|
||
cur.execute(
|
||
"""UPDATE fcc_rmd
|
||
SET removed_from_rmd = TRUE, removed_at = COALESCE(removed_at, now()), updated_at = now()
|
||
WHERE removed_from_rmd = FALSE
|
||
AND rmd_number != ALL(%s)""",
|
||
(csv_rmd_numbers,),
|
||
)
|
||
newly_removed = cur.rowcount
|
||
conn.commit()
|
||
if newly_removed > 0:
|
||
LOG.warning("Flagged %d records as REMOVED from RMD (no longer in CSV)", newly_removed)
|
||
else:
|
||
LOG.info("No new RMD removals detected")
|
||
|
||
return upserted
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Phase 2 — ServiceNow SP API email fetch (no browser required)
|
||
# ---------------------------------------------------------------------------
|
||
#
|
||
# The FCC ServiceNow portal exposes an unauthenticated SP page API:
|
||
# GET /api/now/sp/page?id=rmd_form&table=...&sys_id={SYS_ID}&...
|
||
# The JSON response contains the full record field data, including
|
||
# contact_email, nested at:
|
||
# result.containers[*].rows[*].columns[*].widgets[*].widget.data.f._fields.contact_email.value
|
||
# We do a recursive key search so the exact path doesn't need to be hardcoded.
|
||
|
||
SP_API_BASE = (
|
||
"https://fccprod.servicenowservices.com/api/now/sp/page"
|
||
"?id=rmd_form"
|
||
"&table=x_g_fmc_rmd_robocall_mitigation_database"
|
||
"&sys_id={sys_id}"
|
||
"&view=sp&time=1"
|
||
"&portal_id=ac2856301b92681048c6ed7bbc4bcb27"
|
||
"&request_uri=%2Frmd"
|
||
)
|
||
|
||
_NOISE_DOMAINS = {"fcc.gov", "servicenow.com", "service-now.com"}
|
||
_NOISE_LOCALS = {"noreply", "no-reply", "support", "webmaster", "info", "admin", "guest"}
|
||
|
||
|
||
def _find_contact_email_in_json(obj) -> Optional[str]:
|
||
"""
|
||
Recursively walk a parsed JSON object looking for a dict that has
|
||
key "contact_email" with a sub-dict containing a "value" string.
|
||
This matches the ServiceNow widget data structure:
|
||
{"contact_email": {"value": "person@company.com", "displayValue": "...", ...}}
|
||
"""
|
||
if isinstance(obj, dict):
|
||
if "contact_email" in obj:
|
||
candidate = obj["contact_email"]
|
||
if isinstance(candidate, dict):
|
||
val = candidate.get("value") or candidate.get("displayValue")
|
||
if val and isinstance(val, str) and "@" in val:
|
||
return val.strip().lower()
|
||
elif isinstance(candidate, str) and "@" in candidate:
|
||
return candidate.strip().lower()
|
||
for v in obj.values():
|
||
result = _find_contact_email_in_json(v)
|
||
if result:
|
||
return result
|
||
elif isinstance(obj, list):
|
||
for item in obj:
|
||
result = _find_contact_email_in_json(item)
|
||
if result:
|
||
return result
|
||
return None
|
||
|
||
|
||
def _fetch_email_via_sp_api(sys_id: str, session: requests.Session) -> Optional[str]:
|
||
"""Call the SP page API and extract contact_email from the JSON response."""
|
||
url = SP_API_BASE.format(sys_id=sys_id)
|
||
try:
|
||
resp = session.get(url, timeout=20)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
return _find_contact_email_in_json(data.get("result", {}))
|
||
except Exception as exc:
|
||
LOG.debug("SP API fetch failed for %s: %s", sys_id, exc)
|
||
return None
|
||
|
||
|
||
def run_email_scrape(
|
||
conn: psycopg2.extensions.connection,
|
||
limit: Optional[int] = None,
|
||
) -> int:
|
||
"""
|
||
Fetch contact email for every fcc_rmd row that has a servicenow_sys_id
|
||
but no contact_email yet. Uses the ServiceNow SP page JSON API directly —
|
||
no browser required, ~1–2 requests/sec sustainable.
|
||
|
||
Returns the number of emails successfully fetched.
|
||
"""
|
||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||
query = """
|
||
SELECT id, rmd_number, servicenow_sys_id
|
||
FROM fcc_rmd
|
||
WHERE contact_email IS NULL
|
||
AND servicenow_sys_id IS NOT NULL
|
||
ORDER BY rmd_number
|
||
"""
|
||
if limit:
|
||
query += f" LIMIT {int(limit)}"
|
||
cur.execute(query)
|
||
rows = cur.fetchall()
|
||
|
||
LOG.info("Fetching email for %d records via SP API …", len(rows))
|
||
scraped = 0
|
||
no_email = 0
|
||
errors = 0
|
||
|
||
session = requests.Session()
|
||
session.headers.update({
|
||
"User-Agent": (
|
||
"Mozilla/5.0 (X11; Linux x86_64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/124.0.0.0 Safari/537.36"
|
||
),
|
||
"Accept": "application/json",
|
||
})
|
||
|
||
for i, row in enumerate(rows, 1):
|
||
rmd = row["rmd_number"]
|
||
sys_id = row["servicenow_sys_id"]
|
||
|
||
try:
|
||
email_val = _fetch_email_via_sp_api(sys_id, session)
|
||
|
||
if email_val:
|
||
cur.execute(
|
||
"""
|
||
UPDATE fcc_rmd
|
||
SET contact_email = %s,
|
||
contact_email_scraped_at = now(),
|
||
updated_at = now()
|
||
WHERE id = %s
|
||
""",
|
||
(email_val, row["id"]),
|
||
)
|
||
conn.commit()
|
||
scraped += 1
|
||
if i <= 10 or i % 500 == 0:
|
||
LOG.info("[%d/%d] %s -> %s", i, len(rows), rmd, email_val)
|
||
else:
|
||
no_email += 1
|
||
if i <= 10 or i % 500 == 0:
|
||
LOG.info("[%d/%d] %s -> no email", i, len(rows), rmd)
|
||
|
||
except Exception as exc:
|
||
LOG.error("[%d/%d] %s -> error: %s", i, len(rows), rmd, exc)
|
||
errors += 1
|
||
try:
|
||
conn.rollback()
|
||
except Exception:
|
||
pass
|
||
|
||
# Polite delay: ~1 req/sec to avoid rate-limiting
|
||
time.sleep(SCRAPE_DELAY_SECONDS)
|
||
|
||
# Batch progress log every 1000
|
||
if i % 1000 == 0:
|
||
LOG.info("Progress: %d/%d — %d emails, %d no-email, %d errors",
|
||
i, len(rows), scraped, no_email, errors)
|
||
|
||
LOG.info(
|
||
"Email fetch complete — %d found, %d no email in record, %d errors",
|
||
scraped, no_email, errors,
|
||
)
|
||
return scraped
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entrypoint
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(
|
||
description="FCC RMD scraper — import CSV and scrape emails"
|
||
)
|
||
parser.add_argument(
|
||
"--phase",
|
||
choices=["csv", "scrape", "all"],
|
||
default="all",
|
||
help="Which phase(s) to run (default: all)",
|
||
)
|
||
parser.add_argument(
|
||
"--limit",
|
||
type=int,
|
||
default=None,
|
||
help="Limit number of records to scrape (phase=scrape only)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if not DATABASE_URL:
|
||
LOG.error("DATABASE_URL is not set")
|
||
sys.exit(1)
|
||
|
||
conn = psycopg2.connect(DATABASE_URL)
|
||
conn.autocommit = False
|
||
|
||
try:
|
||
phase = args.phase
|
||
|
||
if phase in ("csv", "all"):
|
||
run_csv_import(conn)
|
||
|
||
if phase in ("scrape", "all"):
|
||
run_email_scrape(conn, limit=args.limit)
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
LOG.info("Done.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|