""" Sentry-Emote Aggregator A lightweight event broker that manages priority-based system status. """ import json import random import threading import time from datetime import datetime from pathlib import Path from flask import Flask, request, jsonify, send_from_directory app = Flask(__name__, static_folder=".") ROOT_DIR = Path(__file__).parent # Configuration STATUS_FILE = Path(__file__).parent / "status.json" DEFAULT_NOTIFY_TTL = 10 # Default TTL for Priority 3 (Notify) events CELEBRATION_DURATION = 5 # Seconds to show celebration after recovery # Emote variations with paired animations OPTIMAL_EMOTES = [ ("( ^_^)", "breathing"), # calm, content ("( ᵔᴥᵔ)", "floating"), # dreamy ("(◕‿◕)", "bouncing"), # cheerful ("( ・ω・)", "swaying"), # curious ("( ˘▽˘)", "breathing"), # cozy ] IDLE_EMOTES = [ ("( -_^)", "blink"), # wink ("( ^_~)", "blink"), # wink ("( ᵕ.ᵕ)", "blink"), # blink ] CELEBRATION_EMOTE = ("\\(^o^)/", "celebrating") # Priority definitions PRIORITY_CONFIG = { 1: {"name": "Critical", "emote": "( x_x)", "color": "#FF0000", "animation": "shaking"}, 2: {"name": "Warning", "emote": "( o_o)", "color": "#FFFF00", "animation": "breathing"}, 3: {"name": "Notify", "emote": "( 'o')", "color": "#0088FF", "animation": "popping"}, 4: {"name": "Optimal", "emote": "( ^_^)", "color": "#00FF00", "animation": "breathing"}, } # Thread-safe event storage events_lock = threading.Lock() active_events = {} # id -> {priority, message, timestamp, ttl} # State tracking for personality previous_priority = 4 celebrating_until = 0 last_emote_change = 0 current_optimal_emote = OPTIMAL_EMOTES[0][0] current_optimal_animation = OPTIMAL_EMOTES[0][1] # Sleep mode is_sleeping = False SLEEP_EMOTE = "( -_-)zzZ" SLEEP_COLOR = "#333333" SLEEP_ANIMATION = "sleeping" def get_current_state(): """Determine current state based on active events.""" global previous_priority, celebrating_until, last_emote_change, current_optimal_emote, current_optimal_animation # Sleep mode overrides everything if is_sleeping: return { "current_state": "sleeping", "active_emote": SLEEP_EMOTE, "color": SLEEP_COLOR, "animation": SLEEP_ANIMATION, "message": "", "active_events": [], "last_updated": datetime.now().isoformat(timespec="seconds"), } now = time.time() with events_lock: if not active_events: priority = 4 events_list = [] else: # Find highest priority (lowest number) priority = min(e["priority"] for e in active_events.values()) events_list = [ {"id": eid, "priority": e["priority"], "message": e.get("message", "")} for eid, e in active_events.items() ] config = PRIORITY_CONFIG[priority] emote = config["emote"] animation = config["animation"] color = config["color"] # Check for recovery (was bad, now optimal) if priority == 4 and previous_priority < 4: celebrating_until = now + CELEBRATION_DURATION previous_priority = priority # Handle optimal state personality if priority == 4: if now < celebrating_until: # Celebration mode emote, animation = CELEBRATION_EMOTE else: # Rotate optimal emotes every 5 minutes, occasional idle expression if now - last_emote_change > 300: last_emote_change = now # 15% chance of an idle expression (wink/blink) if random.random() < 0.15: current_optimal_emote, current_optimal_animation = random.choice(IDLE_EMOTES) else: current_optimal_emote, current_optimal_animation = random.choice(OPTIMAL_EMOTES) emote = current_optimal_emote animation = current_optimal_animation return { "current_state": config["name"].lower(), "active_emote": emote, "color": color, "animation": animation, "message": config["name"] if priority == 4 else f"{config['name']} state active", "active_events": sorted(events_list, key=lambda x: x["priority"]), "last_updated": datetime.now().isoformat(timespec="seconds"), } def write_status(): """Write current state to status.json.""" state = get_current_state() with open(STATUS_FILE, "w") as f: json.dump(state, f, indent="\t") return state def cleanup_expired_events(): """Background thread to remove expired TTL events.""" while True: time.sleep(1) now = time.time() expired = [] with events_lock: for eid, event in active_events.items(): if event.get("ttl") and now > event["ttl"]: expired.append(eid) for eid in expired: del active_events[eid] if expired: write_status() @app.route("/event", methods=["POST"]) def post_event(): """ Accept a new event. Expected JSON: {"id": "event_id", "priority": 1-4, "message": "optional", "ttl": optional_seconds} """ data = request.get_json(force=True) if not data or "id" not in data or "priority" not in data: return jsonify({"error": "Missing required fields: id, priority"}), 400 event_id = str(data["id"]) priority = int(data["priority"]) if priority not in PRIORITY_CONFIG: return jsonify({"error": f"Invalid priority: {priority}. Must be 1-4."}), 400 event = { "priority": priority, "message": data.get("message", ""), "timestamp": time.time(), } # Apply TTL if provided, or use default for Priority 3 (Notify) if "ttl" in data: event["ttl"] = time.time() + int(data["ttl"]) elif priority == 3: event["ttl"] = time.time() + DEFAULT_NOTIFY_TTL with events_lock: active_events[event_id] = event state = write_status() return jsonify({"status": "ok", "current_state": state}), 200 @app.route("/clear", methods=["POST"]) def clear_event(): """ Clear an event by ID. Expected JSON: {"id": "event_id"} """ data = request.get_json(force=True) if not data or "id" not in data: return jsonify({"error": "Missing required field: id"}), 400 event_id = str(data["id"]) with events_lock: if event_id in active_events: del active_events[event_id] state = write_status() return jsonify({"status": "cleared", "current_state": state}), 200 else: return jsonify({"error": "Event not found"}), 404 @app.route("/sleep", methods=["POST"]) def sleep_mode(): """Enter sleep mode. For Home Assistant webhook.""" global is_sleeping is_sleeping = True state = write_status() return jsonify({"status": "sleeping", "current_state": state}), 200 @app.route("/wake", methods=["POST"]) def wake_mode(): """Exit sleep mode. For Home Assistant webhook.""" global is_sleeping is_sleeping = False state = write_status() return jsonify({"status": "awake", "current_state": state}), 200 @app.route("/") def index(): """Serve the frontend.""" return send_from_directory(ROOT_DIR, "index.html") @app.route("/status", methods=["GET"]) def get_status(): """Return current status as JSON.""" return jsonify(get_current_state()), 200 @app.route("/events", methods=["GET"]) def list_events(): """List all active events.""" with events_lock: return jsonify({"events": dict(active_events)}), 200 def main(): # Write initial optimal state write_status() print(f"Status file: {STATUS_FILE}") # Start TTL cleanup thread cleanup_thread = threading.Thread(target=cleanup_expired_events, daemon=True) cleanup_thread.start() # Run Flask app.run(host="0.0.0.0", port=5000, threaded=True) if __name__ == "__main__": main()