""" Kao Single entry point for the entire system - aggregator + all detectors. Usage: python kao.py [--config config.json] """ import json import os import signal import subprocess import sys import time from pathlib import Path import requests # Configuration DEFAULT_CONFIG = "config.json" RESTART_DELAY = 5 AGGREGATOR_STARTUP_TIMEOUT = 10 class KaoManager: def __init__(self, config_path): self.config_path = Path(config_path) self.base_dir = self.config_path.parent self.processes = {} # name -> {process, config} self.running = True self.config = None def load_config(self): """Load configuration from JSON file.""" with open(self.config_path) as f: self.config = json.load(f) return self.config def start_process(self, name, script, env=None): """Start a Python script as a subprocess.""" script_path = self.base_dir / script if not script_path.exists(): print(f"[{name}] Script not found: {script_path}") return None # Build environment proc_env = os.environ.copy() if env: proc_env.update(env) try: process = subprocess.Popen( [sys.executable, "-u", str(script_path)], env=proc_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True, ) print(f"[{name}] Started (PID {process.pid})") return process except Exception as e: print(f"[{name}] Failed to start: {e}") return None def wait_for_aggregator(self, url, timeout=AGGREGATOR_STARTUP_TIMEOUT): """Wait for the aggregator to become available.""" print(f"[aggregator] Waiting for service at {url}...") start = time.time() while time.time() - start < timeout: try: response = requests.get(f"{url}/status", timeout=2) if response.status_code == 200: print(f"[aggregator] Service ready") return True except requests.RequestException: pass time.sleep(0.5) print(f"[aggregator] Timeout waiting for service") return False def stream_output(self, name, process): """Read and print output from a process (non-blocking).""" if process.stdout: while True: line = process.stdout.readline() if not line: break print(f"[{name}] {line.rstrip()}") def start_aggregator(self): """Start the aggregator service.""" agg_config = self.config.get("aggregator", {}) script = agg_config.get("script", "aggregator.py") env = agg_config.get("env", {}) process = self.start_process("aggregator", script, env) if process: self.processes["aggregator"] = { "process": process, "config": {"name": "aggregator", "script": script, "env": env}, } # Wait for aggregator to be ready url = self.config.get("aggregator_url", "http://localhost:5000") return self.wait_for_aggregator(url) return False def start_detectors(self): """Start all enabled detectors.""" url = self.config.get("aggregator_url", "http://localhost:5000") for detector in self.config.get("detectors", []): if not detector.get("enabled", True): continue name = detector["name"] env = {"AGGREGATOR_URL": url} env.update(detector.get("env", {})) process = self.start_process(name, detector["script"], env) if process: self.processes[name] = { "process": process, "config": detector, } def check_processes(self): """Check for crashed processes and restart them.""" for name, info in list(self.processes.items()): process = info["process"] # Stream any available output self.stream_output(name, process) # Check if process has exited if process.poll() is not None: print(f"[{name}] Exited with code {process.returncode}, restarting in {RESTART_DELAY}s...") time.sleep(RESTART_DELAY) if self.running: config = info["config"] env = {"AGGREGATOR_URL": self.config.get("aggregator_url", "http://localhost:5000")} env.update(config.get("env", {})) new_process = self.start_process(name, config["script"], env) if new_process: self.processes[name]["process"] = new_process def stop_all(self): """Stop all processes (detectors first, then aggregator).""" self.running = False print("\nShutting down Kao...") # Stop detectors first for name, info in list(self.processes.items()): if name == "aggregator": continue process = info["process"] if process.poll() is None: print(f"[{name}] Stopping...") process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() # Stop aggregator last if "aggregator" in self.processes: process = self.processes["aggregator"]["process"] if process.poll() is None: print(f"[aggregator] Stopping...") process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() print("Kao stopped.") def run(self): """Main run loop.""" self.load_config() print("=" * 50) print(" Kao") print("=" * 50) print(f"Config: {self.config_path}") print(f"Aggregator URL: {self.config.get('aggregator_url')}") print() enabled = [d["name"] for d in self.config.get("detectors", []) if d.get("enabled", True)] disabled = [d["name"] for d in self.config.get("detectors", []) if not d.get("enabled", True)] print(f"Detectors enabled: {', '.join(enabled) or 'none'}") if disabled: print(f"Detectors disabled: {', '.join(disabled)}") print() # Start aggregator first if not self.start_aggregator(): print("Failed to start aggregator, exiting.") self.stop_all() return # Give it a moment to initialize time.sleep(1) # Start detectors self.start_detectors() print() print("=" * 50) print(f" UI available at: {self.config.get('aggregator_url')}") print("=" * 50) print() try: while self.running: self.check_processes() time.sleep(1) except KeyboardInterrupt: pass finally: self.stop_all() def main(): # Parse arguments config_path = DEFAULT_CONFIG if len(sys.argv) > 1: if sys.argv[1] in ("-h", "--help"): print(__doc__) sys.exit(0) elif sys.argv[1] == "--config" and len(sys.argv) > 2: config_path = sys.argv[2] else: config_path = sys.argv[1] # Resolve config path config_path = Path(config_path) if not config_path.is_absolute(): config_path = Path(__file__).parent / config_path if not config_path.exists(): print(f"Config file not found: {config_path}") sys.exit(1) # Setup signal handlers kao = KaoManager(config_path) def signal_handler(sig, frame): kao.stop_all() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Run kao.run() if __name__ == "__main__": main()