""" Kao Aggregator A lightweight event broker that manages priority-based system status. """ import json import os import random import threading import time from datetime import datetime from pathlib import Path from flask import Flask, request, jsonify, send_from_directory app = Flask(__name__, static_folder=".") ROOT_DIR = Path(__file__).parent # Configuration STATUS_FILE = Path(__file__).parent / "status.json" DEFAULT_NOTIFY_TTL = 10 # Default TTL for Priority 3 (Notify) events CELEBRATION_DURATION = 5 # Seconds to show celebration after recovery EMOTE_ROTATION_INTERVAL = 300 # Seconds between emote rotations IDLE_EXPRESSION_CHANCE = 0.15 # Chance of a brief blink/wink on rotation DEFAULT_NOTIFY_DURATION = 5 # Default duration for /notify events # Emote variations with paired animations OPTIMAL_EMOTES = [ ("( ^_^)", "breathing"), # calm, content ("( ˙▿˙)", "floating"), # content ("(◕‿◕)", "bouncing"), # cheerful ("( ・ω・)", "swaying"), # curious ("( ˘▽˘)", "breathing"), # cozy ] IDLE_EMOTES = [ ("( -_^)", "blink"), # wink ("( ^_~)", "blink"), # wink ("( ᵕ.ᵕ)", "blink"), # blink ] CELEBRATION_EMOTE = ("\\(^o^)/", "celebrating") # Priority definitions PRIORITY_CONFIG = { 1: {"name": "Critical", "emote": "( x_x)", "color": "#FF0000", "animation": "shaking"}, 2: {"name": "Warning", "emote": "( o_o)", "color": "#FFFF00", "animation": "breathing"}, 3: {"name": "Notify", "emote": "( 'o')", "color": "#0088FF", "animation": "popping"}, 4: {"name": "Optimal", "emote": "( ^_^)", "color": "#00FF00", "animation": "breathing"}, } # Thread-safe event storage events_lock = threading.Lock() active_events = {} # id -> {priority, message, timestamp, ttl} # State tracking for personality previous_priority = 4 celebrating_until = 0 blinking_until = 0 blink_emote = None blink_animation = None last_emote_change = time.time() current_optimal_emote = OPTIMAL_EMOTES[0][0] current_optimal_animation = OPTIMAL_EMOTES[0][1] # Notify counter for unique IDs _notify_counter = 0 # Sleep mode is_sleeping = False SLEEP_EMOTE = "( -_-)zzZ" SLEEP_COLOR = "#333333" SLEEP_ANIMATION = "sleeping" def get_current_state(): """Determine current state based on active events.""" global previous_priority, celebrating_until, blinking_until, blink_emote, blink_animation global last_emote_change, current_optimal_emote, current_optimal_animation # Sleep mode overrides everything if is_sleeping: return { "current_state": "sleeping", "active_emote": SLEEP_EMOTE, "color": SLEEP_COLOR, "animation": SLEEP_ANIMATION, "message": "", "active_events": [], "last_updated": datetime.now().isoformat(timespec="seconds"), } now = time.time() top_event = None with events_lock: if not active_events: priority = 4 events_list = [] else: # Find highest priority (lowest number) and its event priority = min(e["priority"] for e in active_events.values()) for eid, e in active_events.items(): if e["priority"] == priority: top_event = e break events_list = [ {"id": eid, "priority": e["priority"], "message": e.get("message", "")} for eid, e in active_events.items() ] config = PRIORITY_CONFIG[priority] emote = config["emote"] animation = config["animation"] color = config["color"] sound = None # Check for custom display properties from top event if top_event: if "emote" in top_event: emote = top_event["emote"] if "color" in top_event: color = top_event["color"] if "animation" in top_event: animation = top_event["animation"] if "sound" in top_event: sound = top_event["sound"] # Check for recovery (was bad, now optimal) if priority == 4 and previous_priority < 3: celebrating_until = now + CELEBRATION_DURATION previous_priority = priority # Handle optimal state personality (only if no custom overrides) if priority == 4 and not top_event: if now < celebrating_until: # Celebration mode emote, animation = CELEBRATION_EMOTE elif now < blinking_until: # Brief blink/wink (1-2 seconds) emote = blink_emote animation = blink_animation else: # Rotate optimal emotes every 5 minutes if now - last_emote_change > EMOTE_ROTATION_INTERVAL: last_emote_change = now current_optimal_emote, current_optimal_animation = random.choice(OPTIMAL_EMOTES) # Brief blink/wink chance on rotation if random.random() < IDLE_EXPRESSION_CHANCE: blink_emote, blink_animation = random.choice(IDLE_EMOTES) blinking_until = now + random.uniform(1, 2) emote = current_optimal_emote animation = current_optimal_animation result = { "current_state": config["name"].lower(), "active_emote": emote, "color": color, "animation": animation, "message": config["name"] if priority == 4 else f"{config['name']} state active", "active_events": sorted(events_list, key=lambda x: x["priority"]), "last_updated": datetime.now().isoformat(timespec="seconds"), } if sound: result["sound"] = sound return result def write_status(): """Write current state to status.json.""" state = get_current_state() try: with open(STATUS_FILE, "w") as f: json.dump(state, f, indent="\t") except OSError as e: print(f"[ERROR] Failed to write status file: {e}") return state def cleanup_expired_events(): """Background thread to remove expired TTL events.""" while True: try: time.sleep(1) now = time.time() expired = [] with events_lock: for eid, event in active_events.items(): if event.get("ttl") and now > event["ttl"]: expired.append(eid) for eid in expired: del active_events[eid] if expired: write_status() except Exception as e: print(f"[cleanup] Error: {e}") @app.route("/event", methods=["POST"]) def post_event(): """ Accept a new event. Expected JSON: {"id": "event_id", "priority": 1-4, "message": "optional", "ttl": optional_seconds} """ data = request.get_json(force=True) if not data or "id" not in data or "priority" not in data: return jsonify({"error": "Missing required fields: id, priority"}), 400 event_id = str(data["id"]) priority = int(data["priority"]) if priority not in PRIORITY_CONFIG: return jsonify({"error": f"Invalid priority: {priority}. Must be 1-4."}), 400 event = { "priority": priority, "message": data.get("message", ""), "timestamp": time.time(), } # Apply TTL if provided, or use default for Priority 3 (Notify) if "ttl" in data: event["ttl"] = time.time() + int(data["ttl"]) elif priority == 3: event["ttl"] = time.time() + DEFAULT_NOTIFY_TTL with events_lock: active_events[event_id] = event state = write_status() return jsonify({"status": "ok", "current_state": state}), 200 @app.route("/clear-all", methods=["POST"]) def clear_all_events(): """Clear all active events.""" with events_lock: count = len(active_events) active_events.clear() state = write_status() return jsonify({"status": "cleared", "count": count, "current_state": state}), 200 @app.route("/clear", methods=["POST"]) def clear_event(): """ Clear an event by ID. Expected JSON: {"id": "event_id"} """ data = request.get_json(force=True) if not data or "id" not in data: return jsonify({"error": "Missing required field: id"}), 400 event_id = str(data["id"]) with events_lock: if event_id in active_events: del active_events[event_id] state = write_status() return jsonify({"status": "cleared", "current_state": state}), 200 else: return jsonify({"error": "Event not found"}), 404 @app.route("/notify", methods=["POST"]) def notify(): """ Notification endpoint for Home Assistant. JSON: { "message": "text", "duration": 5, "emote": "( °o°)", # optional custom emote "color": "#FF9900", # optional custom color "animation": "popping", # optional: breathing, shaking, popping, celebrating, floating, bouncing, swaying "sound": "chime" # optional: chime, alert, warning, critical, success, none } """ global _notify_counter data = request.get_json(force=True) if request.data else {} message = data.get("message", "") duration = int(data.get("duration", DEFAULT_NOTIFY_DURATION)) # Generate unique ID to avoid conflicts _notify_counter += 1 event_id = f"notify_{int(time.time())}_{_notify_counter}" event = { "priority": 3, # Notify priority "message": message, "timestamp": time.time(), "ttl": time.time() + duration, } # Optional custom display properties if "emote" in data: event["emote"] = data["emote"] if "color" in data: event["color"] = data["color"] if "animation" in data: event["animation"] = data["animation"] if "sound" in data: event["sound"] = data["sound"] with events_lock: active_events[event_id] = event state = write_status() return jsonify({"status": "ok", "id": event_id, "current_state": state}), 200 @app.route("/sleep", methods=["POST"]) def sleep_mode(): """Enter sleep mode. For Home Assistant webhook.""" global is_sleeping is_sleeping = True state = write_status() return jsonify({"status": "sleeping", "current_state": state}), 200 @app.route("/wake", methods=["POST"]) def wake_mode(): """Exit sleep mode. For Home Assistant webhook.""" global is_sleeping is_sleeping = False state = write_status() return jsonify({"status": "awake", "current_state": state}), 200 @app.route("/") def index(): """Serve the frontend.""" return send_from_directory(ROOT_DIR, "index.html") @app.route("/openapi.yaml") def openapi_spec(): """Serve OpenAPI specification.""" return send_from_directory(ROOT_DIR, "openapi.yaml", mimetype="text/yaml") @app.route("/status", methods=["GET"]) def get_status(): """Return current status as JSON.""" return jsonify(get_current_state()), 200 @app.route("/events", methods=["GET"]) def list_events(): """List all active events.""" with events_lock: return jsonify({"events": dict(active_events)}), 200 @app.route("/docs") def docs(): """Serve interactive API documentation via Swagger UI.""" return """