new-site/scripts/formation/ein_worker.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

666 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ein_worker.py — IRS EIN (Employer Identification Number) obtainment via the
IRS online application at https://sa.www4.irs.gov/modiein/individual/index.jsp
Uses Playwright to fill out the SS-4 equivalent online form and extracts the
assigned EIN from the confirmation page.
IMPORTANT: IRS online EIN is only available MonFri, 7:00 AM 10:00 PM ET.
Environment variables:
DATABASE_URL PostgreSQL connection string (optional, for order updates)
Usage:
# Standalone — obtain EIN for an order in the database
python -m formation.ein_worker <order_id>
# Called programmatically from formation_worker
from formation.ein_worker import obtain_ein
result = await obtain_ein(order)
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from zoneinfo import ZoneInfo
from playwright.async_api import async_playwright, Page
from .base import EntityType, FormationOrder, Member
LOG = logging.getLogger("formation.ein")
DATABASE_URL = os.environ.get("DATABASE_URL", "")
IRS_EIN_URL = "https://sa.www4.irs.gov/modiein/individual/index.jsp"
SCREENSHOTS_DIR = Path(os.getenv("SCREENSHOTS_DIR", "/tmp/formation-screenshots"))
SCREENSHOTS_DIR.mkdir(parents=True, exist_ok=True)
# ---------------------------------------------------------------------------
# Result type
# ---------------------------------------------------------------------------
@dataclass
class EINResult:
success: bool
ein: str = ""
confirmation_pdf: str = "" # Path to PDF screenshot
error_message: str = ""
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Availability check
# ---------------------------------------------------------------------------
ET = ZoneInfo("America/New_York")
def is_irs_available() -> bool:
"""
Check if the IRS online EIN application is currently available.
Available MonFri, 7:00 AM 10:00 PM Eastern Time.
"""
now_et = datetime.now(ET)
weekday = now_et.weekday() # 0=Monday, 6=Sunday
hour = now_et.hour
if weekday >= 5: # Saturday or Sunday
return False
if hour < 7 or hour >= 22: # Before 7 AM or after 10 PM
return False
return True
def next_available_time() -> datetime:
"""Return the next datetime (ET) when the IRS EIN service will be available."""
now_et = datetime.now(ET)
# If currently available, return now
if is_irs_available():
return now_et
# Find next available slot
candidate = now_et.replace(hour=7, minute=0, second=0, microsecond=0)
if candidate <= now_et:
# Move to next day
from datetime import timedelta
candidate += timedelta(days=1)
# Skip weekends
while candidate.weekday() >= 5:
from datetime import timedelta
candidate += timedelta(days=1)
return candidate
# ---------------------------------------------------------------------------
# Helper: responsible party (first member / organizer)
# ---------------------------------------------------------------------------
def _get_responsible_party(order: FormationOrder) -> Member | None:
"""Get the responsible party for the EIN application."""
# Prefer the organizer
for m in order.members:
if m.is_organizer:
return m
# Fall back to first member
return order.members[0] if order.members else None
# ---------------------------------------------------------------------------
# Core EIN automation
# ---------------------------------------------------------------------------
async def obtain_ein(order: FormationOrder) -> EINResult:
"""
Obtain an EIN from the IRS online application for the given order.
Navigates the IRS EIN Assistant, fills out entity information, responsible
party details, and extracts the assigned EIN from the confirmation page.
Args:
order: FormationOrder with entity and member details.
Returns:
EINResult with the assigned EIN or error information.
"""
# Check availability
if not is_irs_available():
next_time = next_available_time()
return EINResult(
success=False,
error_message=(
f"IRS online EIN application is not currently available. "
f"Hours: MonFri 7 AM 10 PM ET. "
f"Next available: {next_time.strftime('%A %B %d, %Y at %I:%M %p ET')}"
),
)
responsible_party = _get_responsible_party(order)
if not responsible_party:
return EINResult(
success=False,
error_message="No members/responsible party found on order.",
)
LOG.info(
"[%s] Starting EIN application for %s (%s)",
order.order_id,
order.entity_name,
order.state_code,
)
pw = await async_playwright().start()
browser = await pw.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled", "--no-sandbox"],
)
context = await browser.new_context(
viewport={"width": 1280, "height": 900},
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
),
locale="en-US",
timezone_id="America/New_York",
)
await context.add_init_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
page = await context.new_page()
async def _screenshot(label: str) -> str:
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
path = SCREENSHOTS_DIR / f"ein_{order.order_id}_{label}_{ts}.png"
await page.screenshot(path=str(path), full_page=True)
LOG.info("Screenshot: %s", path)
return str(path)
async def _delay(min_s: float = 1.0, max_s: float = 3.0):
import random
await asyncio.sleep(random.uniform(min_s, max_s))
try:
# Step 1: Navigate to IRS EIN Assistant
LOG.info("[%s] Navigating to IRS EIN Assistant...", order.order_id)
await page.goto(IRS_EIN_URL, wait_until="networkidle", timeout=30000)
await _delay(2, 4)
await _screenshot("01_landing")
# Step 2: Begin application — click "Begin Application" or "Apply Online Now"
begin_selectors = [
"input[value*='Begin Application']",
"a:has-text('Begin Application')",
"input[value*='Apply']",
"button:has-text('Begin')",
]
for sel in begin_selectors:
try:
el = await page.query_selector(sel)
if el:
await el.click()
break
except Exception:
continue
await _delay(2, 3)
# Step 3: Select entity type
LOG.info("[%s] Selecting entity type...", order.order_id)
if order.entity_type == EntityType.LLC:
# Select "Limited Liability Company (LLC)"
llc_selectors = [
"input[value*='LLC']",
"input[value*='limited liability']",
"label:has-text('Limited Liability Company')",
"input[type='radio'][id*='llc']",
]
for sel in llc_selectors:
try:
el = await page.query_selector(sel)
if el:
await el.click()
break
except Exception:
continue
elif order.entity_type in (EntityType.CORPORATION, EntityType.S_CORP):
corp_selectors = [
"input[value*='Corporation']",
"label:has-text('Corporation')",
"input[type='radio'][id*='corp']",
]
for sel in corp_selectors:
try:
el = await page.query_selector(sel)
if el:
await el.click()
break
except Exception:
continue
await _delay(1, 2)
# Click Continue/Next
await _click_continue(page)
await _delay(2, 3)
await _screenshot("02_entity_type")
# Step 4: Number of members (for LLC)
if order.entity_type == EntityType.LLC:
member_count = len(order.members)
if member_count <= 1:
# Single-member LLC
try:
await page.click("input[value*='1'], input[value*='single']")
except Exception:
pass
else:
# Multi-member LLC
try:
await page.click("input[value*='multi'], input[value*='More']")
except Exception:
pass
await _delay(1, 2)
await _click_continue(page)
await _delay(2, 3)
# Step 5: State of formation
LOG.info("[%s] Selecting state: %s", order.order_id, order.state_code)
state_select = await page.query_selector("select[name*='state'], select[id*='state']")
if state_select:
from .states import STATES
state_name = STATES.get(order.state_code.upper(), {}).get("name", order.state_code)
await state_select.select_option(label=state_name)
await _delay(1, 2)
await _click_continue(page)
await _delay(2, 3)
await _screenshot("03_state")
# Step 6: Reason for applying — "Started new business"
LOG.info("[%s] Selecting reason for applying...", order.order_id)
reason_selectors = [
"input[value*='Started']",
"input[value*='new business']",
"label:has-text('Started new business')",
"input[type='radio']:first-of-type",
]
for sel in reason_selectors:
try:
el = await page.query_selector(sel)
if el:
await el.click()
break
except Exception:
continue
await _delay(1, 2)
await _click_continue(page)
await _delay(2, 3)
await _screenshot("04_reason")
# Step 7: Entity information — name, address
LOG.info("[%s] Filling entity information...", order.order_id)
await _fill_field(page, "name", order.entity_name)
await _fill_field(page, "trade", order.entity_name) # DBA if asked
await _fill_field(page, "address", order.principal_address or responsible_party.address)
await _fill_field(page, "city", order.principal_city or responsible_party.city)
await _fill_field(page, "zip", order.principal_zip or responsible_party.zip_code)
# State dropdown for address
addr_state = order.principal_state or responsible_party.state
addr_state_selects = await page.query_selector_all("select")
for sel_el in addr_state_selects:
name_attr = await sel_el.get_attribute("name") or ""
id_attr = await sel_el.get_attribute("id") or ""
if "state" in name_attr.lower() or "state" in id_attr.lower():
try:
await sel_el.select_option(value=addr_state)
except Exception:
try:
from .states import STATES as _S
sn = _S.get(addr_state.upper(), {}).get("name", addr_state)
await sel_el.select_option(label=sn)
except Exception:
pass
break
await _delay(1, 2)
await _click_continue(page)
await _delay(2, 3)
await _screenshot("05_entity_info")
# Step 8: Responsible party information
LOG.info("[%s] Filling responsible party: %s", order.order_id, responsible_party.name)
name_parts = responsible_party.name.split(None, 1)
first_name = name_parts[0] if name_parts else ""
last_name = name_parts[1] if len(name_parts) > 1 else ""
await _fill_field(page, "first", first_name)
await _fill_field(page, "last", last_name)
# SSN/ITIN — these would be provided securely; placeholder for the field
# In production, SSN is passed through secure order data (not stored in plain text)
ssn = getattr(order, "_responsible_party_ssn", "")
if ssn:
ssn_fields = await page.query_selector_all("input[type='text'][maxlength='3'], input[type='text'][maxlength='2'], input[type='text'][maxlength='4']")
ssn_digits = re.sub(r"\D", "", ssn)
if len(ssn_digits) == 9 and len(ssn_fields) >= 3:
await ssn_fields[0].fill(ssn_digits[:3])
await _delay(0.3, 0.6)
await ssn_fields[1].fill(ssn_digits[3:5])
await _delay(0.3, 0.6)
await ssn_fields[2].fill(ssn_digits[5:])
await _delay(1, 2)
await _click_continue(page)
await _delay(2, 3)
await _screenshot("06_responsible_party")
# Step 9: Additional questions — date started, fiscal year, etc.
LOG.info("[%s] Filling additional details...", order.order_id)
today_str = datetime.now().strftime("%m/%d/%Y")
await _fill_field(page, "date", order.effective_date or today_str)
await _fill_field(page, "closing", order.fiscal_year_end or "December")
# Number of employees expected (select "0" or "No employees planned")
await _fill_field(page, "employee", "0")
await _delay(1, 2)
await _click_continue(page)
await _delay(2, 3)
await _screenshot("07_additional")
# Step 10: Review and submit
LOG.info("[%s] Reviewing and submitting application...", order.order_id)
await _screenshot("08_review")
submit_selectors = [
"input[value*='Submit']",
"button:has-text('Submit')",
"input[type='submit']",
]
for sel in submit_selectors:
try:
el = await page.query_selector(sel)
if el:
await el.click()
break
except Exception:
continue
await _delay(3, 5)
await _screenshot("09_submitted")
# Step 11: Extract EIN from confirmation page
LOG.info("[%s] Extracting EIN from confirmation...", order.order_id)
page_text = await page.inner_text("body")
# EIN format: XX-XXXXXXX
ein_match = re.search(r"\b(\d{2}-\d{7})\b", page_text)
if not ein_match:
# Try without hyphen
ein_match = re.search(r"EIN[:\s]*(\d{9})", page_text, re.IGNORECASE)
if ein_match:
ein = ein_match.group(1)
# Normalize to XX-XXXXXXX format
if "-" not in ein and len(ein) == 9:
ein = f"{ein[:2]}-{ein[2:]}"
LOG.info("[%s] EIN obtained: %s", order.order_id, ein)
else:
LOG.error("[%s] Could not extract EIN from confirmation page", order.order_id)
await _screenshot("09_no_ein_found")
return EINResult(
success=False,
error_message="Could not extract EIN from IRS confirmation page.",
confirmation_pdf=await _save_confirmation_pdf(page, order.order_id),
)
# Save confirmation as PDF
confirmation_pdf = await _save_confirmation_pdf(page, order.order_id)
await _screenshot("10_confirmation")
return EINResult(
success=True,
ein=ein,
confirmation_pdf=confirmation_pdf,
)
except Exception as exc:
LOG.error("[%s] EIN application failed: %s", order.order_id, exc, exc_info=True)
try:
await _screenshot("error")
except Exception:
pass
return EINResult(
success=False,
error_message=str(exc),
)
finally:
await context.close()
await browser.close()
# ---------------------------------------------------------------------------
# Page interaction helpers
# ---------------------------------------------------------------------------
async def _fill_field(page: Page, name_hint: str, value: str):
"""
Attempt to fill a form field matching a name/id hint.
Tries multiple selector strategies.
"""
if not value:
return
selectors = [
f"input[name*='{name_hint}' i]",
f"input[id*='{name_hint}' i]",
f"textarea[name*='{name_hint}' i]",
f"select[name*='{name_hint}' i]",
]
for sel in selectors:
try:
el = await page.query_selector(sel)
if el:
tag = await el.evaluate("e => e.tagName.toLowerCase()")
if tag == "select":
try:
await el.select_option(label=value)
except Exception:
await el.select_option(value=value)
else:
await el.fill(value)
return
except Exception:
continue
async def _click_continue(page: Page):
"""Click the Continue/Next/Submit button on the current IRS page."""
selectors = [
"input[value='Continue']",
"input[value='Next']",
"input[value*='Continue']",
"button:has-text('Continue')",
"button:has-text('Next')",
"input[type='submit']",
]
for sel in selectors:
try:
el = await page.query_selector(sel)
if el and await el.is_visible():
await el.click()
return
except Exception:
continue
async def _save_confirmation_pdf(page: Page, order_id: str) -> str:
"""Save the current page as a PDF screenshot for records."""
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
output_dir = Path(f"/tmp/formations/{order_id}")
output_dir.mkdir(parents=True, exist_ok=True)
pdf_path = output_dir / f"ein_confirmation_{ts}.pdf"
try:
await page.pdf(path=str(pdf_path))
LOG.info("EIN confirmation PDF saved: %s", pdf_path)
except Exception:
# PDF generation only works in headless Chromium; fall back to screenshot
png_path = output_dir / f"ein_confirmation_{ts}.png"
await page.screenshot(path=str(png_path), full_page=True)
LOG.info("EIN confirmation screenshot saved (PDF fallback): %s", png_path)
return str(png_path)
return str(pdf_path)
# ---------------------------------------------------------------------------
# Database update
# ---------------------------------------------------------------------------
def _update_order_ein(order_id: str, ein: str, confirmation_pdf: str):
"""Update the formation_orders table with the obtained EIN."""
if not DATABASE_URL:
LOG.warning("DATABASE_URL not set — skipping order update for EIN")
return
import psycopg2
conn = psycopg2.connect(DATABASE_URL)
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE formation_orders
SET ein = %s,
ein_confirmation = %s,
updated_at = NOW()
WHERE order_id = %s
""",
(ein, confirmation_pdf, order_id),
)
conn.commit()
LOG.info("Updated order %s with EIN %s", order_id, ein)
finally:
conn.close()
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
async def _main_standalone(order_id: str):
"""Fetch order from DB and obtain EIN."""
if not DATABASE_URL:
print("Error: DATABASE_URL not set.", file=sys.stderr)
sys.exit(1)
import psycopg2
import psycopg2.extras
conn = psycopg2.connect(DATABASE_URL)
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM formation_orders WHERE order_id = %s", (order_id,))
row = cur.fetchone()
finally:
conn.close()
if not row:
print(f"Error: Order {order_id} not found.", file=sys.stderr)
sys.exit(1)
# Build FormationOrder from row
members_raw = row.get("members")
if isinstance(members_raw, str):
members_raw = json.loads(members_raw)
elif members_raw is None:
members_raw = []
members = [
Member(
name=m.get("name", ""),
address=m.get("address", ""),
city=m.get("city", ""),
state=m.get("state", ""),
zip_code=m.get("zip_code", ""),
title=m.get("title", "Member"),
ownership_pct=float(m.get("ownership_pct", 0)),
is_organizer=bool(m.get("is_organizer", False)),
)
for m in members_raw
]
try:
entity_type = EntityType(row.get("entity_type", "llc"))
except ValueError:
entity_type = EntityType.LLC
order = FormationOrder(
order_id=str(row["order_id"]),
state_code=row.get("state_code", ""),
entity_type=entity_type,
entity_name=row.get("entity_name", ""),
members=members,
principal_address=row.get("principal_address", ""),
principal_city=row.get("principal_city", ""),
principal_state=row.get("principal_state", ""),
principal_zip=row.get("principal_zip", ""),
fiscal_year_end=row.get("fiscal_year_end", "12/31"),
effective_date=row.get("effective_date", "") or "",
)
# Check availability first
if not is_irs_available():
next_time = next_available_time()
print(
f"IRS EIN online service is currently unavailable.\n"
f"Hours: MonFri, 7:00 AM 10:00 PM ET\n"
f"Next available: {next_time.strftime('%A %B %d, %Y at %I:%M %p ET')}"
)
sys.exit(1)
result = await obtain_ein(order)
if result.success:
print(f"EIN obtained: {result.ein}")
print(f"Confirmation: {result.confirmation_pdf}")
_update_order_ein(order.order_id, result.ein, result.confirmation_pdf)
else:
print(f"EIN application failed: {result.error_message}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
if len(sys.argv) < 2:
print("Usage: python -m formation.ein_worker <order_id>")
print()
print("Obtains an EIN from the IRS online application for the given order.")
print()
print("Note: IRS online EIN is only available MonFri, 7 AM 10 PM ET.")
sys.exit(1)
asyncio.run(_main_standalone(sys.argv[1]))