new-site/docserver/docserver_worker.py
justin 7929413eeb docserver: survive MinIO outages instead of exiting
The worker called sys.exit(1) on any MinIO connection error, so a single
transient 502 from MinIO/its reverse proxy left it dead until a manual restart
or reboot (its scheduled task only runs at system startup). It had been dead
~5 weeks after a 502 on May 9.

- _connect_minio_forever(): retry the initial MinIO connect indefinitely with
  capped exponential backoff (5s..120s) instead of exiting.
- main loop: wrap each poll cycle; on any error, log + rebuild the client and
  keep polling rather than crashing.

Verified on the box: normal DOCX->PDF still works (~11s e2e); a bogus endpoint
now retries forever without ever calling sys.exit (was the exact May-9 failure).
2026-06-15 22:40:27 -05:00

398 lines
14 KiB
Python

r"""
Performance West -- Document Conversion Worker (Windows)
Polls a MinIO bucket for DOCX files, converts them to PDF using
Microsoft Word COM automation, and drops the PDF back into MinIO.
No HTTP server, no open ports, no SSH tunnel required.
The Windows VM only needs outbound HTTPS access to MinIO.
Protocol
---------
Input: minio://{bucket}/to-convert/{job_id}.docx
Output: minio://{bucket}/converted/{job_id}.pdf
Cleanup: deletes the input DOCX after successful conversion
The Linux pdf_converter.py polls converted/ until the PDF appears
(up to DOCSERVER_TIMEOUT seconds), then downloads and removes it.
Heartbeat
---------
Every 60 seconds this worker writes a tiny heartbeat object:
minio://{bucket}/docserver-heartbeat.json
Content: {"status":"ok","word_version":"...","ts":"...","host":"..."}
The health_check() in pdf_converter.py reads this to detect if the
worker is alive without needing a network round-trip to the VM.
Setup
-----
1. Copy this file + requirements_windows.txt to C:\docserver\ on the Windows VM
2. pip install -r C:\docserver\requirements_windows.txt
3. Set the MinIO env vars (see docserver.env or pass via Task Scheduler)
4. Run: python docserver_worker.py
Or let install.ps1 register it as a Task Scheduler task
Environment variables
---------------------
MINIO_ENDPOINT -- MinIO host:port (e.g. minio.performancewest.net or IP:9000)
MINIO_PORT -- MinIO port (default 9000)
MINIO_ACCESS_KEY -- access key
MINIO_SECRET_KEY -- secret key
MINIO_BUCKET -- bucket (default: performancewest)
MINIO_SECURE -- true/false (default: false for internal; true for external)
POLL_INTERVAL -- seconds between polls (default: 12)
HEARTBEAT_INTERVAL -- seconds between heartbeats (default: 60)
"""
from __future__ import annotations
import json
import logging
import os
import platform
import shutil
import socket
import sys
import tempfile
import threading
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
LOG = logging.getLogger("docserver_worker")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(
os.path.join(os.getenv("LOG_DIR", r"C:\docserver\logs"), "worker.log"),
encoding="utf-8",
),
],
)
# ── Configuration ─────────────────────────────────────────────────────────────
_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio.performancewest.net")
_PORT = int(os.getenv("MINIO_PORT", "9000"))
_ACCESS = os.getenv("MINIO_ACCESS_KEY", "")
_SECRET = os.getenv("MINIO_SECRET_KEY", "")
_BUCKET = os.getenv("MINIO_BUCKET", "performancewest")
_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
_PREFIX_IN = "to-convert" # input: DOCX files from Linux
_PREFIX_OUT = "converted" # output: PDF files for Linux to pick up
_POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "12"))
_HEARTBEAT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "60"))
# Word COM constants
_WD_FORMAT_PDF = 17
_WD_DO_NOT_SAVE_CHANGES = 0
# ── Word COM singleton ────────────────────────────────────────────────────────
_word_app = None
_word_lock = threading.Lock()
def _get_word():
"""Return the Word COM application, creating it if necessary.
Retries up to 3 times with increasing delays to handle DCOM startup latency
when running under SYSTEM via Task Scheduler (Session 0 + DCOM RunAs).
"""
global _word_app
if _word_app is not None:
try:
_ = _word_app.Visible # probe — raises if Word died
return _word_app
except Exception:
LOG.warning("Word COM instance died — restarting...")
_word_app = None
import win32com.client # type: ignore
import pythoncom # type: ignore
max_retries = 3
for attempt in range(1, max_retries + 1):
try:
pythoncom.CoInitialize()
_word_app = win32com.client.DispatchEx("Word.Application")
if _word_app is None:
raise RuntimeError("DispatchEx returned None")
_word_app.Visible = False
_word_app.DisplayAlerts = False
_word_app.AutomationSecurity = 3 # msoAutomationSecurityForceDisable
LOG.info("Word COM started — version %s", _word_app.Version)
return _word_app
except Exception as e:
LOG.warning("Word COM init attempt %d/%d failed: %s", attempt, max_retries, e)
_word_app = None
if attempt < max_retries:
delay = attempt * 10 # 10s, 20s
LOG.info(" Retrying in %ds...", delay)
time.sleep(delay)
else:
LOG.error("Word COM failed after %d attempts. Is DCOM configured? "
"Run fix_dcom.bat as Administrator.", max_retries)
raise
def _quit_word():
global _word_app
if _word_app:
try:
_word_app.Quit()
except Exception:
pass
_word_app = None
def _convert_docx_to_pdf(docx_path: Path, pdf_path: Path) -> bool:
"""Convert one DOCX to PDF via Word COM. Serialised by _word_lock."""
with _word_lock:
word = _get_word()
doc = None
try:
doc = word.Documents.Open(
str(docx_path.resolve()),
ReadOnly=True,
AddToRecentFiles=False,
Visible=False,
)
doc.SaveAs2(str(pdf_path.resolve()), FileFormat=_WD_FORMAT_PDF)
size = pdf_path.stat().st_size if pdf_path.exists() else 0
LOG.info("Converted: %s%s (%d bytes)", docx_path.name, pdf_path.name, size)
return pdf_path.exists() and size > 0
except Exception as exc:
LOG.error("Conversion failed for %s: %s", docx_path.name, exc)
return False
finally:
if doc:
try:
doc.Close(SaveChanges=_WD_DO_NOT_SAVE_CHANGES)
except Exception:
pass
# ── MinIO helpers ─────────────────────────────────────────────────────────────
def _mc():
from minio import Minio # type: ignore
return Minio(
f"{_ENDPOINT}:{_PORT}",
access_key=_ACCESS,
secret_key=_SECRET,
secure=_SECURE,
)
def _ensure_bucket(mc) -> None:
if not mc.bucket_exists(_BUCKET):
mc.make_bucket(_BUCKET)
LOG.info("Created bucket: %s", _BUCKET)
def _connect_minio_forever():
"""Build a MinIO client and verify the bucket, retrying forever with capped
exponential backoff. Returns a working client once MinIO is reachable.
The worker used to ``sys.exit(1)`` on a connection error, so a single
transient 502 from MinIO (or its reverse proxy) left it dead until a reboot.
"""
delay = 5
attempt = 0
while True:
attempt += 1
try:
mc = _mc()
_ensure_bucket(mc)
LOG.info("MinIO connected — bucket '%s' ready", _BUCKET)
return mc
except Exception as exc:
LOG.error("MinIO connection failed (attempt %d): %s; retrying in %ds",
attempt, exc, delay)
time.sleep(delay)
delay = min(delay * 2, 120) # 5,10,20,40,80,120,120...
def _list_pending(mc) -> list[str]:
"""Return object names under to-convert/ that end in .docx.
Ignores .tmp_ prefixed files — those are still being uploaded atomically
by the Linux side and are not ready for processing yet.
"""
try:
objects = mc.list_objects(_BUCKET, prefix=f"{_PREFIX_IN}/", recursive=False)
return [
obj.object_name
for obj in objects
if obj.object_name.endswith(".docx")
and "/.tmp_" not in obj.object_name
]
except Exception as exc:
LOG.error("Failed to list pending jobs: %s", exc)
return []
# ── Main processing loop ──────────────────────────────────────────────────────
def _process_one(mc, in_key: str) -> None:
"""Download one DOCX from MinIO, convert, upload the PDF, delete the DOCX."""
job_id = Path(in_key).stem # e.g. "abc123"
out_key = f"{_PREFIX_OUT}/{job_id}.pdf"
# Skip if the PDF is already there (duplicate poll before delete completed)
try:
mc.stat_object(_BUCKET, out_key)
LOG.info("Job %s already converted — skipping", job_id[:8])
return
except Exception:
pass # expected — PDF doesn't exist yet
work_dir = Path(tempfile.mkdtemp(prefix=f"docserver_{job_id[:8]}_"))
docx_path = work_dir / f"{job_id}.docx"
pdf_path = work_dir / f"{job_id}.pdf"
try:
# 1. Download DOCX
LOG.info("[%s] Downloading %s", job_id[:8], in_key)
mc.fget_object(_BUCKET, in_key, str(docx_path))
# 2. Convert
LOG.info("[%s] Converting via Word...", job_id[:8])
t0 = time.monotonic()
success = _convert_docx_to_pdf(docx_path, pdf_path)
elapsed = time.monotonic() - t0
if not success:
LOG.error("[%s] Conversion failed — leaving DOCX in to-convert/ for retry", job_id[:8])
return
LOG.info("[%s] Converted in %.1fs", job_id[:8], elapsed)
# 3. Upload PDF to converted/ — atomic via tmp + rename
# Upload to .tmp_ first, then server-side copy to final key.
# Linux side polls stat_object(out_key) — it won't see the .tmp_.
from minio.commonconfig import CopySource # type: ignore
tmp_out = f"{_PREFIX_OUT}/.tmp_{job_id}.pdf"
mc.fput_object(
_BUCKET, tmp_out, str(pdf_path),
content_type="application/pdf",
metadata={
"x-amz-meta-job-id": job_id,
"x-amz-meta-elapsed": f"{elapsed:.1f}s",
},
)
mc.copy_object(_BUCKET, out_key, CopySource(_BUCKET, tmp_out))
mc.remove_object(_BUCKET, tmp_out)
LOG.info("[%s] Uploaded PDF → minio://%s/%s (atomic)", job_id[:8], _BUCKET, out_key)
# 4. Delete the input DOCX so it doesn't get processed again
mc.remove_object(_BUCKET, in_key)
LOG.info("[%s] Removed input DOCX from to-convert/", job_id[:8])
except Exception as exc:
LOG.error("[%s] Unexpected error processing %s: %s", job_id[:8], in_key, exc)
finally:
shutil.rmtree(work_dir, ignore_errors=True)
def _heartbeat_loop(word_version: str) -> None:
"""Write a heartbeat object to MinIO every HEARTBEAT_INTERVAL seconds."""
mc = _mc()
hostname = socket.gethostname()
while True:
try:
payload = json.dumps({
"status": "ok",
"word_version": word_version,
"host": hostname,
"ts": datetime.now(timezone.utc).isoformat(),
}).encode()
mc.put_object(
_BUCKET,
"docserver-heartbeat.json",
__import__("io").BytesIO(payload),
length=len(payload),
content_type="application/json",
)
except Exception as exc:
LOG.warning("Heartbeat write failed: %s", exc)
time.sleep(_HEARTBEAT_INTERVAL)
def main() -> None:
LOG.info("Performance West Document Conversion Worker starting...")
LOG.info(" Python: %s", sys.version.split()[0])
LOG.info(" Platform: %s", platform.platform())
LOG.info(" MinIO: %s:%d / bucket=%s", _ENDPOINT, _PORT, _BUCKET)
# Log session/user info for debugging COM issues
try:
import getpass
LOG.info(" User: %s", getpass.getuser())
import ctypes
session_id = ctypes.windll.kernel32.WTSGetActiveConsoleSessionId()
LOG.info(" Session: %d (console)", session_id)
except Exception:
pass
if not _ACCESS or not _SECRET:
LOG.error("MINIO_ACCESS_KEY / MINIO_SECRET_KEY not set -- cannot start")
sys.exit(1)
# Verify Word is available before accepting work
LOG.info("Initialising Word COM...")
try:
with _word_lock:
word = _get_word()
word_version = word.Version
LOG.info("Word %s ready", word_version)
except Exception as exc:
LOG.error("Word COM failed to initialise: %s", exc)
LOG.error("Fix: run fix_dcom.bat as Administrator, then reboot.")
LOG.error("Or RDP in to create an interactive session, then the AtLogOn task will fire.")
sys.exit(1)
# Connect to MinIO, retrying indefinitely. MinIO (or the nginx vhost in
# front of it) can return transient 502s / be briefly unreachable; the
# worker must wait it out rather than exit, otherwise it stays dead until a
# reboot or a manual restart.
mc = _connect_minio_forever()
# Start heartbeat background thread
hb = threading.Thread(target=_heartbeat_loop, args=(word_version,), daemon=True)
hb.start()
LOG.info("Heartbeat thread started (interval=%ds)", _HEARTBEAT_INTERVAL)
LOG.info("Polling to-convert/ every %ds — waiting for jobs...", _POLL_INTERVAL)
try:
while True:
try:
pending = _list_pending(mc)
if pending:
LOG.info("Found %d pending job(s)", len(pending))
for key in pending:
_process_one(mc, key)
except Exception as exc:
# Never let a transient MinIO/network error kill the loop.
# Rebuild the client and keep going after a short pause.
LOG.error("Poll cycle failed (%s); reconnecting to MinIO...", exc)
mc = _connect_minio_forever()
time.sleep(_POLL_INTERVAL)
except KeyboardInterrupt:
LOG.info("Shutting down...")
finally:
_quit_word()
LOG.info("Worker stopped.")
if __name__ == "__main__":
# Ensure log directory exists
log_dir = Path(os.getenv("LOG_DIR", r"C:\docserver\logs"))
log_dir.mkdir(parents=True, exist_ok=True)
main()