Files
Kao/detectors/base.py
Spencer Grimes dd8bf6005b Bump to v1.5.0: deduplicate detectors, fix aggregator bugs, fix blocking I/O
- Extract shared send_event/clear_event into detectors/base.py, removing
  ~150 lines of duplication across all 6 detectors
- Fix default aggregator URL from port 5000 to 5100 in all detectors
- Standardize cpu.py and memory.py to use active_alerts set pattern
- Fix immediate emote rotation on startup (last_emote_change = time.time())
- Extract magic numbers to named constants in aggregator
- Protect write_status() with try/except OSError
- Fix notify event ID collision with monotonic counter
- Replace blocking stream_output() with background daemon threads in kao.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 12:17:17 -06:00

36 lines
1.0 KiB
Python

"""
Shared utilities for Kao detectors.
"""
import requests
DEFAULT_AGGREGATOR_URL = "http://localhost:5100"
def send_event(url, event_id, priority, message, check_interval):
"""Send an event to the aggregator with heartbeat TTL."""
ttl = check_interval * 2
try:
response = requests.post(
f"{url}/event",
json={"id": event_id, "priority": priority, "message": message, "ttl": ttl},
timeout=5,
)
print(f"[EVENT] {event_id}: {message} (priority {priority}, ttl {ttl}s) -> {response.status_code}")
except requests.RequestException as e:
print(f"[ERROR] Failed to send event: {e}")
def clear_event(url, event_id):
"""Clear an event from the aggregator."""
try:
response = requests.post(
f"{url}/clear",
json={"id": event_id},
timeout=5,
)
if response.status_code == 200:
print(f"[CLEAR] {event_id}")
except requests.RequestException as e:
print(f"[ERROR] Failed to clear event: {e}")