#!/usr/bin/env python3 """Scrape a contact/IR email for each OTC issuer domain (harvest step 2). Takes harvest_otc_issuers.py output (has a `domain`), fetches the issuer's home + investor/contact pages, and extracts the best contact email. Public companies almost always have an IR or general inbox. Phone is the fallback channel for misses (the harvest already has phone at 100%). Bandwidth-friendly: gzip, HTML document only (no assets), early-abort on first good email, capped page size. Optional --proxy for residential egress (usually unnecessary -- fetching a known corporate site is not search-engine scraping). Usage: python3 scripts/scrape_otc_emails.py OTC_ISSUERS.csv OUT.csv [--proxy URL] """ from __future__ import annotations import argparse import csv import gzip import io import re import sys import time import urllib.request UA = "Mozilla/5.0 (compatible; PW-research/1.0; +https://performancewest.net)" EMAIL_RE = re.compile(r'[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}') # Prefer these local-parts for a corporate contact. PREFER = ("ir", "investor", "investorrelations", "info", "contact", "admin", "corporate", "legal", "compliance", "general") BAD_EMAIL_BITS = ("example.com", "sentry", "wixpress", "@2x", "godaddy", "cloudflare", "domain.com", "email.com", "yourdomain", "sentry.io", ".png", ".jpg", ".gif", ".svg", ".webp") PATHS = ("/investor-relations", "/investors", "/ir", "/contact", "/contact-us", "/about", "/") MAX_BYTES = 600_000 # Filing-agent / EDGAR-vendor domains that sometimes leak into the extracted # `domain` (e.g. netlistDFN.com = Donnelley). These are NOT the issuer's site, # so skip scraping them -- treat the row as "no usable domain" (phone fallback). FILING_AGENT_HINTS = ("dfn", "rdgfilings", "edgar", "donnelley", "broadridge", "toppanmerrill", "dfinsolutions", "workiva", "vintage", "issuerdirect", "globenewswire", "businesswire", "prnewswire", "secdatabase", "sec1934act") def is_filing_agent_domain(domain: str) -> bool: base = domain.split(".")[0] return any(h in domain for h in FILING_AGENT_HINTS) or base.endswith("dfn") def fetch(url: str, proxy: str | None, timeout: int = 10) -> str | None: req = urllib.request.Request(url, headers={ "User-Agent": UA, "Accept-Encoding": "gzip", "Accept": "text/html,application/xhtml+xml", }) opener = (urllib.request.build_opener( urllib.request.ProxyHandler({"http": proxy, "https": proxy})) if proxy else urllib.request.build_opener()) try: with opener.open(req, timeout=timeout) as r: ctype = r.headers.get("Content-Type", "") if "html" not in ctype and ctype: return None raw = r.read(MAX_BYTES) if r.headers.get("Content-Encoding") == "gzip": try: raw = gzip.GzipFile(fileobj=io.BytesIO(raw)).read() except OSError: pass return raw.decode("utf-8", "ignore") except Exception: return None def best_email(html: str, domain: str) -> str | None: found = [] for e in EMAIL_RE.findall(html): el = e.lower() if any(b in el for b in BAD_EMAIL_BITS): continue found.append(el) if not found: return None # 1) same-domain + preferred local-part same = [e for e in found if e.split("@")[1].endswith(domain)] pool = same or found for pref in PREFER: for e in pool: if e.split("@")[0] == pref or e.split("@")[0].startswith(pref): return e return pool[0] def scrape_domain(domain: str, proxy: str | None) -> str | None: for path in PATHS: for host in (f"https://{domain}", f"https://www.{domain}"): html = fetch(host + path, proxy) if not html: continue em = best_email(html, domain) if em: return em break # https worked for this path; don't retry www for same path return None def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("infile") ap.add_argument("outfile") ap.add_argument("--proxy", default=None) ap.add_argument("--sleep", type=float, default=0.4) args = ap.parse_args() rows = list(csv.DictReader(open(args.infile, newline="", encoding="utf-8"))) out = [] got = 0 for i, r in enumerate(rows, 1): dom = (r.get("domain") or "").strip().lower() if dom and is_filing_agent_domain(dom): dom = "" # filing-agent artifact, not the issuer's site email = scrape_domain(dom, args.proxy) if dom else None r["email"] = email or "" out.append(r) if email: got += 1 if i % 25 == 0: print(f" {i}/{len(rows)} | emails {got}", file=sys.stderr) if dom: time.sleep(args.sleep) fields = list(rows[0].keys()) + (["email"] if "email" not in rows[0] else []) with open(args.outfile, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields) w.writeheader() w.writerows(out) print(f"issuers: {len(rows):,} | with domain: {sum(1 for r in rows if r.get('domain'))} " f"| emails scraped: {got} ({100*got/max(len(rows),1):.0f}%)", file=sys.stderr) print(f" -> {args.outfile}", file=sys.stderr) return 0 if __name__ == "__main__": raise SystemExit(main())