From dd8bf6005b88212a0c38c3d0ed337c5581e75589 Mon Sep 17 00:00:00 2001 From: Spencer Grimes Date: Fri, 6 Feb 2026 12:17:17 -0600 Subject: [PATCH] Bump to v1.5.0: deduplicate detectors, fix aggregator bugs, fix blocking I/O - Extract shared send_event/clear_event into detectors/base.py, removing ~150 lines of duplication across all 6 detectors - Fix default aggregator URL from port 5000 to 5100 in all detectors - Standardize cpu.py and memory.py to use active_alerts set pattern - Fix immediate emote rotation on startup (last_emote_change = time.time()) - Extract magic numbers to named constants in aggregator - Protect write_status() with try/except OSError - Fix notify event ID collision with monotonic counter - Replace blocking stream_output() with background daemon threads in kao.py Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 1 + aggregator.py | 27 ++++++++++++++------ detectors/base.py | 35 ++++++++++++++++++++++++++ detectors/cpu.py | 55 ++++++++++++----------------------------- detectors/disk_space.py | 41 ++++++------------------------ detectors/docker.py | 45 +++++++-------------------------- detectors/memory.py | 55 ++++++++++++----------------------------- detectors/network.py | 39 +++++------------------------ detectors/service.py | 39 +++++------------------------ index.html | 2 +- kao.py | 21 +++++++--------- openapi.yaml | 2 +- 12 files changed, 126 insertions(+), 236 deletions(-) create mode 100644 detectors/base.py diff --git a/CLAUDE.md b/CLAUDE.md index f4430b6..e15dcb6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -180,6 +180,7 @@ Use in automations: ├── config.json # Runtime configuration ├── openapi.yaml # API documentation (OpenAPI 3.0) ├── detectors/ +│ ├── base.py │ ├── disk_space.py │ ├── cpu.py │ ├── memory.py diff --git a/aggregator.py b/aggregator.py index 6263c9a..2c992fd 100644 --- a/aggregator.py +++ b/aggregator.py @@ -19,6 +19,9 @@ ROOT_DIR = Path(__file__).parent STATUS_FILE = Path(__file__).parent / "status.json" DEFAULT_NOTIFY_TTL = 10 # Default TTL for Priority 3 (Notify) events CELEBRATION_DURATION = 5 # Seconds to show celebration after recovery +EMOTE_ROTATION_INTERVAL = 300 # Seconds between emote rotations +IDLE_EXPRESSION_CHANCE = 0.15 # Chance of a brief blink/wink on rotation +DEFAULT_NOTIFY_DURATION = 5 # Default duration for /notify events # Emote variations with paired animations OPTIMAL_EMOTES = [ @@ -53,10 +56,13 @@ celebrating_until = 0 blinking_until = 0 blink_emote = None blink_animation = None -last_emote_change = 0 +last_emote_change = time.time() current_optimal_emote = OPTIMAL_EMOTES[0][0] current_optimal_animation = OPTIMAL_EMOTES[0][1] +# Notify counter for unique IDs +_notify_counter = 0 + # Sleep mode is_sleeping = False SLEEP_EMOTE = "( -_-)zzZ" @@ -134,11 +140,11 @@ def get_current_state(): animation = blink_animation else: # Rotate optimal emotes every 5 minutes - if now - last_emote_change > 300: + if now - last_emote_change > EMOTE_ROTATION_INTERVAL: last_emote_change = now current_optimal_emote, current_optimal_animation = random.choice(OPTIMAL_EMOTES) - # 15% chance of a brief blink/wink - if random.random() < 0.15: + # Brief blink/wink chance on rotation + if random.random() < IDLE_EXPRESSION_CHANCE: blink_emote, blink_animation = random.choice(IDLE_EMOTES) blinking_until = now + random.uniform(1, 2) emote = current_optimal_emote @@ -163,8 +169,11 @@ def get_current_state(): def write_status(): """Write current state to status.json.""" state = get_current_state() - with open(STATUS_FILE, "w") as f: - json.dump(state, f, indent="\t") + try: + with open(STATUS_FILE, "w") as f: + json.dump(state, f, indent="\t") + except OSError as e: + print(f"[ERROR] Failed to write status file: {e}") return state @@ -272,12 +281,14 @@ def notify(): "sound": "chime" # optional: chime, alert, warning, critical, success, none } """ + global _notify_counter data = request.get_json(force=True) if request.data else {} message = data.get("message", "") - duration = int(data.get("duration", 5)) + duration = int(data.get("duration", DEFAULT_NOTIFY_DURATION)) # Generate unique ID to avoid conflicts - event_id = f"ha_notify_{int(time.time() * 1000)}" + _notify_counter += 1 + event_id = f"notify_{int(time.time())}_{_notify_counter}" event = { "priority": 3, # Notify priority diff --git a/detectors/base.py b/detectors/base.py new file mode 100644 index 0000000..6a7fbfb --- /dev/null +++ b/detectors/base.py @@ -0,0 +1,35 @@ +""" +Shared utilities for Kao detectors. +""" + +import requests + +DEFAULT_AGGREGATOR_URL = "http://localhost:5100" + + +def send_event(url, event_id, priority, message, check_interval): + """Send an event to the aggregator with heartbeat TTL.""" + ttl = check_interval * 2 + try: + response = requests.post( + f"{url}/event", + json={"id": event_id, "priority": priority, "message": message, "ttl": ttl}, + timeout=5, + ) + print(f"[EVENT] {event_id}: {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") + except requests.RequestException as e: + print(f"[ERROR] Failed to send event: {e}") + + +def clear_event(url, event_id): + """Clear an event from the aggregator.""" + try: + response = requests.post( + f"{url}/clear", + json={"id": event_id}, + timeout=5, + ) + if response.status_code == 200: + print(f"[CLEAR] {event_id}") + except requests.RequestException as e: + print(f"[ERROR] Failed to clear event: {e}") diff --git a/detectors/cpu.py b/detectors/cpu.py index 7c94959..310eae1 100644 --- a/detectors/cpu.py +++ b/detectors/cpu.py @@ -3,7 +3,7 @@ CPU Usage Detector Monitors CPU usage and reports to the aggregator when thresholds are exceeded. Environment variables: - AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5000) + AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100) CHECK_INTERVAL - Seconds between checks (default: 30) THRESHOLD_CRITICAL - Percent usage for critical alert (default: 95) THRESHOLD_WARNING - Percent usage for warning alert (default: 85) @@ -12,10 +12,11 @@ Environment variables: import os import time import psutil -import requests + +from detectors.base import DEFAULT_AGGREGATOR_URL, send_event, clear_event # Configuration from environment -AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", "http://localhost:5000") +AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 30)) THRESHOLD_CRITICAL = int(os.environ.get("THRESHOLD_CRITICAL", 95)) THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) @@ -23,34 +24,6 @@ THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) EVENT_ID = "cpu_usage" -def send_event(priority, message): - """Send an event to the aggregator with heartbeat TTL.""" - ttl = CHECK_INTERVAL * 2 - try: - response = requests.post( - f"{AGGREGATOR_URL}/event", - json={"id": EVENT_ID, "priority": priority, "message": message, "ttl": ttl}, - timeout=5 - ) - print(f"[EVENT] {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") - except requests.RequestException as e: - print(f"[ERROR] Failed to send event: {e}") - - -def clear_event(): - """Clear the event from the aggregator.""" - try: - response = requests.post( - f"{AGGREGATOR_URL}/clear", - json={"id": EVENT_ID}, - timeout=5 - ) - if response.status_code == 200: - print(f"[CLEAR] {EVENT_ID}") - except requests.RequestException as e: - print(f"[ERROR] Failed to clear event: {e}") - - def main(): print(f"CPU Usage Detector started") print(f" Aggregator: {AGGREGATOR_URL}") @@ -58,23 +31,27 @@ def main(): print(f" Thresholds: Warning={THRESHOLD_WARNING}%, Critical={THRESHOLD_CRITICAL}%") print() - alert_active = False + active_alerts = set() while True: # Get CPU usage over a 1-second sample cpu_percent = psutil.cpu_percent(interval=1) + current_alerts = set() if cpu_percent >= THRESHOLD_CRITICAL: - send_event(1, f"CPU at {cpu_percent:.0f}%") - alert_active = True + send_event(AGGREGATOR_URL, EVENT_ID, 1, f"CPU at {cpu_percent:.0f}%", CHECK_INTERVAL) + current_alerts.add(EVENT_ID) elif cpu_percent >= THRESHOLD_WARNING: - send_event(2, f"CPU at {cpu_percent:.0f}%") - alert_active = True + send_event(AGGREGATOR_URL, EVENT_ID, 2, f"CPU at {cpu_percent:.0f}%", CHECK_INTERVAL) + current_alerts.add(EVENT_ID) else: print(f"[OK] CPU: {cpu_percent:.0f}%") - if alert_active: - clear_event() - alert_active = False + + # Clear alerts that are no longer active + for eid in active_alerts - current_alerts: + clear_event(AGGREGATOR_URL, eid) + + active_alerts = current_alerts time.sleep(CHECK_INTERVAL - 1) # Account for 1s sample time diff --git a/detectors/disk_space.py b/detectors/disk_space.py index 70cc381..d920a93 100644 --- a/detectors/disk_space.py +++ b/detectors/disk_space.py @@ -3,7 +3,7 @@ Disk Space Detector Monitors all drives and reports to the aggregator when thresholds are exceeded. Environment variables: - AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5000) + AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100) CHECK_INTERVAL - Seconds between checks (default: 300) THRESHOLD_CRITICAL - Percent usage for critical alert (default: 95) THRESHOLD_WARNING - Percent usage for warning alert (default: 85) @@ -12,10 +12,11 @@ Environment variables: import os import time import shutil -import requests + +from detectors.base import DEFAULT_AGGREGATOR_URL, send_event, clear_event # Configuration from environment -AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", "http://localhost:5000") +AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 300)) THRESHOLD_CRITICAL = int(os.environ.get("THRESHOLD_CRITICAL", 95)) THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) @@ -85,34 +86,6 @@ def check_disk(drive): return None, None, None -def send_event(event_id, priority, message): - """Send an event to the aggregator with heartbeat TTL.""" - ttl = CHECK_INTERVAL * 2 # Event expires if not refreshed - try: - response = requests.post( - f"{AGGREGATOR_URL}/event", - json={"id": event_id, "priority": priority, "message": message, "ttl": ttl}, - timeout=5 - ) - print(f"[EVENT] {event_id}: {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") - except requests.RequestException as e: - print(f"[ERROR] Failed to send event: {e}") - - -def clear_event(event_id): - """Clear an event from the aggregator.""" - try: - response = requests.post( - f"{AGGREGATOR_URL}/clear", - json={"id": event_id}, - timeout=5 - ) - if response.status_code == 200: - print(f"[CLEAR] {event_id}") - except requests.RequestException as e: - print(f"[ERROR] Failed to clear event: {e}") - - def main(): print(f"Disk Space Detector started") print(f" Aggregator: {AGGREGATOR_URL}") @@ -139,18 +112,18 @@ def main(): if percent >= THRESHOLD_CRITICAL: message = f"{drive} at {percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)" - send_event(event_id, 1, message) + send_event(AGGREGATOR_URL, event_id, 1, message, CHECK_INTERVAL) current_alerts.add(event_id) elif percent >= THRESHOLD_WARNING: message = f"{drive} at {percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)" - send_event(event_id, 2, message) + send_event(AGGREGATOR_URL, event_id, 2, message, CHECK_INTERVAL) current_alerts.add(event_id) else: print(f"[OK] {drive}: {percent:.0f}%") # Clear alerts that are no longer active for event_id in active_alerts - current_alerts: - clear_event(event_id) + clear_event(AGGREGATOR_URL, event_id) active_alerts = current_alerts diff --git a/detectors/docker.py b/detectors/docker.py index 645856f..b90315d 100644 --- a/detectors/docker.py +++ b/detectors/docker.py @@ -3,7 +3,7 @@ Docker Container Health Detector Monitors for containers stuck in restart loops or unhealthy states. Environment variables: - AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5000) + AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100) CHECK_INTERVAL - Seconds between checks (default: 60) RESTART_THRESHOLD - Number of restarts to consider a loop (default: 3) CONTAINERS - Comma-separated container names to monitor (optional, monitors all if empty) @@ -13,10 +13,11 @@ import json import os import subprocess import time -import requests + +from detectors.base import DEFAULT_AGGREGATOR_URL, send_event, clear_event # Configuration from environment -AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", "http://localhost:5000") +AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 60)) RESTART_THRESHOLD = int(os.environ.get("RESTART_THRESHOLD", 3)) CONTAINERS = os.environ.get("CONTAINERS", "") @@ -70,34 +71,6 @@ def get_restart_count(container_name): return 0 -def send_event(event_id, priority, message): - """Send an event to the aggregator with heartbeat TTL.""" - ttl = CHECK_INTERVAL * 2 - try: - response = requests.post( - f"{AGGREGATOR_URL}/event", - json={"id": event_id, "priority": priority, "message": message, "ttl": ttl}, - timeout=5 - ) - print(f"[EVENT] {event_id}: {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") - except requests.RequestException as e: - print(f"[ERROR] Failed to send event: {e}") - - -def clear_event(event_id): - """Clear an event from the aggregator.""" - try: - response = requests.post( - f"{AGGREGATOR_URL}/clear", - json={"id": event_id}, - timeout=5 - ) - if response.status_code == 200: - print(f"[CLEAR] {event_id}") - except requests.RequestException as e: - print(f"[ERROR] Failed to clear event: {e}") - - def main(): # Parse container filter filter_containers = None @@ -148,10 +121,10 @@ def main(): last_restart_counts[name] = restart_count if state == "restarting" or new_restarts >= RESTART_THRESHOLD: - send_event(event_id, 1, f"Container '{name}' restart loop ({restart_count}x)") + send_event(AGGREGATOR_URL, event_id, 1, f"Container '{name}' restart loop ({restart_count}x)", CHECK_INTERVAL) current_alerts.add(event_id) elif new_restarts > 0: - send_event(event_id, 2, f"Container '{name}' restarting ({restart_count}x)") + send_event(AGGREGATOR_URL, event_id, 2, f"Container '{name}' restarting ({restart_count}x)", CHECK_INTERVAL) current_alerts.add(event_id) else: print(f"[OK] Container '{name}' is {state}") @@ -160,14 +133,14 @@ def main(): elif state in ("exited", "dead"): # Only alert if it exited abnormally (non-zero exit code in status) if "Exited (0)" not in status: - send_event(event_id, 2, f"Container '{name}' {state}") + send_event(AGGREGATOR_URL, event_id, 2, f"Container '{name}' {state}", CHECK_INTERVAL) current_alerts.add(event_id) else: print(f"[OK] Container '{name}' exited cleanly") # Check for unhealthy containers elif "unhealthy" in status.lower(): - send_event(event_id, 2, f"Container '{name}' unhealthy") + send_event(AGGREGATOR_URL, event_id, 2, f"Container '{name}' unhealthy", CHECK_INTERVAL) current_alerts.add(event_id) else: @@ -175,7 +148,7 @@ def main(): # Clear alerts for containers that are now healthy for event_id in active_alerts - current_alerts: - clear_event(event_id) + clear_event(AGGREGATOR_URL, event_id) active_alerts = current_alerts diff --git a/detectors/memory.py b/detectors/memory.py index 8f3b556..e79a89f 100644 --- a/detectors/memory.py +++ b/detectors/memory.py @@ -3,7 +3,7 @@ Memory Usage Detector Monitors RAM usage and reports to the aggregator when thresholds are exceeded. Environment variables: - AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5000) + AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100) CHECK_INTERVAL - Seconds between checks (default: 30) THRESHOLD_CRITICAL - Percent usage for critical alert (default: 95) THRESHOLD_WARNING - Percent usage for warning alert (default: 85) @@ -12,10 +12,11 @@ Environment variables: import os import time import psutil -import requests + +from detectors.base import DEFAULT_AGGREGATOR_URL, send_event, clear_event # Configuration from environment -AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", "http://localhost:5000") +AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 30)) THRESHOLD_CRITICAL = int(os.environ.get("THRESHOLD_CRITICAL", 95)) THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) @@ -23,34 +24,6 @@ THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) EVENT_ID = "memory_usage" -def send_event(priority, message): - """Send an event to the aggregator with heartbeat TTL.""" - ttl = CHECK_INTERVAL * 2 - try: - response = requests.post( - f"{AGGREGATOR_URL}/event", - json={"id": EVENT_ID, "priority": priority, "message": message, "ttl": ttl}, - timeout=5 - ) - print(f"[EVENT] {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") - except requests.RequestException as e: - print(f"[ERROR] Failed to send event: {e}") - - -def clear_event(): - """Clear the event from the aggregator.""" - try: - response = requests.post( - f"{AGGREGATOR_URL}/clear", - json={"id": EVENT_ID}, - timeout=5 - ) - if response.status_code == 200: - print(f"[CLEAR] {EVENT_ID}") - except requests.RequestException as e: - print(f"[ERROR] Failed to clear event: {e}") - - def main(): print(f"Memory Usage Detector started") print(f" Aggregator: {AGGREGATOR_URL}") @@ -58,25 +31,29 @@ def main(): print(f" Thresholds: Warning={THRESHOLD_WARNING}%, Critical={THRESHOLD_CRITICAL}%") print() - alert_active = False + active_alerts = set() while True: mem = psutil.virtual_memory() mem_percent = mem.percent used_gb = mem.used / (1024 ** 3) total_gb = mem.total / (1024 ** 3) + current_alerts = set() if mem_percent >= THRESHOLD_CRITICAL: - send_event(1, f"Memory at {mem_percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)") - alert_active = True + send_event(AGGREGATOR_URL, EVENT_ID, 1, f"Memory at {mem_percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)", CHECK_INTERVAL) + current_alerts.add(EVENT_ID) elif mem_percent >= THRESHOLD_WARNING: - send_event(2, f"Memory at {mem_percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)") - alert_active = True + send_event(AGGREGATOR_URL, EVENT_ID, 2, f"Memory at {mem_percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)", CHECK_INTERVAL) + current_alerts.add(EVENT_ID) else: print(f"[OK] Memory: {mem_percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)") - if alert_active: - clear_event() - alert_active = False + + # Clear alerts that are no longer active + for eid in active_alerts - current_alerts: + clear_event(AGGREGATOR_URL, eid) + + active_alerts = current_alerts time.sleep(CHECK_INTERVAL) diff --git a/detectors/network.py b/detectors/network.py index e5d3e93..9e6c41d 100644 --- a/detectors/network.py +++ b/detectors/network.py @@ -3,7 +3,7 @@ Network/Ping Detector Monitors if hosts are reachable via ping. Environment variables: - AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5000) + AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100) CHECK_INTERVAL - Seconds between checks (default: 60) HOSTS - Comma-separated list of hosts to ping (required) Example: "8.8.8.8,google.com,192.168.1.1" @@ -15,10 +15,11 @@ import sys import time import platform import subprocess -import requests + +from detectors.base import DEFAULT_AGGREGATOR_URL, send_event, clear_event # Configuration from environment -AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", "http://localhost:5000") +AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 60)) HOSTS = os.environ.get("HOSTS", "") TIMEOUT = int(os.environ.get("TIMEOUT", 5)) @@ -44,34 +45,6 @@ def ping(host): return False -def send_event(event_id, priority, message): - """Send an event to the aggregator with heartbeat TTL.""" - ttl = CHECK_INTERVAL * 2 - try: - response = requests.post( - f"{AGGREGATOR_URL}/event", - json={"id": event_id, "priority": priority, "message": message, "ttl": ttl}, - timeout=5 - ) - print(f"[EVENT] {event_id}: {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") - except requests.RequestException as e: - print(f"[ERROR] Failed to send event: {e}") - - -def clear_event(event_id): - """Clear the event from the aggregator.""" - try: - response = requests.post( - f"{AGGREGATOR_URL}/clear", - json={"id": event_id}, - timeout=5 - ) - if response.status_code == 200: - print(f"[CLEAR] {event_id}") - except requests.RequestException as e: - print(f"[ERROR] Failed to clear event: {e}") - - def main(): if not HOSTS: print("ERROR: HOSTS environment variable is required") @@ -99,12 +72,12 @@ def main(): if ping(host): print(f"[OK] Host '{host}' is reachable") else: - send_event(event_id, 1, f"Host '{host}' is unreachable") + send_event(AGGREGATOR_URL, event_id, 1, f"Host '{host}' is unreachable", CHECK_INTERVAL) current_alerts.add(event_id) # Clear alerts for hosts that are now reachable for event_id in active_alerts - current_alerts: - clear_event(event_id) + clear_event(AGGREGATOR_URL, event_id) active_alerts = current_alerts diff --git a/detectors/service.py b/detectors/service.py index 83c2edf..f7df46c 100644 --- a/detectors/service.py +++ b/detectors/service.py @@ -3,7 +3,7 @@ Service Health Detector Monitors if specific processes/services are running. Environment variables: - AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5000) + AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100) CHECK_INTERVAL - Seconds between checks (default: 30) SERVICES - Comma-separated list of process names to monitor (required) Example: "nginx,postgres,redis" @@ -13,10 +13,11 @@ import os import sys import time import psutil -import requests + +from detectors.base import DEFAULT_AGGREGATOR_URL, send_event, clear_event # Configuration from environment -AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", "http://localhost:5000") +AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 30)) SERVICES = os.environ.get("SERVICES", "") @@ -37,34 +38,6 @@ def get_running_processes(): return running -def send_event(event_id, priority, message): - """Send an event to the aggregator with heartbeat TTL.""" - ttl = CHECK_INTERVAL * 2 - try: - response = requests.post( - f"{AGGREGATOR_URL}/event", - json={"id": event_id, "priority": priority, "message": message, "ttl": ttl}, - timeout=5 - ) - print(f"[EVENT] {event_id}: {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") - except requests.RequestException as e: - print(f"[ERROR] Failed to send event: {e}") - - -def clear_event(event_id): - """Clear the event from the aggregator.""" - try: - response = requests.post( - f"{AGGREGATOR_URL}/clear", - json={"id": event_id}, - timeout=5 - ) - if response.status_code == 200: - print(f"[CLEAR] {event_id}") - except requests.RequestException as e: - print(f"[ERROR] Failed to clear event: {e}") - - def main(): if not SERVICES: print("ERROR: SERVICES environment variable is required") @@ -90,14 +63,14 @@ def main(): event_id = f"service_{service}" if service not in running: - send_event(event_id, 1, f"Service '{service}' is not running") + send_event(AGGREGATOR_URL, event_id, 1, f"Service '{service}' is not running", CHECK_INTERVAL) current_alerts.add(event_id) else: print(f"[OK] Service '{service}' is running") # Clear alerts for services that are now running for event_id in active_alerts - current_alerts: - clear_event(event_id) + clear_event(AGGREGATOR_URL, event_id) active_alerts = current_alerts diff --git a/index.html b/index.html index 57c336d..7084b0e 100644 --- a/index.html +++ b/index.html @@ -217,7 +217,7 @@ const emoteEl = document.getElementById("emote"); const messageEl = document.getElementById("message"); const POLL_INTERVAL = 2000; - const VERSION = "v1.4.0"; + const VERSION = "v1.5.0"; // Sound system let audioCtx = null; diff --git a/kao.py b/kao.py index 6d2b447..e99a8db 100644 --- a/kao.py +++ b/kao.py @@ -11,6 +11,7 @@ import os import signal import subprocess import sys +import threading import time from pathlib import Path @@ -59,11 +60,19 @@ class KaoManager: universal_newlines=True, ) print(f"[{name}] Started (PID {process.pid})") + # Read output in a background thread to avoid blocking the main loop + thread = threading.Thread(target=self._read_output, args=(name, process), daemon=True) + thread.start() return process except Exception as e: print(f"[{name}] Failed to start: {e}") return None + def _read_output(self, name, process): + """Read and print output from a process in a background thread.""" + for line in process.stdout: + print(f"[{name}] {line.rstrip()}") + def wait_for_aggregator(self, url, timeout=AGGREGATOR_STARTUP_TIMEOUT): """Wait for the aggregator to become available.""" print(f"[aggregator] Waiting for service at {url}...") @@ -80,15 +89,6 @@ class KaoManager: print(f"[aggregator] Timeout waiting for service") return False - def stream_output(self, name, process): - """Read and print output from a process (non-blocking).""" - if process.stdout: - while True: - line = process.stdout.readline() - if not line: - break - print(f"[{name}] {line.rstrip()}") - def get_aggregator_url(self): """Get aggregator URL from config port.""" port = self.config.get("port", 5100) @@ -135,9 +135,6 @@ class KaoManager: for name, info in list(self.processes.items()): process = info["process"] - # Stream any available output - self.stream_output(name, process) - # Check if process has exited if process.poll() is not None: print(f"[{name}] Exited with code {process.returncode}, restarting in {RESTART_DELAY}s...") diff --git a/openapi.yaml b/openapi.yaml index 31ffd22..2849b22 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -14,7 +14,7 @@ info: ## TTL/Heartbeat Pattern Events can have a TTL (time-to-live) that auto-expires them. Detectors typically send heartbeat events that expire if not refreshed, indicating loss of communication. - version: 1.4.0 + version: 1.5.0 license: name: MIT