""" CDR puller — hourly cron. Walks every `cdr_ingestion_profiles` row with `pull_enabled=TRUE` and fetches new files since the last run. Dispatches to a switch preset (if `switch_preset` is set) or a generic transport adapter. For each file fetched: 1. Compute SHA-256; skip if we already have that exact file on record (file-level dedup). 2. Stream into MinIO at `cdr-uploads/{customer_id}/raw/{pull|preset}/{ts}_{name}`. 3. Insert a `cdr_ingestion_uploads` row with status=pending. 4. The `cdr_ingester` picks it up. Credentials come from ERPNext's Sensitive ID doctype, keyed by `profile.pull_sensitive_id`. We fetch that record via the ERPNext REST API (existing pattern in scripts.workers.erpnext_client). Usage: python -m scripts.workers.cdr_puller # process all enabled python -m scripts.workers.cdr_puller --profile-id=42 --now CRON: 0 * * * * python -m scripts.workers.cdr_puller """ from __future__ import annotations import argparse import hashlib import io import logging import os import sys from datetime import datetime from typing import Optional import psycopg2 import psycopg2.extras from scripts.workers.cdr_transports import TRANSPORTS from scripts.workers.cdr_presets import PRESETS log = logging.getLogger("cdr_puller") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) DATABASE_URL = os.environ.get("DATABASE_URL", "") MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest") def _minio(): from minio import Minio return Minio( os.environ.get("MINIO_ENDPOINT", "minio:9000"), access_key=os.environ.get("MINIO_ACCESS_KEY", ""), secret_key=os.environ.get("MINIO_SECRET_KEY", ""), secure=os.environ.get("MINIO_SECURE", "false").lower() == "true", ) def _load_secrets(sensitive_id: Optional[str]) -> dict: """Fetch credentials from ERPNext Sensitive ID.""" if not sensitive_id: return {} try: from scripts.workers.erpnext_client import ERPNextClient erp = ERPNextClient() doc = erp.get_resource("Sensitive ID", sensitive_id) return doc.get("credentials_json", {}) or { k: doc.get(k) for k in ("password", "private_key", "api_key", "client_secret", "auth_token", "bearer_token") if doc.get(k) } except Exception as exc: log.error("Could not load Sensitive ID %s: %s", sensitive_id, exc) return {} def process_profile(conn, profile: dict) -> dict: profile_id = profile["id"] preset_slug = profile.get("switch_preset") since = profile.get("last_fetched_mtime") secrets = _load_secrets(profile.get("pull_sensitive_id")) profile_config = dict(profile.get("preset_config") or {}) # Merge in the non-sensitive profile fields some presets expect for key in ("host", "port", "username", "remote_dir", "remote_glob", "api_host", "account_id", "domain", "admin_url"): val = profile.get(key) or profile_config.get(key) if val is not None: profile_config[key] = val fetched = 0 fetched_bytes = 0 uploads: list[int] = [] try: if preset_slug: preset_cls = PRESETS.get(preset_slug) if not preset_cls: return {"profile_id": profile_id, "error": f"unknown preset {preset_slug}"} preset = preset_cls() files = list(preset.fetch(profile_config, secrets, since)) else: # Generic transport transport_slug = profile.get("pull_transport") transport_cls = TRANSPORTS.get(transport_slug) if transport_slug else None if not transport_cls: return {"profile_id": profile_id, "error": "no transport configured"} transport = transport_cls( host=profile.get("pull_host") or "", port=profile.get("pull_port"), username=profile.get("username"), password=secrets.get("password"), private_key=secrets.get("private_key"), remote_glob=profile.get("pull_remote_glob", "*"), extra=profile_config, ) files = [] for remote in transport.list_since(since): class _FF: # adapt RemoteFile → FetchedFile-like pass ff = _FF() ff.remote_path = remote.path ff.mtime = remote.mtime ff.content = transport.fetch(remote.path) ff.size_bytes = remote.size_bytes files.append(ff) except Exception as exc: log.exception("Pull failed for profile %s: %s", profile_id, exc) return {"profile_id": profile_id, "error": str(exc)} newest_mtime = since minio = _minio() for f in files: sha = hashlib.sha256(f.content).hexdigest() # File-level dedup with conn.cursor() as cur: cur.execute( "SELECT id FROM cdr_ingestion_uploads " "WHERE profile_id=%s AND raw_sha256=%s", (profile_id, sha), ) dup = cur.fetchone() if dup: log.info("Profile %s: file %s duplicate of upload %s — skipping", profile_id, f.remote_path, dup[0]) continue # Stream to MinIO basename = os.path.basename(f.remote_path) or f"cdr_{sha[:12]}" source_tag = "preset" if preset_slug else "pull" minio_key = ( f"cdr-uploads/{profile['customer_id']}/raw/{source_tag}/" f"{datetime.utcnow():%Y%m%dT%H%M%SZ}_{basename}" ) minio.put_object( MINIO_BUCKET, minio_key, io.BytesIO(f.content), len(f.content), ) fetched_bytes += len(f.content) fetched += 1 with conn.cursor() as cur: cur.execute( """ INSERT INTO cdr_ingestion_uploads ( profile_id, source, raw_minio_path, raw_sha256, status, summary_json ) VALUES (%s, %s, %s, %s, 'pending', %s::jsonb) RETURNING id """, ( profile_id, source_tag, f"{MINIO_BUCKET}/{minio_key}", sha, psycopg2.extras.Json({"bytes": len(f.content), "remote_path": f.remote_path, "mtime": f.mtime.isoformat() if f.mtime else None}), ), ) uploads.append(cur.fetchone()[0]) if f.mtime and (newest_mtime is None or f.mtime > newest_mtime): newest_mtime = f.mtime conn.commit() # Update last_fetched_mtime with conn.cursor() as cur: cur.execute( "UPDATE cdr_ingestion_profiles " "SET last_fetched_at=NOW(), last_fetched_mtime=%s WHERE id=%s", (newest_mtime, profile_id), ) conn.commit() return { "profile_id": profile_id, "files_fetched": fetched, "bytes": fetched_bytes, "upload_ids": uploads, } def run_once(profile_id: Optional[int] = None) -> list[dict]: conn = psycopg2.connect(DATABASE_URL) results = [] try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: if profile_id: cur.execute( "SELECT * FROM cdr_ingestion_profiles WHERE id=%s", (profile_id,), ) else: cur.execute( "SELECT * FROM cdr_ingestion_profiles WHERE pull_enabled=TRUE", ) profiles = [dict(r) for r in cur.fetchall()] for profile in profiles: results.append(process_profile(conn, profile)) finally: conn.close() return results def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--profile-id", type=int) parser.add_argument("--now", action="store_true", help="Run immediately regardless of cron schedule (no-op; already immediate).") args = parser.parse_args() for r in run_once(args.profile_id): print(r) if __name__ == "__main__": main()