""" Disk Space Detector Monitors all drives and reports to the aggregator when thresholds are exceeded. Environment variables: AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5000) CHECK_INTERVAL - Seconds between checks (default: 300) THRESHOLD_CRITICAL - Percent usage for critical alert (default: 95) THRESHOLD_WARNING - Percent usage for warning alert (default: 85) """ import os import time import shutil import requests # Configuration from environment AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", "http://localhost:5000") CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 300)) THRESHOLD_CRITICAL = int(os.environ.get("THRESHOLD_CRITICAL", 95)) THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) def get_all_drives(): """Get list of mounted drives/partitions.""" import platform drives = [] if platform.system() == "Windows": import string for letter in string.ascii_uppercase: drive = f"{letter}:\\" try: shutil.disk_usage(drive) drives.append(drive) except (FileNotFoundError, PermissionError, OSError): pass else: # Linux/macOS - parse /proc/mounts or /etc/mtab for real filesystems seen_devices = set() try: with open("/proc/mounts", "r") as f: for line in f: parts = line.split() if len(parts) < 2: continue device, mount = parts[0], parts[1] # Skip virtual filesystems if not device.startswith("/dev/"): continue # Skip duplicate devices (e.g., bind mounts) if device in seen_devices: continue seen_devices.add(device) try: shutil.disk_usage(mount) drives.append(mount) except (FileNotFoundError, PermissionError, OSError): pass except FileNotFoundError: # Fallback for macOS or systems without /proc/mounts for mount in ["/", "/home", "/var"]: if os.path.exists(mount): try: shutil.disk_usage(mount) drives.append(mount) except (FileNotFoundError, PermissionError, OSError): pass return drives def check_disk(drive): """Check disk usage for a drive. Returns (percent_used, total_gb, used_gb).""" try: usage = shutil.disk_usage(drive) total_gb = usage.total / (1024 ** 3) used_gb = usage.used / (1024 ** 3) percent = (usage.used / usage.total) * 100 return percent, total_gb, used_gb except Exception: return None, None, None def send_event(event_id, priority, message): """Send an event to the aggregator with heartbeat TTL.""" ttl = CHECK_INTERVAL * 2 # Event expires if not refreshed try: response = requests.post( f"{AGGREGATOR_URL}/event", json={"id": event_id, "priority": priority, "message": message, "ttl": ttl}, timeout=5 ) print(f"[EVENT] {event_id}: {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}") except requests.RequestException as e: print(f"[ERROR] Failed to send event: {e}") def clear_event(event_id): """Clear an event from the aggregator.""" try: response = requests.post( f"{AGGREGATOR_URL}/clear", json={"id": event_id}, timeout=5 ) if response.status_code == 200: print(f"[CLEAR] {event_id}") except requests.RequestException as e: print(f"[ERROR] Failed to clear event: {e}") def main(): print(f"Disk Space Detector started") print(f" Aggregator: {AGGREGATOR_URL}") print(f" Interval: {CHECK_INTERVAL}s") print(f" Thresholds: Warning={THRESHOLD_WARNING}%, Critical={THRESHOLD_CRITICAL}%") print() # Track active alerts to know when to clear active_alerts = set() while True: drives = get_all_drives() print(f"[CHECK] Scanning {len(drives)} drive(s)...") current_alerts = set() for drive in drives: percent, total_gb, used_gb = check_disk(drive) if percent is None: continue # Create a clean event ID from drive path event_id = f"disk_{drive.replace(':', '').replace('/', '_').replace('\\', '').strip('_') or 'root'}" if percent >= THRESHOLD_CRITICAL: message = f"{drive} at {percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)" send_event(event_id, 1, message) current_alerts.add(event_id) elif percent >= THRESHOLD_WARNING: message = f"{drive} at {percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)" send_event(event_id, 2, message) current_alerts.add(event_id) else: print(f"[OK] {drive}: {percent:.0f}%") # Clear alerts that are no longer active for event_id in active_alerts - current_alerts: clear_event(event_id) active_alerts = current_alerts print(f"[SLEEP] Next check in {CHECK_INTERVAL}s\n") time.sleep(CHECK_INTERVAL) if __name__ == "__main__": main()