Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
365 lines
14 KiB
Python
365 lines
14 KiB
Python
"""
|
|
entity_scraper.py — Multi-state business entity scraper using Playwright adapters.
|
|
|
|
Generic scraper that works with any state SOS portal by iterating through
|
|
letter-prefix name searches (AA, AB, ... ZZ, 0-9). Uses the same adapter
|
|
infrastructure as the formation system.
|
|
|
|
Replaces the WY-specific wy_entity_scraper.py with a universal approach.
|
|
|
|
Usage:
|
|
# Scrape a single state:
|
|
python -m workers.entity_scraper --state WY
|
|
|
|
# Scrape multiple states:
|
|
python -m workers.entity_scraper --state WY,TX,NV
|
|
|
|
# Scrape all states without bulk data:
|
|
python -m workers.entity_scraper --all
|
|
|
|
# Resume from a specific prefix:
|
|
python -m workers.entity_scraper --state TX --start-prefix MA
|
|
|
|
# Single prefix test:
|
|
python -m workers.entity_scraper --state TX --prefix GT
|
|
|
|
# Dry run:
|
|
python -m workers.entity_scraper --state WY --prefix AA --dry-run
|
|
|
|
Environment:
|
|
DATABASE_URL PostgreSQL connection string
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
LOG = logging.getLogger("workers.entity_scraper")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
|
|
# Rate limiting
|
|
PAGE_DELAY_MIN = 1.5 # Between pagination clicks (same search)
|
|
PAGE_DELAY_MAX = 3.0
|
|
SEARCH_DELAY_MIN = 15 # Between NEW searches (new prefix)
|
|
SEARCH_DELAY_MAX = 30
|
|
CAPTCHA_BACKOFF = 300 # 5 min backoff on CAPTCHA
|
|
|
|
# States that already have bulk data (skip these)
|
|
BULK_DATA_STATES = {"CO", "IA", "CT", "OR", "NY", "FL"}
|
|
|
|
# Max pages per prefix before splitting deeper
|
|
MAX_PAGES_PER_PREFIX = 200
|
|
|
|
|
|
def _human_delay(min_s: float = PAGE_DELAY_MIN, max_s: float = PAGE_DELAY_MAX):
|
|
time.sleep(random.uniform(min_s, max_s))
|
|
|
|
|
|
def _generate_prefixes(start: str = "AA", end: str = "ZZ") -> list[str]:
|
|
prefixes = []
|
|
for c1 in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
|
|
for c2 in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
|
|
p = c1 + c2
|
|
if p >= start and p <= end:
|
|
prefixes.append(p)
|
|
for d in "0123456789":
|
|
prefixes.append(d)
|
|
return prefixes
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# Generic search via state adapter
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def scrape_prefix(state_code: str, prefix: str, conn, dry_run: bool = False) -> int:
|
|
"""Scrape entities matching a name prefix from a state portal.
|
|
|
|
Uses the state's adapter search_name() function, which handles
|
|
portal-specific selectors, pagination, and ASP.NET postback quirks.
|
|
|
|
Returns:
|
|
>0: number of entities found
|
|
0: no results
|
|
-1: CAPTCHA detected
|
|
<-1: too many results, needs deeper split (abs value = entities saved so far)
|
|
"""
|
|
LOG.info(" [%s] Scraping prefix '%s'", state_code, prefix)
|
|
|
|
entities: list[dict] = []
|
|
|
|
try:
|
|
# Use the adapter's search to find entities
|
|
from scripts.formation.name_search import search_name
|
|
|
|
loop = asyncio.new_event_loop()
|
|
result = loop.run_until_complete(search_name(prefix, state_code))
|
|
loop.close()
|
|
|
|
# Convert result to dict
|
|
if hasattr(result, "__dict__"):
|
|
result = vars(result)
|
|
elif hasattr(result, "_asdict"):
|
|
result = result._asdict()
|
|
|
|
# The search_name function returns name availability, not entity listing.
|
|
# For scraping, we need the similar_names list which contains existing entities.
|
|
similar = result.get("similar_names", [])
|
|
|
|
if not similar and result.get("available") is False:
|
|
# Single exact match
|
|
entities.append({
|
|
"entity_name": prefix.upper(),
|
|
"entity_number": f"{state_code}_{prefix.upper()[:30]}",
|
|
"entity_type": None,
|
|
"status": "ACTIVE",
|
|
"formation_date": None,
|
|
"formation_state": state_code,
|
|
"jurisdiction": f"US_{state_code}",
|
|
"state": state_code,
|
|
"principal_address": None,
|
|
})
|
|
elif similar:
|
|
for name in similar:
|
|
if name and len(name) > 1:
|
|
entities.append({
|
|
"entity_name": name.upper(),
|
|
"entity_number": f"{state_code}_{name.upper()[:30]}",
|
|
"entity_type": None,
|
|
"status": "ACTIVE", # If it shows up in search, it's registered
|
|
"formation_date": None,
|
|
"formation_state": None,
|
|
"jurisdiction": f"US_{state_code}",
|
|
"state": state_code,
|
|
"principal_address": None,
|
|
})
|
|
|
|
except Exception as exc:
|
|
err_msg = str(exc).lower()
|
|
if "captcha" in err_msg:
|
|
LOG.warning(" [%s] CAPTCHA detected for prefix '%s'", state_code, prefix)
|
|
return -1
|
|
LOG.error(" [%s] Error scraping prefix '%s': %s", state_code, prefix, exc)
|
|
return 0
|
|
|
|
if not entities:
|
|
return 0
|
|
|
|
LOG.info(" [%s] Prefix '%s': found %d entities", state_code, prefix, len(entities))
|
|
|
|
if entities and not dry_run and conn:
|
|
count = _upsert(conn, entities)
|
|
return count
|
|
|
|
return len(entities)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# Database upsert
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def _upsert(conn, entities: list[dict]) -> int:
|
|
cur = conn.cursor()
|
|
count = 0
|
|
|
|
# Deduplicate
|
|
seen: set = set()
|
|
deduped: list = []
|
|
for e in entities:
|
|
key = (e["jurisdiction"], e["entity_number"])
|
|
if key not in seen:
|
|
seen.add(key)
|
|
deduped.append(e)
|
|
|
|
try:
|
|
for batch_start in range(0, len(deduped), 200):
|
|
batch = deduped[batch_start:batch_start + 200]
|
|
values = []
|
|
for e in batch:
|
|
values.append(cur.mogrify(
|
|
"(%s,%s,%s,%s,%s,%s,%s,%s,%s,'playwright')",
|
|
(
|
|
e["jurisdiction"], e["entity_name"], e["entity_number"],
|
|
e["entity_type"], e["status"], e["formation_date"],
|
|
e["state"], e.get("formation_state"),
|
|
e.get("principal_address"),
|
|
),
|
|
).decode())
|
|
|
|
sql = f"""
|
|
INSERT INTO entity_cache
|
|
(jurisdiction, entity_name, entity_number, entity_type, status,
|
|
formation_date, state, formation_state, principal_address, source)
|
|
VALUES {",".join(values)}
|
|
ON CONFLICT (jurisdiction, entity_number) DO UPDATE SET
|
|
entity_name = EXCLUDED.entity_name,
|
|
entity_type = EXCLUDED.entity_type,
|
|
status = EXCLUDED.status,
|
|
formation_date = EXCLUDED.formation_date,
|
|
formation_state = COALESCE(EXCLUDED.formation_state, entity_cache.formation_state),
|
|
principal_address = COALESCE(EXCLUDED.principal_address, entity_cache.principal_address),
|
|
last_synced = NOW()
|
|
"""
|
|
cur.execute(sql)
|
|
count += len(batch)
|
|
|
|
conn.commit()
|
|
except Exception as exc:
|
|
LOG.error("DB upsert error: %s", exc)
|
|
conn.rollback()
|
|
|
|
return count
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# State runner
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
def scrape_state(
|
|
state_code: str,
|
|
conn,
|
|
*,
|
|
start_prefix: str = "AA",
|
|
single_prefix: Optional[str] = None,
|
|
dry_run: bool = False,
|
|
) -> int:
|
|
"""Scrape all entities for a state using letter-prefix search."""
|
|
state_code = state_code.upper()
|
|
|
|
if state_code in BULK_DATA_STATES:
|
|
LOG.info("[%s] Has bulk data — skipping scrape", state_code)
|
|
return 0
|
|
|
|
if single_prefix:
|
|
prefixes = [single_prefix.upper()]
|
|
else:
|
|
prefixes = _generate_prefixes(start_prefix.upper())
|
|
|
|
LOG.info("[%s] Scraping %d prefixes: %s ... %s", state_code, len(prefixes), prefixes[0], prefixes[-1])
|
|
total = 0
|
|
|
|
for i, prefix in enumerate(prefixes):
|
|
count = scrape_prefix(state_code, prefix, conn, dry_run=dry_run)
|
|
|
|
if count == -1:
|
|
LOG.warning("[%s] CAPTCHA — backing off %ds", state_code, CAPTCHA_BACKOFF)
|
|
time.sleep(CAPTCHA_BACKOFF)
|
|
continue
|
|
|
|
if count < -1:
|
|
# Too many results — split deeper
|
|
total += abs(count)
|
|
LOG.info("[%s] Splitting '%s' into 3-letter prefixes", state_code, prefix)
|
|
for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZ":
|
|
sub = prefix + c
|
|
sub_count = scrape_prefix(state_code, sub, conn, dry_run=dry_run)
|
|
if sub_count == -1:
|
|
time.sleep(CAPTCHA_BACKOFF)
|
|
elif sub_count > 0:
|
|
total += sub_count
|
|
_human_delay(SEARCH_DELAY_MIN, SEARCH_DELAY_MAX)
|
|
continue
|
|
|
|
total += max(count, 0)
|
|
|
|
if (i + 1) % 10 == 0:
|
|
LOG.info("[%s] Progress: %d/%d prefixes (%d entities)", state_code, i + 1, len(prefixes), total)
|
|
|
|
_human_delay(SEARCH_DELAY_MIN, SEARCH_DELAY_MAX)
|
|
|
|
LOG.info("[%s] Complete: %d entities scraped", state_code, total)
|
|
return total
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
# CLI
|
|
# ═══════════════════════════════════════════════════════════════════════
|
|
|
|
ALL_SCRAPE_STATES = [
|
|
# Tier 1: High priority (formation + carrier states)
|
|
"DE", "TX", "NV", "WY",
|
|
# Tier 2: Large states
|
|
"CA", "IL", "PA", "GA", "NC", "OH", "NJ", "MI",
|
|
# Tier 3: Medium states
|
|
"VA", "WA", "MA", "MD", "MN", "AZ", "IN", "MO", "TN", "WI",
|
|
# Tier 4: Smaller states
|
|
"SC", "AL", "KY", "LA", "OK", "UT", "AR", "KS", "NE", "ID",
|
|
"NM", "MS", "NH", "DC", "WV", "HI", "ME", "MT", "RI", "SD",
|
|
"ND", "VT", "AK",
|
|
]
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Multi-state business entity scraper")
|
|
parser.add_argument("--state", type=str, help="State(s) to scrape, comma-separated (e.g., WY,TX,NV)")
|
|
parser.add_argument("--all", action="store_true", help="Scrape all states without bulk data")
|
|
parser.add_argument("--start-prefix", type=str, default="AA", help="Start from this prefix (default: AA)")
|
|
parser.add_argument("--prefix", type=str, help="Scrape a single prefix only")
|
|
parser.add_argument("--dry-run", action="store_true", help="Don't save to DB")
|
|
parser.add_argument("--max-states", type=int, help="Max states to process (for parallel workers)")
|
|
parser.add_argument("--worker-id", type=int, default=0, help="Worker ID for parallel runs (0-based)")
|
|
parser.add_argument("--num-workers", type=int, default=1, help="Total workers for parallel distribution")
|
|
args = parser.parse_args()
|
|
|
|
if not DATABASE_URL and not args.dry_run:
|
|
LOG.error("DATABASE_URL not set")
|
|
sys.exit(1)
|
|
|
|
conn = psycopg2.connect(DATABASE_URL) if not args.dry_run else None
|
|
|
|
if args.state:
|
|
states = [s.strip().upper() for s in args.state.split(",")]
|
|
elif args.all:
|
|
states = ALL_SCRAPE_STATES
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
# Parallel distribution: each worker takes every Nth state
|
|
if args.num_workers > 1:
|
|
states = [s for i, s in enumerate(states) if i % args.num_workers == args.worker_id]
|
|
LOG.info("Worker %d/%d: handling %d states: %s", args.worker_id, args.num_workers, len(states), ", ".join(states))
|
|
|
|
if args.max_states:
|
|
states = states[:args.max_states]
|
|
|
|
grand_total = 0
|
|
for state in states:
|
|
total = scrape_state(
|
|
state, conn,
|
|
start_prefix=args.start_prefix,
|
|
single_prefix=args.prefix,
|
|
dry_run=args.dry_run,
|
|
)
|
|
grand_total += total
|
|
LOG.info("Running total: %d entities across %d states", grand_total, states.index(state) + 1)
|
|
# Longer pause between states
|
|
if state != states[-1]:
|
|
_human_delay(60, 120)
|
|
|
|
LOG.info("Grand total: %d entities across %d states", grand_total, len(states))
|
|
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|