new-site/scripts/scrape_otc_emails.py
justin 4d3af2aeae otc: domain->email scraper + filing-agent domain filtering
scrape_otc_emails.py: fetch each issuer domain's IR/contact pages (gzip,
HTML-only, early-abort, prefer ir@/investor@/info@), extract a contact email.
Skip filing-agent domains (DFN/Donnelley/Broadridge/etc.) that leak into the
extracted domain -- those are not the issuer's site. Same filter added to the
harvester's DOMAIN_NOISE for future runs. Phone (100%) is the fallback channel
for email misses.
2026-06-14 06:56:45 -05:00

146 lines
5.5 KiB
Python

#!/usr/bin/env python3
"""Scrape a contact/IR email for each OTC issuer domain (harvest step 2).
Takes harvest_otc_issuers.py output (has a `domain`), fetches the issuer's
home + investor/contact pages, and extracts the best contact email. Public
companies almost always have an IR or general inbox. Phone is the fallback
channel for misses (the harvest already has phone at 100%).
Bandwidth-friendly: gzip, HTML document only (no assets), early-abort on first
good email, capped page size. Optional --proxy for residential egress (usually
unnecessary -- fetching a known corporate site is not search-engine scraping).
Usage:
python3 scripts/scrape_otc_emails.py OTC_ISSUERS.csv OUT.csv [--proxy URL]
"""
from __future__ import annotations
import argparse
import csv
import gzip
import io
import re
import sys
import time
import urllib.request
UA = "Mozilla/5.0 (compatible; PW-research/1.0; +https://performancewest.net)"
EMAIL_RE = re.compile(r'[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}')
# Prefer these local-parts for a corporate contact.
PREFER = ("ir", "investor", "investorrelations", "info", "contact", "admin",
"corporate", "legal", "compliance", "general")
BAD_EMAIL_BITS = ("example.com", "sentry", "wixpress", "@2x", "godaddy",
"cloudflare", "domain.com", "email.com", "yourdomain",
"sentry.io", ".png", ".jpg", ".gif", ".svg", ".webp")
PATHS = ("/investor-relations", "/investors", "/ir", "/contact", "/contact-us",
"/about", "/")
MAX_BYTES = 600_000
# Filing-agent / EDGAR-vendor domains that sometimes leak into the extracted
# `domain` (e.g. netlistDFN.com = Donnelley). These are NOT the issuer's site,
# so skip scraping them -- treat the row as "no usable domain" (phone fallback).
FILING_AGENT_HINTS = ("dfn", "rdgfilings", "edgar", "donnelley", "broadridge",
"toppanmerrill", "dfinsolutions", "workiva", "vintage",
"issuerdirect", "globenewswire", "businesswire", "prnewswire",
"secdatabase", "sec1934act")
def is_filing_agent_domain(domain: str) -> bool:
base = domain.split(".")[0]
return any(h in domain for h in FILING_AGENT_HINTS) or base.endswith("dfn")
def fetch(url: str, proxy: str | None, timeout: int = 10) -> str | None:
req = urllib.request.Request(url, headers={
"User-Agent": UA,
"Accept-Encoding": "gzip",
"Accept": "text/html,application/xhtml+xml",
})
opener = (urllib.request.build_opener(
urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
if proxy else urllib.request.build_opener())
try:
with opener.open(req, timeout=timeout) as r:
ctype = r.headers.get("Content-Type", "")
if "html" not in ctype and ctype:
return None
raw = r.read(MAX_BYTES)
if r.headers.get("Content-Encoding") == "gzip":
try:
raw = gzip.GzipFile(fileobj=io.BytesIO(raw)).read()
except OSError:
pass
return raw.decode("utf-8", "ignore")
except Exception:
return None
def best_email(html: str, domain: str) -> str | None:
found = []
for e in EMAIL_RE.findall(html):
el = e.lower()
if any(b in el for b in BAD_EMAIL_BITS):
continue
found.append(el)
if not found:
return None
# 1) same-domain + preferred local-part
same = [e for e in found if e.split("@")[1].endswith(domain)]
pool = same or found
for pref in PREFER:
for e in pool:
if e.split("@")[0] == pref or e.split("@")[0].startswith(pref):
return e
return pool[0]
def scrape_domain(domain: str, proxy: str | None) -> str | None:
for path in PATHS:
for host in (f"https://{domain}", f"https://www.{domain}"):
html = fetch(host + path, proxy)
if not html:
continue
em = best_email(html, domain)
if em:
return em
break # https worked for this path; don't retry www for same path
return None
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("infile")
ap.add_argument("outfile")
ap.add_argument("--proxy", default=None)
ap.add_argument("--sleep", type=float, default=0.4)
args = ap.parse_args()
rows = list(csv.DictReader(open(args.infile, newline="", encoding="utf-8")))
out = []
got = 0
for i, r in enumerate(rows, 1):
dom = (r.get("domain") or "").strip().lower()
if dom and is_filing_agent_domain(dom):
dom = "" # filing-agent artifact, not the issuer's site
email = scrape_domain(dom, args.proxy) if dom else None
r["email"] = email or ""
out.append(r)
if email:
got += 1
if i % 25 == 0:
print(f" {i}/{len(rows)} | emails {got}", file=sys.stderr)
if dom:
time.sleep(args.sleep)
fields = list(rows[0].keys()) + (["email"] if "email" not in rows[0] else [])
with open(args.outfile, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fields)
w.writeheader()
w.writerows(out)
print(f"issuers: {len(rows):,} | with domain: {sum(1 for r in rows if r.get('domain'))} "
f"| emails scraped: {got} ({100*got/max(len(rows),1):.0f}%)", file=sys.stderr)
print(f" -> {args.outfile}", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main())