new-site/api/src/routes/icc.ts
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

270 lines
10 KiB
TypeScript

/**
* Inter-Carrier Compensation (ICC) revenue import API.
*
* Customer-facing endpoints for uploading carrier invoice files (CABS BOS,
* EDI 810, iconectiv 8YY, international settlement, wholesale SIP CSV) +
* reading back the parsed revenue summary. Parsing is done asynchronously
* by scripts/workers/icc_ingester.py polling `icc_ingestion_uploads
* WHERE status='pending'`.
*
* Mirrors the CDR ingestion pattern (migration 050) — pre-signed MinIO PUT,
* ingester worker parses, rows land in icc_revenue_lines, deduped by
* natural_key_hash.
*/
import { Router } from "express";
import type { Request, Response } from "express";
import { randomBytes, createHash } from "crypto";
import { pool } from "../db.js";
const router = Router();
const WORKER_URL = process.env.WORKER_URL || "http://workers:8090";
/** Ask the worker for a presigned MinIO PUT URL. Returns null on failure. */
async function presignPut(key: string, expires = 24 * 3600): Promise<string | null> {
try {
const r = await fetch(`${WORKER_URL}/jobs/presign`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ key, expires, method: "PUT" }),
});
if (!r.ok) return null;
const data = (await r.json()) as { url?: string };
return data.url || null;
} catch {
return null;
}
}
// ── Helpers ─────────────────────────────────────────────────────────────
async function loadProfile(profileId: number) {
const r = await pool.query(
`SELECT * FROM cdr_ingestion_profiles WHERE id = $1`,
[profileId],
);
return r.rows[0] || null;
}
function detectFormatByExtension(fileName: string): string | null {
const f = fileName.toLowerCase();
if (f.endsWith(".bos") || f.endsWith(".bos.gz")) return "cabs_bos";
if (f.endsWith(".810") || f.endsWith(".edi") || f.endsWith(".x12")) return "edi_810";
if (f.endsWith(".qry") || (f.endsWith(".xml") && f.includes("8yy"))) return "8yy_qry";
if (f.endsWith(".tas")) return "itu_tas";
if (f.endsWith(".icss")) return "icss";
if (f.endsWith(".csv")) return "wholesale_sip_csv";
if (f.endsWith(".pdf")) return "carrier_invoice_pdf";
return null;
}
// ── POST /api/v1/icc/upload-token ──────────────────────────────────────
//
// Returns a presigned MinIO PUT URL + token. The portal PUTs the file
// directly to MinIO, then the ingester picks it up from
// `icc_ingestion_uploads WHERE status='pending'`.
router.post("/api/v1/icc/upload-token", async (req: Request, res: Response) => {
const { profile_id, file_name, source_format } = req.body ?? {};
if (!profile_id || !file_name) {
res.status(400).json({ error: "profile_id and file_name required" });
return;
}
const profile = await loadProfile(Number(profile_id));
if (!profile) { res.status(404).json({ error: "profile not found" }); return; }
const detected = source_format || detectFormatByExtension(String(file_name));
if (!detected) {
res.status(400).json({
error: "unable to detect source format; supply source_format explicitly",
});
return;
}
const token = randomBytes(16).toString("hex");
const safeName = String(file_name).replace(/[^A-Za-z0-9._-]/g, "_");
const minioKey =
`icc-uploads/${profile.customer_id}/raw/` +
`${new Date().toISOString().replace(/[:.]/g, "")}_${token}_${safeName}`;
// Placeholder sha256 until the ingester reads the uploaded file; uniqueness
// constraint enforces dedup then.
const placeholder = createHash("sha256")
.update(`${profile.id}_${token}_${safeName}`).digest("hex");
const insert = await pool.query(
`INSERT INTO icc_ingestion_uploads
(profile_id, customer_id, source_format, raw_minio_path, raw_sha256,
status, summary_json)
VALUES ($1, $2, $3, $4, $5, 'pending', $6::jsonb)
RETURNING id`,
[profile.id, profile.customer_id, detected, minioKey, placeholder,
JSON.stringify({ token, file_name: safeName, submitted_at: new Date().toISOString() })],
);
const minioPutUrl = await presignPut(minioKey, 24 * 3600);
res.status(201).json({
upload_id: insert.rows[0].id,
token,
minio_key: minioKey,
minio_put_url: minioPutUrl,
source_format: detected,
expires_in_seconds: 24 * 3600,
});
});
// ── GET /api/v1/icc/profile/:id/uploads ────────────────────────────────
//
// List uploads for a profile with parse status + row counts.
router.get(
"/api/v1/icc/profile/:profile_id/uploads",
async (req: Request, res: Response) => {
const profileId = Number(req.params.profile_id);
if (!Number.isFinite(profileId)) {
res.status(400).json({ error: "bad profile_id" }); return;
}
const r = await pool.query(
`SELECT id, source_format, status, rows_accepted, rows_rejected,
error_message, created_at, parsed_at, summary_json
FROM icc_ingestion_uploads
WHERE profile_id = $1
ORDER BY created_at DESC
LIMIT 100`,
[profileId],
);
res.json({ uploads: r.rows });
},
);
// ── GET /api/v1/icc/profile/:id/summary?year=YYYY ──────────────────────
//
// Aggregate parsed revenue lines by icc_category for the given reporting
// year, and map to the corresponding Form 499-A lines via
// icc_499a_line_mapping. Used by RevenueStep.astro to pre-fill Lines 404,
// 404.1, 404.3, and 418.
router.get(
"/api/v1/icc/profile/:profile_id/summary",
async (req: Request, res: Response) => {
const profileId = Number(req.params.profile_id);
const year = Number(req.query.year) || new Date().getUTCFullYear() - 1;
if (!Number.isFinite(profileId)) {
res.status(400).json({ error: "bad profile_id" }); return;
}
const r = await pool.query(
`SELECT icc.icc_category,
m.form_499a_line,
m.jurisdiction_split,
SUM(icc.revenue_cents)::bigint AS revenue_cents,
COALESCE(SUM(icc.minutes_of_use), 0)::bigint AS minutes_of_use,
COUNT(*)::int AS line_count
FROM icc_revenue_lines icc
JOIN icc_499a_line_mapping m ON m.icc_category = icc.icc_category
WHERE icc.profile_id = $1
AND icc.reporting_year = $2
GROUP BY icc.icc_category, m.form_499a_line, m.jurisdiction_split
ORDER BY revenue_cents DESC`,
[profileId, year],
);
// Aggregate by 499-A line for the "pre-fill Line 404 with $X" summary
const byLine: Record<string, { revenue_cents: number; minutes: number }> = {};
for (const row of r.rows) {
const line = row.form_499a_line;
byLine[line] ||= { revenue_cents: 0, minutes: 0 };
byLine[line].revenue_cents += Number(row.revenue_cents);
byLine[line].minutes += Number(row.minutes_of_use);
}
res.json({
reporting_year: year,
categories: r.rows.map((row) => ({
icc_category: row.icc_category,
form_499a_line: row.form_499a_line,
jurisdiction_split: row.jurisdiction_split,
revenue_cents: Number(row.revenue_cents),
minutes_of_use: Number(row.minutes_of_use),
line_count: row.line_count,
})),
by_form_line: byLine,
grand_total_cents:
r.rows.reduce((acc, row) => acc + Number(row.revenue_cents), 0),
});
},
);
// ── POST /api/v1/icc/profile/:id/reparse/:upload_id ────────────────────
//
// Admin-only: re-run the adapter on an already-uploaded file (e.g.,
// adapter bug was fixed). Flips the upload back to pending; the ingester
// picks it up on next poll. Authorization is the admin header used by
// the rest of the admin endpoints.
router.post(
"/api/v1/icc/profile/:profile_id/reparse/:upload_id",
async (req: Request, res: Response) => {
const adminToken = (req.headers["x-admin-token"] || "").toString();
if (!process.env.ADMIN_API_TOKEN || adminToken !== process.env.ADMIN_API_TOKEN) {
res.status(403).json({ error: "admin token required" }); return;
}
const profileId = Number(req.params.profile_id);
const uploadId = Number(req.params.upload_id);
// Delete already-parsed rows for this upload so reparse is idempotent
await pool.query(
`DELETE FROM icc_revenue_lines WHERE source_upload_id = $1`,
[uploadId],
);
const r = await pool.query(
`UPDATE icc_ingestion_uploads
SET status = 'pending',
error_message = NULL,
rows_accepted = 0,
rows_rejected = 0,
parsed_at = NULL
WHERE id = $1 AND profile_id = $2
RETURNING id`,
[uploadId, profileId],
);
if (r.rows.length === 0) {
res.status(404).json({ error: "upload not found" }); return;
}
res.json({ ok: true, upload_id: uploadId, status: "pending" });
},
);
// ── GET /api/v1/icc/profile/:id/revenue-lines ──────────────────────────
//
// Paginated list of parsed ICC revenue lines. Used by the admin panel to
// audit individual invoice rows.
router.get(
"/api/v1/icc/profile/:profile_id/revenue-lines",
async (req: Request, res: Response) => {
const profileId = Number(req.params.profile_id);
const year = Number(req.query.year) || new Date().getUTCFullYear() - 1;
const category = (req.query.icc_category as string) || null;
const limit = Math.min(Number(req.query.limit) || 100, 500);
const offset = Math.max(Number(req.query.offset) || 0, 0);
const conditions = ["profile_id = $1", "reporting_year = $2"];
const params: (number | string)[] = [profileId, year];
if (category) {
conditions.push(`icc_category = $${params.length + 1}`);
params.push(category);
}
params.push(limit, offset);
const r = await pool.query(
`SELECT id, reporting_quarter, icc_category, counterparty_legal_name,
counterparty_ocn, counterparty_country, revenue_cents,
minutes_of_use, source_upload_id, source_line_no, created_at
FROM icc_revenue_lines
WHERE ${conditions.join(" AND ")}
ORDER BY created_at DESC, id DESC
LIMIT $${params.length - 1} OFFSET $${params.length}`,
params,
);
res.json({ lines: r.rows, limit, offset });
},
);
export default router;