Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
338 lines
14 KiB
Python
338 lines
14 KiB
Python
"""bulk_download.py — Download business entity data from state open data portals.
|
|
|
|
Supports:
|
|
- Socrata SODA API (CO, AK, CT, IL, IA, MI, NY, OR, PA, VT, WA)
|
|
- SFTP bulk download (FL)
|
|
- HTTP CSV bulk download (CA, TX)
|
|
|
|
Run: python3 scripts/formation/bulk_download.py [--state CO] [--all]
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import urllib.request
|
|
import urllib.parse
|
|
import csv
|
|
import io
|
|
from typing import Optional
|
|
from datetime import datetime, timezone
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import psycopg2
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="[%(asctime)s] %(levelname)s %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
DB_URL = os.getenv("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
|
|
|
|
# ── State data source registry ────────────────────────────────────────────────
|
|
|
|
SOCRATA_STATES = {
|
|
# Verified working 2026-04-20
|
|
"CO": {"url": "https://data.colorado.gov/resource/4ykn-tg5h.json", "name_field": "entityname", "number_field": "entityid", "type_field": "entitytypecode", "status_field": "entitystatus", "date_field": "entityformdate", "formation_state_field": "jurisdictonofformation"},
|
|
"IA": {"url": "https://data.iowa.gov/resource/ykb6-ywnd.json", "name_field": "entity_name", "number_field": "entity_number", "type_field": "entity_type", "status_field": "entity_status", "date_field": "date_formed", "formation_state_field": "home_state"},
|
|
"CT": {"url": "https://data.ct.gov/resource/n7gp-d28j.json", "name_field": "name", "number_field": "accountnumber", "type_field": "type", "status_field": "status", "date_field": "date_registration", "formation_state_field": "state_of_formation"},
|
|
"OR": {"url": "https://data.oregon.gov/resource/tckn-sxa6.json", "name_field": "business_name", "number_field": "registry_number", "type_field": "entity_type", "status_field": "status", "date_field": "registry_date", "formation_state_field": "state_of_origin"},
|
|
# NY dataset is active entities only (no status field — all are implicitly ACTIVE)
|
|
# jurisdiction field contains formation state ("New York" for domestic, other state for foreign)
|
|
"NY": {"url": "https://data.ny.gov/resource/n9v6-gdp6.json", "name_field": "current_entity_name", "number_field": "dos_id", "type_field": "entity_type", "status_field": "", "date_field": "initial_dos_filing_date", "formation_state_field": "jurisdiction", "default_status": "ACTIVE"},
|
|
# Broken as of 2026-04-20 — dataset IDs need updating (portals reorganized)
|
|
#
|
|
# "WA": {"url": "https://data.wa.gov/resource/????.json", ...},
|
|
# "IL": {"url": "https://data.illinois.gov/resource/????.json", ...},
|
|
# "PA": {"url": "https://data.pa.gov/resource/????.json", ...},
|
|
# "MI": {"url": "https://data.michigan.gov/resource/????.json", ...},
|
|
# "AK": {"url": "https://data.alaska.gov/resource/????.json", ...},
|
|
# "VT": {"url": "https://data.vermont.gov/resource/????.json", ...},
|
|
}
|
|
|
|
# States with alternative bulk download sources (not Socrata)
|
|
# These have downloadable CSV/XLSX files from their SOS websites
|
|
DIRECT_DOWNLOAD_STATES = {
|
|
# "FL": Florida Sunbiz provides monthly SFTP dump
|
|
# "CA": California SOS provides daily CSV extract
|
|
# "TX": Texas Comptroller provides downloadable SOSDirect data
|
|
# "WY": Wyoming SOS provides CSV export via WyoBiz
|
|
# "NV": Nevada SilverFlume provides searchable API (not bulk)
|
|
# "DE": Delaware Division of Corporations — no bulk data (paid API only)
|
|
}
|
|
|
|
# For states without bulk data: use Playwright live search on demand
|
|
# (slower, ~3-5s per lookup, cached 24h in name_search_cache)
|
|
# All 52 state adapters support search_name() for on-demand lookups
|
|
|
|
# ── Socrata downloader ────────────────────────────────────────────────────────
|
|
|
|
def download_socrata(state_code: str, config: dict) -> list[dict]:
|
|
"""Download all entities from a Socrata SODA API endpoint."""
|
|
base_url = config["url"]
|
|
all_records = []
|
|
offset = 0
|
|
batch_size = 50000
|
|
|
|
while True:
|
|
url = f"{base_url}?$limit={batch_size}&$offset={offset}&$order=:id"
|
|
log.info(f" [{state_code}] Fetching offset={offset}...")
|
|
|
|
try:
|
|
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
|
with urllib.request.urlopen(req, timeout=120) as r:
|
|
data = json.loads(r.read())
|
|
except Exception as e:
|
|
log.error(f" [{state_code}] Socrata error at offset {offset}: {e}")
|
|
break
|
|
|
|
if not data:
|
|
break
|
|
|
|
for record in data:
|
|
# Extract formation state (where entity was originally incorporated)
|
|
fs_field = config.get("formation_state_field", "")
|
|
raw_formation_state = str(record.get(fs_field, "")).strip().upper() if fs_field else ""
|
|
# Normalize to 2-letter code (some states return full name)
|
|
formation_state = _normalize_state_code(raw_formation_state) if raw_formation_state else None
|
|
|
|
raw_status = str(record.get(config["status_field"], "")).strip() if config.get("status_field") else ""
|
|
entity = {
|
|
"entity_name": str(record.get(config["name_field"], "")).strip().upper(),
|
|
"entity_number": str(record.get(config["number_field"], "")).strip(),
|
|
"entity_type": _normalize_type(str(record.get(config["type_field"], "")).strip()),
|
|
"status": _normalize_status(raw_status) if raw_status else config.get("default_status", "ACTIVE"),
|
|
"formation_date": _parse_date(record.get(config["date_field"])),
|
|
"formation_state": formation_state,
|
|
"jurisdiction": f"US_{state_code}",
|
|
"state": state_code,
|
|
"registered_agent": str(record.get("registered_agent", record.get("agent_name", ""))).strip() or None,
|
|
"principal_address": _build_address(record),
|
|
}
|
|
if entity["entity_name"] and entity["entity_number"]:
|
|
all_records.append(entity)
|
|
|
|
offset += batch_size
|
|
if len(data) < batch_size:
|
|
break
|
|
|
|
time.sleep(0.5) # Be respectful to the API
|
|
|
|
return all_records
|
|
|
|
|
|
_STATE_NAME_TO_CODE = {
|
|
"ALABAMA": "AL", "ALASKA": "AK", "ARIZONA": "AZ", "ARKANSAS": "AR",
|
|
"CALIFORNIA": "CA", "COLORADO": "CO", "CONNECTICUT": "CT", "DELAWARE": "DE",
|
|
"DISTRICT OF COLUMBIA": "DC", "FLORIDA": "FL", "GEORGIA": "GA", "HAWAII": "HI",
|
|
"IDAHO": "ID", "ILLINOIS": "IL", "INDIANA": "IN", "IOWA": "IA",
|
|
"KANSAS": "KS", "KENTUCKY": "KY", "LOUISIANA": "LA", "MAINE": "ME",
|
|
"MARYLAND": "MD", "MASSACHUSETTS": "MA", "MICHIGAN": "MI", "MINNESOTA": "MN",
|
|
"MISSISSIPPI": "MS", "MISSOURI": "MO", "MONTANA": "MT", "NEBRASKA": "NE",
|
|
"NEVADA": "NV", "NEW HAMPSHIRE": "NH", "NEW JERSEY": "NJ", "NEW MEXICO": "NM",
|
|
"NEW YORK": "NY", "NORTH CAROLINA": "NC", "NORTH DAKOTA": "ND", "OHIO": "OH",
|
|
"OKLAHOMA": "OK", "OREGON": "OR", "PENNSYLVANIA": "PA", "RHODE ISLAND": "RI",
|
|
"SOUTH CAROLINA": "SC", "SOUTH DAKOTA": "SD", "TENNESSEE": "TN", "TEXAS": "TX",
|
|
"UTAH": "UT", "VERMONT": "VT", "VIRGINIA": "VA", "WASHINGTON": "WA",
|
|
"WEST VIRGINIA": "WV", "WISCONSIN": "WI", "WYOMING": "WY",
|
|
}
|
|
|
|
|
|
def _normalize_state_code(raw: str) -> Optional[str]:
|
|
"""Convert full state name or abbreviation to 2-letter code."""
|
|
raw = raw.strip().upper()
|
|
if len(raw) == 2 and raw.isalpha():
|
|
return raw
|
|
return _STATE_NAME_TO_CODE.get(raw)
|
|
|
|
|
|
def _normalize_type(raw: str) -> str:
|
|
upper = raw.upper()
|
|
if "LLC" in upper or "LIMITED LIABILITY" in upper:
|
|
return "LLC"
|
|
if "CORP" in upper or "INC" in upper:
|
|
return "CORPORATION"
|
|
if "LP" in upper or "LIMITED PARTNERSHIP" in upper:
|
|
return "LP"
|
|
if "LLP" in upper:
|
|
return "LLP"
|
|
if "NONPROFIT" in upper or "NOT FOR PROFIT" in upper:
|
|
return "NONPROFIT"
|
|
return raw.upper()[:50] if raw else None
|
|
|
|
|
|
def _normalize_status(raw: str) -> str:
|
|
upper = raw.upper()
|
|
if "ACTIVE" in upper or "GOOD STANDING" in upper or "CURRENT" in upper:
|
|
return "ACTIVE"
|
|
if "DISSOLV" in upper or "CANCEL" in upper:
|
|
return "DISSOLVED"
|
|
if "SUSPEND" in upper or "REVOK" in upper:
|
|
return "SUSPENDED"
|
|
if "DELINQ" in upper or "DEFAULT" in upper:
|
|
return "DELINQUENT"
|
|
if "INACTIVE" in upper or "WITHDRAWN" in upper:
|
|
return "INACTIVE"
|
|
return raw.upper()[:30] if raw else None
|
|
|
|
|
|
def _parse_date(val) -> str | None:
|
|
if not val:
|
|
return None
|
|
s = str(val).strip()
|
|
# ISO format
|
|
if len(s) >= 10 and s[4] == "-":
|
|
return s[:10]
|
|
# Socrata floating timestamp: "2020-03-15T00:00:00.000"
|
|
if "T" in s:
|
|
return s[:10]
|
|
return None
|
|
|
|
|
|
def _build_address(record: dict) -> str | None:
|
|
parts = []
|
|
for key in ["principal_address", "address", "street_address", "mailing_address",
|
|
"principal_office_addr", "addr_line1"]:
|
|
if key in record and record[key]:
|
|
parts.append(str(record[key]).strip())
|
|
break
|
|
for key in ["principal_city", "city"]:
|
|
if key in record and record[key]:
|
|
parts.append(str(record[key]).strip())
|
|
break
|
|
for key in ["principal_state", "state_province"]:
|
|
if key in record and record[key]:
|
|
parts.append(str(record[key]).strip())
|
|
break
|
|
for key in ["principal_zip", "zip", "postal_code"]:
|
|
if key in record and record[key]:
|
|
parts.append(str(record[key]).strip())
|
|
break
|
|
return ", ".join(parts) if parts else None
|
|
|
|
|
|
# ── Database upsert ───────────────────────────────────────────────────────────
|
|
|
|
def upsert_entities(entities: list[dict], state_code: str) -> int:
|
|
"""UPSERT entities into entity_cache table. Returns count of upserted rows."""
|
|
if not entities:
|
|
return 0
|
|
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
count = 0
|
|
|
|
try:
|
|
# Deduplicate by (jurisdiction, entity_number) to avoid ON CONFLICT errors
|
|
seen_keys: set = set()
|
|
deduped: list = []
|
|
for e in entities:
|
|
key = (e["jurisdiction"], e["entity_number"])
|
|
if key not in seen_keys:
|
|
seen_keys.add(key)
|
|
deduped.append(e)
|
|
if len(deduped) < len(entities):
|
|
log.info(f" Deduped: {len(entities)} → {len(deduped)} ({len(entities) - len(deduped)} duplicates removed)")
|
|
entities = deduped
|
|
|
|
for batch_start in range(0, len(entities), 500):
|
|
batch = entities[batch_start:batch_start + 500]
|
|
values = []
|
|
for e in batch:
|
|
values.append(cur.mogrify(
|
|
"(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'socrata')",
|
|
(
|
|
e["jurisdiction"], e["entity_name"], e["entity_number"],
|
|
e["entity_type"], e["status"], e["formation_date"],
|
|
None, # dissolution_date
|
|
e.get("registered_agent"),
|
|
e.get("principal_address"),
|
|
e["state"],
|
|
e.get("formation_state"),
|
|
)
|
|
).decode())
|
|
|
|
sql = f"""
|
|
INSERT INTO entity_cache
|
|
(jurisdiction, entity_name, entity_number, entity_type, status,
|
|
formation_date, dissolution_date, registered_agent, principal_address,
|
|
state, formation_state, source)
|
|
VALUES {",".join(values)}
|
|
ON CONFLICT (jurisdiction, entity_number) DO UPDATE SET
|
|
entity_name = EXCLUDED.entity_name,
|
|
entity_type = EXCLUDED.entity_type,
|
|
status = EXCLUDED.status,
|
|
formation_date = EXCLUDED.formation_date,
|
|
formation_state = COALESCE(EXCLUDED.formation_state, entity_cache.formation_state),
|
|
registered_agent = EXCLUDED.registered_agent,
|
|
principal_address = EXCLUDED.principal_address,
|
|
last_synced = NOW()
|
|
"""
|
|
cur.execute(sql)
|
|
count += len(batch)
|
|
|
|
conn.commit()
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
return count
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def download_state(state_code: str) -> int:
|
|
"""Download all entities for a single state. Returns count."""
|
|
state_code = state_code.upper()
|
|
|
|
if state_code in SOCRATA_STATES:
|
|
log.info(f"Downloading {state_code} via Socrata SODA API...")
|
|
entities = download_socrata(state_code, SOCRATA_STATES[state_code])
|
|
else:
|
|
log.warning(f"{state_code}: no bulk download source configured (Playwright-only)")
|
|
return 0
|
|
|
|
if entities:
|
|
count = upsert_entities(entities, state_code)
|
|
log.info(f" [{state_code}] Upserted {count} entities")
|
|
return count
|
|
else:
|
|
log.warning(f" [{state_code}] No entities downloaded")
|
|
return 0
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Bulk download business entities from state open data portals")
|
|
parser.add_argument("--state", type=str, help="Download a single state (2-letter code)")
|
|
parser.add_argument("--all", action="store_true", help="Download all configured states")
|
|
parser.add_argument("--list", action="store_true", help="List available states")
|
|
args = parser.parse_args()
|
|
|
|
if args.list:
|
|
print("Socrata SODA API states:")
|
|
for code in sorted(SOCRATA_STATES.keys()):
|
|
print(f" {code}: {SOCRATA_STATES[code]['url']}")
|
|
return
|
|
|
|
if args.state:
|
|
total = download_state(args.state)
|
|
log.info(f"Done: {total} entities for {args.state.upper()}")
|
|
elif args.all:
|
|
grand_total = 0
|
|
for code in sorted(SOCRATA_STATES.keys()):
|
|
total = download_state(code)
|
|
grand_total += total
|
|
time.sleep(2) # Pause between states
|
|
log.info(f"Done: {grand_total} total entities across {len(SOCRATA_STATES)} states")
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|