Feat: notification queue — buffer rapid /notify calls, play sequentially
When multiple /notify calls arrive in quick succession, they now queue up and display one at a time rather than clobbering each other. Each notification plays for its full duration before the next is promoted. - /notify returns `queued: true` and `notify_queue_size` when buffered - Cleanup thread auto-advances the queue when the playing notification expires - /clear on the playing notification promotes the next immediately - /clear on a queued (not-yet-playing) notification removes it from the queue - /clear-all also drains the queue - Status response includes `notify_queue_size` for frontend awareness Bump to v2.3.3. Update OpenAPI spec, README, TODO. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@ Kao Aggregator
|
||||
A lightweight event broker that manages priority-based system status.
|
||||
"""
|
||||
|
||||
import collections
|
||||
import json
|
||||
import os
|
||||
import queue
|
||||
@@ -62,6 +63,9 @@ current_optimal_animation = OPTIMAL_EMOTES[0][1]
|
||||
# Notify counter for unique IDs
|
||||
_notify_counter = 0
|
||||
|
||||
# Notification queue: buffered /notify calls waiting to play (protected by events_lock)
|
||||
notify_queue: collections.deque = collections.deque()
|
||||
|
||||
# SSE subscribers: one queue per connected client
|
||||
_subscribers: set = set()
|
||||
_subscribers_lock = threading.Lock()
|
||||
@@ -165,6 +169,7 @@ def get_current_state():
|
||||
"animation": animation,
|
||||
"message": config["name"] if priority == 4 else f"{config['name']} state active",
|
||||
"active_events": sorted(events_list, key=lambda x: x["priority"]),
|
||||
"notify_queue_size": len(notify_queue),
|
||||
"last_updated": datetime.now().isoformat(timespec="seconds"),
|
||||
}
|
||||
|
||||
@@ -186,6 +191,19 @@ def write_status():
|
||||
return state
|
||||
|
||||
|
||||
def _promote_next_notification(now: float):
|
||||
"""Promote the next queued notification to active. Must be called under events_lock."""
|
||||
if notify_queue:
|
||||
next_id, next_event = notify_queue.popleft()
|
||||
duration = next_event.get("duration", DEFAULT_NOTIFY_DURATION)
|
||||
active_events[next_id] = {
|
||||
**next_event,
|
||||
"timestamp": now,
|
||||
"ttl": now + duration,
|
||||
}
|
||||
print(f"[QUEUE] Promoting next notification: {next_id} ({len(notify_queue)} remaining)")
|
||||
|
||||
|
||||
def cleanup_expired_events():
|
||||
"""Background thread to remove expired TTL events."""
|
||||
while True:
|
||||
@@ -193,6 +211,7 @@ def cleanup_expired_events():
|
||||
time.sleep(1)
|
||||
now = time.time()
|
||||
expired = []
|
||||
had_queue_event = False
|
||||
|
||||
with events_lock:
|
||||
for eid, event in active_events.items():
|
||||
@@ -200,8 +219,14 @@ def cleanup_expired_events():
|
||||
expired.append(eid)
|
||||
|
||||
for eid in expired:
|
||||
if active_events[eid].get("from_queue"):
|
||||
had_queue_event = True
|
||||
del active_events[eid]
|
||||
|
||||
# Auto-advance: promote next queued notification when the playing one expires
|
||||
if had_queue_event:
|
||||
_promote_next_notification(now)
|
||||
|
||||
if expired:
|
||||
write_status()
|
||||
except Exception as e:
|
||||
@@ -248,10 +273,11 @@ def post_event():
|
||||
|
||||
@app.route("/clear-all", methods=["POST"])
|
||||
def clear_all_events():
|
||||
"""Clear all active events."""
|
||||
"""Clear all active events and the notification queue."""
|
||||
with events_lock:
|
||||
count = len(active_events)
|
||||
active_events.clear()
|
||||
notify_queue.clear()
|
||||
|
||||
state = write_status()
|
||||
return jsonify({"status": "cleared", "count": count, "current_state": state}), 200
|
||||
@@ -270,10 +296,27 @@ def clear_event():
|
||||
|
||||
event_id = str(data["id"])
|
||||
|
||||
found = False
|
||||
with events_lock:
|
||||
if event_id not in active_events:
|
||||
return jsonify({"error": "Event not found"}), 404
|
||||
del active_events[event_id]
|
||||
if event_id in active_events:
|
||||
was_queue_playing = active_events[event_id].get("from_queue", False)
|
||||
del active_events[event_id]
|
||||
found = True
|
||||
# If the playing queue notification was cleared, promote the next one
|
||||
if was_queue_playing:
|
||||
_promote_next_notification(time.time())
|
||||
else:
|
||||
# Check if it's waiting in the queue and remove it
|
||||
new_queue = collections.deque(
|
||||
(qid, qe) for qid, qe in notify_queue if qid != event_id
|
||||
)
|
||||
if len(new_queue) < len(notify_queue):
|
||||
notify_queue.clear()
|
||||
notify_queue.extend(new_queue)
|
||||
found = True
|
||||
|
||||
if not found:
|
||||
return jsonify({"error": "Event not found"}), 404
|
||||
|
||||
state = write_status()
|
||||
return jsonify({"status": "cleared", "current_state": state}), 200
|
||||
@@ -291,6 +334,7 @@ def notify():
|
||||
"animation": "popping", # optional: breathing, shaking, popping, celebrating, floating, bouncing, swaying
|
||||
"sound": "chime" # optional: chime, alert, warning, critical, success, none
|
||||
}
|
||||
Rapid calls are buffered and played sequentially.
|
||||
"""
|
||||
global _notify_counter
|
||||
data = request.get_json(force=True) if request.data else {}
|
||||
@@ -302,27 +346,41 @@ def notify():
|
||||
event_id = f"notify_{int(time.time())}_{_notify_counter}"
|
||||
|
||||
event = {
|
||||
"priority": 3, # Notify priority
|
||||
"priority": 3,
|
||||
"message": message,
|
||||
"timestamp": time.time(),
|
||||
"ttl": time.time() + duration,
|
||||
"from_queue": True, # Tag so cleanup knows to auto-advance
|
||||
"duration": duration, # Preserved for when promoted from queue
|
||||
}
|
||||
|
||||
# Optional custom display properties
|
||||
if "emote" in data:
|
||||
event["emote"] = data["emote"]
|
||||
if "color" in data:
|
||||
event["color"] = data["color"]
|
||||
if "animation" in data:
|
||||
event["animation"] = data["animation"]
|
||||
if "sound" in data:
|
||||
event["sound"] = data["sound"]
|
||||
for key in ("emote", "color", "animation", "sound"):
|
||||
if key in data:
|
||||
event[key] = data[key]
|
||||
|
||||
play_now = False
|
||||
with events_lock:
|
||||
active_events[event_id] = event
|
||||
# Play immediately only if no queue notification is currently active or pending
|
||||
queue_busy = any(e.get("from_queue") for e in active_events.values()) or len(notify_queue) > 0
|
||||
if not queue_busy:
|
||||
now = time.time()
|
||||
active_events[event_id] = {**event, "timestamp": now, "ttl": now + duration}
|
||||
play_now = True
|
||||
else:
|
||||
notify_queue.append((event_id, event))
|
||||
print(f"[QUEUE] Buffered notification: {event_id} ({len(notify_queue)} in queue)")
|
||||
|
||||
state = write_status()
|
||||
return jsonify({"status": "ok", "id": event_id, "current_state": state}), 200
|
||||
if play_now:
|
||||
state = write_status()
|
||||
else:
|
||||
state = get_current_state()
|
||||
|
||||
return jsonify({
|
||||
"status": "ok",
|
||||
"id": event_id,
|
||||
"queued": not play_now,
|
||||
"notify_queue_size": len(notify_queue),
|
||||
"current_state": state,
|
||||
}), 200
|
||||
|
||||
|
||||
@app.route("/sleep", methods=["POST"])
|
||||
|
||||
Reference in New Issue
Block a user