Initial commit — Performance West telecom compliance platform

Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
justin 2026-04-27 06:54:22 -05:00
commit f8cd37ac8c
1823 changed files with 145167 additions and 0 deletions

View file

@ -0,0 +1 @@
"""Telecom compliance service helpers (undetected browser, FCC/USAC adapters)."""

View file

@ -0,0 +1,307 @@
"""Global auto-filing toggle + admin-review workflow.
Gates the Playwright submission step of every FCC/USAC/BDC filing handler
so that by default no filing is submitted to the FCC without a
Performance West admin reviewing the generated packet first.
Design
------
* Settings live in ERPNext ``Compliance Settings`` (single DocType) under
the ``auto_filing_enabled`` Check field. Default = False (safer).
* A per-order override field (``custom_auto_filing_override`` on the Sales
Order) lets the admin approve a specific filing one-shot after review
without flipping the global toggle.
* When auto-filing is OFF and there's no per-order override, the handler:
1. Produces and uploads the packet as normal.
2. Creates an ERPNext ToDo assigned to ``admin_email`` (default
``ops@performancewest.net``) with a summary + "Approve & File"
CTA link.
3. Sends the admin a short HTML email with the same summary + link.
4. Leaves the order in "Awaiting Admin Review" state the workflow
picks it back up when the admin clicks Approve.
The Approve-and-File link hits the Express API endpoint
``POST /api/v1/compliance-orders/:order_number/approve-and-file`` which
sets ``custom_auto_filing_override = 1`` on the Sales Order and
re-dispatches the handler. Handler reads the override flag and runs
the Playwright submission even if the global toggle is still off.
Env fallback
------------
If ERPNext is unreachable (e.g. during local development), the module
reads the env var ``AUTO_FILING_ENABLED`` as a truthy override. Admin
email falls back to ``ADMIN_EMAIL`` env var, then
``ops@performancewest.net``.
"""
from __future__ import annotations
import logging
import os
import smtplib
from dataclasses import dataclass
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Optional
logger = logging.getLogger(__name__)
DEFAULT_ADMIN_EMAIL = "ops@performancewest.net"
# ─── Settings lookup ──────────────────────────────────────────────────────
@dataclass
class AutoFilingDecision:
"""Result of ``check_auto_filing`` — caller branches on ``may_submit``."""
may_submit: bool
reason: str
admin_email: str
global_enabled: bool
order_override: bool
def _env_truthy(value: str | None) -> bool:
if not value:
return False
return value.strip().lower() in ("1", "true", "yes", "on")
def _settings_from_erpnext() -> tuple[bool, str]:
"""Read ``Compliance Settings`` from ERPNext — returns (enabled, admin_email)."""
try:
from scripts.workers.erpnext_client import ERPNextClient
erp = ERPNextClient()
settings = erp.get_resource("Compliance Settings", "Compliance Settings")
if isinstance(settings, list):
settings = settings[0] if settings else {}
enabled = bool(settings.get("auto_filing_enabled", 0))
admin = (
settings.get("admin_email")
or os.environ.get("ADMIN_EMAIL")
or DEFAULT_ADMIN_EMAIL
)
return enabled, admin
except Exception as exc:
logger.debug("auto_filing: ERPNext settings read failed: %s", exc)
return _env_truthy(os.environ.get("AUTO_FILING_ENABLED")), (
os.environ.get("ADMIN_EMAIL") or DEFAULT_ADMIN_EMAIL
)
def check_auto_filing(order_data: dict) -> AutoFilingDecision:
"""Resolve whether the calling handler may submit to the FCC/USAC.
The handler passes the current ``order_data`` (as received from
``job_server.py``). We inspect the per-order override first, then
fall back to the global setting.
"""
order_override = bool(order_data.get("custom_auto_filing_override", 0))
global_enabled, admin_email = _settings_from_erpnext()
if order_override:
return AutoFilingDecision(
may_submit=True,
reason="per-order admin override",
admin_email=admin_email,
global_enabled=global_enabled,
order_override=True,
)
if global_enabled:
return AutoFilingDecision(
may_submit=True,
reason="global auto_filing_enabled",
admin_email=admin_email,
global_enabled=True,
order_override=False,
)
return AutoFilingDecision(
may_submit=False,
reason="auto-filing disabled; admin review required",
admin_email=admin_email,
global_enabled=False,
order_override=False,
)
# ─── Admin review hand-off ────────────────────────────────────────────────
def request_admin_review(
*,
order_number: str,
service_slug: str,
service_name: str,
entity_name: str,
frn: str,
packet_minio_paths: list[str],
admin_email: str,
summary: str = "",
) -> None:
"""Create an ERPNext ToDo + send a short email to the admin.
The email and the ToDo both include a one-click "Approve & File" URL
that re-dispatches the handler for this order with the auto-filing
override set.
"""
api_base = os.environ.get("API_URL", "http://api:3001").rstrip("/")
approve_url = (
f"{api_base}/api/v1/compliance-orders/{order_number}/approve-and-file"
)
todo_description = (
f"[{service_slug}] Admin review required for {order_number}\n\n"
f"Carrier: {entity_name}\n"
f"FRN: {frn or 'N/A'}\n"
f"Service: {service_name}\n\n"
f"Auto-filing is disabled. The generated packet is staged in MinIO — "
f"review it, then POST to the approve-and-file URL (or click the "
f"button in the admin email) to submit the filing to the FCC.\n\n"
f"Files in MinIO:\n"
+ "\n".join(f"{p}" for p in packet_minio_paths)
+ f"\n\nApprove & File:\n {approve_url}\n"
)
if summary:
todo_description += f"\nHandler notes:\n{summary}\n"
_create_todo(todo_description, admin_email)
_send_admin_email(
to_email=admin_email,
order_number=order_number,
service_name=service_name,
entity_name=entity_name,
frn=frn,
packet_minio_paths=packet_minio_paths,
approve_url=approve_url,
summary=summary,
)
def _create_todo(description: str, admin_email: str) -> None:
try:
from scripts.workers.erpnext_client import ERPNextClient
ERPNextClient().create_resource(
"ToDo",
{
"description": description,
"priority": "High",
"allocated_to": admin_email,
"status": "Open",
},
)
except Exception as exc:
logger.warning("auto_filing: could not create admin ToDo: %s", exc)
_EMAIL_TMPL = """\
<html>
<body style="font-family:Arial,Helvetica,sans-serif;color:#1f2937;">
<h2 style="color:#1a2744;margin:0 0 12px;">Filing ready for review</h2>
<p><strong>Order:</strong> {order_number}<br>
<strong>Carrier:</strong> {entity_name}<br>
<strong>FRN:</strong> {frn}<br>
<strong>Service:</strong> {service_name}</p>
<p>Auto-filing is <em>disabled</em>. The generated packet is staged in MinIO.
Review the documents, then click below to submit the filing to the FCC.</p>
<p><a href="{approve_url}"
style="display:inline-block;padding:12px 22px;background:#059669;
color:#fff;border-radius:4px;font-weight:600;text-decoration:none;">
Approve &amp; File
</a></p>
<h3 style="margin:22px 0 6px;color:#1a2744;">Packet files</h3>
<ul style="margin:0 0 12px 18px;padding:0;">
{files}
</ul>
{summary_block}
<p style="color:#64748b;font-size:12px;margin-top:24px;">
To enable auto-filing globally, set <code>auto_filing_enabled = 1</code>
in the ERPNext Compliance Settings doctype (or set
<code>AUTO_FILING_ENABLED=1</code> in the worker environment).
</p>
</body>
</html>
"""
def _send_admin_email(
*,
to_email: str,
order_number: str,
service_name: str,
entity_name: str,
frn: str,
packet_minio_paths: list[str],
approve_url: str,
summary: str,
) -> None:
smtp_host = os.environ.get("SMTP_HOST", "co.carrierone.com")
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_user = os.environ.get("SMTP_USER", "")
smtp_pass = os.environ.get("SMTP_PASSWORD", "")
smtp_from = os.environ.get("SMTP_FROM", "orders@performancewest.net")
if not smtp_user or not smtp_pass:
logger.warning("auto_filing: SMTP not configured — skipping admin email")
return
files_html = "\n ".join(
f"<li style=\"margin:3px 0;\"><code>{p}</code></li>"
for p in packet_minio_paths
) or "<li><em>(none)</em></li>"
summary_block = (
f"<h3 style=\"margin:22px 0 6px;color:#1a2744;\">Handler notes</h3>"
f"<pre style=\"background:#f8fafc;padding:10px;border-radius:4px;"
f"white-space:pre-wrap;font-size:12px;\">{summary}</pre>"
if summary else ""
)
html = _EMAIL_TMPL.format(
order_number=order_number,
entity_name=entity_name,
frn=frn or "N/A",
service_name=service_name,
approve_url=approve_url,
files=files_html,
summary_block=summary_block,
)
msg = MIMEMultipart("alternative")
msg["Subject"] = f"[Review & File] {order_number}{service_name}"
msg["From"] = smtp_from
msg["To"] = to_email
msg.attach(MIMEText(html, "html"))
try:
with smtplib.SMTP(smtp_host, smtp_port) as server:
if smtp_port != 25:
server.starttls()
server.login(smtp_user, smtp_pass)
server.sendmail(smtp_from, [to_email], msg.as_string())
logger.info("auto_filing: admin review email sent to %s", to_email)
except Exception as exc:
logger.warning("auto_filing: could not send admin email: %s", exc)
# ─── Per-order override writer (used by the API approve-and-file endpoint) ─
def set_order_override(sales_order_name: str) -> bool:
"""Flip ``custom_auto_filing_override`` to 1 on the Sales Order."""
try:
from scripts.workers.erpnext_client import ERPNextClient
ERPNextClient().set_value(
"Sales Order",
sales_order_name,
"custom_auto_filing_override",
1,
)
return True
except Exception as exc:
logger.warning("auto_filing: could not set override: %s", exc)
return False

View file

@ -0,0 +1,333 @@
"""FCC Form 499-A shared utilities.
De minimis calculator (Appendix A), safe-harbor percentage lookup, Line 612
filing-type detection, and Line 105 box-tick derivation.
Used by form_499a.py, form_499_initial.py, and the /validate endpoint's
Python counterpart (if we ever move validation server-side).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Optional
import psycopg2
import os
logger = logging.getLogger(__name__)
# ── Line 105 box-tick derivation ────────────────────────────────────────
# Mirrors site/src/lib/line_105_catalog.ts::derivedLine105Boxes. When a
# CLEC/IXC/Wireless has infra_type='reseller' or 'mvno', the handler
# automatically ticks the corresponding derived Line 105 box on the form.
LINE_105_BOX_NUMBERS = {
"voip_interconnected": 1,
"voip_non_interconnected": 2,
"clec": 3,
"ilec": 4,
"local_reseller": 5, # derived from clec + reseller
"toll_reseller": 6, # derived from ixc + reseller
"ixc": 7,
"wireless": 8,
"mvno": 9, # derived from wireless + mvno
"prepaid_calling_card": 10,
"private_line": 11,
"satellite": 12,
"payphone": 13,
"osp": 14,
"shared_tenant": 15,
"audio_bridging": 16,
"toll_free": 17,
"paging": 18,
"smr": 19,
"fixed_wireless": 20,
"mobile_satellite": 21,
"other": 22,
}
def derived_line_105_boxes(category_id: str, infra_type: Optional[str]) -> list[int]:
"""Return extra Line 105 boxes to tick because of the infra_type flag."""
boxes: list[int] = []
if infra_type == "reseller":
if category_id == "clec":
boxes.append(5)
elif category_id == "ixc":
boxes.append(6)
if infra_type == "mvno" and category_id == "wireless":
boxes.append(9)
return boxes
def all_line_105_boxes_to_tick(line_105_categories: list[dict]) -> list[int]:
"""Return every Line 105 box number to tick for this filer."""
boxes: set[int] = set()
for cat in line_105_categories or []:
cat_id = cat.get("id")
if cat_id and cat_id in LINE_105_BOX_NUMBERS:
boxes.add(LINE_105_BOX_NUMBERS[cat_id])
boxes.update(derived_line_105_boxes(cat_id, cat.get("infra_type")))
return sorted(boxes)
# ── Safe harbor lookup ──────────────────────────────────────────────────
SAFE_HARBOR_DISALLOWED_CATEGORIES = {"voip_non_interconnected"}
def _db_connect():
return psycopg2.connect(os.environ.get("DATABASE_URL", ""))
def load_safe_harbor_pct(form_year: int, category_id: str) -> Optional[float]:
"""Return the safe-harbor interstate % for (year, category), or None.
None is returned for categories that have no safe harbor (e.g.,
non-interconnected VoIP) or if the year/category combination isn't in
the fcc_safe_harbor_percentages seed table.
"""
if category_id in SAFE_HARBOR_DISALLOWED_CATEGORIES:
return None
try:
conn = _db_connect()
with conn.cursor() as cur:
cur.execute(
"SELECT interstate_pct FROM fcc_safe_harbor_percentages "
"WHERE form_year = %s AND line_105_category = %s",
(form_year, category_id),
)
row = cur.fetchone()
conn.close()
return float(row[0]) if row else None
except Exception as exc:
logger.warning("safe-harbor lookup failed: %s", exc)
return None
def safe_harbor_allowed(category_id: str) -> bool:
return category_id not in SAFE_HARBOR_DISALLOWED_CATEGORIES
# ── De minimis calculator (Appendix A, 11-line worksheet) ───────────────
@dataclass
class DeMinimisWorksheet:
"""Appendix A de minimis determination worksheet.
Every field corresponds to a line in the 2026 Form 499-A Appendix A.
Mirrors the PDF layout exactly so auditors can follow along.
"""
form_year: int
# Lines 1-4 — interstate/intl contribution bases for filer + affiliates
line_1_filer_interstate_cents: int = 0
line_2_filer_intl_cents: int = 0
line_3_affiliates_interstate_cents: int = 0
line_4_affiliates_intl_cents: int = 0
# Line 5 — consolidated interstate
line_5_consolidated_interstate_cents: int = 0
# Line 6 — consolidated interstate+intl (pre-LIRE exclusion)
line_6_consolidated_total_cents: int = 0
# Line 7 — interstate as % of consolidated total (LIRE test)
line_7_interstate_pct: float = 0.0
# Line 8 — LIRE exempt? (line_7 ≤ 12%)
line_8_lire_exempt: bool = False
# Line 9 — contribution base to test
line_9_contribution_base_cents: int = 0
# Line 10 — year-specific factor (0.256 for 2026)
line_10_factor: float = 0.0
# Line 11 — estimated annual contribution = line_9 × line_10
line_11_estimated_contrib_cents: int = 0
# Result
is_de_minimis: bool = False
threshold_usd: int = 10000
notes: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"form_year": self.form_year,
"line_1_filer_interstate_cents": self.line_1_filer_interstate_cents,
"line_2_filer_intl_cents": self.line_2_filer_intl_cents,
"line_3_affiliates_interstate_cents": self.line_3_affiliates_interstate_cents,
"line_4_affiliates_intl_cents": self.line_4_affiliates_intl_cents,
"line_5_consolidated_interstate_cents": self.line_5_consolidated_interstate_cents,
"line_6_consolidated_total_cents": self.line_6_consolidated_total_cents,
"line_7_interstate_pct": self.line_7_interstate_pct,
"line_8_lire_exempt": self.line_8_lire_exempt,
"line_9_contribution_base_cents": self.line_9_contribution_base_cents,
"line_10_factor": self.line_10_factor,
"line_11_estimated_contrib_cents": self.line_11_estimated_contrib_cents,
"is_de_minimis": self.is_de_minimis,
"threshold_usd": self.threshold_usd,
"notes": self.notes,
}
def load_deminimis_factor(form_year: int) -> float:
"""Return the Appendix A Line 10 factor for a form year.
Raises ValueError if the year isn't in the seed table — an unknown
form year is a code bug, not a graceful-fallback situation.
"""
try:
conn = _db_connect()
with conn.cursor() as cur:
cur.execute(
"SELECT factor FROM fcc_deminimis_factors WHERE form_year = %s",
(form_year,),
)
row = cur.fetchone()
conn.close()
except Exception as exc:
logger.error("deminimis factor lookup failed: %s", exc)
raise
if not row:
raise ValueError(f"No de minimis factor configured for form year {form_year}")
return float(row[0])
def calculate_de_minimis(
*,
form_year: int,
filer_total_revenue_cents: int,
filer_interstate_pct: float,
filer_international_pct: float,
affiliates: Optional[list[dict]] = None,
) -> DeMinimisWorksheet:
"""Compute the Appendix A de minimis worksheet.
affiliates is a list of {total_revenue_cents, interstate_pct,
international_pct} records for each affiliated filer. Empty list =
no affiliates.
"""
affiliates = affiliates or []
w = DeMinimisWorksheet(form_year=form_year)
# Line 1: filer interstate revenue
w.line_1_filer_interstate_cents = int(
filer_total_revenue_cents * (filer_interstate_pct / 100.0)
)
# Line 2: filer international revenue
w.line_2_filer_intl_cents = int(
filer_total_revenue_cents * (filer_international_pct / 100.0)
)
# Lines 3-4: affiliates
for a in affiliates:
tot = int(a.get("total_revenue_cents", 0))
ipct = float(a.get("interstate_pct", 0))
intl = float(a.get("international_pct", 0))
w.line_3_affiliates_interstate_cents += int(tot * ipct / 100.0)
w.line_4_affiliates_intl_cents += int(tot * intl / 100.0)
# Line 5: consolidated interstate
w.line_5_consolidated_interstate_cents = (
w.line_1_filer_interstate_cents + w.line_3_affiliates_interstate_cents
)
# Line 6: consolidated interstate + intl
w.line_6_consolidated_total_cents = w.line_5_consolidated_interstate_cents + (
w.line_2_filer_intl_cents + w.line_4_affiliates_intl_cents
)
# Line 7: interstate as % of consolidated total (guard /0)
if w.line_6_consolidated_total_cents > 0:
w.line_7_interstate_pct = round(
100.0 * w.line_5_consolidated_interstate_cents /
w.line_6_consolidated_total_cents,
4,
)
else:
w.line_7_interstate_pct = 0.0
# Line 8: LIRE exempt if interstate ≤ 12% of combined
w.line_8_lire_exempt = w.line_7_interstate_pct <= 12.0
# Line 9: contribution base to test — interstate + (0 if LIRE else intl)
intl_total = w.line_2_filer_intl_cents + w.line_4_affiliates_intl_cents
w.line_9_contribution_base_cents = (
w.line_5_consolidated_interstate_cents +
(0 if w.line_8_lire_exempt else intl_total)
)
# Line 10: year factor
w.line_10_factor = load_deminimis_factor(form_year)
# Line 11: estimated annual contribution
w.line_11_estimated_contrib_cents = int(
w.line_9_contribution_base_cents * w.line_10_factor
)
# Result: de minimis if < $10,000
w.is_de_minimis = w.line_11_estimated_contrib_cents < (w.threshold_usd * 100)
if w.is_de_minimis:
w.notes.append(
f"De minimis: estimated contribution ${w.line_11_estimated_contrib_cents/100:,.2f}"
f" < ${w.threshold_usd:,.0f} threshold."
)
else:
w.notes.append(
f"NOT de minimis: estimated contribution ${w.line_11_estimated_contrib_cents/100:,.2f}"
f" ≥ ${w.threshold_usd:,.0f} threshold."
)
if w.line_8_lire_exempt:
w.notes.append(
f"LIRE exempt: interstate ({w.line_7_interstate_pct:.2f}%) ≤ 12% "
f"of combined interstate+intl — international revenue excluded."
)
return w
# ── Line 612 filing-type detection ──────────────────────────────────────
def detect_filing_type(
*,
entity: dict,
current_year_filing_exists: bool = False,
revised_reason: Optional[str] = None,
) -> str:
"""Return one of: original_april_1, registration_new_filer,
revised_registration, revised_revenue.
"""
if not entity.get("filer_id_499"):
return "registration_new_filer"
if current_year_filing_exists:
if revised_reason == "registration":
return "revised_registration"
if revised_reason == "revenue":
return "revised_revenue"
return "original_april_1"
# ── TRS contribution base (Lines 512-514) ───────────────────────────────
# Revenue lines that roll up into the TRS contribution base.
# Line 418.4 (non-interconnected VoIP) is included ONLY in TRS base —
# it's excluded from USF/NANPA/LNP/ITSP bases. Line 511 subtracts.
TRS_BASE_LINE_KEYS = [
"line_403", "line_404", "line_404_1", "line_404_3",
"line_405", "line_406", "line_407", "line_408",
"line_409", "line_410", "line_411", "line_412",
"line_413", "line_414_1", "line_414_2",
"line_415", "line_416", "line_417",
"line_418_4", # TRS-only
]
def compute_trs_contribution_base(revenue_lines: dict) -> tuple[int, int, int]:
"""Return (line_512, line_513, line_514) in cents.
line_512 = Σ(TRS_BASE_LINE_KEYS) - line_511
line_513 = line_513 (uncollectible for TRS, provided by filer)
line_514 = line_512 - line_513
"""
line_512 = sum(int(revenue_lines.get(k, 0) or 0) for k in TRS_BASE_LINE_KEYS)
line_512 -= int(revenue_lines.get("line_511", 0) or 0)
line_513 = int(revenue_lines.get("line_513", 0) or 0)
line_514 = line_512 - line_513
return line_512, line_513, line_514

View file

@ -0,0 +1,179 @@
"""Helpers for reading/writing FCC filing state on telecom_entities.
The remediation handlers (RMD, CPNI, 499-A, BDC) all need to:
1. Check whether the filing is already on record for the current cycle
(idempotency don't double-submit if the customer ordered twice).
2. On success, persist the submission timestamp + confirmation number
so the next compliance checkup flips the deficiency to green.
Columns added by migration 047:
* rmd_last_cert_date / rmd_confirmation_number
* cpni_last_cert_date / cpni_confirmation_number
* form_499a_confirmation_number (uses existing last_filing_year)
* bdc_last_filing_date / bdc_confirmation_number
* stir_shaken_cert_issued_at
"""
from __future__ import annotations
import logging
import os
from datetime import datetime, timedelta
from typing import Optional
logger = logging.getLogger(__name__)
def _connect():
"""Open a psycopg2 connection using DATABASE_URL. Returns None on failure."""
try:
import psycopg2
return psycopg2.connect(os.environ.get("DATABASE_URL", ""))
except Exception as exc:
logger.error("filing_state: could not connect to PG: %s", exc)
return None
def _get_entity_field(entity_id: int, field: str) -> Optional[object]:
conn = _connect()
if conn is None:
return None
try:
with conn.cursor() as cur:
# Column names come from the handlers, not user input — safe to
# interpolate. psycopg2 does not parameterize identifiers.
cur.execute(
f"SELECT {field} FROM telecom_entities WHERE id = %s", # noqa: S608
(entity_id,),
)
row = cur.fetchone()
return row[0] if row else None
except Exception as exc:
logger.warning("filing_state: read %s.%s failed: %s", entity_id, field, exc)
return None
finally:
conn.close()
def _update_entity(entity_id: int, updates: dict[str, object]) -> bool:
"""UPDATE telecom_entities SET <updates> WHERE id = $id. Returns success."""
if not updates:
return True
conn = _connect()
if conn is None:
return False
try:
set_clause = ", ".join(f"{k} = %s" for k in updates.keys())
values = list(updates.values()) + [entity_id]
with conn.cursor() as cur:
cur.execute(
f"UPDATE telecom_entities SET {set_clause} WHERE id = %s", # noqa: S608
values,
)
conn.commit()
return True
except Exception as exc:
logger.warning("filing_state: update %s failed: %s", entity_id, exc)
return False
finally:
conn.close()
# ── Idempotency windows ─────────────────────────────────────────────────────
#
# RMD recertification is annual; USAC 499-A is annual (due April 1); CPNI is
# annual (due March 1); BDC is twice-yearly (Dec 1 / Jun 1). "Already filed
# this cycle" roughly means the last filing was within the window below.
RMD_CYCLE_DAYS = 365
CPNI_CYCLE_DAYS = 365
FORM_499A_CYCLE_DAYS = 365
BDC_CYCLE_DAYS = 180
def already_filed(
entity_id: int, filing_type: str, filing_year: Optional[int] = None,
) -> bool:
"""Return True if the filing is on record within its cycle window.
``filing_type`` is one of: "rmd", "cpni", "499a", "bdc".
``filing_year`` applies only to 499a callers filing past-due should
pass the target reporting year so we don't skip an older year just
because a newer one was already filed. If not provided, defaults to
the current year (preserving legacy behavior).
"""
column, cycle_days = {
"rmd": ("rmd_last_cert_date", RMD_CYCLE_DAYS),
"cpni": ("cpni_last_cert_date", CPNI_CYCLE_DAYS),
"499a": ("last_filing_year", None), # year-based, handled separately
"bdc": ("bdc_last_filing_date", BDC_CYCLE_DAYS),
}.get(filing_type, (None, None))
if column is None:
return False
value = _get_entity_field(entity_id, column)
if value is None:
return False
if filing_type == "499a":
try:
last_year = int(value)
check_year = filing_year if filing_year is not None else datetime.utcnow().year
# "Already filed" means the entity's last_filing_year equals or
# exceeds the year we're about to file. For past-due filings
# targeting an older year, use that year explicitly.
return last_year >= int(check_year)
except (TypeError, ValueError):
return False
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
return False
if not isinstance(value, datetime):
return False
# psycopg2 returns tz-aware datetimes when the column is TIMESTAMPTZ; strip
# tz for the arithmetic below.
if value.tzinfo is not None:
value = value.replace(tzinfo=None)
return value >= datetime.utcnow() - timedelta(days=cycle_days or 0)
# ── Success writers ─────────────────────────────────────────────────────────
def record_rmd_filing(entity_id: int, confirmation_number: str = "") -> bool:
return _update_entity(entity_id, {
"rmd_last_cert_date": datetime.utcnow(),
"rmd_confirmation_number": confirmation_number,
})
def record_cpni_filing(entity_id: int, confirmation_number: str = "") -> bool:
return _update_entity(entity_id, {
"cpni_last_cert_date": datetime.utcnow(),
"cpni_confirmation_number": confirmation_number,
})
def record_form_499a_filing(entity_id: int, confirmation_number: str = "") -> bool:
return _update_entity(entity_id, {
"last_filing_year": datetime.utcnow().year,
"form_499a_confirmation_number": confirmation_number,
})
def record_bdc_filing(entity_id: int, confirmation_number: str = "") -> bool:
return _update_entity(entity_id, {
"bdc_last_filing_date": datetime.utcnow(),
"bdc_confirmation_number": confirmation_number,
})
def record_stir_shaken_cert(entity_id: int, issued_at: Optional[datetime] = None) -> bool:
return _update_entity(entity_id, {
"stir_shaken_cert_issued_at": issued_at or datetime.utcnow(),
})

View file

@ -0,0 +1,221 @@
"""Undetected Playwright launcher for FCC / USAC / BDC portals.
All automated filing handlers (RMD, CPNI, Form 499-A, BDC) go through this
helper instead of importing ``playwright`` directly. We prefer ``patchright``
(a drop-in Playwright replacement that patches ``navigator.webdriver``, CDP
leakage, runtime-enable fingerprints, and the ``--disable-blink-features=
AutomationControlled`` artifact) and fall back to vanilla Playwright with
the same stealth init scripts we use in ``scripts/formation/base.py`` if
patchright is not installed.
State formation portals that sit behind Incapsula/Akamai (Nevada, Delaware,
etc.) should also use this helper see ``docs/state-automation-status.md``
for the list.
Optional residential proxy support: set ``UNDETECTED_PROXY_URL`` in the
environment (e.g. ``http://user:pass@proxy.example.com:8080``) and pass
``use_proxy=True`` when launching.
"""
from __future__ import annotations
import logging
import os
import random
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, AsyncIterator
if TYPE_CHECKING:
from playwright.async_api import Browser, BrowserContext, Page, Playwright
logger = logging.getLogger(__name__)
# Prefer patchright; fall back to playwright with manual stealth patches.
_USING_PATCHRIGHT = False
try:
from patchright.async_api import async_playwright # type: ignore
_USING_PATCHRIGHT = True
logger.info("undetected_browser: using patchright")
except ImportError:
from playwright.async_api import async_playwright # type: ignore
logger.warning(
"undetected_browser: patchright not installed — falling back to "
"vanilla playwright with home-grown stealth patches. Install with "
"`pip install patchright` and run `patchright install chromium` "
"for best results against bot-detection-heavy portals."
)
# Common modern Chrome UAs. We rotate between a handful so that a burst of
# concurrent submissions doesn't all look like the same client.
_USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
]
# Common viewports; mildly jittered per-launch to vary fingerprint.
_VIEWPORTS = [
{"width": 1280, "height": 900},
{"width": 1440, "height": 900},
{"width": 1536, "height": 864},
{"width": 1920, "height": 1080},
]
# Init script run on every page — only used on the vanilla-playwright path;
# patchright handles all of these patches (and many more) internally.
_STEALTH_INIT_SCRIPT = """
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5].map(() => ({ name: 'Chrome PDF Plugin' })),
});
Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });
window.chrome = { runtime: {} };
const originalQuery = window.navigator.permissions && window.navigator.permissions.query;
if (originalQuery) {
window.navigator.permissions.query = (parameters) =>
parameters.name === 'notifications'
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters);
}
"""
def _proxy_config() -> dict | None:
"""Read UNDETECTED_PROXY_URL and turn it into a Playwright proxy dict."""
url = os.environ.get("UNDETECTED_PROXY_URL", "").strip()
if not url:
return None
# Playwright's proxy dict supports: server, username, password, bypass
from urllib.parse import urlparse
parsed = urlparse(url)
server = f"{parsed.scheme}://{parsed.hostname}"
if parsed.port:
server += f":{parsed.port}"
cfg: dict = {"server": server}
if parsed.username:
cfg["username"] = parsed.username
if parsed.password:
cfg["password"] = parsed.password
return cfg
async def launch_context(
playwright: "Playwright",
*,
headless: bool = True,
use_proxy: bool = False,
timezone_id: str = "America/New_York",
locale: str = "en-US",
storage_state: str | None = None,
) -> "tuple[Browser, BrowserContext]":
"""Launch a Chromium browser + context with stealth settings.
Returns ``(browser, context)`` caller is responsible for closing both
(prefer the :func:`undetected_browser` context manager instead).
"""
launch_args = [
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
]
# On the vanilla-playwright path we add the extra flag that hides the
# AutomationControlled fingerprint. Patchright already does this (and
# adding the flag with patchright is harmless but redundant).
if not _USING_PATCHRIGHT:
launch_args.append("--disable-blink-features=AutomationControlled")
browser = await playwright.chromium.launch(
headless=headless,
args=launch_args,
)
context_kwargs: dict = {
"viewport": random.choice(_VIEWPORTS),
"user_agent": random.choice(_USER_AGENTS),
"locale": locale,
"timezone_id": timezone_id,
"java_script_enabled": True,
}
if use_proxy:
proxy = _proxy_config()
if proxy:
context_kwargs["proxy"] = proxy
else:
logger.warning(
"undetected_browser: use_proxy=True but UNDETECTED_PROXY_URL is unset"
)
if storage_state:
context_kwargs["storage_state"] = storage_state
context = await browser.new_context(**context_kwargs)
if not _USING_PATCHRIGHT:
await context.add_init_script(_STEALTH_INIT_SCRIPT)
return browser, context
@asynccontextmanager
async def undetected_browser(
*,
headless: bool = True,
use_proxy: bool = False,
timezone_id: str = "America/New_York",
locale: str = "en-US",
storage_state: str | None = None,
) -> AsyncIterator["tuple[BrowserContext, Page]"]:
"""Async context manager yielding a (context, page) pair.
Example::
async with undetected_browser(headless=False) as (ctx, page):
await page.goto("https://apps.fcc.gov/rmd/")
...
"""
async with async_playwright() as pw:
browser, context = await launch_context(
pw,
headless=headless,
use_proxy=use_proxy,
timezone_id=timezone_id,
locale=locale,
storage_state=storage_state,
)
try:
page = await context.new_page()
yield context, page
finally:
await context.close()
await browser.close()
# ─── Human-like interaction helpers (lifted from scripts/formation/base.py) ──
async def human_delay(min_s: float = 1.0, max_s: float = 3.0) -> None:
"""Random delay to appear human. Mirrors the formation base helper."""
import asyncio
await asyncio.sleep(random.uniform(min_s, max_s))
async def type_slowly(page: "Page", selector: str, text: str, delay_ms: int = 50) -> None:
"""Type text character-by-character with jitter."""
await page.click(selector)
for char in text:
await page.type(selector, char, delay=delay_ms + random.randint(0, 30))
def is_using_patchright() -> bool:
"""Return True if patchright is the active backend."""
return _USING_PATCHRIGHT