new-site/scripts/workers/cdr_retention_sweeper.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

121 lines
4 KiB
Python

"""
CDR retention sweeper — nightly.
Enforces the 5-year retention window on included-quota profiles and
handles lapsed-plan deletion grace periods on tier customers.
Scope:
* For each profile, compute the retention cutoff (5 years from today,
or storage_plan-specific if enterprise).
* Delete `cdr_calls` rows older than the cutoff.
* Delete the corresponding MinIO objects referenced by
`cdr_ingestion_uploads.raw_minio_path` when every call from that
upload is outside the window AND the upload itself is older than the
retention window.
* `cdr_traffic_studies` rows are NOT deleted — the summary metadata
is retained forever as an audit artifact.
Usage:
python -m scripts.workers.cdr_retention_sweeper
python -m scripts.workers.cdr_retention_sweeper --dry-run
CRON: 0 3 * * * python -m scripts.workers.cdr_retention_sweeper
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
from datetime import datetime, timedelta
import psycopg2
import psycopg2.extras
log = logging.getLogger("cdr_retention_sweeper")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
DATABASE_URL = os.environ.get("DATABASE_URL", "")
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest")
RETENTION_DAYS_INCLUDED = 5 * 365
def _minio():
from minio import Minio
return Minio(
os.environ.get("MINIO_ENDPOINT", "minio:9000"),
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
)
def run(dry_run: bool = False) -> dict:
if not DATABASE_URL:
log.error("DATABASE_URL not set")
return {"error": "no DATABASE_URL"}
cutoff = datetime.utcnow() - timedelta(days=RETENTION_DAYS_INCLUDED)
deleted_calls = 0
deleted_objects = 0
conn = psycopg2.connect(DATABASE_URL)
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
# Uploads outside the retention window
cur.execute(
"SELECT id, profile_id, raw_minio_path "
"FROM cdr_ingestion_uploads "
"WHERE created_at < %s",
(cutoff,),
)
old_uploads = list(cur.fetchall())
if not old_uploads:
log.info("retention: no uploads older than %s; nothing to sweep", cutoff)
return {"deleted_calls": 0, "deleted_objects": 0}
minio = _minio()
for up in old_uploads:
upload_id = up["id"]
raw_path = up["raw_minio_path"]
if dry_run:
log.info("[DRY RUN] would delete upload %s + calls + object %s",
upload_id, raw_path)
continue
with conn.cursor() as cur:
cur.execute("DELETE FROM cdr_calls WHERE upload_id=%s", (upload_id,))
deleted_calls += cur.rowcount
cur.execute("DELETE FROM cdr_quarantine WHERE upload_id=%s", (upload_id,))
cur.execute("DELETE FROM cdr_ingestion_uploads WHERE id=%s", (upload_id,))
conn.commit()
try:
key = raw_path.split(f"{MINIO_BUCKET}/", 1)[-1]
minio.remove_object(MINIO_BUCKET, key)
deleted_objects += 1
except Exception as exc:
log.warning("retention: could not delete MinIO object %s: %s",
raw_path, exc)
finally:
conn.close()
log.info("retention sweep: deleted %s calls, %s MinIO objects",
deleted_calls, deleted_objects)
return {"deleted_calls": deleted_calls, "deleted_objects": deleted_objects}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
print(run(dry_run=args.dry_run))
if __name__ == "__main__":
main()