Files
Kao/aggregator.py
Spencer Grimes 71c7bb756a Rename project to Kao
- Renamed sentry.py to kao.py
- Updated all references from Sentry-Emote to Kao
- Kao (顔) means "face" in Japanese

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 21:50:12 -06:00

269 lines
7.9 KiB
Python

"""
Kao Aggregator
A lightweight event broker that manages priority-based system status.
"""
import json
import random
import threading
import time
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_from_directory
app = Flask(__name__, static_folder=".")
ROOT_DIR = Path(__file__).parent
# Configuration
STATUS_FILE = Path(__file__).parent / "status.json"
DEFAULT_NOTIFY_TTL = 10 # Default TTL for Priority 3 (Notify) events
CELEBRATION_DURATION = 5 # Seconds to show celebration after recovery
# Emote variations with paired animations
OPTIMAL_EMOTES = [
("( ^_^)", "breathing"), # calm, content
("( ᵔᴥᵔ)", "floating"), # dreamy
("(◕‿◕)", "bouncing"), # cheerful
("( ・ω・)", "swaying"), # curious
("( ˘▽˘)", "breathing"), # cozy
]
IDLE_EMOTES = [
("( -_^)", "blink"), # wink
("( ^_~)", "blink"), # wink
("( ᵕ.ᵕ)", "blink"), # blink
]
CELEBRATION_EMOTE = ("\\(^o^)/", "celebrating")
# Priority definitions
PRIORITY_CONFIG = {
1: {"name": "Critical", "emote": "( x_x)", "color": "#FF0000", "animation": "shaking"},
2: {"name": "Warning", "emote": "( o_o)", "color": "#FFFF00", "animation": "breathing"},
3: {"name": "Notify", "emote": "( 'o')", "color": "#0088FF", "animation": "popping"},
4: {"name": "Optimal", "emote": "( ^_^)", "color": "#00FF00", "animation": "breathing"},
}
# Thread-safe event storage
events_lock = threading.Lock()
active_events = {} # id -> {priority, message, timestamp, ttl}
# State tracking for personality
previous_priority = 4
celebrating_until = 0
last_emote_change = 0
current_optimal_emote = OPTIMAL_EMOTES[0][0]
current_optimal_animation = OPTIMAL_EMOTES[0][1]
# Sleep mode
is_sleeping = False
SLEEP_EMOTE = "( -_-)zzZ"
SLEEP_COLOR = "#333333"
SLEEP_ANIMATION = "sleeping"
def get_current_state():
"""Determine current state based on active events."""
global previous_priority, celebrating_until, last_emote_change, current_optimal_emote, current_optimal_animation
# Sleep mode overrides everything
if is_sleeping:
return {
"current_state": "sleeping",
"active_emote": SLEEP_EMOTE,
"color": SLEEP_COLOR,
"animation": SLEEP_ANIMATION,
"message": "",
"active_events": [],
"last_updated": datetime.now().isoformat(timespec="seconds"),
}
now = time.time()
with events_lock:
if not active_events:
priority = 4
events_list = []
else:
# Find highest priority (lowest number)
priority = min(e["priority"] for e in active_events.values())
events_list = [
{"id": eid, "priority": e["priority"], "message": e.get("message", "")}
for eid, e in active_events.items()
]
config = PRIORITY_CONFIG[priority]
emote = config["emote"]
animation = config["animation"]
color = config["color"]
# Check for recovery (was bad, now optimal)
if priority == 4 and previous_priority < 4:
celebrating_until = now + CELEBRATION_DURATION
previous_priority = priority
# Handle optimal state personality
if priority == 4:
if now < celebrating_until:
# Celebration mode
emote, animation = CELEBRATION_EMOTE
else:
# Rotate optimal emotes every 5 minutes, occasional idle expression
if now - last_emote_change > 300:
last_emote_change = now
# 15% chance of an idle expression (wink/blink)
if random.random() < 0.15:
current_optimal_emote, current_optimal_animation = random.choice(IDLE_EMOTES)
else:
current_optimal_emote, current_optimal_animation = random.choice(OPTIMAL_EMOTES)
emote = current_optimal_emote
animation = current_optimal_animation
return {
"current_state": config["name"].lower(),
"active_emote": emote,
"color": color,
"animation": animation,
"message": config["name"] if priority == 4 else f"{config['name']} state active",
"active_events": sorted(events_list, key=lambda x: x["priority"]),
"last_updated": datetime.now().isoformat(timespec="seconds"),
}
def write_status():
"""Write current state to status.json."""
state = get_current_state()
with open(STATUS_FILE, "w") as f:
json.dump(state, f, indent="\t")
return state
def cleanup_expired_events():
"""Background thread to remove expired TTL events."""
while True:
time.sleep(1)
now = time.time()
expired = []
with events_lock:
for eid, event in active_events.items():
if event.get("ttl") and now > event["ttl"]:
expired.append(eid)
for eid in expired:
del active_events[eid]
if expired:
write_status()
@app.route("/event", methods=["POST"])
def post_event():
"""
Accept a new event.
Expected JSON: {"id": "event_id", "priority": 1-4, "message": "optional", "ttl": optional_seconds}
"""
data = request.get_json(force=True)
if not data or "id" not in data or "priority" not in data:
return jsonify({"error": "Missing required fields: id, priority"}), 400
event_id = str(data["id"])
priority = int(data["priority"])
if priority not in PRIORITY_CONFIG:
return jsonify({"error": f"Invalid priority: {priority}. Must be 1-4."}), 400
event = {
"priority": priority,
"message": data.get("message", ""),
"timestamp": time.time(),
}
# Apply TTL if provided, or use default for Priority 3 (Notify)
if "ttl" in data:
event["ttl"] = time.time() + int(data["ttl"])
elif priority == 3:
event["ttl"] = time.time() + DEFAULT_NOTIFY_TTL
with events_lock:
active_events[event_id] = event
state = write_status()
return jsonify({"status": "ok", "current_state": state}), 200
@app.route("/clear", methods=["POST"])
def clear_event():
"""
Clear an event by ID.
Expected JSON: {"id": "event_id"}
"""
data = request.get_json(force=True)
if not data or "id" not in data:
return jsonify({"error": "Missing required field: id"}), 400
event_id = str(data["id"])
with events_lock:
if event_id in active_events:
del active_events[event_id]
state = write_status()
return jsonify({"status": "cleared", "current_state": state}), 200
else:
return jsonify({"error": "Event not found"}), 404
@app.route("/sleep", methods=["POST"])
def sleep_mode():
"""Enter sleep mode. For Home Assistant webhook."""
global is_sleeping
is_sleeping = True
state = write_status()
return jsonify({"status": "sleeping", "current_state": state}), 200
@app.route("/wake", methods=["POST"])
def wake_mode():
"""Exit sleep mode. For Home Assistant webhook."""
global is_sleeping
is_sleeping = False
state = write_status()
return jsonify({"status": "awake", "current_state": state}), 200
@app.route("/")
def index():
"""Serve the frontend."""
return send_from_directory(ROOT_DIR, "index.html")
@app.route("/status", methods=["GET"])
def get_status():
"""Return current status as JSON."""
return jsonify(get_current_state()), 200
@app.route("/events", methods=["GET"])
def list_events():
"""List all active events."""
with events_lock:
return jsonify({"events": dict(active_events)}), 200
def main():
# Write initial optimal state
write_status()
print(f"Status file: {STATUS_FILE}")
# Start TTL cleanup thread
cleanup_thread = threading.Thread(target=cleanup_expired_events, daemon=True)
cleanup_thread.start()
# Run Flask
app.run(host="0.0.0.0", port=5000, threaded=True)
if __name__ == "__main__":
main()