new-site/scripts/ollama_client.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

134 lines
3.8 KiB
Python

"""
ollama_client.py — Shared Ollama client for all Performance West monitor scripts.
Handles model warmup, session reuse via keep_alive, and SSH tunneling.
"""
import os
import json
import time
import subprocess
import urllib.request
import logging
from pathlib import Path
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
ON_PROD = os.path.exists("/opt/performancewest")
REMOTE_HOST = "172.18.0.2"
REMOTE_PORT = "11434"
TUNNEL_PORT = "11435"
SSH_TARGET = "justin@207.174.124.70"
SSH_PORT = "22022"
SSH_KEY = str(Path.home() / ".ssh/id_ed25519")
OLLAMA_BASE = f"http://{REMOTE_HOST}:{REMOTE_PORT}" if ON_PROD else f"http://localhost:{TUNNEL_PORT}"
MODEL = "qwen2.5:3b"
TIMEOUT = 300
KEEP_ALIVE = "30m" # Keep model loaded for 30 minutes between calls
_tunnel_proc = None
_warmed_up = False
# ---------------------------------------------------------------------------
# Tunnel management (local dev only)
# ---------------------------------------------------------------------------
def start_tunnel() -> bool:
global _tunnel_proc, OLLAMA_BASE
if ON_PROD:
log.info(f"Production: Ollama at {OLLAMA_BASE}")
return True
# Check if tunnel already open
try:
urllib.request.urlopen(f"{OLLAMA_BASE}/api/tags", timeout=3)
log.info("Ollama tunnel already open")
return True
except Exception:
pass
log.info("Opening SSH tunnel to Ollama...")
_tunnel_proc = subprocess.Popen([
"ssh", "-N", "-L", f"{TUNNEL_PORT}:{REMOTE_HOST}:{REMOTE_PORT}",
"-p", SSH_PORT, "-i", SSH_KEY,
"-o", "StrictHostKeyChecking=no",
"-o", "ExitOnForwardFailure=yes",
SSH_TARGET,
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
for _ in range(15):
time.sleep(1)
try:
urllib.request.urlopen(f"{OLLAMA_BASE}/api/tags", timeout=2)
log.info("Tunnel ready")
return True
except Exception:
pass
log.error("Failed to open tunnel")
return False
def stop_tunnel():
global _tunnel_proc
if _tunnel_proc:
_tunnel_proc.terminate()
_tunnel_proc = None
# ---------------------------------------------------------------------------
# Model warmup
# ---------------------------------------------------------------------------
def warmup():
"""Pre-load the model into GPU/RAM so first real call is fast."""
global _warmed_up
if _warmed_up:
return
log.info(f"Warming up {MODEL}...")
try:
generate("Say OK", system="You are a test.", max_tokens=5)
_warmed_up = True
log.info("Model loaded and warm")
except Exception as e:
log.warning(f"Warmup failed: {e}")
# ---------------------------------------------------------------------------
# Generate
# ---------------------------------------------------------------------------
def generate(prompt: str, system: str = "", max_tokens: int = 200,
temperature: float = 0.6) -> str:
"""
Generate a response from Ollama. Keeps the model loaded between calls
via keep_alive parameter.
"""
payload = json.dumps({
"model": MODEL,
"system": system,
"prompt": prompt,
"stream": False,
"keep_alive": KEEP_ALIVE,
"options": {
"temperature": temperature,
"num_predict": max_tokens,
},
}).encode()
req = urllib.request.Request(
f"{OLLAMA_BASE}/api/generate",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=TIMEOUT) as r:
data = json.loads(r.read())
return data.get("response", "").strip()