Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
121 lines
4 KiB
Python
121 lines
4 KiB
Python
"""
|
|
CDR retention sweeper — nightly.
|
|
|
|
Enforces the 5-year retention window on included-quota profiles and
|
|
handles lapsed-plan deletion grace periods on tier customers.
|
|
|
|
Scope:
|
|
* For each profile, compute the retention cutoff (5 years from today,
|
|
or storage_plan-specific if enterprise).
|
|
* Delete `cdr_calls` rows older than the cutoff.
|
|
* Delete the corresponding MinIO objects referenced by
|
|
`cdr_ingestion_uploads.raw_minio_path` when every call from that
|
|
upload is outside the window AND the upload itself is older than the
|
|
retention window.
|
|
* `cdr_traffic_studies` rows are NOT deleted — the summary metadata
|
|
is retained forever as an audit artifact.
|
|
|
|
Usage:
|
|
python -m scripts.workers.cdr_retention_sweeper
|
|
python -m scripts.workers.cdr_retention_sweeper --dry-run
|
|
CRON: 0 3 * * * python -m scripts.workers.cdr_retention_sweeper
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
log = logging.getLogger("cdr_retention_sweeper")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
|
MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest")
|
|
RETENTION_DAYS_INCLUDED = 5 * 365
|
|
|
|
|
|
def _minio():
|
|
from minio import Minio
|
|
return Minio(
|
|
os.environ.get("MINIO_ENDPOINT", "minio:9000"),
|
|
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
|
|
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
|
|
)
|
|
|
|
|
|
def run(dry_run: bool = False) -> dict:
|
|
if not DATABASE_URL:
|
|
log.error("DATABASE_URL not set")
|
|
return {"error": "no DATABASE_URL"}
|
|
|
|
cutoff = datetime.utcnow() - timedelta(days=RETENTION_DAYS_INCLUDED)
|
|
deleted_calls = 0
|
|
deleted_objects = 0
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
try:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
# Uploads outside the retention window
|
|
cur.execute(
|
|
"SELECT id, profile_id, raw_minio_path "
|
|
"FROM cdr_ingestion_uploads "
|
|
"WHERE created_at < %s",
|
|
(cutoff,),
|
|
)
|
|
old_uploads = list(cur.fetchall())
|
|
|
|
if not old_uploads:
|
|
log.info("retention: no uploads older than %s; nothing to sweep", cutoff)
|
|
return {"deleted_calls": 0, "deleted_objects": 0}
|
|
|
|
minio = _minio()
|
|
for up in old_uploads:
|
|
upload_id = up["id"]
|
|
raw_path = up["raw_minio_path"]
|
|
|
|
if dry_run:
|
|
log.info("[DRY RUN] would delete upload %s + calls + object %s",
|
|
upload_id, raw_path)
|
|
continue
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute("DELETE FROM cdr_calls WHERE upload_id=%s", (upload_id,))
|
|
deleted_calls += cur.rowcount
|
|
cur.execute("DELETE FROM cdr_quarantine WHERE upload_id=%s", (upload_id,))
|
|
cur.execute("DELETE FROM cdr_ingestion_uploads WHERE id=%s", (upload_id,))
|
|
conn.commit()
|
|
|
|
try:
|
|
key = raw_path.split(f"{MINIO_BUCKET}/", 1)[-1]
|
|
minio.remove_object(MINIO_BUCKET, key)
|
|
deleted_objects += 1
|
|
except Exception as exc:
|
|
log.warning("retention: could not delete MinIO object %s: %s",
|
|
raw_path, exc)
|
|
finally:
|
|
conn.close()
|
|
|
|
log.info("retention sweep: deleted %s calls, %s MinIO objects",
|
|
deleted_calls, deleted_objects)
|
|
return {"deleted_calls": deleted_calls, "deleted_objects": deleted_objects}
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
print(run(dry_run=args.dry_run))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|