new-site/scripts/workers/cdr_puller.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

237 lines
8.2 KiB
Python

"""
CDR puller — hourly cron.
Walks every `cdr_ingestion_profiles` row with `pull_enabled=TRUE` and
fetches new files since the last run. Dispatches to a switch preset
(if `switch_preset` is set) or a generic transport adapter.
For each file fetched:
1. Compute SHA-256; skip if we already have that exact file on record
(file-level dedup).
2. Stream into MinIO at `cdr-uploads/{customer_id}/raw/{pull|preset}/{ts}_{name}`.
3. Insert a `cdr_ingestion_uploads` row with status=pending.
4. The `cdr_ingester` picks it up.
Credentials come from ERPNext's Sensitive ID doctype, keyed by
`profile.pull_sensitive_id`. We fetch that record via the ERPNext REST
API (existing pattern in scripts.workers.erpnext_client).
Usage:
python -m scripts.workers.cdr_puller # process all enabled
python -m scripts.workers.cdr_puller --profile-id=42 --now
CRON: 0 * * * * python -m scripts.workers.cdr_puller
"""
from __future__ import annotations
import argparse
import hashlib
import io
import logging
import os
import sys
from datetime import datetime
from typing import Optional
import psycopg2
import psycopg2.extras
from scripts.workers.cdr_transports import TRANSPORTS
from scripts.workers.cdr_presets import PRESETS
log = logging.getLogger("cdr_puller")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest")
def _minio():
from minio import Minio
return Minio(
os.environ.get("MINIO_ENDPOINT", "minio:9000"),
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
)
def _load_secrets(sensitive_id: Optional[str]) -> dict:
"""Fetch credentials from ERPNext Sensitive ID."""
if not sensitive_id:
return {}
try:
from scripts.workers.erpnext_client import ERPNextClient
erp = ERPNextClient()
doc = erp.get_resource("Sensitive ID", sensitive_id)
return doc.get("credentials_json", {}) or {
k: doc.get(k) for k in
("password", "private_key", "api_key", "client_secret", "auth_token", "bearer_token")
if doc.get(k)
}
except Exception as exc:
log.error("Could not load Sensitive ID %s: %s", sensitive_id, exc)
return {}
def process_profile(conn, profile: dict) -> dict:
profile_id = profile["id"]
preset_slug = profile.get("switch_preset")
since = profile.get("last_fetched_mtime")
secrets = _load_secrets(profile.get("pull_sensitive_id"))
profile_config = dict(profile.get("preset_config") or {})
# Merge in the non-sensitive profile fields some presets expect
for key in ("host", "port", "username", "remote_dir", "remote_glob",
"api_host", "account_id", "domain", "admin_url"):
val = profile.get(key) or profile_config.get(key)
if val is not None:
profile_config[key] = val
fetched = 0
fetched_bytes = 0
uploads: list[int] = []
try:
if preset_slug:
preset_cls = PRESETS.get(preset_slug)
if not preset_cls:
return {"profile_id": profile_id, "error": f"unknown preset {preset_slug}"}
preset = preset_cls()
files = list(preset.fetch(profile_config, secrets, since))
else:
# Generic transport
transport_slug = profile.get("pull_transport")
transport_cls = TRANSPORTS.get(transport_slug) if transport_slug else None
if not transport_cls:
return {"profile_id": profile_id, "error": "no transport configured"}
transport = transport_cls(
host=profile.get("pull_host") or "",
port=profile.get("pull_port"),
username=profile.get("username"),
password=secrets.get("password"),
private_key=secrets.get("private_key"),
remote_glob=profile.get("pull_remote_glob", "*"),
extra=profile_config,
)
files = []
for remote in transport.list_since(since):
class _FF: # adapt RemoteFile → FetchedFile-like
pass
ff = _FF()
ff.remote_path = remote.path
ff.mtime = remote.mtime
ff.content = transport.fetch(remote.path)
ff.size_bytes = remote.size_bytes
files.append(ff)
except Exception as exc:
log.exception("Pull failed for profile %s: %s", profile_id, exc)
return {"profile_id": profile_id, "error": str(exc)}
newest_mtime = since
minio = _minio()
for f in files:
sha = hashlib.sha256(f.content).hexdigest()
# File-level dedup
with conn.cursor() as cur:
cur.execute(
"SELECT id FROM cdr_ingestion_uploads "
"WHERE profile_id=%s AND raw_sha256=%s",
(profile_id, sha),
)
dup = cur.fetchone()
if dup:
log.info("Profile %s: file %s duplicate of upload %s — skipping",
profile_id, f.remote_path, dup[0])
continue
# Stream to MinIO
basename = os.path.basename(f.remote_path) or f"cdr_{sha[:12]}"
source_tag = "preset" if preset_slug else "pull"
minio_key = (
f"cdr-uploads/{profile['customer_id']}/raw/{source_tag}/"
f"{datetime.utcnow():%Y%m%dT%H%M%SZ}_{basename}"
)
minio.put_object(
MINIO_BUCKET, minio_key, io.BytesIO(f.content), len(f.content),
)
fetched_bytes += len(f.content)
fetched += 1
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO cdr_ingestion_uploads (
profile_id, source, raw_minio_path, raw_sha256, status, summary_json
) VALUES (%s, %s, %s, %s, 'pending', %s::jsonb)
RETURNING id
""",
(
profile_id, source_tag,
f"{MINIO_BUCKET}/{minio_key}", sha,
psycopg2.extras.Json({"bytes": len(f.content),
"remote_path": f.remote_path,
"mtime": f.mtime.isoformat() if f.mtime else None}),
),
)
uploads.append(cur.fetchone()[0])
if f.mtime and (newest_mtime is None or f.mtime > newest_mtime):
newest_mtime = f.mtime
conn.commit()
# Update last_fetched_mtime
with conn.cursor() as cur:
cur.execute(
"UPDATE cdr_ingestion_profiles "
"SET last_fetched_at=NOW(), last_fetched_mtime=%s WHERE id=%s",
(newest_mtime, profile_id),
)
conn.commit()
return {
"profile_id": profile_id, "files_fetched": fetched,
"bytes": fetched_bytes, "upload_ids": uploads,
}
def run_once(profile_id: Optional[int] = None) -> list[dict]:
conn = psycopg2.connect(DATABASE_URL)
results = []
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
if profile_id:
cur.execute(
"SELECT * FROM cdr_ingestion_profiles WHERE id=%s",
(profile_id,),
)
else:
cur.execute(
"SELECT * FROM cdr_ingestion_profiles WHERE pull_enabled=TRUE",
)
profiles = [dict(r) for r in cur.fetchall()]
for profile in profiles:
results.append(process_profile(conn, profile))
finally:
conn.close()
return results
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--profile-id", type=int)
parser.add_argument("--now", action="store_true",
help="Run immediately regardless of cron schedule (no-op; already immediate).")
args = parser.parse_args()
for r in run_once(args.profile_id):
print(r)
if __name__ == "__main__":
main()