6 Commits

Author SHA1 Message Date
mrmarcus007 17565331c7 Fix indent 2025-11-14 16:25:29 +01:00
mrmarcus007 7bf61ca44f Clean up blank lines in GPU resource manager script
Removed unnecessary blank lines in the GPU resource manager script.
2025-11-14 16:24:43 +01:00
mrmarcus007 40067ea9c2 Fix indentation 2025-11-14 16:23:59 +01:00
mrmarcus007 8c5b2cd2fe Removed unused imports and cleaned up code for size. 2025-11-14 16:20:36 +01:00
mrmarcus007 146d8456fa Commit 2025-11-14 16:07:24 +01:00
mrmarcus007 2fbc1ffa6f added support to unload ollama model when client requests. 2025-11-14 16:05:15 +01:00
@@ -7,27 +7,21 @@ import time
import threading import threading
import subprocess import subprocess
from datetime import datetime, time as dt_time from datetime import datetime, time as dt_time
from urllib.parse import urlparse, parse_qs
import logging import logging
import socket import socket
# ----------------------------------------------------------Host-config--------------------------------------------------------------#
# Configuration
OLLAMA_HOST = "" # Your Ollama LXC IP OLLAMA_HOST = "" # Your Ollama LXC IP
OLLAMA_PORT = 11434 # Your Ollama LXC Port OLLAMA_PORT = 11434 # Your Ollama LXC Port
PROXY_PORT = 11435 # Port Of This Proxy PROXY_PORT = 11435 # Port Of This Proxy
OLLAMA_BASE_URL = f"http://{OLLAMA_HOST}:{OLLAMA_PORT}" # Fixed formatting OLLAMA_BASE_URL = f"http://{OLLAMA_HOST}:{OLLAMA_PORT}"
# GPU monitoring # GPU monitoring
GPU_CHECK_INTERVAL = 10 # seconds it waits to check for other process apart from known/compute processes GPU_CHECK_INTERVAL = 10 # seconds it waits to check for other process apart from known/compute processes
# process process patterns (from your nvidia-smi output) # process process patterns (from your nvidia-smi output)
IDLE_NvGPU_PROCESSES = ['t-rex', 'trex', 'miner', 'xmrig', 'lolminer', 'nbminer'] IDLE_NvGPU_PROCESSES = ['t-rex', 'trex', 'miner', 'xmrig', 'lolminer', 'nbminer']
KNOWN_NvGPU_PROCESSES = ['Xorg'] # Processes that are allowed when "idle" compute process is running KNOWN_NvGPU_PROCESSES = ['Xorg'] # Processes that are allowed when "idle" compute process is running
IDLE_CONTAINER_ID = "" # running Idle GPU Container ID, example: COMPUTE_CONTAINER_ID ="120" IDLE_CONTAINER_ID = "" # running Idle GPU Container ID, example: COMPUTE_CONTAINER_ID ="120"
# Maintenance window (dt_time objects)
Blackout_schedule_Start = dt_time(2, 15) # 2:15 AM Blackout_schedule_Start = dt_time(2, 15) # 2:15 AM
Blackout_schedule_End = dt_time(3, 30) # 3:30 AM Blackout_schedule_End = dt_time(3, 30) # 3:30 AM
# ----------------------------------------------------------active-code--------------------------------------------------------------# # ----------------------------------------------------------active-code--------------------------------------------------------------#
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -37,7 +31,6 @@ logging.basicConfig(
logging.StreamHandler() logging.StreamHandler()
] ]
) )
class GPUResourceManager: class GPUResourceManager:
def __init__(self): def __init__(self):
self.idle_compute_running = False self.idle_compute_running = False
@@ -48,7 +41,6 @@ class GPUResourceManager:
self.gpu_processes = [] self.gpu_processes = []
self.lock = threading.Lock() self.lock = threading.Lock()
self.operation_in_progress = False self.operation_in_progress = False
def run_command(self, cmd): def run_command(self, cmd):
try: try:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True) result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
@@ -56,13 +48,11 @@ class GPUResourceManager:
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
logging.error(f"Command failed: {cmd}, error: {e.stderr}") logging.error(f"Command failed: {cmd}, error: {e.stderr}")
return None return None
def is_container_running(self, container_id): def is_container_running(self, container_id):
output = self.run_command(f"pct list | grep \"^{container_id}\"") output = self.run_command(f"pct list | grep \"^{container_id}\"")
if output and "running" in output: if output and "running" in output:
return True return True
return False return False
def stop_container(self, container_id): def stop_container(self, container_id):
if self.is_container_running(container_id): if self.is_container_running(container_id):
logging.info(f"Stopping container {container_id}") logging.info(f"Stopping container {container_id}")
@@ -73,7 +63,6 @@ class GPUResourceManager:
while self.is_container_running(container_id) and waited < max_wait: while self.is_container_running(container_id) and waited < max_wait:
time.sleep(1) time.sleep(1)
waited += 1 waited += 1
if not self.is_container_running(container_id): if not self.is_container_running(container_id):
logging.info(f"Container {container_id} stopped successfully") logging.info(f"Container {container_id} stopped successfully")
return True return True
@@ -86,7 +75,6 @@ class GPUResourceManager:
logging.debug(f"Container {container_id} already stopped, no action needed") logging.debug(f"Container {container_id} already stopped, no action needed")
return True return True
return False return False
def start_container(self, container_id): def start_container(self, container_id):
if not self.is_container_running(container_id): if not self.is_container_running(container_id):
logging.info(f"Starting container {container_id}") logging.info(f"Starting container {container_id}")
@@ -97,7 +85,6 @@ class GPUResourceManager:
while not self.is_container_running(container_id) and waited < max_wait: while not self.is_container_running(container_id) and waited < max_wait:
time.sleep(1) time.sleep(1)
waited += 1 waited += 1
if self.is_container_running(container_id): if self.is_container_running(container_id):
logging.info(f"Container {container_id} started successfully") logging.info(f"Container {container_id} started successfully")
return True return True
@@ -109,7 +96,6 @@ class GPUResourceManager:
logging.debug(f"Container {container_id} already running, no action needed") logging.debug(f"Container {container_id} already running, no action needed")
return True return True
return False return False
def get_gpu_processes(self): def get_gpu_processes(self):
try: try:
output = self.run_command( output = self.run_command(
@@ -130,29 +116,23 @@ class GPUResourceManager:
except Exception as e: except Exception as e:
logging.error(f"Error getting GPU processes: {e}") logging.error(f"Error getting GPU processes: {e}")
return [] return []
def is_compute_process(self, process_name): def is_compute_process(self, process_name):
process_lower = process_name.lower() process_lower = process_name.lower()
for pattern in IDLE_NvGPU_PROCESSES: for pattern in IDLE_NvGPU_PROCESSES:
if pattern in process_lower: if pattern in process_lower:
return True return True
return False return False
def is_known_system_process(self, process_name): def is_known_system_process(self, process_name):
return any(sys_proc in process_name for sys_proc in KNOWN_NvGPU_PROCESSES) return any(sys_proc in process_name for sys_proc in KNOWN_NvGPU_PROCESSES)
def is_gpu_idle(self): def is_gpu_idle(self):
processes = self.get_gpu_processes() processes = self.get_gpu_processes()
non_mining_processes = [] non_mining_processes = []
for process in processes: for process in processes:
if not self.is_compute_process(process['name']) and not self.is_known_system_process(process['name']): if not self.is_compute_process(process['name']) and not self.is_known_system_process(process['name']):
memory_usage = int(process['memory'].split()[0]) memory_usage = int(process['memory'].split()[0])
if memory_usage > 100: #MB if memory_usage > 100: #MB
non_mining_processes.append(process) non_mining_processes.append(process)
is_idle = len(non_mining_processes) == 0 is_idle = len(non_mining_processes) == 0
if is_idle: if is_idle:
mining_count = len([p for p in processes if self.is_compute_process(p['name'])]) mining_count = len([p for p in processes if self.is_compute_process(p['name'])])
if mining_count > 0: if mining_count > 0:
@@ -161,20 +141,16 @@ class GPUResourceManager:
logging.debug("GPU is truly idle (no significant processes)") logging.debug("GPU is truly idle (no significant processes)")
else: else:
logging.debug(f"GPU is active with non-mining processes: {[p['name'] for p in non_mining_processes]}") logging.debug(f"GPU is active with non-mining processes: {[p['name'] for p in non_mining_processes]}")
return is_idle return is_idle
def is_Idle_NvGPU_process_active(self): def is_Idle_NvGPU_process_active(self):
processes = self.get_gpu_processes() processes = self.get_gpu_processes()
mining_processes = [p for p in processes if self.is_compute_process(p['name'])] mining_processes = [p for p in processes if self.is_compute_process(p['name'])]
return len(mining_processes) > 0 return len(mining_processes) > 0
def is_ollama_still_active(self): def is_ollama_still_active(self):
if not self.ollama_active: if not self.ollama_active:
return False return False
time_since_last_activity = time.time() - self.last_ollama_activity time_since_last_activity = time.time() - self.last_ollama_activity
return time_since_last_activity < self.ollama_activity_timeout return time_since_last_activity < self.ollama_activity_timeout
def should_stop_for_schedule(self): def should_stop_for_schedule(self):
now = datetime.now().time() now = datetime.now().time()
stop_start = Blackout_schedule_Start stop_start = Blackout_schedule_Start
@@ -183,12 +159,10 @@ class GPUResourceManager:
if in_window: if in_window:
logging.debug("Within scheduled maintenance window (2:15am-3:30am)") logging.debug("Within scheduled maintenance window (2:15am-3:30am)")
return in_window return in_window
def manage_idle_NvGPU_process(self): def manage_idle_NvGPU_process(self):
with self.lock: with self.lock:
if self.operation_in_progress: if self.operation_in_progress:
return return
self.operation_in_progress = True self.operation_in_progress = True
try: try:
current_idle_NvGPU_process_state = self.is_container_running(IDLE_CONTAINER_ID) current_idle_NvGPU_process_state = self.is_container_running(IDLE_CONTAINER_ID)
@@ -213,7 +187,6 @@ class GPUResourceManager:
if self.ollama_active: if self.ollama_active:
self.ollama_active = False self.ollama_active = False
logging.info("Ollama activity timeout reached") logging.info("Ollama activity timeout reached")
if self.is_gpu_idle(): if self.is_gpu_idle():
if not current_idle_NvGPU_process_state and not mining_active_on_gpu: if not current_idle_NvGPU_process_state and not mining_active_on_gpu:
logging.info("GPU idle, starting idle NvGPU container") logging.info("GPU idle, starting idle NvGPU container")
@@ -229,7 +202,6 @@ class GPUResourceManager:
self.idle_compute_running = False self.idle_compute_running = False
finally: finally:
self.operation_in_progress = False self.operation_in_progress = False
def force_stop_idle_NvGPU_process_for_ollama(self): def force_stop_idle_NvGPU_process_for_ollama(self):
with self.lock: with self.lock:
self.ollama_active = True self.ollama_active = True
@@ -252,7 +224,6 @@ class GPUResourceManager:
logging.debug("idle NvGPU container already stopped, no action needed") logging.debug("idle NvGPU container already stopped, no action needed")
self.idle_compute_running = False self.idle_compute_running = False
def start_monitoring(self): def start_monitoring(self):
def monitor_loop(): def monitor_loop():
while True: while True:
@@ -261,59 +232,52 @@ class GPUResourceManager:
except Exception as e: except Exception as e:
logging.error(f"Error in monitor loop: {e}") logging.error(f"Error in monitor loop: {e}")
time.sleep(GPU_CHECK_INTERVAL) time.sleep(GPU_CHECK_INTERVAL)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True) monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start() monitor_thread.start()
logging.info("GPU monitoring thread started") logging.info("GPU monitoring thread started")
class OllamaProxyHandler(http.server.SimpleHTTPRequestHandler): class OllamaProxyHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.gpu_manager = kwargs.pop('gpu_manager') self.gpu_manager = kwargs.pop('gpu_manager')
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def do_GET(self): def do_GET(self):
if self._is_gpu_intensive_operation(self.path, {}): if self._is_gpu_intensive_operation(self.path, {}):
self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama() self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama()
time.sleep(2) time.sleep(2)
self._forward_request('GET') self._forward_request('GET')
def do_HEAD(self): def do_HEAD(self):
if self._is_gpu_intensive_operation(self.path, {}): if self._is_gpu_intensive_operation(self.path, {}):
self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama() self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama()
time.sleep(2) time.sleep(2)
self._forward_request('HEAD') self._forward_request('HEAD')
def do_POST(self): def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0)) content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length) if content_length > 0 else b'' post_data = self.rfile.read(content_length) if content_length > 0 else b''
try: try:
request_data = {} request_data = {}
try: try:
request_data = json.loads(post_data.decode('utf-8')) if post_data else {} request_data = json.loads(post_data.decode('utf-8')) if post_data else {}
except Exception: except Exception:
request_data = {} request_data = {}
is_gpu_intensive = self._is_gpu_intensive_operation(self.path, request_data) is_gpu_intensive = self._is_gpu_intensive_operation(self.path, request_data)
if is_gpu_intensive: if is_gpu_intensive:
logging.info(f"GPU-intensive operation detected: {self.path}") logging.info(f"GPU-intensive operation detected: {self.path}")
self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama() self.gpu_manager.force_stop_idle_NvGPU_process_for_ollama()
time.sleep(3.5) time.sleep(3.5)
self._forward_request('POST', post_data) self._forward_request('POST', post_data)
if is_gpu_intensive: if is_gpu_intensive:
with self.gpu_manager.lock: with self.gpu_manager.lock:
self.gpu_manager.last_ollama_activity = time.time() self.gpu_manager.last_ollama_activity = time.time()
logging.info("Ollama request completed, activity timestamp updated") logging.info("Ollama request completed, activity timestamp updated")
except Exception as e: except Exception as e:
logging.error(f"Error processing request: {e}") logging.error(f"Error processing request: {e}")
self.send_error(500, f"Internal server error: {e}") self.send_error(500, f"Internal server error: {e}")
def _is_gpu_intensive_operation(self, path, request_data): def _is_gpu_intensive_operation(self, path, request_data):
if path == '/api/generate' and request_data.get('keep_alive') == 0:
logging.debug("Detected model unload request via /api/generate")
return False
if path == '/api/chat' and request_data.get('keep_alive') == 0 and request_data.get('messages') == []:
logging.debug("Detected model unload request via /api/chat")
return False
if path in ['/api/generate', '/api/chat', '/api/embeddings']: if path in ['/api/generate', '/api/chat', '/api/embeddings']:
return True return True
if path == '/api/load': if path == '/api/load':
@@ -321,11 +285,9 @@ class OllamaProxyHandler(http.server.SimpleHTTPRequestHandler):
if path == '/api/pull': if path == '/api/pull':
return request_data.get('stream', True) return request_data.get('stream', True)
return False return False
def _forward_request(self, method, data=None): def _forward_request(self, method, data=None):
url = f"{OLLAMA_BASE_URL}{self.path}" url = f"{OLLAMA_BASE_URL}{self.path}"
headers = {key: value for key, value in self.headers.items()} headers = {key: value for key, value in self.headers.items()}
hop_headers = { hop_headers = {
'connection', 'keep-alive', 'proxy-authenticate', 'connection', 'keep-alive', 'proxy-authenticate',
'proxy-authorization', 'te', 'trailers', 'upgrade', 'proxy-authorization', 'te', 'trailers', 'upgrade',
@@ -334,19 +296,14 @@ class OllamaProxyHandler(http.server.SimpleHTTPRequestHandler):
for header in list(headers.keys()): for header in list(headers.keys()):
if header.lower() in hop_headers: if header.lower() in hop_headers:
headers.pop(header, None) headers.pop(header, None)
headers.pop('Host', None) headers.pop('Host', None)
timeout = (10, None) timeout = (10, None)
try: try:
if method.upper() in ('GET', 'HEAD'): if method.upper() in ('GET', 'HEAD'):
resp = requests.request(method, url, headers=headers, stream=True, timeout=timeout) resp = requests.request(method, url, headers=headers, stream=True, timeout=timeout)
else: else:
resp = requests.request(method, url, headers=headers, data=data, stream=True, timeout=timeout) resp = requests.request(method, url, headers=headers, data=data, stream=True, timeout=timeout)
self.send_response(resp.status_code) self.send_response(resp.status_code)
for key, value in resp.headers.items(): for key, value in resp.headers.items():
k_lower = key.lower() k_lower = key.lower()
if k_lower in hop_headers: if k_lower in hop_headers:
@@ -356,7 +313,6 @@ class OllamaProxyHandler(http.server.SimpleHTTPRequestHandler):
except Exception: except Exception:
logging.debug(f"Skipping header {key} due to send_header error") logging.debug(f"Skipping header {key} due to send_header error")
self.end_headers() self.end_headers()
try: try:
for chunk in resp.iter_content(chunk_size=4096): for chunk in resp.iter_content(chunk_size=4096):
if not chunk: if not chunk:
@@ -384,50 +340,37 @@ class OllamaProxyHandler(http.server.SimpleHTTPRequestHandler):
self.send_error(502, f"Bad gateway: {e}") self.send_error(502, f"Bad gateway: {e}")
except Exception: except Exception:
pass pass
def log_message(self, format, *args): def log_message(self, format, *args):
logging.info(f"{self.address_string()} - {format % args}") logging.info(f"{self.address_string()} - {format % args}")
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True daemon_threads = True
allow_reuse_address = True allow_reuse_address = True
def main(): def main():
logging.info("Starting GPU Resource Manager Proxy on Proxmox Host") logging.info("Starting GPU Resource Manager Proxy on Proxmox Host")
manager = GPUResourceManager() manager = GPUResourceManager()
test_output = manager.run_command("pct list > /dev/null && echo 'pct available'") test_output = manager.run_command("pct list > /dev/null && echo 'pct available'")
if test_output: if test_output:
logging.info("pct command available") logging.info("pct command available")
else: else:
logging.error("pct command not available! Running on wrong system?") logging.error("pct command not available! Running on wrong system?")
gpu_processes = manager.get_gpu_processes() gpu_processes = manager.get_gpu_processes()
logging.info(f"Current GPU processes: {gpu_processes}") logging.info(f"Current GPU processes: {gpu_processes}")
idle_NvGPU_process_status = manager.is_container_running(IDLE_CONTAINER_ID) idle_NvGPU_process_status = manager.is_container_running(IDLE_CONTAINER_ID)
logging.info(f"idle NvGPU {IDLE_CONTAINER_ID} running: {idle_NvGPU_process_status}") logging.info(f"idle NvGPU {IDLE_CONTAINER_ID} running: {idle_NvGPU_process_status}")
gpu_manager = GPUResourceManager() gpu_manager = GPUResourceManager()
gpu_manager.start_monitoring() gpu_manager.start_monitoring()
handler = lambda *args, **kwargs: OllamaProxyHandler(*args, gpu_manager=gpu_manager, **kwargs) handler = lambda *args, **kwargs: OllamaProxyHandler(*args, gpu_manager=gpu_manager, **kwargs)
with ThreadedTCPServer(("", PROXY_PORT), handler) as httpd: with ThreadedTCPServer(("", PROXY_PORT), handler) as httpd:
logging.info(f"Proxy server running on port {PROXY_PORT}") logging.info(f"Proxy server running on port {PROXY_PORT}")
logging.info(f"Forwarding to Ollama at {OLLAMA_HOST}:{OLLAMA_PORT}") logging.info(f"Forwarding to Ollama at {OLLAMA_HOST}:{OLLAMA_PORT}")
logging.info(f"Managing idle NvGPU process: {IDLE_CONTAINER_ID}") logging.info(f"Managing idle NvGPU process: {IDLE_CONTAINER_ID}")
logging.info("Monitoring GPU usage and scheduled maintenance windows") logging.info("Monitoring GPU usage and scheduled maintenance windows")
logging.info(f"Mining process patterns: {IDLE_NvGPU_PROCESSES}") logging.info(f"Mining process patterns: {IDLE_NvGPU_PROCESSES}")
try: try:
httpd.serve_forever() httpd.serve_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("Shutting down proxy server") logging.info("Shutting down proxy server")
except Exception as e: except Exception as e:
logging.error(f"Server error: {e}") logging.error(f"Server error: {e}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()