- Emote face is now stable all day; /wake picks a fresh random face each morning instead of rotating every 5 minutes. Removes EMOTE_ROTATION_INTERVAL and last_emote_change entirely. - Added 10 new synthesized sounds for /notify: doorbell, knock, ding, blip, siren, tada, ping, bubble, fanfare, and alarm (loops until tapped or TTL expires). stopAlarm() wired into tap handler and handleStateChange(). - openapi.yaml: new sound names added to enum. - CLAUDE.md / README.md updated to reflect both changes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
400 lines
12 KiB
Python
400 lines
12 KiB
Python
"""
|
|
Kao Aggregator
|
|
A lightweight event broker that manages priority-based system status.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import random
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from flask import Flask, request, jsonify, send_from_directory
|
|
|
|
app = Flask(__name__, static_folder=".")
|
|
ROOT_DIR = Path(__file__).parent
|
|
|
|
# Configuration
|
|
STATUS_FILE = Path(__file__).parent / "status.json"
|
|
DEFAULT_NOTIFY_TTL = 10 # Default TTL for Priority 3 (Notify) events
|
|
CELEBRATION_DURATION = 5 # Seconds to show celebration after recovery
|
|
IDLE_EXPRESSION_CHANCE = 0.15 # Chance of a brief blink/wink on wake
|
|
DEFAULT_NOTIFY_DURATION = 5 # Default duration for /notify events
|
|
|
|
# Emote variations with paired animations
|
|
OPTIMAL_EMOTES = [
|
|
("( ^_^)", "breathing"), # calm, content
|
|
("(◕‿◕)", "bouncing"), # cheerful
|
|
("( ・ω・)", "swaying"), # curious
|
|
("( ˘▽˘)", "breathing"), # cozy
|
|
]
|
|
IDLE_EMOTES = [
|
|
("( -_^)", "blink"), # wink
|
|
("( ^_~)", "blink"), # wink
|
|
("( ᵕ.ᵕ)", "blink"), # blink
|
|
]
|
|
CELEBRATION_EMOTE = ("\\(^o^)/", "celebrating")
|
|
|
|
# Priority definitions
|
|
PRIORITY_CONFIG = {
|
|
1: {"name": "Critical", "emote": "( x_x)", "color": "#FF0000", "animation": "shaking"},
|
|
2: {"name": "Warning", "emote": "( o_o)", "color": "#FFFF00", "animation": "breathing"},
|
|
3: {"name": "Notify", "emote": "( 'o')", "color": "#0088FF", "animation": "popping"},
|
|
4: {"name": "Optimal", "emote": "( ^_^)", "color": "#00FF00", "animation": "breathing"},
|
|
}
|
|
|
|
# Thread-safe event storage
|
|
events_lock = threading.Lock()
|
|
active_events = {} # id -> {priority, message, timestamp, ttl}
|
|
|
|
# State tracking for personality
|
|
previous_priority = 4
|
|
celebrating_until = 0
|
|
blinking_until = 0
|
|
blink_emote = None
|
|
blink_animation = None
|
|
current_optimal_emote = OPTIMAL_EMOTES[0][0]
|
|
current_optimal_animation = OPTIMAL_EMOTES[0][1]
|
|
|
|
# Notify counter for unique IDs
|
|
_notify_counter = 0
|
|
|
|
# Sleep mode
|
|
is_sleeping = False
|
|
SLEEP_EMOTE = "( -_-)zzZ"
|
|
SLEEP_COLOR = "#333333"
|
|
SLEEP_ANIMATION = "sleeping"
|
|
|
|
|
|
def get_current_state():
|
|
"""Determine current state based on active events."""
|
|
global previous_priority, celebrating_until, blinking_until, blink_emote, blink_animation
|
|
global current_optimal_emote, current_optimal_animation
|
|
|
|
# Sleep mode overrides everything
|
|
if is_sleeping:
|
|
return {
|
|
"current_state": "sleeping",
|
|
"active_emote": SLEEP_EMOTE,
|
|
"color": SLEEP_COLOR,
|
|
"animation": SLEEP_ANIMATION,
|
|
"message": "",
|
|
"active_events": [],
|
|
"last_updated": datetime.now().isoformat(timespec="seconds"),
|
|
}
|
|
|
|
now = time.time()
|
|
|
|
top_event = None
|
|
with events_lock:
|
|
if not active_events:
|
|
priority = 4
|
|
events_list = []
|
|
else:
|
|
# Find highest priority (lowest number) and its event
|
|
priority = min(e["priority"] for e in active_events.values())
|
|
for eid, e in active_events.items():
|
|
if e["priority"] == priority:
|
|
top_event = e
|
|
break
|
|
events_list = [
|
|
{"id": eid, "priority": e["priority"], "message": e.get("message", "")}
|
|
for eid, e in active_events.items()
|
|
]
|
|
|
|
config = PRIORITY_CONFIG[priority]
|
|
emote = config["emote"]
|
|
animation = config["animation"]
|
|
color = config["color"]
|
|
sound = None
|
|
|
|
# Check for custom display properties from top event
|
|
if top_event:
|
|
if "emote" in top_event:
|
|
emote = top_event["emote"]
|
|
if "color" in top_event:
|
|
color = top_event["color"]
|
|
if "animation" in top_event:
|
|
animation = top_event["animation"]
|
|
if "sound" in top_event:
|
|
sound = top_event["sound"]
|
|
|
|
# Check for recovery (was bad, now optimal)
|
|
if priority == 4 and previous_priority < 3:
|
|
celebrating_until = now + CELEBRATION_DURATION
|
|
|
|
previous_priority = priority
|
|
|
|
# Handle optimal state personality (only if no custom overrides)
|
|
if priority == 4 and not top_event:
|
|
if now < celebrating_until:
|
|
# Celebration mode
|
|
emote, animation = CELEBRATION_EMOTE
|
|
elif now < blinking_until:
|
|
# Brief blink/wink (1-2 seconds)
|
|
emote = blink_emote
|
|
animation = blink_animation
|
|
else:
|
|
emote = current_optimal_emote
|
|
animation = current_optimal_animation
|
|
|
|
result = {
|
|
"current_state": config["name"].lower(),
|
|
"active_emote": emote,
|
|
"color": color,
|
|
"animation": animation,
|
|
"message": config["name"] if priority == 4 else f"{config['name']} state active",
|
|
"active_events": sorted(events_list, key=lambda x: x["priority"]),
|
|
"last_updated": datetime.now().isoformat(timespec="seconds"),
|
|
}
|
|
|
|
if sound:
|
|
result["sound"] = sound
|
|
|
|
return result
|
|
|
|
|
|
def write_status():
|
|
"""Write current state to status.json."""
|
|
state = get_current_state()
|
|
try:
|
|
with open(STATUS_FILE, "w") as f:
|
|
json.dump(state, f, indent="\t")
|
|
except OSError as e:
|
|
print(f"[ERROR] Failed to write status file: {e}")
|
|
return state
|
|
|
|
|
|
def cleanup_expired_events():
|
|
"""Background thread to remove expired TTL events."""
|
|
while True:
|
|
try:
|
|
time.sleep(1)
|
|
now = time.time()
|
|
expired = []
|
|
|
|
with events_lock:
|
|
for eid, event in active_events.items():
|
|
if event.get("ttl") and now > event["ttl"]:
|
|
expired.append(eid)
|
|
|
|
for eid in expired:
|
|
del active_events[eid]
|
|
|
|
if expired:
|
|
write_status()
|
|
except Exception as e:
|
|
print(f"[cleanup] Error: {e}")
|
|
|
|
|
|
@app.route("/event", methods=["POST"])
|
|
def post_event():
|
|
"""
|
|
Accept a new event.
|
|
Expected JSON: {"id": "event_id", "priority": 1-4, "message": "optional", "ttl": optional_seconds}
|
|
"""
|
|
data = request.get_json(force=True)
|
|
|
|
if not data or "id" not in data or "priority" not in data:
|
|
return jsonify({"error": "Missing required fields: id, priority"}), 400
|
|
|
|
event_id = str(data["id"])
|
|
priority = int(data["priority"])
|
|
|
|
if priority not in PRIORITY_CONFIG:
|
|
return jsonify({"error": f"Invalid priority: {priority}. Must be 1-4."}), 400
|
|
|
|
event = {
|
|
"priority": priority,
|
|
"message": data.get("message", ""),
|
|
"timestamp": time.time(),
|
|
}
|
|
|
|
# Apply TTL if provided, or use default for Priority 3 (Notify)
|
|
if "ttl" in data:
|
|
event["ttl"] = time.time() + int(data["ttl"])
|
|
elif priority == 3:
|
|
event["ttl"] = time.time() + DEFAULT_NOTIFY_TTL
|
|
|
|
with events_lock:
|
|
active_events[event_id] = event
|
|
|
|
state = write_status()
|
|
return jsonify({"status": "ok", "current_state": state}), 200
|
|
|
|
|
|
@app.route("/clear-all", methods=["POST"])
|
|
def clear_all_events():
|
|
"""Clear all active events."""
|
|
with events_lock:
|
|
count = len(active_events)
|
|
active_events.clear()
|
|
|
|
state = write_status()
|
|
return jsonify({"status": "cleared", "count": count, "current_state": state}), 200
|
|
|
|
|
|
@app.route("/clear", methods=["POST"])
|
|
def clear_event():
|
|
"""
|
|
Clear an event by ID.
|
|
Expected JSON: {"id": "event_id"}
|
|
"""
|
|
data = request.get_json(force=True)
|
|
|
|
if not data or "id" not in data:
|
|
return jsonify({"error": "Missing required field: id"}), 400
|
|
|
|
event_id = str(data["id"])
|
|
|
|
with events_lock:
|
|
if event_id in active_events:
|
|
del active_events[event_id]
|
|
state = write_status()
|
|
return jsonify({"status": "cleared", "current_state": state}), 200
|
|
else:
|
|
return jsonify({"error": "Event not found"}), 404
|
|
|
|
|
|
@app.route("/notify", methods=["POST"])
|
|
def notify():
|
|
"""
|
|
Notification endpoint for Home Assistant.
|
|
JSON: {
|
|
"message": "text",
|
|
"duration": 5,
|
|
"emote": "( °o°)", # optional custom emote
|
|
"color": "#FF9900", # optional custom color
|
|
"animation": "popping", # optional: breathing, shaking, popping, celebrating, floating, bouncing, swaying
|
|
"sound": "chime" # optional: chime, alert, warning, critical, success, none
|
|
}
|
|
"""
|
|
global _notify_counter
|
|
data = request.get_json(force=True) if request.data else {}
|
|
message = data.get("message", "")
|
|
duration = int(data.get("duration", DEFAULT_NOTIFY_DURATION))
|
|
|
|
# Generate unique ID to avoid conflicts
|
|
_notify_counter += 1
|
|
event_id = f"notify_{int(time.time())}_{_notify_counter}"
|
|
|
|
event = {
|
|
"priority": 3, # Notify priority
|
|
"message": message,
|
|
"timestamp": time.time(),
|
|
"ttl": time.time() + duration,
|
|
}
|
|
|
|
# Optional custom display properties
|
|
if "emote" in data:
|
|
event["emote"] = data["emote"]
|
|
if "color" in data:
|
|
event["color"] = data["color"]
|
|
if "animation" in data:
|
|
event["animation"] = data["animation"]
|
|
if "sound" in data:
|
|
event["sound"] = data["sound"]
|
|
|
|
with events_lock:
|
|
active_events[event_id] = event
|
|
|
|
state = write_status()
|
|
return jsonify({"status": "ok", "id": event_id, "current_state": state}), 200
|
|
|
|
|
|
@app.route("/sleep", methods=["POST"])
|
|
def sleep_mode():
|
|
"""Enter sleep mode. For Home Assistant webhook."""
|
|
global is_sleeping
|
|
is_sleeping = True
|
|
state = write_status()
|
|
return jsonify({"status": "sleeping", "current_state": state}), 200
|
|
|
|
|
|
@app.route("/wake", methods=["POST"])
|
|
def wake_mode():
|
|
"""Exit sleep mode. For Home Assistant webhook."""
|
|
global is_sleeping, current_optimal_emote, current_optimal_animation
|
|
global blink_emote, blink_animation, blinking_until
|
|
is_sleeping = False
|
|
current_optimal_emote, current_optimal_animation = random.choice(OPTIMAL_EMOTES)
|
|
if random.random() < IDLE_EXPRESSION_CHANCE:
|
|
blink_emote, blink_animation = random.choice(IDLE_EMOTES)
|
|
blinking_until = time.time() + random.uniform(1, 2)
|
|
state = write_status()
|
|
return jsonify({"status": "awake", "current_state": state}), 200
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
"""Serve the frontend."""
|
|
return send_from_directory(ROOT_DIR, "index.html")
|
|
|
|
|
|
@app.route("/openapi.yaml")
|
|
def openapi_spec():
|
|
"""Serve OpenAPI specification."""
|
|
return send_from_directory(ROOT_DIR, "openapi.yaml", mimetype="text/yaml")
|
|
|
|
|
|
@app.route("/status", methods=["GET"])
|
|
def get_status():
|
|
"""Return current status as JSON."""
|
|
return jsonify(get_current_state()), 200
|
|
|
|
|
|
@app.route("/events", methods=["GET"])
|
|
def list_events():
|
|
"""List all active events."""
|
|
with events_lock:
|
|
return jsonify({"events": dict(active_events)}), 200
|
|
|
|
|
|
@app.route("/docs")
|
|
def docs():
|
|
"""Serve interactive API documentation via Swagger UI."""
|
|
return """<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<title>Kao API Documentation</title>
|
|
<link rel="stylesheet" href="https://unpkg.com/swagger-ui-dist@5/swagger-ui.css">
|
|
<style>
|
|
body { margin: 0; background: #fafafa; }
|
|
.swagger-ui .topbar { display: none; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div id="swagger-ui"></div>
|
|
<script src="https://unpkg.com/swagger-ui-dist@5/swagger-ui-bundle.js"></script>
|
|
<script>
|
|
SwaggerUIBundle({
|
|
url: '/openapi.yaml',
|
|
dom_id: '#swagger-ui',
|
|
presets: [SwaggerUIBundle.presets.apis, SwaggerUIBundle.SwaggerUIStandalonePreset],
|
|
layout: 'BaseLayout'
|
|
});
|
|
</script>
|
|
</body>
|
|
</html>"""
|
|
|
|
|
|
def main():
|
|
port = int(os.environ.get("PORT", 5100))
|
|
|
|
# Write initial optimal state
|
|
write_status()
|
|
print(f"Status file: {STATUS_FILE}")
|
|
|
|
# Start TTL cleanup thread
|
|
cleanup_thread = threading.Thread(target=cleanup_expired_events, daemon=True)
|
|
cleanup_thread.start()
|
|
|
|
# Run Flask
|
|
app.run(host="0.0.0.0", port=port, threaded=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|