- Aggregator: Flask-based event broker with priority queue - Frontend: OLED-optimized UI with animations - Detectors: disk, cpu, memory, service, network - Unified entry point (sentry.py) with process management - Heartbeat TTL system for auto-clearing stale events Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
265 lines
8.1 KiB
Python
265 lines
8.1 KiB
Python
"""
|
|
Sentry-Emote
|
|
Single entry point for the entire system - aggregator + all detectors.
|
|
|
|
Usage:
|
|
python sentry.py [--config config.json]
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
# Configuration
|
|
DEFAULT_CONFIG = "config.json"
|
|
RESTART_DELAY = 5
|
|
AGGREGATOR_STARTUP_TIMEOUT = 10
|
|
|
|
|
|
class SentryEmote:
|
|
def __init__(self, config_path):
|
|
self.config_path = Path(config_path)
|
|
self.base_dir = self.config_path.parent
|
|
self.processes = {} # name -> {process, config}
|
|
self.running = True
|
|
self.config = None
|
|
|
|
def load_config(self):
|
|
"""Load configuration from JSON file."""
|
|
with open(self.config_path) as f:
|
|
self.config = json.load(f)
|
|
return self.config
|
|
|
|
def start_process(self, name, script, env=None):
|
|
"""Start a Python script as a subprocess."""
|
|
script_path = self.base_dir / script
|
|
|
|
if not script_path.exists():
|
|
print(f"[{name}] Script not found: {script_path}")
|
|
return None
|
|
|
|
# Build environment
|
|
proc_env = os.environ.copy()
|
|
if env:
|
|
proc_env.update(env)
|
|
|
|
try:
|
|
process = subprocess.Popen(
|
|
[sys.executable, "-u", str(script_path)],
|
|
env=proc_env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
bufsize=1,
|
|
universal_newlines=True,
|
|
)
|
|
print(f"[{name}] Started (PID {process.pid})")
|
|
return process
|
|
except Exception as e:
|
|
print(f"[{name}] Failed to start: {e}")
|
|
return None
|
|
|
|
def wait_for_aggregator(self, url, timeout=AGGREGATOR_STARTUP_TIMEOUT):
|
|
"""Wait for the aggregator to become available."""
|
|
print(f"[aggregator] Waiting for service at {url}...")
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
try:
|
|
response = requests.get(f"{url}/status", timeout=2)
|
|
if response.status_code == 200:
|
|
print(f"[aggregator] Service ready")
|
|
return True
|
|
except requests.RequestException:
|
|
pass
|
|
time.sleep(0.5)
|
|
print(f"[aggregator] Timeout waiting for service")
|
|
return False
|
|
|
|
def stream_output(self, name, process):
|
|
"""Read and print output from a process (non-blocking)."""
|
|
if process.stdout:
|
|
while True:
|
|
line = process.stdout.readline()
|
|
if not line:
|
|
break
|
|
print(f"[{name}] {line.rstrip()}")
|
|
|
|
def start_aggregator(self):
|
|
"""Start the aggregator service."""
|
|
agg_config = self.config.get("aggregator", {})
|
|
script = agg_config.get("script", "aggregator.py")
|
|
env = agg_config.get("env", {})
|
|
|
|
process = self.start_process("aggregator", script, env)
|
|
if process:
|
|
self.processes["aggregator"] = {
|
|
"process": process,
|
|
"config": {"name": "aggregator", "script": script, "env": env},
|
|
}
|
|
# Wait for aggregator to be ready
|
|
url = self.config.get("aggregator_url", "http://localhost:5000")
|
|
return self.wait_for_aggregator(url)
|
|
return False
|
|
|
|
def start_detectors(self):
|
|
"""Start all enabled detectors."""
|
|
url = self.config.get("aggregator_url", "http://localhost:5000")
|
|
|
|
for detector in self.config.get("detectors", []):
|
|
if not detector.get("enabled", True):
|
|
continue
|
|
|
|
name = detector["name"]
|
|
env = {"AGGREGATOR_URL": url}
|
|
env.update(detector.get("env", {}))
|
|
|
|
process = self.start_process(name, detector["script"], env)
|
|
if process:
|
|
self.processes[name] = {
|
|
"process": process,
|
|
"config": detector,
|
|
}
|
|
|
|
def check_processes(self):
|
|
"""Check for crashed processes and restart them."""
|
|
for name, info in list(self.processes.items()):
|
|
process = info["process"]
|
|
|
|
# Stream any available output
|
|
self.stream_output(name, process)
|
|
|
|
# Check if process has exited
|
|
if process.poll() is not None:
|
|
print(f"[{name}] Exited with code {process.returncode}, restarting in {RESTART_DELAY}s...")
|
|
time.sleep(RESTART_DELAY)
|
|
|
|
if self.running:
|
|
config = info["config"]
|
|
env = {"AGGREGATOR_URL": self.config.get("aggregator_url", "http://localhost:5000")}
|
|
env.update(config.get("env", {}))
|
|
|
|
new_process = self.start_process(name, config["script"], env)
|
|
if new_process:
|
|
self.processes[name]["process"] = new_process
|
|
|
|
def stop_all(self):
|
|
"""Stop all processes (detectors first, then aggregator)."""
|
|
self.running = False
|
|
print("\nShutting down Sentry-Emote...")
|
|
|
|
# Stop detectors first
|
|
for name, info in list(self.processes.items()):
|
|
if name == "aggregator":
|
|
continue
|
|
process = info["process"]
|
|
if process.poll() is None:
|
|
print(f"[{name}] Stopping...")
|
|
process.terminate()
|
|
try:
|
|
process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
|
|
# Stop aggregator last
|
|
if "aggregator" in self.processes:
|
|
process = self.processes["aggregator"]["process"]
|
|
if process.poll() is None:
|
|
print(f"[aggregator] Stopping...")
|
|
process.terminate()
|
|
try:
|
|
process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
|
|
print("Sentry-Emote stopped.")
|
|
|
|
def run(self):
|
|
"""Main run loop."""
|
|
self.load_config()
|
|
|
|
print("=" * 50)
|
|
print(" Sentry-Emote")
|
|
print("=" * 50)
|
|
print(f"Config: {self.config_path}")
|
|
print(f"Aggregator URL: {self.config.get('aggregator_url')}")
|
|
print()
|
|
|
|
enabled = [d["name"] for d in self.config.get("detectors", []) if d.get("enabled", True)]
|
|
disabled = [d["name"] for d in self.config.get("detectors", []) if not d.get("enabled", True)]
|
|
|
|
print(f"Detectors enabled: {', '.join(enabled) or 'none'}")
|
|
if disabled:
|
|
print(f"Detectors disabled: {', '.join(disabled)}")
|
|
print()
|
|
|
|
# Start aggregator first
|
|
if not self.start_aggregator():
|
|
print("Failed to start aggregator, exiting.")
|
|
self.stop_all()
|
|
return
|
|
|
|
# Give it a moment to initialize
|
|
time.sleep(1)
|
|
|
|
# Start detectors
|
|
self.start_detectors()
|
|
|
|
print()
|
|
print("=" * 50)
|
|
print(f" UI available at: {self.config.get('aggregator_url')}")
|
|
print("=" * 50)
|
|
print()
|
|
|
|
try:
|
|
while self.running:
|
|
self.check_processes()
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
self.stop_all()
|
|
|
|
|
|
def main():
|
|
# Parse arguments
|
|
config_path = DEFAULT_CONFIG
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] in ("-h", "--help"):
|
|
print(__doc__)
|
|
sys.exit(0)
|
|
elif sys.argv[1] == "--config" and len(sys.argv) > 2:
|
|
config_path = sys.argv[2]
|
|
else:
|
|
config_path = sys.argv[1]
|
|
|
|
# Resolve config path
|
|
config_path = Path(config_path)
|
|
if not config_path.is_absolute():
|
|
config_path = Path(__file__).parent / config_path
|
|
|
|
if not config_path.exists():
|
|
print(f"Config file not found: {config_path}")
|
|
sys.exit(1)
|
|
|
|
# Setup signal handlers
|
|
sentry = SentryEmote(config_path)
|
|
|
|
def signal_handler(sig, frame):
|
|
sentry.stop_all()
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
# Run
|
|
sentry.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|