r""" Performance West -- Document Conversion Worker (Windows) Polls a MinIO bucket for DOCX files, converts them to PDF using Microsoft Word COM automation, and drops the PDF back into MinIO. No HTTP server, no open ports, no SSH tunnel required. The Windows VM only needs outbound HTTPS access to MinIO. Protocol --------- Input: minio://{bucket}/to-convert/{job_id}.docx Output: minio://{bucket}/converted/{job_id}.pdf Cleanup: deletes the input DOCX after successful conversion The Linux pdf_converter.py polls converted/ until the PDF appears (up to DOCSERVER_TIMEOUT seconds), then downloads and removes it. Heartbeat --------- Every 60 seconds this worker writes a tiny heartbeat object: minio://{bucket}/docserver-heartbeat.json Content: {"status":"ok","word_version":"...","ts":"...","host":"..."} The health_check() in pdf_converter.py reads this to detect if the worker is alive without needing a network round-trip to the VM. Setup ----- 1. Copy this file + requirements_windows.txt to C:\docserver\ on the Windows VM 2. pip install -r C:\docserver\requirements_windows.txt 3. Set the MinIO env vars (see docserver.env or pass via Task Scheduler) 4. Run: python docserver_worker.py Or let install.ps1 register it as a Task Scheduler task Environment variables --------------------- MINIO_ENDPOINT -- MinIO host:port (e.g. minio.performancewest.net or IP:9000) MINIO_PORT -- MinIO port (default 9000) MINIO_ACCESS_KEY -- access key MINIO_SECRET_KEY -- secret key MINIO_BUCKET -- bucket (default: performancewest) MINIO_SECURE -- true/false (default: false for internal; true for external) POLL_INTERVAL -- seconds between polls (default: 12) HEARTBEAT_INTERVAL -- seconds between heartbeats (default: 60) """ from __future__ import annotations import json import logging import os import platform import shutil import socket import sys import tempfile import threading import time import uuid from datetime import datetime, timezone from pathlib import Path LOG = logging.getLogger("docserver_worker") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler( os.path.join(os.getenv("LOG_DIR", r"C:\docserver\logs"), "worker.log"), encoding="utf-8", ), ], ) # ── Configuration ───────────────────────────────────────────────────────────── _ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio.performancewest.net") _PORT = int(os.getenv("MINIO_PORT", "9000")) _ACCESS = os.getenv("MINIO_ACCESS_KEY", "") _SECRET = os.getenv("MINIO_SECRET_KEY", "") _BUCKET = os.getenv("MINIO_BUCKET", "performancewest") _SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true" _PREFIX_IN = "to-convert" # input: DOCX files from Linux _PREFIX_OUT = "converted" # output: PDF files for Linux to pick up _POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "12")) _HEARTBEAT_INTERVAL = int(os.getenv("HEARTBEAT_INTERVAL", "60")) # Word COM constants _WD_FORMAT_PDF = 17 _WD_DO_NOT_SAVE_CHANGES = 0 # ── Word COM singleton ──────────────────────────────────────────────────────── _word_app = None _word_lock = threading.Lock() def _get_word(): """Return the Word COM application, creating it if necessary. Retries up to 3 times with increasing delays to handle DCOM startup latency when running under SYSTEM via Task Scheduler (Session 0 + DCOM RunAs). """ global _word_app if _word_app is not None: try: _ = _word_app.Visible # probe — raises if Word died return _word_app except Exception: LOG.warning("Word COM instance died — restarting...") _word_app = None import win32com.client # type: ignore import pythoncom # type: ignore max_retries = 3 for attempt in range(1, max_retries + 1): try: pythoncom.CoInitialize() _word_app = win32com.client.DispatchEx("Word.Application") if _word_app is None: raise RuntimeError("DispatchEx returned None") _word_app.Visible = False _word_app.DisplayAlerts = False _word_app.AutomationSecurity = 3 # msoAutomationSecurityForceDisable LOG.info("Word COM started — version %s", _word_app.Version) return _word_app except Exception as e: LOG.warning("Word COM init attempt %d/%d failed: %s", attempt, max_retries, e) _word_app = None if attempt < max_retries: delay = attempt * 10 # 10s, 20s LOG.info(" Retrying in %ds...", delay) time.sleep(delay) else: LOG.error("Word COM failed after %d attempts. Is DCOM configured? " "Run fix_dcom.bat as Administrator.", max_retries) raise def _quit_word(): global _word_app if _word_app: try: _word_app.Quit() except Exception: pass _word_app = None def _convert_docx_to_pdf(docx_path: Path, pdf_path: Path) -> bool: """Convert one DOCX to PDF via Word COM. Serialised by _word_lock.""" with _word_lock: word = _get_word() doc = None try: doc = word.Documents.Open( str(docx_path.resolve()), ReadOnly=True, AddToRecentFiles=False, Visible=False, ) doc.SaveAs2(str(pdf_path.resolve()), FileFormat=_WD_FORMAT_PDF) size = pdf_path.stat().st_size if pdf_path.exists() else 0 LOG.info("Converted: %s → %s (%d bytes)", docx_path.name, pdf_path.name, size) return pdf_path.exists() and size > 0 except Exception as exc: LOG.error("Conversion failed for %s: %s", docx_path.name, exc) return False finally: if doc: try: doc.Close(SaveChanges=_WD_DO_NOT_SAVE_CHANGES) except Exception: pass # ── MinIO helpers ───────────────────────────────────────────────────────────── def _mc(): from minio import Minio # type: ignore return Minio( f"{_ENDPOINT}:{_PORT}", access_key=_ACCESS, secret_key=_SECRET, secure=_SECURE, ) def _ensure_bucket(mc) -> None: if not mc.bucket_exists(_BUCKET): mc.make_bucket(_BUCKET) LOG.info("Created bucket: %s", _BUCKET) def _list_pending(mc) -> list[str]: """Return object names under to-convert/ that end in .docx. Ignores .tmp_ prefixed files — those are still being uploaded atomically by the Linux side and are not ready for processing yet. """ try: objects = mc.list_objects(_BUCKET, prefix=f"{_PREFIX_IN}/", recursive=False) return [ obj.object_name for obj in objects if obj.object_name.endswith(".docx") and "/.tmp_" not in obj.object_name ] except Exception as exc: LOG.error("Failed to list pending jobs: %s", exc) return [] # ── Main processing loop ────────────────────────────────────────────────────── def _process_one(mc, in_key: str) -> None: """Download one DOCX from MinIO, convert, upload the PDF, delete the DOCX.""" job_id = Path(in_key).stem # e.g. "abc123" out_key = f"{_PREFIX_OUT}/{job_id}.pdf" # Skip if the PDF is already there (duplicate poll before delete completed) try: mc.stat_object(_BUCKET, out_key) LOG.info("Job %s already converted — skipping", job_id[:8]) return except Exception: pass # expected — PDF doesn't exist yet work_dir = Path(tempfile.mkdtemp(prefix=f"docserver_{job_id[:8]}_")) docx_path = work_dir / f"{job_id}.docx" pdf_path = work_dir / f"{job_id}.pdf" try: # 1. Download DOCX LOG.info("[%s] Downloading %s", job_id[:8], in_key) mc.fget_object(_BUCKET, in_key, str(docx_path)) # 2. Convert LOG.info("[%s] Converting via Word...", job_id[:8]) t0 = time.monotonic() success = _convert_docx_to_pdf(docx_path, pdf_path) elapsed = time.monotonic() - t0 if not success: LOG.error("[%s] Conversion failed — leaving DOCX in to-convert/ for retry", job_id[:8]) return LOG.info("[%s] Converted in %.1fs", job_id[:8], elapsed) # 3. Upload PDF to converted/ — atomic via tmp + rename # Upload to .tmp_ first, then server-side copy to final key. # Linux side polls stat_object(out_key) — it won't see the .tmp_. from minio.commonconfig import CopySource # type: ignore tmp_out = f"{_PREFIX_OUT}/.tmp_{job_id}.pdf" mc.fput_object( _BUCKET, tmp_out, str(pdf_path), content_type="application/pdf", metadata={ "x-amz-meta-job-id": job_id, "x-amz-meta-elapsed": f"{elapsed:.1f}s", }, ) mc.copy_object(_BUCKET, out_key, CopySource(_BUCKET, tmp_out)) mc.remove_object(_BUCKET, tmp_out) LOG.info("[%s] Uploaded PDF → minio://%s/%s (atomic)", job_id[:8], _BUCKET, out_key) # 4. Delete the input DOCX so it doesn't get processed again mc.remove_object(_BUCKET, in_key) LOG.info("[%s] Removed input DOCX from to-convert/", job_id[:8]) except Exception as exc: LOG.error("[%s] Unexpected error processing %s: %s", job_id[:8], in_key, exc) finally: shutil.rmtree(work_dir, ignore_errors=True) def _heartbeat_loop(word_version: str) -> None: """Write a heartbeat object to MinIO every HEARTBEAT_INTERVAL seconds.""" mc = _mc() hostname = socket.gethostname() while True: try: payload = json.dumps({ "status": "ok", "word_version": word_version, "host": hostname, "ts": datetime.now(timezone.utc).isoformat(), }).encode() mc.put_object( _BUCKET, "docserver-heartbeat.json", __import__("io").BytesIO(payload), length=len(payload), content_type="application/json", ) except Exception as exc: LOG.warning("Heartbeat write failed: %s", exc) time.sleep(_HEARTBEAT_INTERVAL) def main() -> None: LOG.info("Performance West Document Conversion Worker starting...") LOG.info(" Python: %s", sys.version.split()[0]) LOG.info(" Platform: %s", platform.platform()) LOG.info(" MinIO: %s:%d / bucket=%s", _ENDPOINT, _PORT, _BUCKET) # Log session/user info for debugging COM issues try: import getpass LOG.info(" User: %s", getpass.getuser()) import ctypes session_id = ctypes.windll.kernel32.WTSGetActiveConsoleSessionId() LOG.info(" Session: %d (console)", session_id) except Exception: pass if not _ACCESS or not _SECRET: LOG.error("MINIO_ACCESS_KEY / MINIO_SECRET_KEY not set -- cannot start") sys.exit(1) # Verify Word is available before accepting work LOG.info("Initialising Word COM...") try: with _word_lock: word = _get_word() word_version = word.Version LOG.info("Word %s ready", word_version) except Exception as exc: LOG.error("Word COM failed to initialise: %s", exc) LOG.error("Fix: run fix_dcom.bat as Administrator, then reboot.") LOG.error("Or RDP in to create an interactive session, then the AtLogOn task will fire.") sys.exit(1) # Verify MinIO connectivity LOG.info("Connecting to MinIO...") try: mc = _mc() _ensure_bucket(mc) LOG.info("MinIO connected — bucket '%s' ready", _BUCKET) except Exception as exc: LOG.error("MinIO connection failed: %s", exc) sys.exit(1) # Start heartbeat background thread hb = threading.Thread(target=_heartbeat_loop, args=(word_version,), daemon=True) hb.start() LOG.info("Heartbeat thread started (interval=%ds)", _HEARTBEAT_INTERVAL) LOG.info("Polling to-convert/ every %ds — waiting for jobs...", _POLL_INTERVAL) try: while True: pending = _list_pending(mc) if pending: LOG.info("Found %d pending job(s)", len(pending)) for key in pending: _process_one(mc, key) time.sleep(_POLL_INTERVAL) except KeyboardInterrupt: LOG.info("Shutting down...") finally: _quit_word() LOG.info("Worker stopped.") if __name__ == "__main__": # Ensure log directory exists log_dir = Path(os.getenv("LOG_DIR", r"C:\docserver\logs")) log_dir.mkdir(parents=True, exist_ok=True) main()