new-site/scripts/harvest_otc_issuers.py
justin 4d3af2aeae otc: domain->email scraper + filing-agent domain filtering
scrape_otc_emails.py: fetch each issuer domain's IR/contact pages (gzip,
HTML-only, early-abort, prefer ir@/investor@/info@), extract a contact email.
Skip filing-agent domains (DFN/Donnelley/Broadridge/etc.) that leak into the
extracted domain -- those are not the issuer's site. Same filter added to the
harvester's DOMAIN_NOISE for future runs. Phone (100%) is the fallback channel
for email misses.
2026-06-14 06:56:45 -05:00

152 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""Harvest US-domestic OTC SEC issuers with their website domain (from EDGAR).
The reincorporation / registered-agent / foreign-qualification / franchise-tax /
annual-report audience. EDGAR is free and bulk-OK (10 req/s, declare a UA).
Pipeline:
1. company_tickers_exchange.json -> issuers on exchange OTC or None
2. per-CIK submissions JSON -> keep US-domestic (stateOfIncorporation in a US
state); record name, ticker, CIK, state, phone, business address
3. pull the latest 10-K/10-Q/8-K filing HTML -> regex the company's own website
domain (drop sec.gov / filing-agent / boilerplate noise)
Output CSV: cik,name,ticker,state_inc,phone,city,state,zip,domain
(Email is a SEPARATE step: scrape domain -> contact/IR email, then verify.)
Usage:
python3 scripts/harvest_otc_issuers.py OUT.csv [--limit N] [--de-nv-only]
"""
from __future__ import annotations
import argparse
import csv
import json
import re
import sys
import time
import urllib.request
UA = "Performance West Inc compliance@performancewest.net"
TICKERS_URL = "https://www.sec.gov/files/company_tickers_exchange.json"
US_STATES = {
"AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN","IA",
"KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ",
"NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT",
"VA","WA","WV","WI","WY","DC",
}
URL_RE = re.compile(r'https?://(?:www\.)?([a-z0-9\-]+\.(?:com|net|io|co|us))', re.I)
DOMAIN_NOISE = ("sec.gov", "xbrl", "w3.org", "schema", "adobe", "fasb",
"rdgfilings", "edgar", "sECDatabase".lower(), "donnelley",
"broadridge", "toppanmerrill", "dfinsolutions", "workiva",
"cloudfront", "googleapis", "gstatic", "jquery", "bootstrap",
"issuerdirect", "globenewswire", "businesswire", "prnewswire",
"vintage", "secdatabase", "sec1934act", "dfn.com")
def get(url: str, timeout: int = 15) -> str:
req = urllib.request.Request(url, headers={"User-Agent": UA})
return urllib.request.urlopen(req, timeout=timeout).read().decode("utf-8", "ignore")
def get_json(url: str, timeout: int = 15):
return json.loads(get(url, timeout))
def domain_from_filings(cik: str, recent: dict) -> str:
forms = recent.get("form", [])
accns = recent.get("accessionNumber", [])
docs = recent.get("primaryDocument", [])
# try a few recent substantive filings
tried = 0
for i, f in enumerate(forms):
if f not in ("10-K", "10-Q", "8-K", "DEF 14A", "20-F", "S-1"):
continue
try:
accn = accns[i].replace("-", "")
html = get(f"https://www.sec.gov/Archives/edgar/data/{int(cik)}/{accn}/{docs[i]}")
except Exception:
continue
cand = {}
for m in URL_RE.finditer(html):
dom = m.group(1).lower()
if any(b in dom for b in DOMAIN_NOISE):
continue
cand[dom] = cand.get(dom, 0) + 1
if cand:
# the most-frequent non-noise domain is almost always the issuer's
return max(cand, key=cand.get)
tried += 1
if tried >= 2:
break
time.sleep(0.2)
return ""
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("outfile")
ap.add_argument("--limit", type=int, default=0, help="cap issuers processed (0=all)")
ap.add_argument("--de-nv-only", action="store_true",
help="only Delaware/Nevada issuers (prime reincorp targets)")
ap.add_argument("--sleep", type=float, default=0.12)
args = ap.parse_args()
d = get_json(TICKERS_URL)
fields = d["fields"]; rows = d["data"]
ci = fields.index("cik"); ni = fields.index("name")
ti = fields.index("ticker"); ei = fields.index("exchange")
otc = [r for r in rows if r[ei] in ("OTC", None)]
print(f"OTC/None issuers: {len(otc):,}", file=sys.stderr)
out = []
us = foreign = 0
for n, r in enumerate(otc, 1):
if args.limit and len(out) >= args.limit:
break
cik = str(r[ci]).zfill(10)
try:
sub = get_json(f"https://data.sec.gov/submissions/CIK{cik}.json")
except Exception:
continue
soi = (sub.get("stateOfIncorporation") or "").strip()
ba = sub.get("addresses", {}).get("business", {}) or {}
addr_loc = (ba.get("stateOrCountry") or "").strip()
# The 2-letter stateOfIncorporation is ambiguous: 'DE' = Delaware OR
# Germany, etc. Disambiguate with the business address country: a real
# US-domestic issuer has BOTH a US state-of-incorporation AND a US-state
# business address. Foreign ADRs (Infineon: soi=DE but addr country=2M)
# are correctly excluded.
if soi not in US_STATES or addr_loc not in US_STATES:
foreign += 1
time.sleep(args.sleep)
continue
if args.de_nv_only and soi not in ("DE", "NV"):
time.sleep(args.sleep)
continue
us += 1
domain = domain_from_filings(cik, sub.get("filings", {}).get("recent", {}))
out.append({
"cik": cik, "name": sub.get("name", r[ni]), "ticker": r[ti] or "",
"state_inc": soi, "phone": sub.get("phone", "") or "",
"city": ba.get("city", ""), "state": ba.get("stateOrCountry", ""),
"zip": ba.get("zipCode", ""), "domain": domain,
})
if len(out) % 25 == 0:
print(f" processed {n} | us={us} found_domain={sum(1 for o in out if o['domain'])}",
file=sys.stderr)
time.sleep(args.sleep)
with open(args.outfile, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=["cik", "name", "ticker", "state_inc",
"phone", "city", "state", "zip", "domain"])
w.writeheader()
w.writerows(out)
with_domain = sum(1 for o in out if o["domain"])
print(f"US-domestic issuers: {len(out):,} (skipped {foreign:,} foreign)", file=sys.stderr)
print(f" with a domain: {with_domain:,} ({100*with_domain/max(len(out),1):.0f}%)", file=sys.stderr)
print(f" -> {args.outfile}", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main())