Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
237 lines
8.2 KiB
Python
237 lines
8.2 KiB
Python
"""
|
|
CDR puller — hourly cron.
|
|
|
|
Walks every `cdr_ingestion_profiles` row with `pull_enabled=TRUE` and
|
|
fetches new files since the last run. Dispatches to a switch preset
|
|
(if `switch_preset` is set) or a generic transport adapter.
|
|
|
|
For each file fetched:
|
|
1. Compute SHA-256; skip if we already have that exact file on record
|
|
(file-level dedup).
|
|
2. Stream into MinIO at `cdr-uploads/{customer_id}/raw/{pull|preset}/{ts}_{name}`.
|
|
3. Insert a `cdr_ingestion_uploads` row with status=pending.
|
|
4. The `cdr_ingester` picks it up.
|
|
|
|
Credentials come from ERPNext's Sensitive ID doctype, keyed by
|
|
`profile.pull_sensitive_id`. We fetch that record via the ERPNext REST
|
|
API (existing pattern in scripts.workers.erpnext_client).
|
|
|
|
Usage:
|
|
python -m scripts.workers.cdr_puller # process all enabled
|
|
python -m scripts.workers.cdr_puller --profile-id=42 --now
|
|
CRON: 0 * * * * python -m scripts.workers.cdr_puller
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
from scripts.workers.cdr_transports import TRANSPORTS
|
|
from scripts.workers.cdr_presets import PRESETS
|
|
|
|
log = logging.getLogger("cdr_puller")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest")
|
|
|
|
|
|
def _minio():
|
|
from minio import Minio
|
|
return Minio(
|
|
os.environ.get("MINIO_ENDPOINT", "minio:9000"),
|
|
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
|
|
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
|
|
)
|
|
|
|
|
|
def _load_secrets(sensitive_id: Optional[str]) -> dict:
|
|
"""Fetch credentials from ERPNext Sensitive ID."""
|
|
if not sensitive_id:
|
|
return {}
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
erp = ERPNextClient()
|
|
doc = erp.get_resource("Sensitive ID", sensitive_id)
|
|
return doc.get("credentials_json", {}) or {
|
|
k: doc.get(k) for k in
|
|
("password", "private_key", "api_key", "client_secret", "auth_token", "bearer_token")
|
|
if doc.get(k)
|
|
}
|
|
except Exception as exc:
|
|
log.error("Could not load Sensitive ID %s: %s", sensitive_id, exc)
|
|
return {}
|
|
|
|
|
|
def process_profile(conn, profile: dict) -> dict:
|
|
profile_id = profile["id"]
|
|
preset_slug = profile.get("switch_preset")
|
|
since = profile.get("last_fetched_mtime")
|
|
|
|
secrets = _load_secrets(profile.get("pull_sensitive_id"))
|
|
profile_config = dict(profile.get("preset_config") or {})
|
|
# Merge in the non-sensitive profile fields some presets expect
|
|
for key in ("host", "port", "username", "remote_dir", "remote_glob",
|
|
"api_host", "account_id", "domain", "admin_url"):
|
|
val = profile.get(key) or profile_config.get(key)
|
|
if val is not None:
|
|
profile_config[key] = val
|
|
|
|
fetched = 0
|
|
fetched_bytes = 0
|
|
uploads: list[int] = []
|
|
|
|
try:
|
|
if preset_slug:
|
|
preset_cls = PRESETS.get(preset_slug)
|
|
if not preset_cls:
|
|
return {"profile_id": profile_id, "error": f"unknown preset {preset_slug}"}
|
|
preset = preset_cls()
|
|
files = list(preset.fetch(profile_config, secrets, since))
|
|
else:
|
|
# Generic transport
|
|
transport_slug = profile.get("pull_transport")
|
|
transport_cls = TRANSPORTS.get(transport_slug) if transport_slug else None
|
|
if not transport_cls:
|
|
return {"profile_id": profile_id, "error": "no transport configured"}
|
|
transport = transport_cls(
|
|
host=profile.get("pull_host") or "",
|
|
port=profile.get("pull_port"),
|
|
username=profile.get("username"),
|
|
password=secrets.get("password"),
|
|
private_key=secrets.get("private_key"),
|
|
remote_glob=profile.get("pull_remote_glob", "*"),
|
|
extra=profile_config,
|
|
)
|
|
files = []
|
|
for remote in transport.list_since(since):
|
|
class _FF: # adapt RemoteFile → FetchedFile-like
|
|
pass
|
|
ff = _FF()
|
|
ff.remote_path = remote.path
|
|
ff.mtime = remote.mtime
|
|
ff.content = transport.fetch(remote.path)
|
|
ff.size_bytes = remote.size_bytes
|
|
files.append(ff)
|
|
|
|
except Exception as exc:
|
|
log.exception("Pull failed for profile %s: %s", profile_id, exc)
|
|
return {"profile_id": profile_id, "error": str(exc)}
|
|
|
|
newest_mtime = since
|
|
minio = _minio()
|
|
for f in files:
|
|
sha = hashlib.sha256(f.content).hexdigest()
|
|
|
|
# File-level dedup
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT id FROM cdr_ingestion_uploads "
|
|
"WHERE profile_id=%s AND raw_sha256=%s",
|
|
(profile_id, sha),
|
|
)
|
|
dup = cur.fetchone()
|
|
if dup:
|
|
log.info("Profile %s: file %s duplicate of upload %s — skipping",
|
|
profile_id, f.remote_path, dup[0])
|
|
continue
|
|
|
|
# Stream to MinIO
|
|
basename = os.path.basename(f.remote_path) or f"cdr_{sha[:12]}"
|
|
source_tag = "preset" if preset_slug else "pull"
|
|
minio_key = (
|
|
f"cdr-uploads/{profile['customer_id']}/raw/{source_tag}/"
|
|
f"{datetime.utcnow():%Y%m%dT%H%M%SZ}_{basename}"
|
|
)
|
|
minio.put_object(
|
|
MINIO_BUCKET, minio_key, io.BytesIO(f.content), len(f.content),
|
|
)
|
|
fetched_bytes += len(f.content)
|
|
fetched += 1
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO cdr_ingestion_uploads (
|
|
profile_id, source, raw_minio_path, raw_sha256, status, summary_json
|
|
) VALUES (%s, %s, %s, %s, 'pending', %s::jsonb)
|
|
RETURNING id
|
|
""",
|
|
(
|
|
profile_id, source_tag,
|
|
f"{MINIO_BUCKET}/{minio_key}", sha,
|
|
psycopg2.extras.Json({"bytes": len(f.content),
|
|
"remote_path": f.remote_path,
|
|
"mtime": f.mtime.isoformat() if f.mtime else None}),
|
|
),
|
|
)
|
|
uploads.append(cur.fetchone()[0])
|
|
|
|
if f.mtime and (newest_mtime is None or f.mtime > newest_mtime):
|
|
newest_mtime = f.mtime
|
|
conn.commit()
|
|
|
|
# Update last_fetched_mtime
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE cdr_ingestion_profiles "
|
|
"SET last_fetched_at=NOW(), last_fetched_mtime=%s WHERE id=%s",
|
|
(newest_mtime, profile_id),
|
|
)
|
|
conn.commit()
|
|
|
|
return {
|
|
"profile_id": profile_id, "files_fetched": fetched,
|
|
"bytes": fetched_bytes, "upload_ids": uploads,
|
|
}
|
|
|
|
|
|
def run_once(profile_id: Optional[int] = None) -> list[dict]:
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
results = []
|
|
try:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
if profile_id:
|
|
cur.execute(
|
|
"SELECT * FROM cdr_ingestion_profiles WHERE id=%s",
|
|
(profile_id,),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"SELECT * FROM cdr_ingestion_profiles WHERE pull_enabled=TRUE",
|
|
)
|
|
profiles = [dict(r) for r in cur.fetchall()]
|
|
for profile in profiles:
|
|
results.append(process_profile(conn, profile))
|
|
finally:
|
|
conn.close()
|
|
return results
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--profile-id", type=int)
|
|
parser.add_argument("--now", action="store_true",
|
|
help="Run immediately regardless of cron schedule (no-op; already immediate).")
|
|
args = parser.parse_args()
|
|
for r in run_once(args.profile_id):
|
|
print(r)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|