new-site/scripts/workers/services/telecom/filing_state.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

179 lines
6.3 KiB
Python

"""Helpers for reading/writing FCC filing state on telecom_entities.
The remediation handlers (RMD, CPNI, 499-A, BDC) all need to:
1. Check whether the filing is already on record for the current cycle
(idempotency — don't double-submit if the customer ordered twice).
2. On success, persist the submission timestamp + confirmation number
so the next compliance checkup flips the deficiency to green.
Columns added by migration 047:
* rmd_last_cert_date / rmd_confirmation_number
* cpni_last_cert_date / cpni_confirmation_number
* form_499a_confirmation_number (uses existing last_filing_year)
* bdc_last_filing_date / bdc_confirmation_number
* stir_shaken_cert_issued_at
"""
from __future__ import annotations
import logging
import os
from datetime import datetime, timedelta
from typing import Optional
logger = logging.getLogger(__name__)
def _connect():
"""Open a psycopg2 connection using DATABASE_URL. Returns None on failure."""
try:
import psycopg2
return psycopg2.connect(os.environ.get("DATABASE_URL", ""))
except Exception as exc:
logger.error("filing_state: could not connect to PG: %s", exc)
return None
def _get_entity_field(entity_id: int, field: str) -> Optional[object]:
conn = _connect()
if conn is None:
return None
try:
with conn.cursor() as cur:
# Column names come from the handlers, not user input — safe to
# interpolate. psycopg2 does not parameterize identifiers.
cur.execute(
f"SELECT {field} FROM telecom_entities WHERE id = %s", # noqa: S608
(entity_id,),
)
row = cur.fetchone()
return row[0] if row else None
except Exception as exc:
logger.warning("filing_state: read %s.%s failed: %s", entity_id, field, exc)
return None
finally:
conn.close()
def _update_entity(entity_id: int, updates: dict[str, object]) -> bool:
"""UPDATE telecom_entities SET <updates> WHERE id = $id. Returns success."""
if not updates:
return True
conn = _connect()
if conn is None:
return False
try:
set_clause = ", ".join(f"{k} = %s" for k in updates.keys())
values = list(updates.values()) + [entity_id]
with conn.cursor() as cur:
cur.execute(
f"UPDATE telecom_entities SET {set_clause} WHERE id = %s", # noqa: S608
values,
)
conn.commit()
return True
except Exception as exc:
logger.warning("filing_state: update %s failed: %s", entity_id, exc)
return False
finally:
conn.close()
# ── Idempotency windows ─────────────────────────────────────────────────────
#
# RMD recertification is annual; USAC 499-A is annual (due April 1); CPNI is
# annual (due March 1); BDC is twice-yearly (Dec 1 / Jun 1). "Already filed
# this cycle" roughly means the last filing was within the window below.
RMD_CYCLE_DAYS = 365
CPNI_CYCLE_DAYS = 365
FORM_499A_CYCLE_DAYS = 365
BDC_CYCLE_DAYS = 180
def already_filed(
entity_id: int, filing_type: str, filing_year: Optional[int] = None,
) -> bool:
"""Return True if the filing is on record within its cycle window.
``filing_type`` is one of: "rmd", "cpni", "499a", "bdc".
``filing_year`` applies only to 499a — callers filing past-due should
pass the target reporting year so we don't skip an older year just
because a newer one was already filed. If not provided, defaults to
the current year (preserving legacy behavior).
"""
column, cycle_days = {
"rmd": ("rmd_last_cert_date", RMD_CYCLE_DAYS),
"cpni": ("cpni_last_cert_date", CPNI_CYCLE_DAYS),
"499a": ("last_filing_year", None), # year-based, handled separately
"bdc": ("bdc_last_filing_date", BDC_CYCLE_DAYS),
}.get(filing_type, (None, None))
if column is None:
return False
value = _get_entity_field(entity_id, column)
if value is None:
return False
if filing_type == "499a":
try:
last_year = int(value)
check_year = filing_year if filing_year is not None else datetime.utcnow().year
# "Already filed" means the entity's last_filing_year equals or
# exceeds the year we're about to file. For past-due filings
# targeting an older year, use that year explicitly.
return last_year >= int(check_year)
except (TypeError, ValueError):
return False
if isinstance(value, str):
try:
value = datetime.fromisoformat(value)
except ValueError:
return False
if not isinstance(value, datetime):
return False
# psycopg2 returns tz-aware datetimes when the column is TIMESTAMPTZ; strip
# tz for the arithmetic below.
if value.tzinfo is not None:
value = value.replace(tzinfo=None)
return value >= datetime.utcnow() - timedelta(days=cycle_days or 0)
# ── Success writers ─────────────────────────────────────────────────────────
def record_rmd_filing(entity_id: int, confirmation_number: str = "") -> bool:
return _update_entity(entity_id, {
"rmd_last_cert_date": datetime.utcnow(),
"rmd_confirmation_number": confirmation_number,
})
def record_cpni_filing(entity_id: int, confirmation_number: str = "") -> bool:
return _update_entity(entity_id, {
"cpni_last_cert_date": datetime.utcnow(),
"cpni_confirmation_number": confirmation_number,
})
def record_form_499a_filing(entity_id: int, confirmation_number: str = "") -> bool:
return _update_entity(entity_id, {
"last_filing_year": datetime.utcnow().year,
"form_499a_confirmation_number": confirmation_number,
})
def record_bdc_filing(entity_id: int, confirmation_number: str = "") -> bool:
return _update_entity(entity_id, {
"bdc_last_filing_date": datetime.utcnow(),
"bdc_confirmation_number": confirmation_number,
})
def record_stir_shaken_cert(entity_id: int, issued_at: Optional[datetime] = None) -> bool:
return _update_entity(entity_id, {
"stir_shaken_cert_issued_at": issued_at or datetime.utcnow(),
})