""" Memory Usage Detector Monitors RAM usage and reports to the aggregator when thresholds are exceeded. Environment variables: AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5000) CHECK_INTERVAL - Seconds between checks (default: 30) THRESHOLD_CRITICAL - Percent usage for critical alert (default: 95) THRESHOLD_WARNING - Percent usage for warning alert (default: 85) """ import os import time import psutil import requests # Configuration from environment AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", "http://localhost:5000") CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 30)) THRESHOLD_CRITICAL = int(os.environ.get("THRESHOLD_CRITICAL", 95)) THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) EVENT_ID = "memory_usage" def send_event(priority, message): """Send an event to the aggregator with heartbeat TTL.""" ttl = CHECK_INTERVAL * 2 try: response = requests.post( f"{AGGREGATOR_URL}/event", json={"id": EVENT_ID, "priority": priority, "message": message, "ttl": ttl}, timeout=5 ) print(f"[EVENT] {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") except requests.RequestException as e: print(f"[ERROR] Failed to send event: {e}") def clear_event(): """Clear the event from the aggregator.""" try: response = requests.post( f"{AGGREGATOR_URL}/clear", json={"id": EVENT_ID}, timeout=5 ) if response.status_code == 200: print(f"[CLEAR] {EVENT_ID}") except requests.RequestException as e: print(f"[ERROR] Failed to clear event: {e}") def main(): print(f"Memory Usage Detector started") print(f" Aggregator: {AGGREGATOR_URL}") print(f" Interval: {CHECK_INTERVAL}s") print(f" Thresholds: Warning={THRESHOLD_WARNING}%, Critical={THRESHOLD_CRITICAL}%") print() alert_active = False while True: mem = psutil.virtual_memory() mem_percent = mem.percent used_gb = mem.used / (1024 ** 3) total_gb = mem.total / (1024 ** 3) if mem_percent >= THRESHOLD_CRITICAL: send_event(1, f"Memory at {mem_percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)") alert_active = True elif mem_percent >= THRESHOLD_WARNING: send_event(2, f"Memory at {mem_percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)") alert_active = True else: print(f"[OK] Memory: {mem_percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)") if alert_active: clear_event() alert_active = False time.sleep(CHECK_INTERVAL) if __name__ == "__main__": main()