""" CPU Usage Detector Polls CPU usage every 30 seconds. Sends a heartbeat event while usage is above threshold; if usage drops below 85%, no event is sent and the previous event expires naturally via its 30-second TTL. Environment variables: AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100) CHECK_INTERVAL - Seconds between checks (default: 30) TTL - Event lifetime in seconds (default: 30) THRESHOLD_CRITICAL - Percent usage for critical alert (default: 95) THRESHOLD_WARNING - Percent usage for warning alert (default: 85) """ import os import time import psutil from detectors.base import DEFAULT_AGGREGATOR_URL, send_event AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 30)) TTL = int(os.environ.get("TTL", 30)) THRESHOLD_CRITICAL = int(os.environ.get("THRESHOLD_CRITICAL", 95)) THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) EVENT_ID = "cpu_usage" def main(): print("CPU Usage Detector started") print(f" Aggregator: {AGGREGATOR_URL}") print(f" Interval: {CHECK_INTERVAL}s, TTL: {TTL}s") print(f" Thresholds: Warning={THRESHOLD_WARNING}%, Critical={THRESHOLD_CRITICAL}%") print() while True: cpu = psutil.cpu_percent(interval=1) # 1-second blocking sample if cpu >= THRESHOLD_CRITICAL: send_event(AGGREGATOR_URL, EVENT_ID, 1, f"CPU {cpu:.0f}%", ttl=TTL) elif cpu >= THRESHOLD_WARNING: send_event(AGGREGATOR_URL, EVENT_ID, 2, f"CPU {cpu:.0f}%", ttl=TTL) else: print(f"[OK] CPU: {cpu:.0f}%") time.sleep(CHECK_INTERVAL - 1) # 29s sleep + 1s sample = 30s total if __name__ == "__main__": main()