Files
Kao/detectors/disk_space.py
Spencer Grimes dd8bf6005b Bump to v1.5.0: deduplicate detectors, fix aggregator bugs, fix blocking I/O
- Extract shared send_event/clear_event into detectors/base.py, removing
  ~150 lines of duplication across all 6 detectors
- Fix default aggregator URL from port 5000 to 5100 in all detectors
- Standardize cpu.py and memory.py to use active_alerts set pattern
- Fix immediate emote rotation on startup (last_emote_change = time.time())
- Extract magic numbers to named constants in aggregator
- Protect write_status() with try/except OSError
- Fix notify event ID collision with monotonic counter
- Replace blocking stream_output() with background daemon threads in kao.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 12:17:17 -06:00

136 lines
4.8 KiB
Python

"""
Disk Space Detector
Monitors all drives and reports to the aggregator when thresholds are exceeded.
Environment variables:
AGGREGATOR_URL - URL of the aggregator (default: http://localhost:5100)
CHECK_INTERVAL - Seconds between checks (default: 300)
THRESHOLD_CRITICAL - Percent usage for critical alert (default: 95)
THRESHOLD_WARNING - Percent usage for warning alert (default: 85)
"""
import os
import time
import shutil
from detectors.base import DEFAULT_AGGREGATOR_URL, send_event, clear_event
# Configuration from environment
AGGREGATOR_URL = os.environ.get("AGGREGATOR_URL", DEFAULT_AGGREGATOR_URL)
CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", 300))
THRESHOLD_CRITICAL = int(os.environ.get("THRESHOLD_CRITICAL", 95))
THRESHOLD_WARNING = int(os.environ.get("THRESHOLD_WARNING", 85))
def get_all_drives():
"""Get list of mounted drives/partitions."""
import platform
drives = []
if platform.system() == "Windows":
import string
for letter in string.ascii_uppercase:
drive = f"{letter}:\\"
try:
shutil.disk_usage(drive)
drives.append(drive)
except (FileNotFoundError, PermissionError, OSError):
pass
else:
# Linux/macOS - parse /proc/mounts or /etc/mtab for real filesystems
seen_devices = set()
try:
with open("/proc/mounts", "r") as f:
for line in f:
parts = line.split()
if len(parts) < 2:
continue
device, mount = parts[0], parts[1]
# Skip virtual filesystems
if not device.startswith("/dev/"):
continue
# Skip snap mounts and other system paths
if mount.startswith(("/snap/", "/boot/efi")):
continue
# Skip duplicate devices (e.g., bind mounts)
if device in seen_devices:
continue
seen_devices.add(device)
try:
shutil.disk_usage(mount)
drives.append(mount)
except (FileNotFoundError, PermissionError, OSError):
pass
except FileNotFoundError:
# Fallback for macOS or systems without /proc/mounts
for mount in ["/", "/home", "/var"]:
if os.path.exists(mount):
try:
shutil.disk_usage(mount)
drives.append(mount)
except (FileNotFoundError, PermissionError, OSError):
pass
return drives
def check_disk(drive):
"""Check disk usage for a drive. Returns (percent_used, total_gb, used_gb)."""
try:
usage = shutil.disk_usage(drive)
total_gb = usage.total / (1024 ** 3)
used_gb = usage.used / (1024 ** 3)
percent = (usage.used / usage.total) * 100
return percent, total_gb, used_gb
except Exception:
return None, None, None
def main():
print(f"Disk Space Detector started")
print(f" Aggregator: {AGGREGATOR_URL}")
print(f" Interval: {CHECK_INTERVAL}s")
print(f" Thresholds: Warning={THRESHOLD_WARNING}%, Critical={THRESHOLD_CRITICAL}%")
print()
# Track active alerts to know when to clear
active_alerts = set()
while True:
drives = get_all_drives()
print(f"[CHECK] Scanning {len(drives)} drive(s)...")
current_alerts = set()
for drive in drives:
percent, total_gb, used_gb = check_disk(drive)
if percent is None:
continue
# Create a clean event ID from drive path
event_id = f"disk_{drive.replace(':', '').replace('/', '_').replace('\\', '').strip('_') or 'root'}"
if percent >= THRESHOLD_CRITICAL:
message = f"{drive} at {percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)"
send_event(AGGREGATOR_URL, event_id, 1, message, CHECK_INTERVAL)
current_alerts.add(event_id)
elif percent >= THRESHOLD_WARNING:
message = f"{drive} at {percent:.0f}% ({used_gb:.1f}/{total_gb:.1f} GB)"
send_event(AGGREGATOR_URL, event_id, 2, message, CHECK_INTERVAL)
current_alerts.add(event_id)
else:
print(f"[OK] {drive}: {percent:.0f}%")
# Clear alerts that are no longer active
for event_id in active_alerts - current_alerts:
clear_event(AGGREGATOR_URL, event_id)
active_alerts = current_alerts
print(f"[SLEEP] Next check in {CHECK_INTERVAL}s\n")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()