new-site/scripts/workers/services/calea_ssi.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

370 lines
16 KiB
Python

"""CALEA SSI Plan handler.
No portal submission — SSI plans are held internally. We produce a
carrier-specific signed DOCX + PDF, persist the generation timestamp
and next-review date on the entity, and schedule an annual review
reminder in the Compliance Calendar.
Auto-filing toggle doesn't apply (no filing is made).
"""
from __future__ import annotations
import logging
import os
from datetime import date, datetime, timedelta
from typing import Optional
from .base_handler import BaseServiceHandler
logger = logging.getLogger(__name__)
class CALEASSIHandler(BaseServiceHandler):
SERVICE_SLUG = "calea-ssi"
SERVICE_NAME = "CALEA SSI Plan"
REQUIRES_LLM = False
async def process(self, order_data: dict) -> list[str]:
work_dir = self._make_work_dir()
order_number = order_data["name"]
entity = order_data.get("entity", {})
intake = order_data.get("intake_data") or {}
entity_id = entity.get("id")
calea_intake = intake.get("calea_ssi") or {}
# Guard: pause if critical CALEA intake fields are missing.
# The SSI plan requires a 24-hour law enforcement contact and
# network infrastructure details. Without these the document
# would have [TO BE POPULATED] placeholders.
le_contact = calea_intake.get("law_enforcement_contact") or {}
missing_fields = []
if not le_contact.get("name"):
missing_fields.append("law enforcement contact name")
if not le_contact.get("phone"):
missing_fields.append("law enforcement contact 24-hour phone")
if not le_contact.get("email"):
missing_fields.append("law enforcement contact email")
if not calea_intake.get("network_infrastructure_summary"):
missing_fields.append("network infrastructure summary")
if not calea_intake.get("interception_support_method"):
missing_fields.append("interception support method")
if missing_fields and not self._is_private_line_only(entity):
logger.info(
"CALEASSIHandler: missing intake fields for %s: %s — creating admin todo",
order_number, ", ".join(missing_fields),
)
try:
from scripts.workers.erpnext_client import ERPNextClient
ERPNextClient().create_resource("ToDo", {
"description": (
f"[calea-ssi] {order_number}\n\n"
f"CALEA SSI Plan cannot be generated — missing intake data.\n"
f"Missing: {', '.join(missing_fields)}\n\n"
f"Entity: {entity.get('legal_name', 'unknown')}\n"
f"Send the client the CALEA questionnaire to collect:\n"
f" - 24-hour law enforcement contact (name, phone, email)\n"
f" - Network infrastructure summary\n"
f" - Interception support method (TTP vendor or self-provision)\n\n"
f"Once collected, update intake_data.calea_ssi and re-dispatch."
),
"priority": "High",
"role": "Accounting Advisor",
})
except Exception as exc:
logger.error("Could not create CALEA admin ToDo: %s", exc)
return []
generated: list[str] = []
date_str = datetime.now().strftime("%Y%m%d")
docx_path = os.path.join(
work_dir, f"calea_ssi_{order_number}_{date_str}.docx",
)
# Private-line-only filers are typically CALEA-exempt under
# 47 CFR 1.20003 scope. Produce a 1-page exemption memo inline.
if self._is_private_line_only(entity):
logger.info(
"CALEASSIHandler: entity %s is private-line only; "
"producing CALEA exemption memo",
entity.get("id"),
)
memo_path = self._write_private_line_exemption_memo(
docx_path, entity
)
if memo_path:
generated.append(memo_path)
try:
generated.append(self._convert_to_pdf(memo_path))
except Exception as exc:
logger.warning(
"CALEA exemption memo PDF conversion failed: %s", exc
)
else:
variant_fn = self._pick_template_generator(entity)
common_kwargs = dict(
output_path=docx_path,
entity_name=entity.get("legal_name", ""),
frn=entity.get("frn", ""),
law_enforcement_contact=calea_intake.get("law_enforcement_contact"),
cpni_protection_officer=calea_intake.get("cpni_protection_officer"),
network_infrastructure_summary=calea_intake.get(
"network_infrastructure_summary", ""
),
interception_support_method=calea_intake.get(
"interception_support_method", ""
),
reporting_year=entity.get("reporting_year", 0),
signatory_name=entity.get("ceo_name")
or entity.get("contact_name", ""),
signatory_title=entity.get(
"ceo_title", "Chief Executive Officer"
),
)
if variant_fn is not None:
logger.info(
"CALEASSIHandler: using variant generator %s for entity %s",
getattr(variant_fn, "__name__", "<unknown>"),
entity.get("id"),
)
result = variant_fn(**common_kwargs)
else:
from scripts.document_gen.templates.calea_ssi_generator import (
generate_calea_ssi_plan,
)
result = generate_calea_ssi_plan(
entity_name=entity.get("legal_name", ""),
frn=entity.get("frn", ""),
law_enforcement_contact=calea_intake.get("law_enforcement_contact"),
cpni_protection_officer=calea_intake.get("cpni_protection_officer"),
network_infrastructure_summary=calea_intake.get(
"network_infrastructure_summary", ""
),
interception_support_method=calea_intake.get(
"interception_support_method", ""
),
is_interconnected_voip=entity.get("carrier_category", "") in (
"interconnected_voip", "non_interconnected_voip",
),
is_wholesale=bool(entity.get("is_wholesale")),
has_retail_customers=not bool(entity.get("is_wholesale")),
signatory_name=entity.get("ceo_name")
or entity.get("contact_name", ""),
signatory_title=entity.get(
"ceo_title", "Chief Executive Officer"
),
output_path=docx_path,
)
if result:
generated.append(result)
try:
generated.append(self._convert_to_pdf(result))
except Exception as exc:
logger.warning("CALEA SSI PDF conversion failed: %s", exc)
# Persist + schedule annual review
if entity_id:
self._persist_calea_state(
entity_id,
reviewer_name=calea_intake.get("reviewer_name", "Justin Hannah"),
)
self._schedule_annual_review(order_number, entity)
return generated
# ------------------------------------------------------------------ #
# Variant selection + private-line exemption
# ------------------------------------------------------------------ #
def _pick_template_generator(self, entity: dict):
"""
Resolve the CALEA variant generator for the entity's Line 105
primary + infra_type. Returns a callable on match, else ``None``
(caller falls back to the generic generate_calea_ssi_plan).
"""
primary = entity.get("line_105_primary") or entity.get(
"filer_type", "voip_interconnected"
)
cats = entity.get("line_105_categories") or []
entry = next((c for c in cats if c.get("id") == primary), {})
infra = entry.get("infra_type", "facilities")
mapping = {
("clec", "facilities"): "calea_clec_ss7_generator",
("clec", "reseller"): "calea_clec_ss7_generator",
("ixc", "facilities"): "calea_ixc_ss7_generator",
("ixc", "reseller"): "calea_ixc_ss7_generator",
("wireless", "facilities"): "calea_wireless_generator",
("wireless", "mvno"): "calea_wireless_mvno_generator",
("satellite", "facilities"): "calea_satellite_generator",
("audio_bridging", "facilities"): "calea_audio_bridge_generator",
}
module_name = mapping.get((primary, infra))
if not module_name:
return None
try:
import importlib
mod = importlib.import_module(
f"scripts.document_gen.templates.{module_name}"
)
fn_name = "generate_" + module_name.replace("_generator", "")
return getattr(mod, fn_name)
except Exception as exc:
logger.warning(
"CALEASSIHandler: failed to load CALEA variant %s: %s",
module_name, exc,
)
return None
def _is_private_line_only(self, entity: dict) -> bool:
"""
Return True if the entity's Line 105 primary is private_line AND
it carries no other CALEA-bearing categories. Private-line-only
carriers are generally outside the CALEA covered-entity
definition — produce a 1-page exemption memo instead of a full
SSI plan.
"""
primary = entity.get("line_105_primary")
if primary != "private_line":
return False
cats = entity.get("line_105_categories") or []
calea_bearing = {
"clec", "ixc", "wireless", "satellite",
"audio_bridging", "voip_interconnected",
}
for c in cats:
cid = c.get("id")
if cid and cid != "private_line" and cid in calea_bearing:
return False
return True
def _write_private_line_exemption_memo(
self, output_path: str, entity: dict
) -> Optional[str]:
"""
Write a 1-page CALEA exemption memo for a private-line-only
carrier, citing 47 CFR § 1.20003 scope. Returns the output
path, or None on failure.
"""
try:
from docx import Document
from docx.shared import Pt, Inches, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
except ImportError:
logger.error("python-docx not installed — exemption memo skipped")
return None
NAVY = RGBColor(0x1A, 0x27, 0x44)
today = date.today().strftime("%B %d, %Y")
name = entity.get("legal_name", "")
frn = entity.get("frn", "")
doc = Document()
for s in doc.sections:
s.top_margin = Inches(1); s.bottom_margin = Inches(1)
s.left_margin = Inches(1.25); s.right_margin = Inches(1.25)
tp = doc.add_paragraph(); tp.alignment = WD_ALIGN_PARAGRAPH.CENTER
tr = tp.add_run("CALEA Exemption Memo")
tr.font.size = Pt(15); tr.bold = True; tr.font.color.rgb = NAVY
sp = doc.add_paragraph(); sp.alignment = WD_ALIGN_PARAGRAPH.CENTER
sr = sp.add_run("Private Line / BDS — Not a Covered Entity under CALEA")
sr.font.size = Pt(11); sr.italic = True
sp.paragraph_format.space_after = Pt(16)
def _p(text: str, bold: bool = False) -> None:
p = doc.add_paragraph()
p.paragraph_format.space_after = Pt(6)
r = p.add_run(text); r.font.size = Pt(11); r.bold = bold
_p(f"Carrier: {name}", bold=True)
if frn:
_p(f"FRN: {frn}")
_p(f"Date: {today}")
_p("")
_p(
"The Communications Assistance for Law Enforcement Act (CALEA), "
"47 U.S.C. \u00a7\u00a7 1001\u20131010, and the Commission's "
"implementing rules at 47 CFR Part 1 Subpart Z (including 47 CFR "
"\u00a7 1.20003) apply to 'telecommunications carriers' within "
"the meaning of 47 U.S.C. \u00a7 1001(8). Dedicated point-to-"
"point private line and/or Business Data Service (BDS) offerings "
"that do not provide switched voice, interconnected VoIP, or "
"other CALEA-covered transmission are generally outside the "
"scope of the statute and the rule."
)
_p(
f"Based on {name}'s Line 105 classification and service "
f"description, {name} provides only private-line / BDS circuits "
f"and does not operate a switched telecommunications service "
f"subject to CALEA. Accordingly, {name} maintains no separate "
f"SSI Plan at this time."
)
_p(
f"{name} will re-evaluate its CALEA status upon any addition of "
f"switched voice, interconnected VoIP, or other CALEA-covered "
f"service, and will prepare and maintain an SSI Plan prior to "
f"the initiation of such service."
)
_p(
"This memo is retained in the corporate compliance file and is "
"available on request to the Commission or the Department of "
"Justice."
)
out = os.fspath(output_path)
doc.save(out)
return out
def _persist_calea_state(self, entity_id: int, reviewer_name: str) -> None:
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
with conn.cursor() as cur:
next_review = date.today() + timedelta(days=365)
cur.execute(
"""
UPDATE telecom_entities SET
calea_ssi_generated_at = NOW(),
calea_ssi_reviewer_name = %s,
calea_ssi_next_review_date = %s
WHERE id = %s
""",
(reviewer_name, next_review, entity_id),
)
conn.commit()
conn.close()
except Exception as exc:
logger.warning("Could not persist CALEA state on %s: %s", entity_id, exc)
def _schedule_annual_review(self, order_number: str, entity: dict) -> None:
try:
from scripts.workers.erpnext_client import ERPNextClient
next_review = date.today() + timedelta(days=365)
ERPNextClient().create_resource(
"Compliance Calendar",
{
"entity_name": entity.get("legal_name", ""),
"order_reference": order_number,
"compliance_type": "CALEA SSI Annual Review",
"description": (
"Annual review of the CALEA System Security and "
"Integrity Plan (47 USC § 229 / 47 CFR § 1.20003). "
"Verify designated LE contact, infrastructure changes, "
"and CPNI officer are still accurate."
),
"due_date": next_review.strftime("%Y-%m-%d"),
"recurring": 1,
"recurrence_period":"Yearly",
"status": "Upcoming",
},
)
except Exception as exc:
logger.warning("Could not schedule CALEA annual review: %s", exc)