""" fcc_rmd_removed_scraper.py — Research contact emails for FCC RMD-removed carriers. Attempts to find a contact email for each carrier in fcc_rmd_removed by: 1. Checking if the company still has an entry in our fcc_rmd table (some were provisionally reinstated and scraped). 2. Querying the company name via the FCC's CORES system for FRN-linked contacts. 3. Web-searching for the company's domain and scraping contact pages. Usage: python -m workers.fcc_rmd_removed_scraper Environment variables: DATABASE_URL PostgreSQL connection string """ from __future__ import annotations import logging import os import re import sys import time from typing import Optional from urllib.parse import urljoin, urlparse import psycopg2 import psycopg2.extras import requests LOG = logging.getLogger("workers.fcc_rmd_removed_scraper") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout, ) DATABASE_URL = os.environ.get("DATABASE_URL", "") DELAY = 1.5 # seconds between web requests # --------------------------------------------------------------------------- # Phase 1 — Email research # --------------------------------------------------------------------------- def _fcc_cores_lookup(frn: str, session: requests.Session) -> Optional[str]: """Query FCC CORES for a contact email by FRN.""" url = f"https://apps.fcc.gov/coresWeb/publicHome.do?frn={frn}" try: r = session.get(url, timeout=15) emails = re.findall( r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}", r.text ) for e in emails: dom = e.split("@")[1].lower() if "fcc.gov" not in dom and "gov" not in dom: return e.lower() except Exception: pass return None def _guess_domain(business_name: str) -> list[str]: """Generate candidate domain guesses from a business name.""" slug = re.sub(r"[^a-z0-9]+", "", business_name.lower().split(" llc")[0].split(" inc")[0].split(" corp")[0].split(" ltd")[0]) slug2 = re.sub(r"[^a-z0-9]+", "-", business_name.lower().split(" llc")[0].split(" inc")[0].strip().strip("-")) candidates = [ f"{slug}.com", f"{slug2}.com", f"{slug}.net", f"{slug2}.net", f"{slug}.io", ] return list(dict.fromkeys(candidates)) # deduplicated def _scrape_contact_from_website(domain: str, session: requests.Session) -> Optional[str]: """Try to find a contact email from a company website.""" for path in ["", "/contact", "/contact-us", "/about", "/about-us"]: url = f"https://{domain}{path}" try: r = session.get(url, timeout=10, allow_redirects=True) if r.status_code != 200: continue emails = re.findall( r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}", r.text ) noise_locals = {"noreply", "no-reply", "info", "support", "hello", "sales", "webmaster", "admin", "privacy", "legal", "abuse", "postmaster", "hostmaster"} for e in emails: local = e.split("@")[0].lower() dom = e.split("@")[1].lower() if local not in noise_locals and domain.split(".")[0] in dom: return e.lower() # Fallback: any non-noise email on the page for e in emails: local = e.split("@")[0].lower() dom = e.split("@")[1].lower() if local not in noise_locals and "gov" not in dom: return e.lower() except Exception: pass time.sleep(0.3) return None def run_email_research(conn: psycopg2.extensions.connection) -> int: """ For each carrier in fcc_rmd_removed without a contact_email: 1. Check if their RMD number matches a record already scraped in fcc_rmd 2. Try FCC CORES lookup by FRN 3. Try guessing the company website and scraping contact emails """ cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute(""" SELECT r.id, r.rmd_number, r.frn, r.business_name, r.business_address FROM fcc_rmd_removed r WHERE r.contact_email IS NULL AND r.business_name NOT LIKE '[%' ORDER BY r.id """) rows = cur.fetchall() LOG.info("Researching emails for %d removed carriers …", len(rows)) found = 0 session = requests.Session() session.headers.update({ "User-Agent": ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" ) }) for i, row in enumerate(rows, 1): rid = row["id"] name = row["business_name"] rmd_num = row["rmd_number"] frn = row["frn"] LOG.info("[%d/%d] Researching: %s", i, len(rows), name) email = None notes = [] # Strategy 1: check our existing fcc_rmd scraped data if rmd_num: cur.execute( "SELECT contact_email FROM fcc_rmd WHERE rmd_number = %s AND contact_email IS NOT NULL", (rmd_num,) ) match = cur.fetchone() if match: email = match["contact_email"] notes.append(f"Found in fcc_rmd table (previously scraped from FCC portal)") LOG.info(" -> [fcc_rmd] %s", email) # Strategy 2: FCC CORES FRN lookup if not email and frn: email = _fcc_cores_lookup(frn, session) if email: notes.append(f"Found via FCC CORES FRN lookup ({frn})") LOG.info(" -> [CORES] %s", email) time.sleep(DELAY) # Strategy 3: Website scrape from guessed domain if not email: for domain in _guess_domain(name): email = _scrape_contact_from_website(domain, session) if email: notes.append(f"Found via website scrape: {domain}") LOG.info(" -> [website %s] %s", domain, email) break time.sleep(DELAY) if email: cur.execute(""" UPDATE fcc_rmd_removed SET contact_email = %s, email_researched = TRUE, email_research_notes = %s, updated_at = now() WHERE id = %s """, (email, "; ".join(notes), rid)) conn.commit() found += 1 else: cur.execute(""" UPDATE fcc_rmd_removed SET email_researched = TRUE, email_research_notes = 'No email found via any method', updated_at = now() WHERE id = %s """, (rid,)) conn.commit() LOG.info(" -> no email found") LOG.info("Email research complete — %d/%d emails found", found, len(rows)) return found # --------------------------------------------------------------------------- # Entrypoint # --------------------------------------------------------------------------- def main() -> None: if not DATABASE_URL: LOG.error("DATABASE_URL not set") sys.exit(1) conn = psycopg2.connect(DATABASE_URL) conn.autocommit = False try: run_email_research(conn) finally: conn.close() LOG.info("Done.") if __name__ == "__main__": main()