new-site/scripts/workers/entity_scraper.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

365 lines
14 KiB
Python

"""
entity_scraper.py — Multi-state business entity scraper using Playwright adapters.
Generic scraper that works with any state SOS portal by iterating through
letter-prefix name searches (AA, AB, ... ZZ, 0-9). Uses the same adapter
infrastructure as the formation system.
Replaces the WY-specific wy_entity_scraper.py with a universal approach.
Usage:
# Scrape a single state:
python -m workers.entity_scraper --state WY
# Scrape multiple states:
python -m workers.entity_scraper --state WY,TX,NV
# Scrape all states without bulk data:
python -m workers.entity_scraper --all
# Resume from a specific prefix:
python -m workers.entity_scraper --state TX --start-prefix MA
# Single prefix test:
python -m workers.entity_scraper --state TX --prefix GT
# Dry run:
python -m workers.entity_scraper --state WY --prefix AA --dry-run
Environment:
DATABASE_URL PostgreSQL connection string
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import os
import random
import re
import sys
import time
from datetime import datetime
from typing import Optional
import psycopg2
import psycopg2.extras
LOG = logging.getLogger("workers.entity_scraper")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
# Rate limiting
PAGE_DELAY_MIN = 1.5 # Between pagination clicks (same search)
PAGE_DELAY_MAX = 3.0
SEARCH_DELAY_MIN = 15 # Between NEW searches (new prefix)
SEARCH_DELAY_MAX = 30
CAPTCHA_BACKOFF = 300 # 5 min backoff on CAPTCHA
# States that already have bulk data (skip these)
BULK_DATA_STATES = {"CO", "IA", "CT", "OR", "NY", "FL"}
# Max pages per prefix before splitting deeper
MAX_PAGES_PER_PREFIX = 200
def _human_delay(min_s: float = PAGE_DELAY_MIN, max_s: float = PAGE_DELAY_MAX):
time.sleep(random.uniform(min_s, max_s))
def _generate_prefixes(start: str = "AA", end: str = "ZZ") -> list[str]:
prefixes = []
for c1 in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
for c2 in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
p = c1 + c2
if p >= start and p <= end:
prefixes.append(p)
for d in "0123456789":
prefixes.append(d)
return prefixes
# ═══════════════════════════════════════════════════════════════════════
# Generic search via state adapter
# ═══════════════════════════════════════════════════════════════════════
def scrape_prefix(state_code: str, prefix: str, conn, dry_run: bool = False) -> int:
"""Scrape entities matching a name prefix from a state portal.
Uses the state's adapter search_name() function, which handles
portal-specific selectors, pagination, and ASP.NET postback quirks.
Returns:
>0: number of entities found
0: no results
-1: CAPTCHA detected
<-1: too many results, needs deeper split (abs value = entities saved so far)
"""
LOG.info(" [%s] Scraping prefix '%s'", state_code, prefix)
entities: list[dict] = []
try:
# Use the adapter's search to find entities
from scripts.formation.name_search import search_name
loop = asyncio.new_event_loop()
result = loop.run_until_complete(search_name(prefix, state_code))
loop.close()
# Convert result to dict
if hasattr(result, "__dict__"):
result = vars(result)
elif hasattr(result, "_asdict"):
result = result._asdict()
# The search_name function returns name availability, not entity listing.
# For scraping, we need the similar_names list which contains existing entities.
similar = result.get("similar_names", [])
if not similar and result.get("available") is False:
# Single exact match
entities.append({
"entity_name": prefix.upper(),
"entity_number": f"{state_code}_{prefix.upper()[:30]}",
"entity_type": None,
"status": "ACTIVE",
"formation_date": None,
"formation_state": state_code,
"jurisdiction": f"US_{state_code}",
"state": state_code,
"principal_address": None,
})
elif similar:
for name in similar:
if name and len(name) > 1:
entities.append({
"entity_name": name.upper(),
"entity_number": f"{state_code}_{name.upper()[:30]}",
"entity_type": None,
"status": "ACTIVE", # If it shows up in search, it's registered
"formation_date": None,
"formation_state": None,
"jurisdiction": f"US_{state_code}",
"state": state_code,
"principal_address": None,
})
except Exception as exc:
err_msg = str(exc).lower()
if "captcha" in err_msg:
LOG.warning(" [%s] CAPTCHA detected for prefix '%s'", state_code, prefix)
return -1
LOG.error(" [%s] Error scraping prefix '%s': %s", state_code, prefix, exc)
return 0
if not entities:
return 0
LOG.info(" [%s] Prefix '%s': found %d entities", state_code, prefix, len(entities))
if entities and not dry_run and conn:
count = _upsert(conn, entities)
return count
return len(entities)
# ═══════════════════════════════════════════════════════════════════════
# Database upsert
# ═══════════════════════════════════════════════════════════════════════
def _upsert(conn, entities: list[dict]) -> int:
cur = conn.cursor()
count = 0
# Deduplicate
seen: set = set()
deduped: list = []
for e in entities:
key = (e["jurisdiction"], e["entity_number"])
if key not in seen:
seen.add(key)
deduped.append(e)
try:
for batch_start in range(0, len(deduped), 200):
batch = deduped[batch_start:batch_start + 200]
values = []
for e in batch:
values.append(cur.mogrify(
"(%s,%s,%s,%s,%s,%s,%s,%s,%s,'playwright')",
(
e["jurisdiction"], e["entity_name"], e["entity_number"],
e["entity_type"], e["status"], e["formation_date"],
e["state"], e.get("formation_state"),
e.get("principal_address"),
),
).decode())
sql = f"""
INSERT INTO entity_cache
(jurisdiction, entity_name, entity_number, entity_type, status,
formation_date, state, formation_state, principal_address, source)
VALUES {",".join(values)}
ON CONFLICT (jurisdiction, entity_number) DO UPDATE SET
entity_name = EXCLUDED.entity_name,
entity_type = EXCLUDED.entity_type,
status = EXCLUDED.status,
formation_date = EXCLUDED.formation_date,
formation_state = COALESCE(EXCLUDED.formation_state, entity_cache.formation_state),
principal_address = COALESCE(EXCLUDED.principal_address, entity_cache.principal_address),
last_synced = NOW()
"""
cur.execute(sql)
count += len(batch)
conn.commit()
except Exception as exc:
LOG.error("DB upsert error: %s", exc)
conn.rollback()
return count
# ═══════════════════════════════════════════════════════════════════════
# State runner
# ═══════════════════════════════════════════════════════════════════════
def scrape_state(
state_code: str,
conn,
*,
start_prefix: str = "AA",
single_prefix: Optional[str] = None,
dry_run: bool = False,
) -> int:
"""Scrape all entities for a state using letter-prefix search."""
state_code = state_code.upper()
if state_code in BULK_DATA_STATES:
LOG.info("[%s] Has bulk data — skipping scrape", state_code)
return 0
if single_prefix:
prefixes = [single_prefix.upper()]
else:
prefixes = _generate_prefixes(start_prefix.upper())
LOG.info("[%s] Scraping %d prefixes: %s ... %s", state_code, len(prefixes), prefixes[0], prefixes[-1])
total = 0
for i, prefix in enumerate(prefixes):
count = scrape_prefix(state_code, prefix, conn, dry_run=dry_run)
if count == -1:
LOG.warning("[%s] CAPTCHA — backing off %ds", state_code, CAPTCHA_BACKOFF)
time.sleep(CAPTCHA_BACKOFF)
continue
if count < -1:
# Too many results — split deeper
total += abs(count)
LOG.info("[%s] Splitting '%s' into 3-letter prefixes", state_code, prefix)
for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
sub = prefix + c
sub_count = scrape_prefix(state_code, sub, conn, dry_run=dry_run)
if sub_count == -1:
time.sleep(CAPTCHA_BACKOFF)
elif sub_count > 0:
total += sub_count
_human_delay(SEARCH_DELAY_MIN, SEARCH_DELAY_MAX)
continue
total += max(count, 0)
if (i + 1) % 10 == 0:
LOG.info("[%s] Progress: %d/%d prefixes (%d entities)", state_code, i + 1, len(prefixes), total)
_human_delay(SEARCH_DELAY_MIN, SEARCH_DELAY_MAX)
LOG.info("[%s] Complete: %d entities scraped", state_code, total)
return total
# ═══════════════════════════════════════════════════════════════════════
# CLI
# ═══════════════════════════════════════════════════════════════════════
ALL_SCRAPE_STATES = [
# Tier 1: High priority (formation + carrier states)
"DE", "TX", "NV", "WY",
# Tier 2: Large states
"CA", "IL", "PA", "GA", "NC", "OH", "NJ", "MI",
# Tier 3: Medium states
"VA", "WA", "MA", "MD", "MN", "AZ", "IN", "MO", "TN", "WI",
# Tier 4: Smaller states
"SC", "AL", "KY", "LA", "OK", "UT", "AR", "KS", "NE", "ID",
"NM", "MS", "NH", "DC", "WV", "HI", "ME", "MT", "RI", "SD",
"ND", "VT", "AK",
]
def main():
parser = argparse.ArgumentParser(description="Multi-state business entity scraper")
parser.add_argument("--state", type=str, help="State(s) to scrape, comma-separated (e.g., WY,TX,NV)")
parser.add_argument("--all", action="store_true", help="Scrape all states without bulk data")
parser.add_argument("--start-prefix", type=str, default="AA", help="Start from this prefix (default: AA)")
parser.add_argument("--prefix", type=str, help="Scrape a single prefix only")
parser.add_argument("--dry-run", action="store_true", help="Don't save to DB")
parser.add_argument("--max-states", type=int, help="Max states to process (for parallel workers)")
parser.add_argument("--worker-id", type=int, default=0, help="Worker ID for parallel runs (0-based)")
parser.add_argument("--num-workers", type=int, default=1, help="Total workers for parallel distribution")
args = parser.parse_args()
if not DATABASE_URL and not args.dry_run:
LOG.error("DATABASE_URL not set")
sys.exit(1)
conn = psycopg2.connect(DATABASE_URL) if not args.dry_run else None
if args.state:
states = [s.strip().upper() for s in args.state.split(",")]
elif args.all:
states = ALL_SCRAPE_STATES
else:
parser.print_help()
sys.exit(1)
# Parallel distribution: each worker takes every Nth state
if args.num_workers > 1:
states = [s for i, s in enumerate(states) if i % args.num_workers == args.worker_id]
LOG.info("Worker %d/%d: handling %d states: %s", args.worker_id, args.num_workers, len(states), ", ".join(states))
if args.max_states:
states = states[:args.max_states]
grand_total = 0
for state in states:
total = scrape_state(
state, conn,
start_prefix=args.start_prefix,
single_prefix=args.prefix,
dry_run=args.dry_run,
)
grand_total += total
LOG.info("Running total: %d entities across %d states", grand_total, states.index(state) + 1)
# Longer pause between states
if state != states[-1]:
_human_delay(60, 120)
LOG.info("Grand total: %d entities across %d states", grand_total, len(states))
if conn:
conn.close()
if __name__ == "__main__":
main()