Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1430 lines
65 KiB
Python
1430 lines
65 KiB
Python
"""FCC Form 499-A (+ optional 499-Q) filing handler.
|
|
|
|
The Form 499-A is the annual telecommunications revenue report filed with
|
|
USAC via E-File (https://forms.universalservice.org/). Due April 1 each
|
|
year. Form 499-Q is the companion quarterly filing.
|
|
|
|
This handler is structured around a phased Playwright session:
|
|
_phase_block_1 — Line 103-112 identification (multi-select Line 105)
|
|
_phase_block_2 — Lines 201-228 contacts / officers / jurisdictions
|
|
_phase_blocks_3_4 — Lines 303-422 revenue schedules (LINE_FILL_MAP)
|
|
_phase_block_5 — Lines 503-514 LNPA regions + Line 511 + TRS base
|
|
_phase_block_6 — Lines 603-612 exemption certs + nondisclosure + filing type
|
|
_phase_submit_traffic_study — stamps + uploads the traffic study alongside
|
|
|
|
Pre-flight:
|
|
* De minimis Appendix A calculation (fcc_499_utils.calculate_de_minimis)
|
|
* Safe-harbor election validation (no safe harbor for non-interconnected VoIP)
|
|
* LNPA region sums (100% per column)
|
|
|
|
Idempotency: if ``last_filing_year >= current_year``, skip the portal
|
|
submission and return just the prep packet.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import date, datetime
|
|
from typing import Any, Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
from .base_handler import BaseServiceHandler
|
|
from .telecom import filing_state
|
|
from .telecom.auto_filing import check_auto_filing, request_admin_review
|
|
from .telecom.fcc_499_utils import (
|
|
all_line_105_boxes_to_tick,
|
|
calculate_de_minimis,
|
|
compute_trs_contribution_base,
|
|
detect_filing_type,
|
|
load_safe_harbor_pct,
|
|
safe_harbor_allowed,
|
|
)
|
|
from .telecom.undetected_browser import undetected_browser, human_delay, type_slowly
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
USAC_EFILE_URL = os.environ.get(
|
|
"USAC_EFILE_URL", "https://forms.universalservice.org/"
|
|
)
|
|
USAC_STORAGE_STATE = os.environ.get(
|
|
"USAC_EFILE_STORAGE_STATE", "/app/data/usac_efile_session.json"
|
|
)
|
|
|
|
|
|
# ── LINE_FILL_MAP ──────────────────────────────────────────────────────
|
|
# Every revenue line we know how to populate. Each entry:
|
|
# line_no — "303.2" as shown on the form
|
|
# selector — CSS selector template; {col} is optional and
|
|
# expands to "intra", "inter", or "intl" for the
|
|
# three column cells
|
|
# source_key — dotted key on intake_data.revenue (e.g., "line_303_2");
|
|
# value may be int cents OR dict {cents, intra, inter, intl}
|
|
# categories — optional list of primary Line 105 categories this line
|
|
# applies to; None means universal
|
|
# federal_only — True for Line 403 federal USF surcharges (100% interstate)
|
|
#
|
|
# The handler iterates this map and writes every applicable line per the
|
|
# safe-harbor election (traffic study / safe harbor / actual data).
|
|
LINE_FILL_MAP: list[dict] = [
|
|
# ── Block 3: Carrier's-carrier revenue (Lines 303-315) ───────────────
|
|
{"line_no": "303", "selector": "input[name='line_303_{col}']",
|
|
"source_key": "line_303", "categories": None},
|
|
{"line_no": "303.1", "selector": "input[name='line_303_1_{col}']",
|
|
"source_key": "line_303_1", "categories": ["clec", "ilec"]},
|
|
{"line_no": "303.2", "selector": "input[name='line_303_2_{col}']",
|
|
"source_key": "line_303_2", "categories": ["voip_interconnected"]},
|
|
{"line_no": "304.1", "selector": "input[name='line_304_1_{col}']",
|
|
"source_key": "line_304_1", "categories": ["clec", "ilec", "ixc"]},
|
|
{"line_no": "304.2", "selector": "input[name='line_304_2_{col}']",
|
|
"source_key": "line_304_2", "categories": ["clec", "ilec", "ixc"]},
|
|
{"line_no": "305.1", "selector": "input[name='line_305_1_{col}']",
|
|
"source_key": "line_305_1", "categories": ["clec", "ilec", "private_line"]},
|
|
{"line_no": "305.2", "selector": "input[name='line_305_2_{col}']",
|
|
"source_key": "line_305_2", "categories": ["clec", "ilec", "private_line"]},
|
|
{"line_no": "309", "selector": "input[name='line_309_{col}']",
|
|
"source_key": "line_309", "categories": ["wireless"]},
|
|
{"line_no": "311", "selector": "input[name='line_311_{col}']",
|
|
"source_key": "line_311", "categories": None},
|
|
{"line_no": "312", "selector": "input[name='line_312_{col}']",
|
|
"source_key": "line_312", "categories": None},
|
|
{"line_no": "313", "selector": "input[name='line_313_{col}']",
|
|
"source_key": "line_313", "categories": ["satellite", "mobile_satellite"]},
|
|
{"line_no": "314", "selector": "input[name='line_314_{col}']",
|
|
"source_key": "line_314", "categories": None},
|
|
{"line_no": "315", "selector": "input[name='line_315_{col}']",
|
|
"source_key": "line_315", "categories": None},
|
|
# ── Block 4: End-user revenue (Lines 403-418) ────────────────────────
|
|
{"line_no": "403", "selector": "input[name='line_403_{col}']",
|
|
"source_key": "line_403_federal", "categories": None, "federal_only": True},
|
|
{"line_no": "403s", "selector": "input[name='line_403_state_{col}']",
|
|
"source_key": "line_403_state", "categories": None},
|
|
{"line_no": "404", "selector": "input[name='line_404_{col}']",
|
|
"source_key": "line_404", "categories": None},
|
|
{"line_no": "404.1", "selector": "input[name='line_404_1_{col}']",
|
|
"source_key": "line_404_1", "categories": None},
|
|
{"line_no": "404.3", "selector": "input[name='line_404_3_{col}']",
|
|
"source_key": "line_404_3", "categories": None},
|
|
{"line_no": "405", "selector": "input[name='line_405_{col}']",
|
|
"source_key": "line_405", "categories": ["clec", "ilec"]},
|
|
{"line_no": "406", "selector": "input[name='line_406_{col}']",
|
|
"source_key": "line_406", "categories": None},
|
|
{"line_no": "407", "selector": "input[name='line_407_{col}']",
|
|
"source_key": "line_407", "categories": ["payphone"]},
|
|
{"line_no": "408", "selector": "input[name='line_408_{col}']",
|
|
"source_key": "line_408", "categories": None},
|
|
{"line_no": "409", "selector": "input[name='line_409_{col}']",
|
|
"source_key": "line_409", "categories": ["wireless"]},
|
|
{"line_no": "410", "selector": "input[name='line_410_{col}']",
|
|
"source_key": "line_410", "categories": ["wireless"]},
|
|
{"line_no": "411", "selector": "input[name='line_411_{col}']",
|
|
"source_key": "line_411", "categories": ["prepaid_calling_card"]},
|
|
{"line_no": "412", "selector": "input[name='line_412_{col}']",
|
|
"source_key": "line_412", "categories": None},
|
|
{"line_no": "413", "selector": "input[name='line_413_{col}']",
|
|
"source_key": "line_413", "categories": None},
|
|
{"line_no": "414.1", "selector": "input[name='line_414_1_{col}']",
|
|
"source_key": "line_414_1", "categories": None},
|
|
{"line_no": "414.2", "selector": "input[name='line_414_2_{col}']",
|
|
"source_key": "line_414_2", "categories": ["voip_interconnected"]},
|
|
{"line_no": "415", "selector": "input[name='line_415_{col}']",
|
|
"source_key": "line_415", "categories": None},
|
|
{"line_no": "416", "selector": "input[name='line_416_{col}']",
|
|
"source_key": "line_416", "categories": ["satellite"]},
|
|
{"line_no": "417", "selector": "input[name='line_417_{col}']",
|
|
"source_key": "line_417", "categories": None},
|
|
{"line_no": "418.1", "selector": "input[name='line_418_1_{col}']",
|
|
"source_key": "line_418_1", "categories": None},
|
|
{"line_no": "418.2", "selector": "input[name='line_418_2_{col}']",
|
|
"source_key": "line_418_2", "categories": None},
|
|
{"line_no": "418.3", "selector": "input[name='line_418_3_{col}']",
|
|
"source_key": "line_418_3", "categories": None},
|
|
{"line_no": "418.4", "selector": "input[name='line_418_4_{col}']",
|
|
"source_key": "line_418_4", "categories": ["voip_non_interconnected"]},
|
|
# ── Block 4-B: Uncollectibles (Lines 421-423 — single column) ────────
|
|
{"line_no": "421", "selector": "input[name='line_421']",
|
|
"source_key": "line_421", "single_column": True, "categories": None},
|
|
{"line_no": "422", "selector": "input[name='line_422']",
|
|
"source_key": "line_422", "single_column": True, "categories": None},
|
|
]
|
|
|
|
|
|
class Form499AHandler(BaseServiceHandler):
|
|
SERVICE_SLUG = "fcc-499a"
|
|
SERVICE_NAME = "FCC Form 499-A Filing"
|
|
REQUIRES_LLM = False
|
|
|
|
# When True, the handler also schedules the four 499-Q compliance
|
|
# calendar entries after submission. Flipped on by the bundle subclass.
|
|
SCHEDULE_499Q = False
|
|
|
|
async def process(self, order_data: dict) -> list[str]:
|
|
work_dir = self._make_work_dir()
|
|
order_number = order_data["name"]
|
|
entity = order_data.get("entity", {}) or {}
|
|
entity_id = entity.get("id")
|
|
intake = order_data.get("intake_data") or {}
|
|
date_str = datetime.now().strftime("%Y%m%d")
|
|
generated: list[str] = []
|
|
|
|
# Multi-year mode (migration 060): when multi_year_filings has 2+
|
|
# years, run this handler once per year with each year pinned as
|
|
# form_year_override. Persist per-year confirmations to
|
|
# compliance_orders.multi_year_confirmations.
|
|
multi_year = order_data.get("multi_year_filings") or []
|
|
if multi_year and len(multi_year) >= 2:
|
|
all_generated: list[str] = []
|
|
year_conf_records: list[dict] = []
|
|
for y in sorted(multi_year):
|
|
logger.info(
|
|
"Form499AHandler: multi-year catch-up — filing year %s for order %s",
|
|
y, order_number,
|
|
)
|
|
per_year_order = dict(order_data)
|
|
per_year_order["form_year_override"] = int(y)
|
|
# Each year is a one-off "past-due" to USAC (unless it's
|
|
# the current year).
|
|
per_year_order["filing_mode"] = "past_due" if y < (datetime.utcnow().year) else "current"
|
|
# Disable multi_year on the recursive call so we don't loop
|
|
per_year_order["multi_year_filings"] = None
|
|
year_result = await self._process_single_year(per_year_order, work_dir)
|
|
all_generated.extend(year_result["artifacts"])
|
|
year_conf_records.append({
|
|
"year": int(y),
|
|
"confirmation": year_result["confirmation"],
|
|
})
|
|
self._persist_multi_year_confirmations(order_number, year_conf_records)
|
|
return all_generated
|
|
|
|
result = await self._process_single_year(order_data, work_dir)
|
|
return result["artifacts"]
|
|
|
|
async def _process_single_year(self, order_data: dict, work_dir: str) -> dict:
|
|
"""Former body of process() — single-year filing. Returns
|
|
{artifacts, confirmation}."""
|
|
order_number = order_data["name"]
|
|
entity = order_data.get("entity", {}) or {}
|
|
entity_id = entity.get("id")
|
|
intake = order_data.get("intake_data") or {}
|
|
date_str = datetime.now().strftime("%Y%m%d")
|
|
generated: list[str] = []
|
|
confirmation = ""
|
|
|
|
# ── Filing mode (migration 058) ─────────────────────────────────
|
|
# order_data passes filing_mode + form_year_override + revises_order_number
|
|
# + revised_reason through from the compliance_orders row.
|
|
filing_mode = order_data.get("filing_mode") or "current"
|
|
form_year_override = order_data.get("form_year_override")
|
|
revises_order_number = order_data.get("revises_order_number")
|
|
|
|
# For revisions: merge the prior order's intake_data into ours so
|
|
# the filer doesn't have to re-enter unchanged fields. Explicit
|
|
# fields on THIS order win over the prior.
|
|
if filing_mode == "revised" and revises_order_number:
|
|
prior_intake = self._load_prior_order_intake(revises_order_number)
|
|
if prior_intake:
|
|
# Merge: current intake_data has precedence
|
|
merged = {**prior_intake, **intake}
|
|
# But merge revenue sub-object specifically to preserve new fields
|
|
if "revenue" in prior_intake and "revenue" in intake:
|
|
merged["revenue"] = {**prior_intake["revenue"], **intake["revenue"]}
|
|
intake = merged
|
|
order_data["intake_data"] = intake
|
|
|
|
# Resolve the reporting year: form_year_override wins over intake.form_year
|
|
# wins over "last year".
|
|
if form_year_override:
|
|
intake["form_year"] = int(form_year_override)
|
|
|
|
# Stash filing-mode metadata on intake_data under __-prefixed keys
|
|
# so _submit_to_usac + Block 6 can read it without changing every
|
|
# call signature.
|
|
intake["__filing_mode"] = filing_mode
|
|
intake["__revises_order_number"] = revises_order_number
|
|
intake["__revised_reason"] = order_data.get("revised_reason")
|
|
if revises_order_number:
|
|
intake["__prior_confirmation_number"] = self._load_prior_confirmation(
|
|
revises_order_number,
|
|
)
|
|
|
|
# ── Generate the prep packet (unchanged from prior handler) ────
|
|
generated.extend(self._generate_prep_packet(
|
|
order_number, entity, intake, work_dir, date_str,
|
|
))
|
|
|
|
# ── Idempotency ─────────────────────────────────────────────────
|
|
# For revised filings we explicitly DO NOT skip — revisions are
|
|
# legitimate re-submissions. For past-due, pass the reporting
|
|
# year so a filer catching up on 2023 doesn't get blocked just
|
|
# because their 2024 filing already landed.
|
|
target_year = int(intake.get("form_year") or datetime.utcnow().year)
|
|
if (
|
|
filing_mode != "revised"
|
|
and entity_id
|
|
and filing_state.already_filed(entity_id, "499a", target_year)
|
|
):
|
|
logger.info(
|
|
"Form499AHandler: already filed for entity %s in year %s",
|
|
entity_id, datetime.utcnow().year,
|
|
)
|
|
if self.SCHEDULE_499Q:
|
|
self._schedule_499q_calendar(order_number, entity)
|
|
return {"artifacts": generated, "confirmation": ""}
|
|
|
|
# ── Auto-filing toggle ──────────────────────────────────────────
|
|
decision = check_auto_filing(order_data)
|
|
if not decision.may_submit:
|
|
logger.info(
|
|
"Form499AHandler: %s — staging for admin review (order=%s)",
|
|
decision.reason, order_number,
|
|
)
|
|
request_admin_review(
|
|
order_number=order_number,
|
|
service_slug=self.SERVICE_SLUG,
|
|
service_name=self.SERVICE_NAME,
|
|
entity_name=entity.get("legal_name", ""),
|
|
frn=entity.get("frn", ""),
|
|
packet_minio_paths=[f"compliance/{order_number}/{os.path.basename(p)}" for p in generated],
|
|
admin_email=decision.admin_email,
|
|
summary=(
|
|
f"499-A prep packet ready. Filer ID: {entity.get('filer_id_499', 'N/A')}. "
|
|
f"Submit via USAC E-File at {USAC_EFILE_URL}."
|
|
),
|
|
)
|
|
return {"artifacts": generated, "confirmation": "admin_review"}
|
|
|
|
# ── De minimis pre-flight ───────────────────────────────────────
|
|
form_year = int(intake.get("form_year") or (datetime.utcnow().year))
|
|
waive_deminimis = bool(order_data.get("waive_deminimis_exemption"))
|
|
try:
|
|
worksheet = calculate_de_minimis(
|
|
form_year=form_year,
|
|
filer_total_revenue_cents=int(entity.get("total_revenue_cents") or 0),
|
|
filer_interstate_pct=float(entity.get("interstate_pct") or 0),
|
|
filer_international_pct=float(entity.get("international_pct") or 0),
|
|
affiliates=self._load_affiliate_revenue(entity),
|
|
)
|
|
self._persist_deminimis_worksheet(order_number, worksheet)
|
|
if worksheet.is_de_minimis and waive_deminimis:
|
|
worksheet.notes.append(
|
|
"Filer qualifies as de minimis but ELECTED to waive "
|
|
"exemption and file as a regular contributor. "
|
|
"Reason on file: "
|
|
+ (order_data.get("waive_deminimis_reason") or "(none recorded)")
|
|
)
|
|
# Thread through to Block 6: unset the "de minimis" checkbox
|
|
entity["is_deminimis"] = False
|
|
intake["is_deminimis"] = False
|
|
elif worksheet.is_de_minimis and not entity.get("is_deminimis"):
|
|
logger.warning(
|
|
"Form499AHandler: Appendix A shows DE MINIMIS but entity "
|
|
"flag says not — check with customer before filing",
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("De minimis calc failed: %s", exc)
|
|
|
|
# ── Portal submission ───────────────────────────────────────────
|
|
confirmation_path, confirmation_number = await self._submit_to_usac(
|
|
order_number=order_number,
|
|
entity=entity,
|
|
intake_data=intake,
|
|
work_dir=work_dir,
|
|
)
|
|
if confirmation_path:
|
|
generated.append(confirmation_path)
|
|
|
|
if entity_id and confirmation_number:
|
|
filing_state.record_form_499a_filing(entity_id, confirmation_number)
|
|
|
|
# For revised filings, persist the prior-confirmation link on
|
|
# THIS order so the amendment chain is queryable.
|
|
if filing_mode == "revised" and confirmation_number:
|
|
self._persist_revision_link(
|
|
order_number,
|
|
intake.get("__prior_confirmation_number"),
|
|
)
|
|
|
|
if self.SCHEDULE_499Q and confirmation_number:
|
|
self._schedule_499q_calendar(order_number, entity)
|
|
|
|
return {"artifacts": generated, "confirmation": confirmation_number}
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Prep packet (unchanged from prior handler — kept intact)
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _generate_prep_packet(
|
|
self, order_number: str, entity: dict, intake: dict,
|
|
work_dir: str, date_str: str,
|
|
) -> list[str]:
|
|
generated: list[str] = []
|
|
|
|
from scripts.document_gen.templates.fcc_499a_checklist_generator import (
|
|
generate_499a_checklist,
|
|
)
|
|
checklist_docx = os.path.join(
|
|
work_dir, f"fcc_499a_checklist_{order_number}_{date_str}.docx",
|
|
)
|
|
checklist = generate_499a_checklist(
|
|
entity_name=entity.get("legal_name", ""),
|
|
frn=entity.get("frn", ""),
|
|
filer_id_499=entity.get("filer_id_499", ""),
|
|
address_street=entity.get("address_street", ""),
|
|
address_city=entity.get("address_city", ""),
|
|
address_state=entity.get("address_state", ""),
|
|
address_zip=entity.get("address_zip", ""),
|
|
filer_type=entity.get("carrier_category", "interconnected_voip"),
|
|
infra_type=entity.get("infra_type", "facilities"),
|
|
service_categories=entity.get("service_categories", []) or [],
|
|
is_deminimis=entity.get("is_deminimis", False),
|
|
is_lire=entity.get("is_lire", False),
|
|
total_revenue_cents=entity.get("total_revenue_cents", 0),
|
|
interstate_pct=entity.get("interstate_pct", 0),
|
|
international_pct=entity.get("international_pct", 0),
|
|
last_filing_year=entity.get("last_filing_year", 0),
|
|
output_path=checklist_docx,
|
|
)
|
|
if checklist:
|
|
generated.append(checklist)
|
|
try:
|
|
generated.append(self._convert_to_pdf(checklist))
|
|
except Exception as exc:
|
|
logger.warning("499-A checklist PDF conversion failed: %s", exc)
|
|
|
|
try:
|
|
from scripts.document_gen.templates.form_499a_revenue_workbook_generator import (
|
|
generate_499a_revenue_workbook,
|
|
)
|
|
workbook_path = os.path.join(
|
|
work_dir, f"fcc_499a_revenue_workbook_{order_number}_{date_str}.xlsx",
|
|
)
|
|
traffic_study = self._load_traffic_study(entity.get("id"))
|
|
wb_result = generate_499a_revenue_workbook(
|
|
entity_name=entity.get("legal_name", ""),
|
|
filer_id_499=entity.get("filer_id_499", ""),
|
|
frn=entity.get("frn", ""),
|
|
reporting_year=int(entity.get("last_filing_year") or 0)
|
|
or (datetime.utcnow().year - 1),
|
|
traffic_study=traffic_study,
|
|
output_path=workbook_path,
|
|
)
|
|
if wb_result:
|
|
generated.append(wb_result)
|
|
except Exception as exc:
|
|
logger.warning("499-A revenue workbook generation failed: %s", exc)
|
|
|
|
return generated
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# USAC submission — orchestrates every phase
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def _submit_to_usac(
|
|
self, *,
|
|
order_number: str,
|
|
entity: dict,
|
|
intake_data: dict,
|
|
work_dir: str,
|
|
) -> tuple[Optional[str], str]:
|
|
filer_id = (entity.get("filer_id_499") or "").strip()
|
|
if not filer_id:
|
|
self._create_admin_todo(
|
|
order_number,
|
|
"Form 499-A submission requires a USAC Filer ID but none was "
|
|
"on file. Capture the 499 Filer ID on the telecom entity and "
|
|
"re-dispatch.",
|
|
)
|
|
return None, ""
|
|
|
|
storage_state = (
|
|
USAC_STORAGE_STATE if os.path.exists(USAC_STORAGE_STATE) else None
|
|
)
|
|
confirmation_path = os.path.join(
|
|
work_dir, f"fcc_499a_confirmation_{order_number}.pdf",
|
|
)
|
|
confirmation_number = ""
|
|
|
|
try:
|
|
async with undetected_browser(
|
|
headless=True,
|
|
storage_state=storage_state,
|
|
) as (ctx, page):
|
|
await page.goto(USAC_EFILE_URL, wait_until="domcontentloaded")
|
|
await human_delay(1.5, 3.0)
|
|
|
|
if "login" in page.url.lower():
|
|
self._create_admin_todo(
|
|
order_number,
|
|
f"USAC E-File required login for Filer ID {filer_id}. "
|
|
"Run the FCC Access Helper Chrome extension to authorize "
|
|
"filings@performancewest.net on this carrier, export the "
|
|
f"session to {USAC_STORAGE_STATE}, then re-dispatch "
|
|
f"order {order_number}.",
|
|
)
|
|
return None, ""
|
|
|
|
# Filing mode branches:
|
|
filing_mode = (
|
|
intake_data.get("__filing_mode")
|
|
or getattr(self, "_filing_mode", None)
|
|
or "current"
|
|
)
|
|
if filing_mode == "past_due":
|
|
ok = await self._phase_past_due_intro(
|
|
page, {"form_year_override": intake_data.get("form_year")},
|
|
)
|
|
if not ok:
|
|
self._create_admin_todo(
|
|
order_number,
|
|
f"Past-due filing for year {intake_data.get('form_year')} "
|
|
f"could not navigate to USAC historical worksheet. "
|
|
f"File manually and record confirmation.",
|
|
)
|
|
return None, ""
|
|
elif filing_mode == "revised":
|
|
ok = await self._phase_revised_intro(
|
|
page, entity, {
|
|
"revises_order_number": intake_data.get("__revises_order_number"),
|
|
"prior_confirmation_number": intake_data.get("__prior_confirmation_number"),
|
|
},
|
|
)
|
|
if not ok:
|
|
self._create_admin_todo(
|
|
order_number,
|
|
f"Revised filing (amendment of "
|
|
f"{intake_data.get('__revises_order_number')}) could not "
|
|
f"locate the prior filing on USAC E-File. Check the "
|
|
f"prior confirmation number and re-dispatch.",
|
|
)
|
|
return None, ""
|
|
else:
|
|
await page.click('text="Form 499-A"')
|
|
await human_delay()
|
|
|
|
revenue_lines = self._build_revenue_lines(entity, intake_data)
|
|
|
|
await self._phase_block_1(page, entity, intake_data)
|
|
await self._phase_block_2(page, entity, intake_data)
|
|
await self._phase_blocks_3_4(page, entity, intake_data, revenue_lines)
|
|
await self._phase_block_5(page, entity, intake_data, revenue_lines)
|
|
await self._phase_block_6(page, entity, intake_data)
|
|
await self._phase_submit_traffic_study(page, entity, work_dir)
|
|
|
|
await human_delay(1.5, 3.0)
|
|
await page.click('button:has-text("Review")')
|
|
await page.wait_for_selector("text=Review & Submit", timeout=30000)
|
|
await page.click('button:has-text("Submit")')
|
|
await page.wait_for_selector("text=Confirmation", timeout=90000)
|
|
|
|
body = await page.locator("body").inner_text()
|
|
for line in body.splitlines():
|
|
if "Confirmation" in line or "Filing ID" in line:
|
|
parts = line.split(":", 1)
|
|
if len(parts) == 2 and parts[1].strip():
|
|
confirmation_number = parts[1].strip()
|
|
break
|
|
|
|
await page.pdf(path=confirmation_path, format="Letter")
|
|
|
|
logger.info(
|
|
"Form499AHandler: filed for %s (Filer ID %s), confirmation %s",
|
|
entity.get("legal_name", ""), filer_id, confirmation_number,
|
|
)
|
|
return confirmation_path, confirmation_number
|
|
|
|
except Exception as exc:
|
|
logger.exception("Form499AHandler: USAC submission failed: %s", exc)
|
|
self._create_admin_todo(
|
|
order_number,
|
|
f"USAC 499-A submission failed for Filer ID {filer_id}: {exc}. "
|
|
"Prep packet is in MinIO; file manually at "
|
|
"https://forms.universalservice.org/.",
|
|
)
|
|
return None, ""
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Phase: Block 1 — identification (multi-select Line 105)
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def _phase_block_1(self, page, entity: dict, intake: dict) -> None:
|
|
# Line 103: legal name
|
|
await page.fill('input[name="company_name"]', entity.get("legal_name", ""))
|
|
# Line 104: DBA / principal trade name
|
|
if entity.get("dba_name"):
|
|
await page.fill('input[name="dba_name"]', entity["dba_name"])
|
|
# Line 106: affiliated filer info
|
|
if entity.get("affiliated_filer_name"):
|
|
await page.fill('input[name="affiliated_filer_name"]',
|
|
entity["affiliated_filer_name"])
|
|
await page.fill('input[name="affiliated_filer_ein"]',
|
|
entity.get("affiliated_filer_ein", ""))
|
|
# Line 108: management company
|
|
if entity.get("management_company_name"):
|
|
await page.fill('input[name="management_company_name"]',
|
|
entity["management_company_name"])
|
|
# Line 109: FRN
|
|
if entity.get("frn"):
|
|
await page.fill('input[name="frn"]', entity["frn"])
|
|
# Line 110: EIN
|
|
if entity.get("ein"):
|
|
await page.fill('input[name="ein"]', str(entity["ein"]).replace("-", ""))
|
|
|
|
# Line 112: trade names (one checkbox row per name)
|
|
trade_names = entity.get("trade_names") or []
|
|
for i, tn in enumerate(trade_names[:10]):
|
|
try:
|
|
await page.fill(f'input[name="trade_name_{i}"]', tn)
|
|
except Exception:
|
|
pass
|
|
|
|
# Line 105: ranked multi-select — tick every box and set rank
|
|
categories = entity.get("line_105_categories") or []
|
|
boxes = all_line_105_boxes_to_tick(categories)
|
|
for box_num in boxes:
|
|
try:
|
|
await page.check(f'input[name="line_105_box_{box_num}"]')
|
|
# Rank = position in categories list (primary=1, secondary=2, ...)
|
|
rank = next(
|
|
(i + 1 for i, c in enumerate(categories)
|
|
if c.get("id") in _box_to_category_ids(box_num)),
|
|
None,
|
|
)
|
|
if rank:
|
|
await page.fill(f'input[name="line_105_rank_{box_num}"]', str(rank))
|
|
except Exception as exc:
|
|
logger.debug("Could not set Line 105 box %s: %s", box_num, exc)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Phase: Block 2 — contacts / officers / jurisdictions
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def _phase_block_2(self, page, entity: dict, intake: dict) -> None:
|
|
# Block 2-A: regulatory contact (Lines 203-208)
|
|
await page.fill('input[name="regulatory_contact_name"]',
|
|
entity.get("regulatory_contact_name", ""))
|
|
await page.fill('input[name="regulatory_contact_email"]',
|
|
entity.get("regulatory_contact_email", ""))
|
|
await page.fill('input[name="regulatory_contact_phone"]',
|
|
entity.get("regulatory_contact_phone", ""))
|
|
if entity.get("itsp_regulatory_fee_email"):
|
|
await page.fill('input[name="itsp_regulatory_fee_email"]',
|
|
entity["itsp_regulatory_fee_email"])
|
|
|
|
# Block 2-B: D.C. agent for service of process (Lines 209-218)
|
|
for key, selector in [
|
|
("dc_agent_company", 'input[name="dc_agent_company"]'),
|
|
("dc_agent_street", 'input[name="dc_agent_street"]'),
|
|
("dc_agent_city", 'input[name="dc_agent_city"]'),
|
|
("dc_agent_state", 'input[name="dc_agent_state"]'),
|
|
("dc_agent_zip", 'input[name="dc_agent_zip"]'),
|
|
("dc_agent_phone", 'input[name="dc_agent_phone"]'),
|
|
("dc_agent_email", 'input[name="dc_agent_email"]'),
|
|
]:
|
|
if entity.get(key):
|
|
await page.fill(selector, entity[key])
|
|
|
|
# Block 2-C: Officers (Lines 219-226) — three officers w/ addresses
|
|
for i in (1, 2, 3):
|
|
name = entity.get(f"officer_{i}_name") if i > 1 else (
|
|
entity.get("officer_1_name") or entity.get("ceo_name")
|
|
)
|
|
title = entity.get(f"officer_{i}_title") if i > 1 else (
|
|
entity.get("officer_1_title") or entity.get("ceo_title")
|
|
)
|
|
if not name:
|
|
continue
|
|
await page.fill(f'input[name="officer_{i}_name"]', name or "")
|
|
await page.fill(f'input[name="officer_{i}_title"]', title or "")
|
|
await page.fill(f'input[name="officer_{i}_street"]',
|
|
entity.get(f"officer_{i}_street", ""))
|
|
await page.fill(f'input[name="officer_{i}_city"]',
|
|
entity.get(f"officer_{i}_city", ""))
|
|
await page.fill(f'input[name="officer_{i}_state"]',
|
|
entity.get(f"officer_{i}_state", ""))
|
|
await page.fill(f'input[name="officer_{i}_zip"]',
|
|
entity.get(f"officer_{i}_zip", ""))
|
|
|
|
# Line 227: jurisdictions_served multi-select
|
|
for state in entity.get("jurisdictions_served") or []:
|
|
try:
|
|
await page.check(f'input[name="line_227_state_{state}"]')
|
|
except Exception:
|
|
pass
|
|
|
|
# Line 228: year + month first service (or pre-1999 checkbox)
|
|
if entity.get("first_telecom_service_pre_1999"):
|
|
await page.check('input[name="line_228_pre_1999"]')
|
|
else:
|
|
y = entity.get("first_telecom_service_year")
|
|
m = entity.get("first_telecom_service_month")
|
|
if y:
|
|
await page.fill('input[name="line_228_year"]', str(y))
|
|
if m:
|
|
await page.fill('input[name="line_228_month"]', str(m))
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Phase: Blocks 3 + 4-A — revenue schedules
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def _phase_blocks_3_4(
|
|
self, page, entity: dict, intake: dict, revenue_lines: dict,
|
|
) -> None:
|
|
primary_cat = entity.get("line_105_primary") or "voip_interconnected"
|
|
safe_harbor_election = entity.get("safe_harbor_election") or {}
|
|
election_method = self._pick_election_method(primary_cat, safe_harbor_election)
|
|
|
|
# Traffic-study percentages (used when method == traffic_study)
|
|
traffic_study = self._load_traffic_study(entity.get("id")) or {}
|
|
ts_interstate = float(traffic_study.get("interstate_pct") or 0)
|
|
ts_intrastate = float(traffic_study.get("intrastate_pct") or 0)
|
|
ts_intl = float(traffic_study.get("international_pct") or 0)
|
|
|
|
# Safe-harbor percentages (used when method == safe_harbor)
|
|
sh_interstate = load_safe_harbor_pct(
|
|
int(intake.get("form_year") or datetime.utcnow().year),
|
|
primary_cat,
|
|
) or 0.0
|
|
|
|
for entry in LINE_FILL_MAP:
|
|
# Skip lines not applicable to this primary category
|
|
cats = entry.get("categories")
|
|
if cats is not None and primary_cat not in cats:
|
|
continue
|
|
|
|
source = revenue_lines.get(entry["source_key"])
|
|
if not source:
|
|
continue
|
|
|
|
# Single-column lines (421/422 uncollectibles) are total-only
|
|
if entry.get("single_column"):
|
|
total_cents = self._source_to_total_cents(source)
|
|
await self._fill_input(page, entry["selector"],
|
|
f"{total_cents / 100:.2f}")
|
|
continue
|
|
|
|
# 3-column fill (intrastate / interstate / international)
|
|
amounts = self._split_amounts(
|
|
source=source,
|
|
method=election_method,
|
|
federal_only=entry.get("federal_only", False),
|
|
sh_interstate_pct=sh_interstate,
|
|
ts_interstate_pct=ts_interstate,
|
|
ts_intrastate_pct=ts_intrastate,
|
|
ts_international_pct=ts_intl,
|
|
filer_interstate_pct=float(entity.get("interstate_pct") or 0),
|
|
filer_international_pct=float(entity.get("international_pct") or 0),
|
|
)
|
|
for col_key in ("intra", "inter", "intl"):
|
|
sel = entry["selector"].format(col=col_key)
|
|
val = amounts.get(col_key, 0) / 100.0
|
|
await self._fill_input(page, sel, f"{val:.2f}")
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Phase: Block 5 — LNPA regions + Line 511 + TRS base
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def _phase_block_5(
|
|
self, page, entity: dict, intake: dict, revenue_lines: dict,
|
|
) -> None:
|
|
form_year = int(intake.get("form_year") or datetime.utcnow().year)
|
|
entity_id = entity.get("id")
|
|
|
|
# Lines 503-510: LNPA region percentages
|
|
lnpa_rows = self._load_lnpa_allocations(entity_id, form_year)
|
|
for row in lnpa_rows:
|
|
region = row["region_code"]
|
|
try:
|
|
await page.fill(f'input[name="line_503_510_{region}_block3"]',
|
|
f"{row['block_3_pct']:.2f}")
|
|
await page.fill(f'input[name="line_503_510_{region}_block4"]',
|
|
f"{row['block_4_pct']:.2f}")
|
|
except Exception as exc:
|
|
logger.debug("LNPA fill for %s failed: %s", region, exc)
|
|
|
|
# Line 511: non-contributing reseller customers
|
|
nc_rows = self._load_non_contributing_resellers(entity_id, form_year)
|
|
total_511 = sum(int(r.get("revenue_cents", 0) or 0) for r in nc_rows)
|
|
try:
|
|
await page.fill('input[name="line_511"]', f"{total_511 / 100:.2f}")
|
|
except Exception:
|
|
pass
|
|
# Record the filer IDs — USAC form has a sub-panel for these
|
|
for i, r in enumerate(nc_rows[:20]):
|
|
try:
|
|
await page.fill(f'input[name="line_511_reseller_{i}_filer_id"]',
|
|
r["reseller_filer_id_499"])
|
|
await page.fill(f'input[name="line_511_reseller_{i}_name"]',
|
|
r["reseller_legal_name"])
|
|
await page.fill(f'input[name="line_511_reseller_{i}_revenue"]',
|
|
f"{(r.get('revenue_cents', 0) or 0) / 100:.2f}")
|
|
except Exception:
|
|
pass
|
|
|
|
# Lines 512-514: TRS contribution base (auto-computed)
|
|
flat = {}
|
|
for k, v in (revenue_lines or {}).items():
|
|
if isinstance(v, (int, float)):
|
|
flat[k] = int(v)
|
|
elif isinstance(v, dict) and "cents" in v:
|
|
flat[k] = int(v.get("cents", 0))
|
|
flat["line_511"] = total_511
|
|
# Line 513 comes from intake if customer separates TRS bad debt
|
|
flat["line_513"] = int((revenue_lines.get("line_513", 0) or 0))
|
|
l512, l513, l514 = compute_trs_contribution_base(flat)
|
|
if l512 < 0:
|
|
logger.warning(
|
|
"Form499AHandler: Line 512 TRS base is negative (%d cents) — "
|
|
"check Line 511 vs Lines 403-418.4 sum", l512,
|
|
)
|
|
try:
|
|
await page.fill('input[name="line_512"]', f"{l512 / 100:.2f}")
|
|
await page.fill('input[name="line_513"]', f"{l513 / 100:.2f}")
|
|
await page.fill('input[name="line_514"]', f"{l514 / 100:.2f}")
|
|
except Exception:
|
|
pass
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Phase: Block 6 — exemption certs + nondisclosure + filing type
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def _phase_past_due_intro(self, page, order_data: dict) -> bool:
|
|
"""Navigate to prior-year worksheet + acknowledge late-filing penalties.
|
|
|
|
Returns True if navigation succeeded, False if the year isn't
|
|
available on USAC E-File (in which case the handler should escalate).
|
|
"""
|
|
form_year = order_data.get("form_year_override")
|
|
if not form_year:
|
|
return True # not past-due
|
|
try:
|
|
# USAC E-File "File Historical Worksheet" path
|
|
await page.click('text="File Historical Worksheet"')
|
|
await human_delay()
|
|
await page.select_option(
|
|
'select[name="historical_worksheet_year"]',
|
|
value=str(form_year),
|
|
)
|
|
await human_delay()
|
|
# Acknowledge the late-filing penalty notice
|
|
try:
|
|
await page.check('input[name="late_filing_acknowledgment"]')
|
|
except Exception:
|
|
pass
|
|
return True
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Past-due filing: could not navigate to historical worksheet "
|
|
"for year %s: %s", form_year, exc,
|
|
)
|
|
return False
|
|
|
|
async def _phase_revised_intro(
|
|
self, page, entity: dict, order_data: dict,
|
|
) -> bool:
|
|
"""Navigate to the 'Revise Filing' flow and select the prior filing.
|
|
|
|
We look up the prior confirmation number from either order_data
|
|
or by querying the prior order's prior_confirmation_number.
|
|
Returns True if navigation succeeded.
|
|
"""
|
|
revises = order_data.get("revises_order_number")
|
|
if not revises:
|
|
return True # not a revision
|
|
prior_conf = order_data.get("prior_confirmation_number") or \
|
|
self._load_prior_confirmation(revises)
|
|
if not prior_conf:
|
|
logger.warning(
|
|
"Revised filing: no prior confirmation number found for order %s",
|
|
revises,
|
|
)
|
|
return False
|
|
try:
|
|
await page.click('text="Revise Filing"')
|
|
await human_delay()
|
|
await page.fill('input[name="prior_confirmation_number"]', prior_conf)
|
|
await human_delay()
|
|
await page.click('button:has-text("Find")')
|
|
await human_delay(2, 4)
|
|
return True
|
|
except Exception as exc:
|
|
logger.warning("Revised filing navigation failed: %s", exc)
|
|
return False
|
|
|
|
async def _phase_block_6(self, page, entity: dict, intake: dict) -> None:
|
|
# Line 603: exemption checkboxes
|
|
for key in ("exempt_usf", "exempt_trs", "exempt_nanpa",
|
|
"exempt_lnp", "exempt_itsp"):
|
|
if entity.get(key):
|
|
try:
|
|
await page.check(f'input[name="line_603_{key}"]')
|
|
except Exception:
|
|
pass
|
|
if entity.get("exemption_explanation"):
|
|
try:
|
|
await page.fill('textarea[name="line_603_explanation"]',
|
|
entity["exemption_explanation"])
|
|
except Exception:
|
|
pass
|
|
|
|
# Line 604: state/local gov, 501(c) tax exempt
|
|
if entity.get("is_state_local_gov"):
|
|
try: await page.check('input[name="line_604_state_local_gov"]')
|
|
except Exception: pass
|
|
if entity.get("is_tax_exempt_501c"):
|
|
try: await page.check('input[name="line_604_501c"]')
|
|
except Exception: pass
|
|
|
|
# Line 605: nondisclosure request
|
|
if entity.get("nondisclosure_requested"):
|
|
try:
|
|
await page.check('input[name="line_605_nondisclosure"]')
|
|
except Exception:
|
|
pass
|
|
|
|
# Line 612: filing type — honor filing_mode + revised_reason first,
|
|
# else auto-detect from the entity state.
|
|
filing_mode = intake.get("__filing_mode", "current")
|
|
if filing_mode == "revised":
|
|
rr = intake.get("__revised_reason") or intake.get("revised_reason")
|
|
if rr == "registration":
|
|
filing_type = "revised_registration"
|
|
elif rr == "both":
|
|
# USAC forces "revised_revenue" if any revenue line changed;
|
|
# tick both-effective by using revenue (USAC will pick up
|
|
# registration changes from the diff).
|
|
filing_type = "revised_revenue"
|
|
else:
|
|
filing_type = "revised_revenue"
|
|
elif filing_mode == "past_due":
|
|
# Past-due: still the "original" filing for that year if
|
|
# nothing was previously submitted.
|
|
filing_type = "original_april_1"
|
|
else:
|
|
filing_type = intake.get("filing_type") or detect_filing_type(
|
|
entity=entity,
|
|
current_year_filing_exists=False,
|
|
revised_reason=intake.get("revised_reason"),
|
|
)
|
|
try:
|
|
await page.check(f'input[name="line_612_{filing_type}"]')
|
|
except Exception:
|
|
pass
|
|
|
|
# Officer signature — use officer 1 (or CEO)
|
|
sig_name = entity.get("officer_1_name") or entity.get("ceo_name") or ""
|
|
sig_title = entity.get("officer_1_title") or entity.get("ceo_title") or ""
|
|
if sig_name:
|
|
await type_slowly(page, 'input[name="officer_signature_name"]', sig_name)
|
|
await type_slowly(page, 'input[name="officer_signature_title"]', sig_title)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Phase: Submit traffic study (stamp + upload)
|
|
# ------------------------------------------------------------------ #
|
|
|
|
async def _phase_submit_traffic_study(
|
|
self, page, entity: dict, work_dir: str,
|
|
) -> None:
|
|
election = entity.get("safe_harbor_election") or {}
|
|
if not any(
|
|
e.get("method") == "traffic_study"
|
|
for e in election.values() if isinstance(e, dict)
|
|
):
|
|
return
|
|
|
|
ts = self._load_traffic_study(entity.get("id"))
|
|
if not ts:
|
|
logger.warning("Traffic study election but no study found; "
|
|
"admin review required.")
|
|
return
|
|
if not ts.get("pdf_minio_path"):
|
|
logger.warning("Traffic study found but no PDF path; skipping upload.")
|
|
return
|
|
|
|
# Stamp the study PDF with Filer ID + Company Name + Affiliated Filers
|
|
try:
|
|
from scripts.document_gen.traffic_study_stamper import stamp_pages
|
|
except Exception as exc:
|
|
logger.warning("Traffic study stamper import failed: %s", exc)
|
|
return
|
|
|
|
stamped_path = os.path.join(work_dir,
|
|
f"stamped_traffic_study_{ts['id']}.pdf")
|
|
try:
|
|
stamp_pages(
|
|
pdf_path=self._localize_minio_path(ts["pdf_minio_path"]),
|
|
output_path=stamped_path,
|
|
filer_id=entity.get("filer_id_499", ""),
|
|
company_name=entity.get("legal_name", ""),
|
|
affiliated_filers_name=entity.get("affiliated_filer_name", "") or "—",
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Traffic study stamping failed: %s", exc)
|
|
return
|
|
|
|
# Upload to USAC's traffic-study widget in the same session
|
|
try:
|
|
await page.set_input_files(
|
|
'input[name="traffic_study_file"]', stamped_path,
|
|
)
|
|
await human_delay(2, 4)
|
|
except Exception as exc:
|
|
logger.warning("Traffic study upload failed: %s", exc)
|
|
return
|
|
|
|
# Persist the stamped path + submission timestamp
|
|
try:
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE cdr_traffic_studies
|
|
SET stamped_pdf_minio_path = %s,
|
|
usac_submitted_at = NOW(),
|
|
fcc_compliance_ok = TRUE
|
|
WHERE id = %s
|
|
""",
|
|
(stamped_path, ts["id"]),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as exc:
|
|
logger.debug("Traffic study metadata persist failed: %s", exc)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# Helpers: revenue lines, safe-harbor election, DB lookups
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _build_revenue_lines(self, entity: dict, intake: dict) -> dict:
|
|
"""Merge intake.revenue with ICC-imported lines from the DB."""
|
|
revenue = dict(intake.get("revenue") or {})
|
|
# Pull ICC aggregates per 499-A line (pre-filled on the wizard, but
|
|
# re-pull here in case the customer edited them after confirm).
|
|
profile_id = intake.get("cdr_profile_id")
|
|
reporting_year = int(intake.get("form_year") or datetime.utcnow().year - 1)
|
|
if profile_id:
|
|
try:
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT m.form_499a_line,
|
|
SUM(icc.revenue_cents)::bigint AS revenue_cents
|
|
FROM icc_revenue_lines icc
|
|
JOIN icc_499a_line_mapping m ON m.icc_category = icc.icc_category
|
|
WHERE icc.profile_id = %s
|
|
AND icc.reporting_year = %s
|
|
GROUP BY m.form_499a_line
|
|
""",
|
|
(profile_id, reporting_year),
|
|
)
|
|
for row in cur.fetchall():
|
|
key = f"line_{row['form_499a_line'].replace('.', '_')}"
|
|
# Only add if the user didn't already set it manually
|
|
if revenue.get(key) in (None, 0):
|
|
revenue[key] = int(row["revenue_cents"])
|
|
conn.close()
|
|
except Exception as exc:
|
|
logger.debug("ICC revenue merge failed: %s", exc)
|
|
return revenue
|
|
|
|
def _source_to_total_cents(self, source: Any) -> int:
|
|
if isinstance(source, (int, float)):
|
|
return int(source)
|
|
if isinstance(source, dict):
|
|
return int(source.get("cents")
|
|
or (int(source.get("intra", 0))
|
|
+ int(source.get("inter", 0))
|
|
+ int(source.get("intl", 0))))
|
|
return 0
|
|
|
|
def _split_amounts(
|
|
self, *,
|
|
source: Any,
|
|
method: str,
|
|
federal_only: bool,
|
|
sh_interstate_pct: float,
|
|
ts_interstate_pct: float,
|
|
ts_intrastate_pct: float,
|
|
ts_international_pct: float,
|
|
filer_interstate_pct: float,
|
|
filer_international_pct: float,
|
|
) -> dict[str, int]:
|
|
"""Return {intra, inter, intl} cents for a line per election method."""
|
|
# If the source already has per-column breakout, use it as-is
|
|
if isinstance(source, dict) and any(k in source for k in ("intra", "inter", "intl")):
|
|
return {
|
|
"intra": int(source.get("intra", 0)),
|
|
"inter": int(source.get("inter", 0)),
|
|
"intl": int(source.get("intl", 0)),
|
|
}
|
|
|
|
total = self._source_to_total_cents(source)
|
|
|
|
# Federal USF surcharges (Line 403 federal) — 100% interstate
|
|
if federal_only:
|
|
return {"intra": 0, "inter": total, "intl": 0}
|
|
|
|
if method == "traffic_study" and (ts_interstate_pct or ts_intrastate_pct or ts_international_pct):
|
|
return {
|
|
"intra": int(total * ts_intrastate_pct / 100),
|
|
"inter": int(total * ts_interstate_pct / 100),
|
|
"intl": int(total * ts_international_pct / 100),
|
|
}
|
|
if method == "safe_harbor" and sh_interstate_pct:
|
|
inter = int(total * sh_interstate_pct / 100)
|
|
return {"intra": total - inter, "inter": inter, "intl": 0}
|
|
# actual_data / fallback — use filer's aggregate percentages
|
|
inter = int(total * filer_interstate_pct / 100)
|
|
intl = int(total * filer_international_pct / 100)
|
|
return {"intra": max(0, total - inter - intl), "inter": inter, "intl": intl}
|
|
|
|
def _pick_election_method(self, primary_cat: str, election: dict) -> str:
|
|
entry = election.get(primary_cat)
|
|
if isinstance(entry, dict):
|
|
method = entry.get("method", "actual_data")
|
|
# Guard: non-interconnected VoIP cannot use safe harbor
|
|
if method == "safe_harbor" and not safe_harbor_allowed(primary_cat):
|
|
logger.warning(
|
|
"Safe harbor not allowed for %s — falling back to actual_data",
|
|
primary_cat,
|
|
)
|
|
return "actual_data"
|
|
return method
|
|
return "actual_data"
|
|
|
|
async def _fill_input(self, page, selector: str, value: str) -> None:
|
|
try:
|
|
if await page.locator(selector).count() > 0:
|
|
await page.fill(selector, value)
|
|
except Exception as exc:
|
|
logger.debug("Fill %s failed: %s", selector, exc)
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# DB lookups
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _db_connect(self):
|
|
return psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
|
|
def _load_traffic_study(self, entity_id) -> Optional[dict]:
|
|
if not entity_id:
|
|
return None
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT s.*
|
|
FROM cdr_traffic_studies s
|
|
JOIN cdr_ingestion_profiles p ON p.id = s.profile_id
|
|
WHERE p.telecom_entity_id = %s
|
|
ORDER BY s.generated_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(entity_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|
|
except Exception as exc:
|
|
logger.debug("traffic study lookup failed: %s", exc)
|
|
return None
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _load_lnpa_allocations(self, entity_id, year: int) -> list[dict]:
|
|
if not entity_id:
|
|
return []
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT region_code,
|
|
block_3_pct::float AS block_3_pct,
|
|
block_4_pct::float AS block_4_pct
|
|
FROM lnpa_region_allocations
|
|
WHERE telecom_entity_id = %s AND reporting_year = %s
|
|
ORDER BY region_code
|
|
""",
|
|
(entity_id, year),
|
|
)
|
|
return [dict(r) for r in cur.fetchall()]
|
|
except Exception as exc:
|
|
logger.debug("LNPA lookup failed: %s", exc)
|
|
return []
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _load_non_contributing_resellers(self, entity_id, year: int) -> list[dict]:
|
|
if not entity_id:
|
|
return []
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT reseller_filer_id_499, reseller_legal_name,
|
|
non_contributing_reason,
|
|
COALESCE(revenue_cents, 0) AS revenue_cents
|
|
FROM non_contributing_reseller_customers
|
|
WHERE filer_telecom_entity_id = %s
|
|
AND reporting_year = %s
|
|
ORDER BY revenue_cents DESC
|
|
""",
|
|
(entity_id, year),
|
|
)
|
|
return [dict(r) for r in cur.fetchall()]
|
|
except Exception as exc:
|
|
logger.debug("non-contributing reseller lookup failed: %s", exc)
|
|
return []
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _load_prior_order_intake(self, order_number: str) -> Optional[dict]:
|
|
"""Load the intake_data JSONB from a prior compliance_order. Used by
|
|
revised-filing flow to pre-fill unchanged fields."""
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"SELECT intake_data FROM compliance_orders WHERE order_number = %s",
|
|
(order_number,),
|
|
)
|
|
row = cur.fetchone()
|
|
return row["intake_data"] if row and row.get("intake_data") else None
|
|
except Exception as exc:
|
|
logger.debug("prior intake load failed: %s", exc)
|
|
return None
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _load_prior_confirmation(self, order_number: str) -> Optional[str]:
|
|
"""Pull the confirmation number written by a prior successful filing.
|
|
|
|
Checks compliance_orders.prior_confirmation_number (set by a prior
|
|
revision) OR the cached filing_state record. Returns None if the
|
|
prior filing has no recorded confirmation.
|
|
"""
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
# First, if this order itself was a revision, its own
|
|
# prior_confirmation_number is what we want.
|
|
cur.execute(
|
|
"""
|
|
SELECT prior_confirmation_number, telecom_entity_id
|
|
FROM compliance_orders WHERE order_number = %s
|
|
""",
|
|
(order_number,),
|
|
)
|
|
row = cur.fetchone() or {}
|
|
if row.get("prior_confirmation_number"):
|
|
return row["prior_confirmation_number"]
|
|
# Fall back: look up the entity's most recent 499a confirmation
|
|
# via filing_state.
|
|
entity_id = row.get("telecom_entity_id")
|
|
if entity_id:
|
|
cur.execute(
|
|
"""
|
|
SELECT confirmation_number
|
|
FROM carrier_filing_state
|
|
WHERE telecom_entity_id = %s AND filing_type = '499a'
|
|
ORDER BY filed_at DESC LIMIT 1
|
|
""",
|
|
(entity_id,),
|
|
)
|
|
r2 = cur.fetchone()
|
|
return r2["confirmation_number"] if r2 else None
|
|
return None
|
|
except Exception as exc:
|
|
logger.debug("prior confirmation lookup failed: %s", exc)
|
|
return None
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _persist_multi_year_confirmations(
|
|
self, order_number: str, records: list[dict],
|
|
) -> None:
|
|
"""Write the per-year confirmation records to
|
|
compliance_orders.multi_year_confirmations."""
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE compliance_orders SET multi_year_confirmations = %s::jsonb "
|
|
"WHERE order_number = %s",
|
|
(json.dumps(records), order_number),
|
|
)
|
|
conn.commit()
|
|
except Exception as exc:
|
|
logger.debug("multi-year confirmations persist failed: %s", exc)
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _persist_revision_link(
|
|
self, order_number: str, prior_confirmation: Optional[str],
|
|
) -> None:
|
|
"""After a revision submits successfully, record the prior confirmation
|
|
on THIS order so the amendment chain is queryable."""
|
|
if not prior_confirmation:
|
|
return
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE compliance_orders SET prior_confirmation_number = %s "
|
|
"WHERE order_number = %s",
|
|
(prior_confirmation, order_number),
|
|
)
|
|
conn.commit()
|
|
except Exception as exc:
|
|
logger.debug("revision link persist failed: %s", exc)
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _load_affiliate_revenue(self, entity: dict) -> list[dict]:
|
|
"""Load affiliate filers by shared EIN for de minimis consolidation."""
|
|
ein = entity.get("affiliated_filer_ein")
|
|
if not ein or not entity.get("id"):
|
|
return []
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT total_revenue_cents,
|
|
interstate_pct::float AS interstate_pct,
|
|
international_pct::float AS international_pct
|
|
FROM telecom_entities
|
|
WHERE affiliated_filer_ein = %s
|
|
AND id <> %s
|
|
""",
|
|
(ein, entity["id"]),
|
|
)
|
|
return [dict(r) for r in cur.fetchall()]
|
|
except Exception as exc:
|
|
logger.debug("affiliate revenue lookup failed: %s", exc)
|
|
return []
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _persist_deminimis_worksheet(self, order_number: str, worksheet) -> None:
|
|
try:
|
|
conn = self._db_connect()
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE compliance_orders
|
|
SET deminimis_worksheet_json = %s::jsonb,
|
|
deminimis_estimated_contrib_cents = %s,
|
|
deminimis_result_is_exempt = %s
|
|
WHERE order_number = %s
|
|
""",
|
|
(
|
|
json.dumps(worksheet.to_dict()),
|
|
worksheet.line_11_estimated_contrib_cents,
|
|
worksheet.is_de_minimis,
|
|
order_number,
|
|
),
|
|
)
|
|
conn.commit()
|
|
except Exception as exc:
|
|
logger.debug("de minimis persist failed: %s", exc)
|
|
finally:
|
|
try: conn.close()
|
|
except Exception: pass
|
|
|
|
def _localize_minio_path(self, minio_path: str) -> str:
|
|
"""Given a MinIO key, return a locally-accessible path.
|
|
In production the file is already mounted or pulled by a helper —
|
|
placeholder returns the minio_path as-is. Real MinIO download
|
|
belongs in a shared helper; this stub keeps the code shape right.
|
|
"""
|
|
return minio_path
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# 499-Q calendar scheduling + admin ToDo
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _schedule_499q_calendar(self, order_number: str, entity: dict) -> None:
|
|
if entity.get("is_deminimis"):
|
|
logger.info(
|
|
"Form499AHandler: de minimis carrier — skipping 499-Q calendar",
|
|
)
|
|
return
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
erp = ERPNextClient()
|
|
year = datetime.utcnow().year
|
|
due_dates = [date(year, 2, 1), date(year, 5, 1),
|
|
date(year, 8, 1), date(year, 11, 1)]
|
|
today = date.today()
|
|
due_dates = [d for d in due_dates if d >= today]
|
|
if not due_dates:
|
|
due_dates = [date(year + 1, 2, 1)]
|
|
for d in due_dates:
|
|
erp.create_resource(
|
|
"Compliance Calendar",
|
|
{
|
|
"entity_name": entity.get("legal_name", ""),
|
|
"order_reference": order_number,
|
|
"compliance_type": "FCC Form 499-Q",
|
|
"description": (
|
|
f"Quarterly FCC Form 499-Q filing for "
|
|
f"{entity.get('legal_name', '')} "
|
|
f"(FRN {entity.get('frn', '')})"
|
|
),
|
|
"due_date": d.strftime("%Y-%m-%d"),
|
|
"recurring": 1,
|
|
"recurrence_period": "Quarterly",
|
|
"status": "Upcoming",
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Form499AHandler: could not schedule 499-Q: %s", exc)
|
|
|
|
def _create_admin_todo(self, order_number: str, description: str) -> None:
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
ERPNextClient().create_resource(
|
|
"ToDo",
|
|
{
|
|
"description": (
|
|
f"[{self.SERVICE_SLUG}] {order_number}\n\n{description}"
|
|
),
|
|
"priority": "High",
|
|
"role": "Accounting Advisor",
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Could not create admin ToDo: %s", exc)
|
|
|
|
|
|
def _box_to_category_ids(box_num: int) -> set[str]:
|
|
"""Given a Line 105 box number, return the category ids that tick it.
|
|
|
|
Used to determine the rank of a box — if a box was ticked because a
|
|
parent category (e.g., CLEC) was selected with infra_type=reseller,
|
|
the rank is whatever rank the parent has in line_105_categories.
|
|
"""
|
|
from .telecom.fcc_499_utils import LINE_105_BOX_NUMBERS
|
|
return {k for k, v in LINE_105_BOX_NUMBERS.items() if v == box_num}
|
|
|
|
|
|
class Form499ABundleHandler(Form499AHandler):
|
|
"""499-A + 499-Q bundle — same submission, plus schedules the quarterlies."""
|
|
|
|
SERVICE_SLUG = "fcc-499a-499q"
|
|
SERVICE_NAME = "FCC Form 499-A + 499-Q Bundle"
|
|
SCHEDULE_499Q = True
|