new-site/scripts/workers/icc_adapters/edi_810_adapter.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

242 lines
9.9 KiB
Python

"""X12 EDI 810 (Invoice) adapter for inter-carrier settlement invoices.
EDI 810 is the ANSI ASC X12 standard transaction for invoices. This
adapter walks the envelope (ISA/GS/ST) and extracts invoice- and line-
level totals, mapping them into ``IccRevenueLine`` records.
Segment coverage
----------------
``ISA`` Interchange control header — captures ``isa_control_num`` for hashing
``GS`` Functional group header — context only
``ST`` Transaction set header — one 810 per ST/SE pair
``BIG`` Beginning of invoice (invoice number, invoice date)
``N1`` Name segment — qualifier ``IC`` (Intermediate Consignee) is treated
as the interconnecting carrier; ``RE`` / ``BT`` used as fallback
``IT1`` Baseline item detail
IT103 = unit price
IT104 = unit of measurement qualifier
IT107 = quantity
IT109 = ``"MG"`` → quantity represents minutes
``TDS`` Total monetary value (TDS01 in implied-2-decimal cents)
Category heuristics
-------------------
``icc_category`` defaults to ``transit``. Free-text in PID / MSG / DTM
segments is scanned case-insensitively for keywords:
* "8yy" / "toll free" → ``8yy_orig_access``
* "term" / "terminat" → ``term_switched_access``
* "orig" / "originat" → ``orig_switched_access``
* "special access" → ``special_access``
* "international" → ``intl_settlement``
Parsing strategy
----------------
Uses ``pyx12`` if importable; otherwise falls back to a lightweight
segment-splitter driven by the ISA segment's declared separators (element
separator = ISA[1][3], segment terminator = last char of ISA). This
tolerates both LF-padded and tight single-line variants.
Deferred
--------
* Full X12 005010 syntactic validation
* 997 / 999 acknowledgment emission
* Sub-element composites beyond position-0 use
* Multi-ST interchanges (first transaction set is parsed; subsequent are
yielded but share the interchange header)
"""
from __future__ import annotations
import logging
from typing import Iterator, List, Optional
from .common import BaseICCAdapter, IccRevenueLine, ValidationError
logger = logging.getLogger(__name__)
try: # pragma: no cover — optional dependency
import pyx12 # noqa: F401
_HAS_PYX12 = True
except ImportError:
_HAS_PYX12 = False
class EDI810Adapter(BaseICCAdapter):
SOURCE_FORMAT = "edi_810"
_CATEGORY_KEYWORDS = [
("8yy_orig_access", ("8yy", "toll free", "toll-free", "tollfree")),
("intl_settlement", ("international", "intl ", "settlement")),
("special_access", ("special access", "spec access")),
("term_switched_access", ("terminating", "term access", "term switched")),
("orig_switched_access", ("originating", "orig access", "orig switched")),
]
def iter_rows(self, local_path: str) -> Iterator[IccRevenueLine]:
with open(local_path, "r", encoding="latin-1", errors="replace") as fh:
raw = fh.read()
if not raw.strip():
return
segments, elem_sep = self._split_segments(raw)
yield from self._walk(segments, elem_sep)
# ------------------------------------------------------------------
# Envelope splitter — derives separators from ISA header
# ------------------------------------------------------------------
@staticmethod
def _split_segments(raw: str) -> tuple[List[List[str]], str]:
if not raw.startswith("ISA"):
# Some exporters prepend a BOM or whitespace; search
idx = raw.find("ISA")
if idx < 0:
raise ValidationError("bad_envelope", "no ISA header found")
raw = raw[idx:]
if len(raw) < 106:
raise ValidationError("bad_envelope", "truncated ISA header")
# ISA is fixed-width 106 bytes; element separator is byte 3,
# segment terminator is byte 105.
elem_sep = raw[3]
seg_term = raw[105]
# Segment terminator may or may not be followed by \n; strip both
chunks = [s for s in raw.split(seg_term) if s.strip()]
segments = [[e for e in seg.strip("\r\n").split(elem_sep)] for seg in chunks]
return segments, elem_sep
# ------------------------------------------------------------------
# Transaction-set walker
# ------------------------------------------------------------------
def _walk(self, segments: List[List[str]], elem_sep: str) -> Iterator[IccRevenueLine]:
isa_control_num = ""
current_counterparty_name = ""
current_counterparty_id: Optional[str] = None
invoice_number = ""
line_no = 0
category_hint = "transit"
pending_line: Optional[dict] = None
tds_emitted_for_st = False
lineno_counter = 0 # file segment counter for hashing
for seg in segments:
lineno_counter += 1
if not seg:
continue
tag = seg[0]
if tag == "ISA" and len(seg) >= 14:
isa_control_num = seg[13].strip()
elif tag == "ST":
# Reset per-ST state
current_counterparty_name = ""
current_counterparty_id = None
invoice_number = ""
line_no = 0
category_hint = "transit"
pending_line = None
tds_emitted_for_st = False
elif tag == "BIG" and len(seg) >= 3:
invoice_number = seg[2].strip()
elif tag == "N1" and len(seg) >= 2:
qual = seg[1].strip().upper()
if qual in ("IC", "RE", "BT"):
current_counterparty_name = (seg[2].strip() if len(seg) > 2 else "") or current_counterparty_name
if len(seg) > 4:
current_counterparty_id = seg[4].strip() or current_counterparty_id
elif tag in ("PID", "MSG", "DTM"):
payload = " ".join(seg[1:]).lower()
for cat, keywords in self._CATEGORY_KEYWORDS:
if any(k in payload for k in keywords):
category_hint = cat
break
elif tag == "IT1":
line_no += 1
# IT103 = unit price, IT107 = qty, IT109 = UoM
unit_price = seg[3] if len(seg) > 3 else ""
quantity = seg[2] if len(seg) > 2 else ""
uom = seg[4] if len(seg) > 4 else ""
is_minutes = False
# Scan remaining elements for "MG" UoM flag
for e in seg[4:]:
if e.strip().upper() == "MG":
is_minutes = True
break
try:
qty_int = self.parse_int(quantity) if quantity else 0
except ValidationError:
qty_int = 0
try:
extended_cents = self.parse_cents(unit_price) * qty_int if unit_price else 0
except ValidationError:
extended_cents = 0
pending_line = {
"line_no": line_no,
"quantity": qty_int,
"unit_price_raw": unit_price,
"uom": uom,
"is_minutes": is_minutes,
"extended_cents": extended_cents,
}
yield self._build_line(
pending_line,
isa_control_num=isa_control_num,
invoice_number=invoice_number,
counterparty_name=current_counterparty_name,
counterparty_id=current_counterparty_id,
category=category_hint,
file_lineno=lineno_counter,
)
elif tag == "TDS" and len(seg) >= 2 and not tds_emitted_for_st and line_no == 0:
# Invoice-total fallback when no IT1 lines were present
try:
total_cents = self.parse_cents(seg[1])
except ValidationError:
continue
tds_emitted_for_st = True
yield IccRevenueLine(
profile_id=self.profile_id,
reporting_year=self.reporting_year,
icc_category=category_hint,
counterparty_legal_name=current_counterparty_name or "UNKNOWN",
counterparty_ocn=current_counterparty_id,
revenue_cents=total_cents,
minutes_of_use=None,
source_line_no=lineno_counter,
raw_row={
"segment": "TDS",
"isa_control_num": isa_control_num,
"invoice_number": invoice_number,
"total_raw": seg[1],
},
)
def _build_line(
self,
pending: dict,
*,
isa_control_num: str,
invoice_number: str,
counterparty_name: str,
counterparty_id: Optional[str],
category: str,
file_lineno: int,
) -> IccRevenueLine:
return IccRevenueLine(
profile_id=self.profile_id,
reporting_year=self.reporting_year,
icc_category=category,
counterparty_legal_name=counterparty_name or "UNKNOWN",
counterparty_ocn=counterparty_id,
revenue_cents=pending["extended_cents"],
minutes_of_use=pending["quantity"] if pending["is_minutes"] else None,
source_line_no=file_lineno,
raw_row={
"segment": "IT1",
"isa_control_num": isa_control_num,
"invoice_number": invoice_number,
"line_no": pending["line_no"],
"uom": pending["uom"],
"unit_price_raw": pending["unit_price_raw"],
"quantity": pending["quantity"],
},
)