new-site/scripts/workers/wy_entity_scraper.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

405 lines
16 KiB
Python

"""
wy_entity_scraper.py — Bulk scrape Wyoming business entities from WyoBiz.
WyoBiz (wyobiz.wyo.gov) doesn't offer bulk downloads without a $10K+ subscription.
This scraper paginates through search results to populate entity_cache.
Strategy: Search by year prefix (2020-, 2021-, etc.) to get all filings,
then paginate through results. WyoBiz shows 10 results per page.
Usage:
# Scrape all years (2000-2026):
python -m workers.wy_entity_scraper
# Scrape specific year range:
python -m workers.wy_entity_scraper --start-year 2023 --end-year 2026
# Resume from a specific year:
python -m workers.wy_entity_scraper --start-year 2024
# Dry run (don't save to DB):
python -m workers.wy_entity_scraper --dry-run --start-year 2025
Environment:
DATABASE_URL PostgreSQL connection string
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import random
import re
import sys
import time
from datetime import datetime
import psycopg2
import psycopg2.extras
LOG = logging.getLogger("workers.wy_entity_scraper")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
WYOBIZ_SEARCH_URL = "https://wyobiz.wyo.gov/Business/FilingSearch.aspx"
# WY filing IDs are formatted as YYYY-NNNNNNN (year + 7-digit sequence)
# Approximate counts per year: ~8,000-15,000 new filings/year
# Rate limiting — new searches are the risky action, pagination is safe
PAGE_DELAY_MIN = 1.5 # Seconds between pagination clicks (same search, low risk)
PAGE_DELAY_MAX = 3.0
SEARCH_DELAY_MIN = 15 # Seconds between NEW searches (new prefix = new session risk)
SEARCH_DELAY_MAX = 30
def _human_delay(min_s: float = PAGE_DELAY_MIN, max_s: float = PAGE_DELAY_MAX):
"""Random delay to mimic human browsing speed."""
delay = random.uniform(min_s, max_s)
time.sleep(delay)
def scrape_prefix(prefix: str, conn, dry_run: bool = False) -> int:
"""Scrape all entities matching a name prefix from WyoBiz.
Uses "Starts With" name search with 2-letter prefixes (AA, AB, ... ZZ)
to enumerate all entities without triggering anti-scraping.
"""
from playwright.sync_api import sync_playwright
LOG.info("Scraping WY entities starting with '%s'", prefix)
entities: list[dict] = []
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36",
viewport={"width": 1366, "height": 768},
)
page = context.new_page()
# Navigate to search page
page.goto(WYOBIZ_SEARCH_URL, wait_until="networkidle", timeout=30000)
_human_delay(3, 6)
# Select "Starts With" radio (default mode)
starts_with = page.query_selector("#MainContent_chkSearchStartWith")
if starts_with:
starts_with.click()
_human_delay(0.5, 1)
# Type the prefix slowly (human-like)
name_input = page.query_selector("#MainContent_txtFilingName")
if not name_input:
LOG.error("Could not find name search input")
browser.close()
return 0
for char in prefix:
name_input.type(char, delay=random.randint(80, 200))
_human_delay(1, 2)
# Click search
page.click("#MainContent_cmdSearch")
_human_delay(4, 8)
# Wait for results
try:
page.wait_for_selector("#scrolltop", timeout=20000)
except Exception:
pass
page.wait_for_load_state("networkidle", timeout=20000)
_human_delay(2, 4)
# Check for CAPTCHA
page_text = page.inner_text("body")
if "captcha" in page_text.lower() or "verify you are human" in page_text.lower():
LOG.warning(" CAPTCHA detected for prefix '%s' — stopping", prefix)
browser.close()
return -1 # Signal to caller to back off
# Parse results pages
page_num = 0
while True:
page_num += 1
# Extract results from current page
rows = page.query_selector_all("table.Grid tr:not(:first-child)")
if not rows:
rows = page.query_selector_all("#MainContent_gvFilings tr:not(:first-child)")
if not rows:
content = page.inner_text("#scrolltop") if page.query_selector("#scrolltop") else ""
if "no record" in content.lower() or not content.strip():
break
if page_num == 1:
LOG.info(" Prefix '%s': no results", prefix)
break
for row in rows:
cells = row.query_selector_all("td")
if len(cells) < 3:
continue
cell_texts = [c.inner_text().strip() for c in cells]
if len(cell_texts) >= 3:
# Columns vary — try to identify them
filing_id = None
name = None
status_raw = ""
entity_type_raw = ""
formation_date_raw = ""
# First cell with YYYY- pattern is filing ID
for i, txt in enumerate(cell_texts):
if re.match(r"\d{4}-\d+", txt):
filing_id = txt
elif not name and len(txt) > 2 and not re.match(r"\d{2}/\d{2}/\d{4}", txt):
if txt.upper() not in ("ACTIVE", "INACTIVE", "DISSOLVED", "LLC", "CORPORATION"):
name = txt.upper()
# Remaining cells by position after name
if name and len(cell_texts) >= 3:
for txt in cell_texts:
t = txt.upper()
if t in ("ACTIVE", "INACTIVE", "DISSOLVED", "DELINQUENT",
"GOOD STANDING", "REVOKED", "SUSPENDED", "CANCELLED"):
status_raw = t
elif "LLC" in t or "CORP" in t or "LP" in t or "LIMITED" in t:
if t != name:
entity_type_raw = t
elif re.match(r"\d{2}/\d{2}/\d{4}", txt):
formation_date_raw = txt
if not filing_id:
# Use name as entity_number fallback
filing_id = f"WY_{name[:30]}" if name else None
if name and filing_id:
# Normalize status
status = "ACTIVE"
s = status_raw.upper()
if "DISSOLV" in s or "CANCEL" in s:
status = "DISSOLVED"
elif "DELINQ" in s or "DEFAULT" in s:
status = "DELINQUENT"
elif "SUSPEND" in s or "REVOK" in s:
status = "SUSPENDED"
elif "INACTIVE" in s or "WITHDRAWN" in s:
status = "INACTIVE"
# Normalize type
entity_type = None
t = entity_type_raw.upper()
if "LLC" in t or "LIMITED LIABILITY" in t:
entity_type = "LLC"
elif "CORP" in t or "INC" in t:
entity_type = "CORPORATION"
elif "LP" in t:
entity_type = "LP"
# Parse date
formation_date = None
if formation_date_raw:
try:
formation_date = datetime.strptime(
formation_date_raw.split()[0], "%m/%d/%Y"
).strftime("%Y-%m-%d")
except (ValueError, IndexError):
pass
entities.append({
"entity_name": name,
"entity_number": filing_id,
"entity_type": entity_type,
"status": status,
"formation_date": formation_date,
"formation_state": "WY",
"jurisdiction": "US_WY",
"state": "WY",
})
# Try to go to next page
next_link = page.query_selector("a[href*='Page$Next']")
if not next_link:
break
next_link.click()
page.wait_for_load_state("networkidle", timeout=20000)
_human_delay(PAGE_DELAY_MIN, PAGE_DELAY_MAX) # 1.5-3s between pages
# Safety limits — if we hit max pages, prefix needs to go deeper
if page_num > 200:
LOG.warning(" Prefix '%s': hit 200 page limit — needs deeper split", prefix)
# Return negative to signal caller to split this prefix
browser.close()
if entities and not dry_run:
_upsert(conn, entities)
return -(len(entities)) # Negative = needs split
if page_num % 50 == 0:
LOG.info(" Prefix '%s': page %d (%d entities)", prefix, page_num, len(entities))
browser.close()
except Exception as exc:
LOG.error("Scraper error for prefix '%s': %s", prefix, exc)
LOG.info(" Prefix '%s': scraped %d entities", prefix, len(entities))
# Upsert to DB
if entities and not dry_run:
count = _upsert(conn, entities)
LOG.info(" Prefix '%s': upserted %d to entity_cache", prefix, count)
return count
return len(entities)
def _upsert(conn, entities: list[dict]) -> int:
"""Upsert scraped WY entities into entity_cache."""
cur = conn.cursor()
count = 0
# Deduplicate
seen = set()
deduped = []
for e in entities:
key = (e["jurisdiction"], e["entity_number"])
if key not in seen:
seen.add(key)
deduped.append(e)
try:
for batch_start in range(0, len(deduped), 200):
batch = deduped[batch_start:batch_start + 200]
values = []
for e in batch:
values.append(cur.mogrify(
"(%s,%s,%s,%s,%s,%s,%s,%s,%s,'playwright')",
(
e["jurisdiction"], e["entity_name"], e["entity_number"],
e["entity_type"], e["status"], e["formation_date"],
e["state"], e.get("formation_state"),
None, # principal_address
),
).decode())
sql = f"""
INSERT INTO entity_cache
(jurisdiction, entity_name, entity_number, entity_type, status,
formation_date, state, formation_state, principal_address, source)
VALUES {",".join(values)}
ON CONFLICT (jurisdiction, entity_number) DO UPDATE SET
entity_name = EXCLUDED.entity_name,
entity_type = EXCLUDED.entity_type,
status = EXCLUDED.status,
formation_date = EXCLUDED.formation_date,
formation_state = COALESCE(EXCLUDED.formation_state, entity_cache.formation_state),
last_synced = NOW()
"""
cur.execute(sql)
count += len(batch)
conn.commit()
except Exception as exc:
LOG.error("DB upsert error: %s", exc)
conn.rollback()
return count
def _generate_prefixes(start: str = "AA", end: str = "ZZ") -> list[str]:
"""Generate 2-letter prefixes from start to end (inclusive)."""
prefixes = []
for c1 in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
for c2 in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
p = c1 + c2
if p >= start and p <= end:
prefixes.append(p)
# Also add numeric prefixes (1A, 2B, etc. — for entities starting with numbers)
for d in "0123456789":
prefixes.append(d)
return prefixes
def main():
parser = argparse.ArgumentParser(description="Scrape Wyoming business entities from WyoBiz")
parser.add_argument("--start", type=str, default="AA", help="Start prefix (default: AA)")
parser.add_argument("--end", type=str, default="ZZ", help="End prefix (default: ZZ)")
parser.add_argument("--prefix", type=str, help="Single prefix to scrape (e.g., GT)")
parser.add_argument("--dry-run", action="store_true", help="Don't save to database")
args = parser.parse_args()
if not DATABASE_URL and not args.dry_run:
LOG.error("DATABASE_URL not set")
sys.exit(1)
conn = psycopg2.connect(DATABASE_URL) if not args.dry_run else None
total = 0
if args.prefix:
prefixes = [args.prefix.upper()]
else:
prefixes = _generate_prefixes(args.start.upper(), args.end.upper())
LOG.info("Will scrape %d prefixes: %s ... %s", len(prefixes), prefixes[0], prefixes[-1])
for i, prefix in enumerate(prefixes):
count = scrape_prefix(prefix, conn, dry_run=args.dry_run)
if count == -1:
# CAPTCHA detected — back off significantly
LOG.warning("CAPTCHA hit — backing off 5 minutes before continuing")
time.sleep(300)
continue
if count < 0:
# Negative = too many results, need deeper split (e.g., "AB" → "ABA", "ABB", ...)
LOG.info("Splitting prefix '%s' into 3-letter sub-prefixes", prefix)
total += abs(count) # Count what was already saved
for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
sub_prefix = prefix + c
sub_count = scrape_prefix(sub_prefix, conn, dry_run=args.dry_run)
if sub_count == -1:
LOG.warning("CAPTCHA during split — backing off 5 min")
time.sleep(300)
elif sub_count < 0:
# Even 3 letters too broad — go to 4 (rare)
total += abs(sub_count)
for c2 in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
sub4 = sub_prefix + c2
s4_count = scrape_prefix(sub4, conn, dry_run=args.dry_run)
if s4_count > 0:
total += s4_count
_human_delay(SEARCH_DELAY_MIN, SEARCH_DELAY_MAX)
else:
total += sub_count
_human_delay(SEARCH_DELAY_MIN, SEARCH_DELAY_MAX)
continue
total += count
# Progress
if (i + 1) % 10 == 0:
LOG.info("Progress: %d/%d prefixes done (%d total entities)", i + 1, len(prefixes), total)
# Longer pause between prefixes
_human_delay(SEARCH_DELAY_MIN, SEARCH_DELAY_MAX)
LOG.info("Done: %d total WY entities scraped across %d prefixes", total, len(prefixes))
if conn:
conn.close()
if __name__ == "__main__":
main()