new-site/scripts/formation/bulk_download.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

338 lines
14 KiB
Python

"""bulk_download.py — Download business entity data from state open data portals.
Supports:
- Socrata SODA API (CO, AK, CT, IL, IA, MI, NY, OR, PA, VT, WA)
- SFTP bulk download (FL)
- HTTP CSV bulk download (CA, TX)
Run: python3 scripts/formation/bulk_download.py [--state CO] [--all]
"""
import os
import sys
import json
import time
import logging
import argparse
import urllib.request
import urllib.parse
import csv
import io
from typing import Optional
from datetime import datetime, timezone
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import psycopg2
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)
DB_URL = os.getenv("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
# ── State data source registry ────────────────────────────────────────────────
SOCRATA_STATES = {
# Verified working 2026-04-20
"CO": {"url": "https://data.colorado.gov/resource/4ykn-tg5h.json", "name_field": "entityname", "number_field": "entityid", "type_field": "entitytypecode", "status_field": "entitystatus", "date_field": "entityformdate", "formation_state_field": "jurisdictonofformation"},
"IA": {"url": "https://data.iowa.gov/resource/ykb6-ywnd.json", "name_field": "entity_name", "number_field": "entity_number", "type_field": "entity_type", "status_field": "entity_status", "date_field": "date_formed", "formation_state_field": "home_state"},
"CT": {"url": "https://data.ct.gov/resource/n7gp-d28j.json", "name_field": "name", "number_field": "accountnumber", "type_field": "type", "status_field": "status", "date_field": "date_registration", "formation_state_field": "state_of_formation"},
"OR": {"url": "https://data.oregon.gov/resource/tckn-sxa6.json", "name_field": "business_name", "number_field": "registry_number", "type_field": "entity_type", "status_field": "status", "date_field": "registry_date", "formation_state_field": "state_of_origin"},
# NY dataset is active entities only (no status field — all are implicitly ACTIVE)
# jurisdiction field contains formation state ("New York" for domestic, other state for foreign)
"NY": {"url": "https://data.ny.gov/resource/n9v6-gdp6.json", "name_field": "current_entity_name", "number_field": "dos_id", "type_field": "entity_type", "status_field": "", "date_field": "initial_dos_filing_date", "formation_state_field": "jurisdiction", "default_status": "ACTIVE"},
# Broken as of 2026-04-20 — dataset IDs need updating (portals reorganized)
#
# "WA": {"url": "https://data.wa.gov/resource/????.json", ...},
# "IL": {"url": "https://data.illinois.gov/resource/????.json", ...},
# "PA": {"url": "https://data.pa.gov/resource/????.json", ...},
# "MI": {"url": "https://data.michigan.gov/resource/????.json", ...},
# "AK": {"url": "https://data.alaska.gov/resource/????.json", ...},
# "VT": {"url": "https://data.vermont.gov/resource/????.json", ...},
}
# States with alternative bulk download sources (not Socrata)
# These have downloadable CSV/XLSX files from their SOS websites
DIRECT_DOWNLOAD_STATES = {
# "FL": Florida Sunbiz provides monthly SFTP dump
# "CA": California SOS provides daily CSV extract
# "TX": Texas Comptroller provides downloadable SOSDirect data
# "WY": Wyoming SOS provides CSV export via WyoBiz
# "NV": Nevada SilverFlume provides searchable API (not bulk)
# "DE": Delaware Division of Corporations — no bulk data (paid API only)
}
# For states without bulk data: use Playwright live search on demand
# (slower, ~3-5s per lookup, cached 24h in name_search_cache)
# All 52 state adapters support search_name() for on-demand lookups
# ── Socrata downloader ────────────────────────────────────────────────────────
def download_socrata(state_code: str, config: dict) -> list[dict]:
"""Download all entities from a Socrata SODA API endpoint."""
base_url = config["url"]
all_records = []
offset = 0
batch_size = 50000
while True:
url = f"{base_url}?$limit={batch_size}&$offset={offset}&$order=:id"
log.info(f" [{state_code}] Fetching offset={offset}...")
try:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=120) as r:
data = json.loads(r.read())
except Exception as e:
log.error(f" [{state_code}] Socrata error at offset {offset}: {e}")
break
if not data:
break
for record in data:
# Extract formation state (where entity was originally incorporated)
fs_field = config.get("formation_state_field", "")
raw_formation_state = str(record.get(fs_field, "")).strip().upper() if fs_field else ""
# Normalize to 2-letter code (some states return full name)
formation_state = _normalize_state_code(raw_formation_state) if raw_formation_state else None
raw_status = str(record.get(config["status_field"], "")).strip() if config.get("status_field") else ""
entity = {
"entity_name": str(record.get(config["name_field"], "")).strip().upper(),
"entity_number": str(record.get(config["number_field"], "")).strip(),
"entity_type": _normalize_type(str(record.get(config["type_field"], "")).strip()),
"status": _normalize_status(raw_status) if raw_status else config.get("default_status", "ACTIVE"),
"formation_date": _parse_date(record.get(config["date_field"])),
"formation_state": formation_state,
"jurisdiction": f"US_{state_code}",
"state": state_code,
"registered_agent": str(record.get("registered_agent", record.get("agent_name", ""))).strip() or None,
"principal_address": _build_address(record),
}
if entity["entity_name"] and entity["entity_number"]:
all_records.append(entity)
offset += batch_size
if len(data) < batch_size:
break
time.sleep(0.5) # Be respectful to the API
return all_records
_STATE_NAME_TO_CODE = {
"ALABAMA": "AL", "ALASKA": "AK", "ARIZONA": "AZ", "ARKANSAS": "AR",
"CALIFORNIA": "CA", "COLORADO": "CO", "CONNECTICUT": "CT", "DELAWARE": "DE",
"DISTRICT OF COLUMBIA": "DC", "FLORIDA": "FL", "GEORGIA": "GA", "HAWAII": "HI",
"IDAHO": "ID", "ILLINOIS": "IL", "INDIANA": "IN", "IOWA": "IA",
"KANSAS": "KS", "KENTUCKY": "KY", "LOUISIANA": "LA", "MAINE": "ME",
"MARYLAND": "MD", "MASSACHUSETTS": "MA", "MICHIGAN": "MI", "MINNESOTA": "MN",
"MISSISSIPPI": "MS", "MISSOURI": "MO", "MONTANA": "MT", "NEBRASKA": "NE",
"NEVADA": "NV", "NEW HAMPSHIRE": "NH", "NEW JERSEY": "NJ", "NEW MEXICO": "NM",
"NEW YORK": "NY", "NORTH CAROLINA": "NC", "NORTH DAKOTA": "ND", "OHIO": "OH",
"OKLAHOMA": "OK", "OREGON": "OR", "PENNSYLVANIA": "PA", "RHODE ISLAND": "RI",
"SOUTH CAROLINA": "SC", "SOUTH DAKOTA": "SD", "TENNESSEE": "TN", "TEXAS": "TX",
"UTAH": "UT", "VERMONT": "VT", "VIRGINIA": "VA", "WASHINGTON": "WA",
"WEST VIRGINIA": "WV", "WISCONSIN": "WI", "WYOMING": "WY",
}
def _normalize_state_code(raw: str) -> Optional[str]:
"""Convert full state name or abbreviation to 2-letter code."""
raw = raw.strip().upper()
if len(raw) == 2 and raw.isalpha():
return raw
return _STATE_NAME_TO_CODE.get(raw)
def _normalize_type(raw: str) -> str:
upper = raw.upper()
if "LLC" in upper or "LIMITED LIABILITY" in upper:
return "LLC"
if "CORP" in upper or "INC" in upper:
return "CORPORATION"
if "LP" in upper or "LIMITED PARTNERSHIP" in upper:
return "LP"
if "LLP" in upper:
return "LLP"
if "NONPROFIT" in upper or "NOT FOR PROFIT" in upper:
return "NONPROFIT"
return raw.upper()[:50] if raw else None
def _normalize_status(raw: str) -> str:
upper = raw.upper()
if "ACTIVE" in upper or "GOOD STANDING" in upper or "CURRENT" in upper:
return "ACTIVE"
if "DISSOLV" in upper or "CANCEL" in upper:
return "DISSOLVED"
if "SUSPEND" in upper or "REVOK" in upper:
return "SUSPENDED"
if "DELINQ" in upper or "DEFAULT" in upper:
return "DELINQUENT"
if "INACTIVE" in upper or "WITHDRAWN" in upper:
return "INACTIVE"
return raw.upper()[:30] if raw else None
def _parse_date(val) -> str | None:
if not val:
return None
s = str(val).strip()
# ISO format
if len(s) >= 10 and s[4] == "-":
return s[:10]
# Socrata floating timestamp: "2020-03-15T00:00:00.000"
if "T" in s:
return s[:10]
return None
def _build_address(record: dict) -> str | None:
parts = []
for key in ["principal_address", "address", "street_address", "mailing_address",
"principal_office_addr", "addr_line1"]:
if key in record and record[key]:
parts.append(str(record[key]).strip())
break
for key in ["principal_city", "city"]:
if key in record and record[key]:
parts.append(str(record[key]).strip())
break
for key in ["principal_state", "state_province"]:
if key in record and record[key]:
parts.append(str(record[key]).strip())
break
for key in ["principal_zip", "zip", "postal_code"]:
if key in record and record[key]:
parts.append(str(record[key]).strip())
break
return ", ".join(parts) if parts else None
# ── Database upsert ───────────────────────────────────────────────────────────
def upsert_entities(entities: list[dict], state_code: str) -> int:
"""UPSERT entities into entity_cache table. Returns count of upserted rows."""
if not entities:
return 0
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
count = 0
try:
# Deduplicate by (jurisdiction, entity_number) to avoid ON CONFLICT errors
seen_keys: set = set()
deduped: list = []
for e in entities:
key = (e["jurisdiction"], e["entity_number"])
if key not in seen_keys:
seen_keys.add(key)
deduped.append(e)
if len(deduped) < len(entities):
log.info(f" Deduped: {len(entities)}{len(deduped)} ({len(entities) - len(deduped)} duplicates removed)")
entities = deduped
for batch_start in range(0, len(entities), 500):
batch = entities[batch_start:batch_start + 500]
values = []
for e in batch:
values.append(cur.mogrify(
"(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'socrata')",
(
e["jurisdiction"], e["entity_name"], e["entity_number"],
e["entity_type"], e["status"], e["formation_date"],
None, # dissolution_date
e.get("registered_agent"),
e.get("principal_address"),
e["state"],
e.get("formation_state"),
)
).decode())
sql = f"""
INSERT INTO entity_cache
(jurisdiction, entity_name, entity_number, entity_type, status,
formation_date, dissolution_date, registered_agent, principal_address,
state, formation_state, source)
VALUES {",".join(values)}
ON CONFLICT (jurisdiction, entity_number) DO UPDATE SET
entity_name = EXCLUDED.entity_name,
entity_type = EXCLUDED.entity_type,
status = EXCLUDED.status,
formation_date = EXCLUDED.formation_date,
formation_state = COALESCE(EXCLUDED.formation_state, entity_cache.formation_state),
registered_agent = EXCLUDED.registered_agent,
principal_address = EXCLUDED.principal_address,
last_synced = NOW()
"""
cur.execute(sql)
count += len(batch)
conn.commit()
finally:
cur.close()
conn.close()
return count
# ── Main ──────────────────────────────────────────────────────────────────────
def download_state(state_code: str) -> int:
"""Download all entities for a single state. Returns count."""
state_code = state_code.upper()
if state_code in SOCRATA_STATES:
log.info(f"Downloading {state_code} via Socrata SODA API...")
entities = download_socrata(state_code, SOCRATA_STATES[state_code])
else:
log.warning(f"{state_code}: no bulk download source configured (Playwright-only)")
return 0
if entities:
count = upsert_entities(entities, state_code)
log.info(f" [{state_code}] Upserted {count} entities")
return count
else:
log.warning(f" [{state_code}] No entities downloaded")
return 0
def main():
parser = argparse.ArgumentParser(description="Bulk download business entities from state open data portals")
parser.add_argument("--state", type=str, help="Download a single state (2-letter code)")
parser.add_argument("--all", action="store_true", help="Download all configured states")
parser.add_argument("--list", action="store_true", help="List available states")
args = parser.parse_args()
if args.list:
print("Socrata SODA API states:")
for code in sorted(SOCRATA_STATES.keys()):
print(f" {code}: {SOCRATA_STATES[code]['url']}")
return
if args.state:
total = download_state(args.state)
log.info(f"Done: {total} entities for {args.state.upper()}")
elif args.all:
grand_total = 0
for code in sorted(SOCRATA_STATES.keys()):
total = download_state(code)
grand_total += total
time.sleep(2) # Pause between states
log.info(f"Done: {grand_total} total entities across {len(SOCRATA_STATES)} states")
else:
parser.print_help()
if __name__ == "__main__":
main()