scrape_otc_emails.py: fetch each issuer domain's IR/contact pages (gzip, HTML-only, early-abort, prefer ir@/investor@/info@), extract a contact email. Skip filing-agent domains (DFN/Donnelley/Broadridge/etc.) that leak into the extracted domain -- those are not the issuer's site. Same filter added to the harvester's DOMAIN_NOISE for future runs. Phone (100%) is the fallback channel for email misses.
146 lines
5.5 KiB
Python
146 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Scrape a contact/IR email for each OTC issuer domain (harvest step 2).
|
|
|
|
Takes harvest_otc_issuers.py output (has a `domain`), fetches the issuer's
|
|
home + investor/contact pages, and extracts the best contact email. Public
|
|
companies almost always have an IR or general inbox. Phone is the fallback
|
|
channel for misses (the harvest already has phone at 100%).
|
|
|
|
Bandwidth-friendly: gzip, HTML document only (no assets), early-abort on first
|
|
good email, capped page size. Optional --proxy for residential egress (usually
|
|
unnecessary -- fetching a known corporate site is not search-engine scraping).
|
|
|
|
Usage:
|
|
python3 scripts/scrape_otc_emails.py OTC_ISSUERS.csv OUT.csv [--proxy URL]
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse
|
|
import csv
|
|
import gzip
|
|
import io
|
|
import re
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
|
|
UA = "Mozilla/5.0 (compatible; PW-research/1.0; +https://performancewest.net)"
|
|
EMAIL_RE = re.compile(r'[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}')
|
|
# Prefer these local-parts for a corporate contact.
|
|
PREFER = ("ir", "investor", "investorrelations", "info", "contact", "admin",
|
|
"corporate", "legal", "compliance", "general")
|
|
BAD_EMAIL_BITS = ("example.com", "sentry", "wixpress", "@2x", "godaddy",
|
|
"cloudflare", "domain.com", "email.com", "yourdomain",
|
|
"sentry.io", ".png", ".jpg", ".gif", ".svg", ".webp")
|
|
PATHS = ("/investor-relations", "/investors", "/ir", "/contact", "/contact-us",
|
|
"/about", "/")
|
|
MAX_BYTES = 600_000
|
|
|
|
# Filing-agent / EDGAR-vendor domains that sometimes leak into the extracted
|
|
# `domain` (e.g. netlistDFN.com = Donnelley). These are NOT the issuer's site,
|
|
# so skip scraping them -- treat the row as "no usable domain" (phone fallback).
|
|
FILING_AGENT_HINTS = ("dfn", "rdgfilings", "edgar", "donnelley", "broadridge",
|
|
"toppanmerrill", "dfinsolutions", "workiva", "vintage",
|
|
"issuerdirect", "globenewswire", "businesswire", "prnewswire",
|
|
"secdatabase", "sec1934act")
|
|
|
|
|
|
def is_filing_agent_domain(domain: str) -> bool:
|
|
base = domain.split(".")[0]
|
|
return any(h in domain for h in FILING_AGENT_HINTS) or base.endswith("dfn")
|
|
|
|
|
|
def fetch(url: str, proxy: str | None, timeout: int = 10) -> str | None:
|
|
req = urllib.request.Request(url, headers={
|
|
"User-Agent": UA,
|
|
"Accept-Encoding": "gzip",
|
|
"Accept": "text/html,application/xhtml+xml",
|
|
})
|
|
opener = (urllib.request.build_opener(
|
|
urllib.request.ProxyHandler({"http": proxy, "https": proxy}))
|
|
if proxy else urllib.request.build_opener())
|
|
try:
|
|
with opener.open(req, timeout=timeout) as r:
|
|
ctype = r.headers.get("Content-Type", "")
|
|
if "html" not in ctype and ctype:
|
|
return None
|
|
raw = r.read(MAX_BYTES)
|
|
if r.headers.get("Content-Encoding") == "gzip":
|
|
try:
|
|
raw = gzip.GzipFile(fileobj=io.BytesIO(raw)).read()
|
|
except OSError:
|
|
pass
|
|
return raw.decode("utf-8", "ignore")
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def best_email(html: str, domain: str) -> str | None:
|
|
found = []
|
|
for e in EMAIL_RE.findall(html):
|
|
el = e.lower()
|
|
if any(b in el for b in BAD_EMAIL_BITS):
|
|
continue
|
|
found.append(el)
|
|
if not found:
|
|
return None
|
|
# 1) same-domain + preferred local-part
|
|
same = [e for e in found if e.split("@")[1].endswith(domain)]
|
|
pool = same or found
|
|
for pref in PREFER:
|
|
for e in pool:
|
|
if e.split("@")[0] == pref or e.split("@")[0].startswith(pref):
|
|
return e
|
|
return pool[0]
|
|
|
|
|
|
def scrape_domain(domain: str, proxy: str | None) -> str | None:
|
|
for path in PATHS:
|
|
for host in (f"https://{domain}", f"https://www.{domain}"):
|
|
html = fetch(host + path, proxy)
|
|
if not html:
|
|
continue
|
|
em = best_email(html, domain)
|
|
if em:
|
|
return em
|
|
break # https worked for this path; don't retry www for same path
|
|
return None
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("infile")
|
|
ap.add_argument("outfile")
|
|
ap.add_argument("--proxy", default=None)
|
|
ap.add_argument("--sleep", type=float, default=0.4)
|
|
args = ap.parse_args()
|
|
|
|
rows = list(csv.DictReader(open(args.infile, newline="", encoding="utf-8")))
|
|
out = []
|
|
got = 0
|
|
for i, r in enumerate(rows, 1):
|
|
dom = (r.get("domain") or "").strip().lower()
|
|
if dom and is_filing_agent_domain(dom):
|
|
dom = "" # filing-agent artifact, not the issuer's site
|
|
email = scrape_domain(dom, args.proxy) if dom else None
|
|
r["email"] = email or ""
|
|
out.append(r)
|
|
if email:
|
|
got += 1
|
|
if i % 25 == 0:
|
|
print(f" {i}/{len(rows)} | emails {got}", file=sys.stderr)
|
|
if dom:
|
|
time.sleep(args.sleep)
|
|
|
|
fields = list(rows[0].keys()) + (["email"] if "email" not in rows[0] else [])
|
|
with open(args.outfile, "w", newline="", encoding="utf-8") as f:
|
|
w = csv.DictWriter(f, fieldnames=fields)
|
|
w.writeheader()
|
|
w.writerows(out)
|
|
print(f"issuers: {len(rows):,} | with domain: {sum(1 for r in rows if r.get('domain'))} "
|
|
f"| emails scraped: {got} ({100*got/max(len(rows),1):.0f}%)", file=sys.stderr)
|
|
print(f" -> {args.outfile}", file=sys.stderr)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|