new-site/scripts/workers/icc_adapters/wholesale_sip_csv.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

221 lines
8.2 KiB
Python

"""Wholesale SIP CSV adapter (Sangoma, Bandwidth, Flowroute).
Vendor wholesale-SIP invoices arrive as CSVs with per-vendor column
layouts. This adapter sniffs the header row and dispatches to a
vendor-specific column map, normalizing into ``IccRevenueLine`` records
tagged ``icc_category='wholesale_sip'``.
Supported vendors
-----------------
* **Sangoma** (``SIPStation`` / ``Sangoma Wholesale``):
``invoice_number, account_id, billing_period, mou, revenue,
vendor_legal_name``
* **Bandwidth.com**:
``invoice, accountId, billingPeriod, totalMinutes, totalAmount``
* **Flowroute**:
``Invoice Number, Account, Period, Minutes, Total``
Generic fallback
----------------
If the header doesn't match any known vendor but contains the canonical
columns ``invoice_number, account_id, billing_period, mou, revenue`` the
adapter still parses the file. Vendor name then falls back to a
``Vendor`` / ``vendor`` / ``CarrierName`` column if present, else
``"UNKNOWN"``.
Deferred
--------
* Taxed-line reconciliation (separate tax columns are summed into
``revenue_cents`` only if the header implies "amount" rather than
"subtotal"; reviewer should confirm per invoice)
* Multi-currency (assumes USD; non-USD rows raise ValidationError)
"""
from __future__ import annotations
import csv
import logging
from typing import Iterator, Optional
from .common import BaseICCAdapter, IccRevenueLine, ValidationError
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Vendor column maps. Each entry: signature columns → canonical map.
# ---------------------------------------------------------------------------
_VENDOR_MAPS = [
{
"name": "sangoma",
"signature": {"invoice_number", "account_id", "billing_period", "mou", "revenue"},
"map": {
"invoice_number": "invoice_number",
"account_id": "account_id",
"billing_period": "billing_period",
"mou": "mou",
"revenue": "revenue",
"vendor": "vendor_legal_name",
},
"default_vendor": "Sangoma",
},
{
"name": "bandwidth",
"signature": {"invoice", "accountid", "billingperiod", "totalminutes", "totalamount"},
"map": {
"invoice_number": "invoice",
"account_id": "accountId",
"billing_period": "billingPeriod",
"mou": "totalMinutes",
"revenue": "totalAmount",
},
"default_vendor": "Bandwidth.com",
},
{
"name": "flowroute",
"signature": {"invoice number", "account", "period", "minutes", "total"},
"map": {
"invoice_number": "Invoice Number",
"account_id": "Account",
"billing_period": "Period",
"mou": "Minutes",
"revenue": "Total",
},
"default_vendor": "Flowroute",
},
]
class WholesaleSIPCSVAdapter(BaseICCAdapter):
SOURCE_FORMAT = "wholesale_sip_csv"
@staticmethod
def _period_to_quarter(period: str) -> Optional[int]:
"""Extract a quarter index from a billing-period free-text field.
Supports ``YYYY-MM``, ``YYYYMM``, ``MM/YYYY``, ``Mon YYYY``.
Returns None if unparseable.
"""
if not period:
return None
p = period.strip()
month: Optional[int] = None
if len(p) >= 7 and p[4] in "-/." and p[:4].isdigit():
try:
month = int(p[5:7])
except ValueError:
month = None
elif len(p) >= 6 and p[:6].isdigit():
try:
month = int(p[4:6])
except ValueError:
month = None
elif "/" in p:
parts = p.split("/")
try:
month = int(parts[0])
except ValueError:
month = None
else:
months = {
"jan": 1, "feb": 2, "mar": 3, "apr": 4, "may": 5, "jun": 6,
"jul": 7, "aug": 8, "sep": 9, "oct": 10, "nov": 11, "dec": 12,
}
token = p[:3].lower()
month = months.get(token)
if month is None:
return None
if 1 <= month <= 3:
return 1
if 4 <= month <= 6:
return 2
if 7 <= month <= 9:
return 3
if 10 <= month <= 12:
return 4
return None
def _detect_vendor(self, headers: list[str]) -> tuple[dict, str]:
"""Return (vendor_map_entry, default_vendor_name).
Raises ``ValidationError('bad_header')`` when no vendor matches.
"""
normalized = {h.strip().lower() for h in headers if h}
for entry in _VENDOR_MAPS:
if entry["signature"].issubset(normalized):
return entry, entry["default_vendor"]
raise ValidationError(
"bad_header",
f"wholesale_sip_csv header doesn't match any vendor: {sorted(normalized)}",
)
def iter_rows(self, local_path: str) -> Iterator[IccRevenueLine]:
with open(local_path, "r", encoding="utf-8-sig", errors="replace", newline="") as fh:
reader = csv.DictReader(fh)
if not reader.fieldnames:
return
entry, default_vendor = self._detect_vendor(reader.fieldnames)
# Build case-insensitive header → raw-name lookup
header_lookup = {h.strip().lower(): h for h in reader.fieldnames if h}
colmap = {
canon: header_lookup.get(raw.strip().lower(), raw)
for canon, raw in entry["map"].items()
}
# Vendor column may be None when the vendor is fixed
vendor_col = colmap.get("vendor")
for i, raw in enumerate(reader, start=2):
# Detect non-USD rows if a currency column is present
for candidate in ("currency", "Currency", "CCY"):
if candidate in raw and raw[candidate] and raw[candidate].strip().upper() not in ("USD", ""):
raise ValidationError(
"non_usd",
f"row {i} currency={raw[candidate]!r}, only USD supported",
)
billing_period = (raw.get(colmap["billing_period"]) or "").strip()
quarter = self._period_to_quarter(billing_period)
vendor = default_vendor
if vendor_col and raw.get(vendor_col):
vendor = raw[vendor_col].strip() or default_vendor
else:
# Generic fallback columns
for k in ("Vendor", "vendor", "CarrierName", "carrier_name"):
if raw.get(k):
vendor = raw[k].strip() or default_vendor
break
account_id = (raw.get(colmap["account_id"]) or "").strip()
invoice_number = (raw.get(colmap["invoice_number"]) or "").strip()
mou_raw = raw.get(colmap["mou"])
rev_raw = raw.get(colmap["revenue"])
try:
mou = self.parse_int(mou_raw) if mou_raw else None
except ValidationError:
mou = None
revenue_cents = self.parse_cents(rev_raw)
yield IccRevenueLine(
profile_id=self.profile_id,
reporting_year=self.reporting_year,
reporting_quarter=quarter,
icc_category="wholesale_sip",
counterparty_legal_name=vendor,
counterparty_ocn=None,
counterparty_country="US",
revenue_cents=revenue_cents,
minutes_of_use=mou,
source_line_no=i,
raw_row={
"vendor": vendor,
"vendor_detected": entry["name"],
"invoice_number": invoice_number,
"account_id": account_id,
"billing_period": billing_period,
"mou_raw": mou_raw,
"revenue_raw": rev_raw,
},
)