From 1594e9ea1fedf02c8263de1f75d7bd499bf8a156 Mon Sep 17 00:00:00 2001 From: mrmarcus007 Date: Fri, 14 Nov 2025 13:25:45 +0100 Subject: [PATCH] Fixed errors, added hop by hop support and status of loaded ollama model. --- ...Proxy for Ollama (Designed for Proxmox).py | 168 ++++++++++++------ 1 file changed, 112 insertions(+), 56 deletions(-) diff --git a/GPU Resource Manager Proxy for Ollama (Designed for Proxmox).py b/GPU Resource Manager Proxy for Ollama (Designed for Proxmox).py index b20a9d7..e4994fb 100644 --- a/GPU Resource Manager Proxy for Ollama (Designed for Proxmox).py +++ b/GPU Resource Manager Proxy for Ollama (Designed for Proxmox).py @@ -9,23 +9,26 @@ import subprocess from datetime import datetime, time as dt_time from urllib.parse import urlparse, parse_qs import logging +import socket # Configuration -OLLAMA_HOST = "localhost" # Your Ollama LXC IP +OLLAMA_HOST = "" # Your Ollama LXC IP OLLAMA_PORT = 11434 # Your Ollama LXC Port -PROXY_PORT = 11435 # Port Of This Proxy -OLLAMA_BASE_URL = f"http://{OLLAMA_HOST}:{OLLAMA_PORT}" # Don't touch unless you know what you are doing. +PROXY_PORT = 11435 # Port Of This Proxy +OLLAMA_BASE_URL = f"http://{OLLAMA_HOST}:{OLLAMA_PORT}" # Fixed formatting # GPU monitoring -GPU_CHECK_INTERVAL = 10 # seconds it wait's to check for other process apart from known/compute processes +GPU_CHECK_INTERVAL = 10 # seconds it waits to check for other process apart from known/compute processes # process process patterns (from your nvidia-smi output) IDLE_NvGPU_PROCESSES = ['t-rex', 'trex', 'miner', 'xmrig', 'lolminer', 'nbminer'] KNOWN_NvGPU_PROCESSES = ['Xorg'] # Processes that are allowed when "idle" compute process is running -IDLE_CONTAINER_ID = "" # running Idle GPU Container ID, example: COMPUTE_CONTAINER_ID ="120" -Blackout_schedule_Start = 2, 15 #when to start stopping the idle NvGPU container. -Blackout_schedule_End = 3, 30 #when to allow starting the idle NvGPU container again. -#----------------------------------------------------------active-code--------------------------------------------------------------# +IDLE_CONTAINER_ID = "" # running Idle GPU Container ID, example: COMPUTE_CONTAINER_ID ="120" +# Maintenance window (dt_time objects) +Blackout_schedule_Start = dt_time(2, 15) # 2:15 AM +Blackout_schedule_End = dt_time(3, 30) # 3:30 AM + +# ----------------------------------------------------------active-code--------------------------------------------------------------# logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', @@ -45,7 +48,7 @@ class GPUResourceManager: self.gpu_processes = [] self.lock = threading.Lock() self.operation_in_progress = False - + def run_command(self, cmd): try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) @@ -70,7 +73,7 @@ class GPUResourceManager: while self.is_container_running(container_id) and waited < max_wait: time.sleep(1) waited += 1 - + if not self.is_container_running(container_id): logging.info(f"Container {container_id} stopped successfully") return True @@ -94,7 +97,7 @@ class GPUResourceManager: while not self.is_container_running(container_id) and waited < max_wait: time.sleep(1) waited += 1 - + if self.is_container_running(container_id): logging.info(f"Container {container_id} started successfully") return True @@ -109,7 +112,9 @@ class GPUResourceManager: def get_gpu_processes(self): try: - output = self.run_command("nvidia-smi --query-compute-apps=pid,process_name,used_memory --format=csv,noheader,nounits") + output = self.run_command( + "nvidia-smi --query-compute-apps=pid,process_name,used_memory --format=csv,noheader,nounits" + ) processes = [] if output: for line in output.split('\n'): @@ -143,11 +148,11 @@ class GPUResourceManager: for process in processes: if not self.is_compute_process(process['name']) and not self.is_known_system_process(process['name']): memory_usage = int(process['memory'].split()[0]) - if memory_usage > 100: # More than 100MB is significant + if memory_usage > 100: #MB non_mining_processes.append(process) - + is_idle = len(non_mining_processes) == 0 - + if is_idle: mining_count = len([p for p in processes if self.is_compute_process(p['name'])]) if mining_count > 0: @@ -156,7 +161,7 @@ class GPUResourceManager: logging.debug("GPU is truly idle (no significant processes)") else: logging.debug(f"GPU is active with non-mining processes: {[p['name'] for p in non_mining_processes]}") - + return is_idle def is_Idle_NvGPU_process_active(self): @@ -172,8 +177,8 @@ class GPUResourceManager: def should_stop_for_schedule(self): now = datetime.now().time() - stop_start = dt_time(Blackout_schedule_Start) # 2:15 AM - stop_end = dt_time(Blackout_schedule_End) # 3:30 AM + stop_start = Blackout_schedule_Start + stop_end = Blackout_schedule_End in_window = stop_start <= now <= stop_end if in_window: logging.debug("Within scheduled maintenance window (2:15am-3:30am)") @@ -182,8 +187,8 @@ class GPUResourceManager: def manage_idle_NvGPU_process(self): with self.lock: if self.operation_in_progress: - return - + return + self.operation_in_progress = True try: current_idle_NvGPU_process_state = self.is_container_running(IDLE_CONTAINER_ID) @@ -195,8 +200,7 @@ class GPUResourceManager: self.stop_container(IDLE_CONTAINER_ID) self.idle_compute_running = False return - - + ollama_still_active = self.is_ollama_still_active() if ollama_still_active: if current_idle_NvGPU_process_state or mining_active_on_gpu: @@ -206,11 +210,10 @@ class GPUResourceManager: self.idle_compute_running = False return else: - if self.ollama_active: self.ollama_active = False logging.info("Ollama activity timeout reached") - + if self.is_gpu_idle(): if not current_idle_NvGPU_process_state and not mining_active_on_gpu: logging.info("GPU idle, starting idle NvGPU container") @@ -247,7 +250,7 @@ class GPUResourceManager: logging.warning("Mining processes still active after container stop") else: logging.debug("idle NvGPU container already stopped, no action needed") - + self.idle_compute_running = False def start_monitoring(self): @@ -258,47 +261,58 @@ class GPUResourceManager: except Exception as e: logging.error(f"Error in monitor loop: {e}") time.sleep(GPU_CHECK_INTERVAL) - + monitor_thread = threading.Thread(target=monitor_loop, daemon=True) monitor_thread.start() logging.info("GPU monitoring thread started") + class OllamaProxyHandler(http.server.SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): self.gpu_manager = kwargs.pop('gpu_manager') super().__init__(*args, **kwargs) - + def do_GET(self): if self._is_gpu_intensive_operation(self.path, {}): self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama() - time.sleep(2) - + time.sleep(2) self._forward_request('GET') - + + def do_HEAD(self): + if self._is_gpu_intensive_operation(self.path, {}): + self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama() + time.sleep(2) + self._forward_request('HEAD') + def do_POST(self): content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length) if content_length > 0 else b'' - + try: - request_data = json.loads(post_data.decode('utf-8')) if post_data else {} + request_data = {} + try: + request_data = json.loads(post_data.decode('utf-8')) if post_data else {} + except Exception: + request_data = {} + is_gpu_intensive = self._is_gpu_intensive_operation(self.path, request_data) - + if is_gpu_intensive: logging.info(f"GPU-intensive operation detected: {self.path}") self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama() time.sleep(3.5) self._forward_request('POST', post_data) - + if is_gpu_intensive: with self.gpu_manager.lock: self.gpu_manager.last_ollama_activity = time.time() logging.info("Ollama request completed, activity timestamp updated") - + except Exception as e: logging.error(f"Error processing request: {e}") self.send_error(500, f"Internal server error: {e}") - + def _is_gpu_intensive_operation(self, path, request_data): if path in ['/api/generate', '/api/chat', '/api/embeddings']: return True @@ -307,35 +321,79 @@ class OllamaProxyHandler(http.server.SimpleHTTPRequestHandler): if path == '/api/pull': return request_data.get('stream', True) return False - + def _forward_request(self, method, data=None): url = f"{OLLAMA_BASE_URL}{self.path}" headers = {key: value for key, value in self.headers.items()} - hop_headers = ['connection', 'keep-alive', 'proxy-authenticate', - 'proxy-authorization', 'te', 'trailers', 'upgrade'] - for header in hop_headers: - headers.pop(header, None) + + hop_headers = { + 'connection', 'keep-alive', 'proxy-authenticate', + 'proxy-authorization', 'te', 'trailers', 'upgrade', + 'transfer-encoding' + } + for header in list(headers.keys()): + if header.lower() in hop_headers: + headers.pop(header, None) + + headers.pop('Host', None) + + timeout = (10, None) try: - if method == 'GET': - response = requests.get(url, headers=headers, timeout=300) + if method.upper() in ('GET', 'HEAD'): + resp = requests.request(method, url, headers=headers, stream=True, timeout=timeout) else: - response = requests.post(url, data=data, headers=headers, timeout=300) + resp = requests.request(method, url, headers=headers, data=data, stream=True, timeout=timeout) - self.send_response(response.status_code) - for key, value in response.headers.items(): - if key.lower() not in ['content-encoding', 'transfer-encoding', 'connection']: + self.send_response(resp.status_code) + + for key, value in resp.headers.items(): + k_lower = key.lower() + if k_lower in hop_headers: + continue + try: self.send_header(key, value) + except Exception: + logging.debug(f"Skipping header {key} due to send_header error") self.end_headers() - self.wfile.write(response.content) - + + try: + for chunk in resp.iter_content(chunk_size=4096): + if not chunk: + continue + try: + self.wfile.write(chunk) + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError, socket.error) as e: + logging.info(f"Client connection closed during streaming: {e}") + break + finally: + resp.close() + except requests.exceptions.RequestException as e: logging.error(f"Error forwarding to Ollama: {e}") - self.send_error(502, f"Bad gateway: {e}") - + try: + if hasattr(self, '_headers_buffer') and getattr(self, 'wfile', None): + try: + err_msg = f"\n\n[proxy error] upstream request failed: {e}\n" + self.wfile.write(err_msg.encode('utf-8')) + self.wfile.flush() + except Exception: + pass + else: + self.send_error(502, f"Bad gateway: {e}") + except Exception: + pass + def log_message(self, format, *args): logging.info(f"{self.address_string()} - {format % args}") + +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + daemon_threads = True + allow_reuse_address = True + + def main(): logging.info("Starting GPU Resource Manager Proxy on Proxmox Host") @@ -345,7 +403,6 @@ def main(): logging.info("pct command available") else: logging.error("pct command not available! Running on wrong system?") - return gpu_processes = manager.get_gpu_processes() logging.info(f"Current GPU processes: {gpu_processes}") @@ -354,18 +411,17 @@ def main(): logging.info(f"idle NvGPU {IDLE_CONTAINER_ID} running: {idle_NvGPU_process_status}") gpu_manager = GPUResourceManager() - gpu_manager.start_monitoring() handler = lambda *args, **kwargs: OllamaProxyHandler(*args, gpu_manager=gpu_manager, **kwargs) - - with socketserver.TCPServer(("", PROXY_PORT), handler) as httpd: + + with ThreadedTCPServer(("", PROXY_PORT), handler) as httpd: logging.info(f"Proxy server running on port {PROXY_PORT}") logging.info(f"Forwarding to Ollama at {OLLAMA_HOST}:{OLLAMA_PORT}") logging.info(f"Managing idle NvGPU process: {IDLE_CONTAINER_ID}") logging.info("Monitoring GPU usage and scheduled maintenance windows") logging.info(f"Mining process patterns: {IDLE_NvGPU_PROCESSES}") - + try: httpd.serve_forever() except KeyboardInterrupt: