""" Disk Space Detector Monitors all drives and reports to the aggregator when thresholds are exceeded. Environment variables: AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100) CHECK_INTERVAL - Seconds between checks (default: 300) THRESHOLD_CRITICAL - Percent usage for critical alert (default: 95) THRESHOLD_WARNING - Percent usage for warning alert (default: 85) """ import os import time import shutil from detectors.base import DEFAULT_AGGREGATOR_URL, send_event, clear_event # Configuration from environment AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL) CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 300)) THRESHOLD_CRITICAL = int(os.environ.get("THRESHOLD_CRITICAL", 95)) THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85)) def get_all_drives(): """Get list of mounted drives/partitions.""" import platform drives = [] if platform.system() == "Windows": import string for letter in string.ascii_uppercase: drive = f"{letter}:\\" try: shutil.disk_usage(drive) drives.append(drive) except (FileNotFoundError, PermissionError, OSError): pass else: # Linux/macOS - parse /proc/mounts or /etc/mtab for real filesystems seen_devices = set() try: with open("/proc/mounts", "r") as f: for line in f: parts = line.split() if len(parts) < 2: continue device, mount = parts[0], parts[1] # Skip virtual filesystems if not device.startswith("/dev/"): continue # Skip snap mounts and other system paths if mount.startswith(("/snap/", "/boot/efi")): continue # Skip duplicate devices (e.g., bind mounts) if device in seen_devices: continue seen_devices.add(device) try: shutil.disk_usage(mount) drives.append(mount) except (FileNotFoundError, PermissionError, OSError): pass except FileNotFoundError: # Fallback for macOS or systems without /proc/mounts for mount in ["/", "/home", "/var"]: if os.path.exists(mount): try: shutil.disk_usage(mount) drives.append(mount) except (FileNotFoundError, PermissionError, OSError): pass return drives def check_disk(drive): """Check disk usage for a drive. Returns (percent_used, total_gb, used_gb).""" try: usage = shutil.disk_usage(drive) total_gb = usage.total / (1024 ** 3) used_gb = usage.used / (1024 ** 3) percent = (usage.used / usage.total) * 100 return percent, total_gb, used_gb except Exception: return None, None, None def main(): print(f"Disk Space Detector started") print(f" Aggregator: {AGGREGATOR_URL}") print(f" Interval: {CHECK_INTERVAL}s") print(f" Thresholds: Warning={THRESHOLD_WARNING}%, Critical={THRESHOLD_CRITICAL}%") print() # Track active alerts to know when to clear active_alerts = set() while True: drives = get_all_drives() print(f"[CHECK] Scanning {len(drives)} drive(s)...") current_alerts = set() for drive in drives: percent, total_gb, used_gb = check_disk(drive) if percent is None: continue # Create a clean event ID from drive path event_id = f"disk_{drive.replace(':', '').replace('/', '_').replace('\\', '').strip('_') or 'root'}" if percent >= THRESHOLD_CRITICAL: message = f"{drive} at {percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)" send_event(AGGREGATOR_URL, event_id, 1, message, CHECK_INTERVAL) current_alerts.add(event_id) elif percent >= THRESHOLD_WARNING: message = f"{drive} at {percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)" send_event(AGGREGATOR_URL, event_id, 2, message, CHECK_INTERVAL) current_alerts.add(event_id) else: print(f"[OK] {drive}: {percent:.0f}%") # Clear alerts that are no longer active for event_id in active_alerts - current_alerts: clear_event(AGGREGATOR_URL, event_id) active_alerts = current_alerts print(f"[SLEEP] Next check in {CHECK_INTERVAL}s\n") time.sleep(CHECK_INTERVAL) if __name__ == "__main__": main()