Merge feature/notify-queue: notification queue for /notify endpoint
This commit is contained in:
@@ -84,7 +84,7 @@ All detectors support: `AGGREGATOR_URL`, `CHECK_INTERVAL`, `THRESHOLD_WARNING`,
|
|||||||
| `/event` | POST | Register event: `{"id": "name", "priority": 1-4, "message": "optional", "ttl": seconds}` |
|
| `/event` | POST | Register event: `{"id": "name", "priority": 1-4, "message": "optional", "ttl": seconds}` |
|
||||||
| `/clear` | POST | Clear event: `{"id": "name"}` |
|
| `/clear` | POST | Clear event: `{"id": "name"}` |
|
||||||
| `/clear-all` | POST | Clear all active events |
|
| `/clear-all` | POST | Clear all active events |
|
||||||
| `/notify` | POST | Notification with optional customization (see below) |
|
| `/notify` | POST | Queued notification — buffered if one is playing, auto-advances when it expires |
|
||||||
| `/sleep` | POST | Enter sleep mode |
|
| `/sleep` | POST | Enter sleep mode |
|
||||||
| `/wake` | POST | Exit sleep mode |
|
| `/wake` | POST | Exit sleep mode |
|
||||||
| `/stream` | GET | SSE stream — pushes state JSON on every change (used by the frontend) |
|
| `/stream` | GET | SSE stream — pushes state JSON on every change (used by the frontend) |
|
||||||
@@ -105,6 +105,8 @@ All detectors support: `AGGREGATOR_URL`, `CHECK_INTERVAL`, `THRESHOLD_WARNING`,
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Rapid calls are buffered — each notification plays for its full `duration` before the next is shown. `/clear-all` also drains the queue.
|
||||||
|
|
||||||
| Field | Required | Description |
|
| Field | Required | Description |
|
||||||
|-------|----------|-------------|
|
|-------|----------|-------------|
|
||||||
| `message` | No | Text to display below emote |
|
| `message` | No | Text to display below emote |
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ Turn an old phone (with its OLED screen) into a glanceable ambient display for y
|
|||||||
- **Extensible** — Add custom detectors for any metric
|
- **Extensible** — Add custom detectors for any metric
|
||||||
- **Personality** — Rotating expressions, celebration animations, sleep mode
|
- **Personality** — Rotating expressions, celebration animations, sleep mode
|
||||||
- **Sound effects** — Optional audio cues for state changes (tap to enable)
|
- **Sound effects** — Optional audio cues for state changes (tap to enable)
|
||||||
|
- **Notification queue** — Rapid `/notify` bursts are buffered and played sequentially, never clobbering
|
||||||
- **Home Assistant ready** — Webhook endpoints for notifications and automation
|
- **Home Assistant ready** — Webhook endpoints for notifications and automation
|
||||||
|
|
||||||
## Quick Start
|
## Quick Start
|
||||||
@@ -191,6 +192,8 @@ automation:
|
|||||||
service: rest_command.kao_wake
|
service: rest_command.kao_wake
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Rapid `/notify` calls are queued automatically — each plays for its full `duration` before the next is shown. `/clear-all` drains the queue.
|
||||||
|
|
||||||
**Notify options:**
|
**Notify options:**
|
||||||
- `emote`: Any ASCII emote (e.g., `( °o°)`, `\\(^o^)/`)
|
- `emote`: Any ASCII emote (e.g., `( °o°)`, `\\(^o^)/`)
|
||||||
- `color`: Hex color (e.g., `#FF9900`)
|
- `color`: Hex color (e.g., `#FF9900`)
|
||||||
@@ -225,7 +228,7 @@ Navigate with `↑↓` or `Tab`, press `Enter` to fire, `Q` to quit. A toast con
|
|||||||
| `/event` | POST | Register an event |
|
| `/event` | POST | Register an event |
|
||||||
| `/clear` | POST | Clear an event by ID |
|
| `/clear` | POST | Clear an event by ID |
|
||||||
| `/clear-all` | POST | Clear all active events |
|
| `/clear-all` | POST | Clear all active events |
|
||||||
| `/notify` | POST | Simple notification `{"message": "", "duration": 5}` |
|
| `/notify` | POST | Queued notification `{"message": "", "duration": 5}` — buffered if one is playing |
|
||||||
| `/sleep` | POST | Enter sleep mode |
|
| `/sleep` | POST | Enter sleep mode |
|
||||||
| `/wake` | POST | Exit sleep mode |
|
| `/wake` | POST | Exit sleep mode |
|
||||||
| `/docs` | GET | Interactive API documentation (Swagger UI) |
|
| `/docs` | GET | Interactive API documentation (Swagger UI) |
|
||||||
|
|||||||
3
TODO.md
3
TODO.md
@@ -4,8 +4,7 @@ Feature ideas for future work, roughly in priority order.
|
|||||||
|
|
||||||
## REST API improvements
|
## REST API improvements
|
||||||
|
|
||||||
- **Notification queue** — buffer rapid `/notify` calls and auto-advance through
|
- ~~**Notification queue**~~ — done in v2.3.3
|
||||||
them instead of clobbering; important when HA fires several events at once
|
|
||||||
- **Sticky notifications** — a `sticky: true` flag on `/notify` to keep a
|
- **Sticky notifications** — a `sticky: true` flag on `/notify` to keep a
|
||||||
notification visible until explicitly cleared via `/clear`, rather than TTL-expiring
|
notification visible until explicitly cleared via `/clear`, rather than TTL-expiring
|
||||||
- **Named presets** — define reusable notification profiles in `config.json`
|
- **Named presets** — define reusable notification profiles in `config.json`
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ Kao Aggregator
|
|||||||
A lightweight event broker that manages priority-based system status.
|
A lightweight event broker that manages priority-based system status.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import collections
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import queue
|
import queue
|
||||||
@@ -62,6 +63,9 @@ current_optimal_animation = OPTIMAL_EMOTES[0][1]
|
|||||||
# Notify counter for unique IDs
|
# Notify counter for unique IDs
|
||||||
_notify_counter = 0
|
_notify_counter = 0
|
||||||
|
|
||||||
|
# Notification queue: buffered /notify calls waiting to play (protected by events_lock)
|
||||||
|
notify_queue: collections.deque = collections.deque()
|
||||||
|
|
||||||
# SSE subscribers: one queue per connected client
|
# SSE subscribers: one queue per connected client
|
||||||
_subscribers: set = set()
|
_subscribers: set = set()
|
||||||
_subscribers_lock = threading.Lock()
|
_subscribers_lock = threading.Lock()
|
||||||
@@ -165,6 +169,7 @@ def get_current_state():
|
|||||||
"animation": animation,
|
"animation": animation,
|
||||||
"message": config["name"] if priority == 4 else f"{config['name']} state active",
|
"message": config["name"] if priority == 4 else f"{config['name']} state active",
|
||||||
"active_events": sorted(events_list, key=lambda x: x["priority"]),
|
"active_events": sorted(events_list, key=lambda x: x["priority"]),
|
||||||
|
"notify_queue_size": len(notify_queue),
|
||||||
"last_updated": datetime.now().isoformat(timespec="seconds"),
|
"last_updated": datetime.now().isoformat(timespec="seconds"),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -186,6 +191,19 @@ def write_status():
|
|||||||
return state
|
return state
|
||||||
|
|
||||||
|
|
||||||
|
def _promote_next_notification(now: float):
|
||||||
|
"""Promote the next queued notification to active. Must be called under events_lock."""
|
||||||
|
if notify_queue:
|
||||||
|
next_id, next_event = notify_queue.popleft()
|
||||||
|
duration = next_event.get("duration", DEFAULT_NOTIFY_DURATION)
|
||||||
|
active_events[next_id] = {
|
||||||
|
**next_event,
|
||||||
|
"timestamp": now,
|
||||||
|
"ttl": now + duration,
|
||||||
|
}
|
||||||
|
print(f"[QUEUE] Promoting next notification: {next_id} ({len(notify_queue)} remaining)")
|
||||||
|
|
||||||
|
|
||||||
def cleanup_expired_events():
|
def cleanup_expired_events():
|
||||||
"""Background thread to remove expired TTL events."""
|
"""Background thread to remove expired TTL events."""
|
||||||
while True:
|
while True:
|
||||||
@@ -193,6 +211,7 @@ def cleanup_expired_events():
|
|||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
now = time.time()
|
now = time.time()
|
||||||
expired = []
|
expired = []
|
||||||
|
had_queue_event = False
|
||||||
|
|
||||||
with events_lock:
|
with events_lock:
|
||||||
for eid, event in active_events.items():
|
for eid, event in active_events.items():
|
||||||
@@ -200,8 +219,14 @@ def cleanup_expired_events():
|
|||||||
expired.append(eid)
|
expired.append(eid)
|
||||||
|
|
||||||
for eid in expired:
|
for eid in expired:
|
||||||
|
if active_events[eid].get("from_queue"):
|
||||||
|
had_queue_event = True
|
||||||
del active_events[eid]
|
del active_events[eid]
|
||||||
|
|
||||||
|
# Auto-advance: promote next queued notification when the playing one expires
|
||||||
|
if had_queue_event:
|
||||||
|
_promote_next_notification(now)
|
||||||
|
|
||||||
if expired:
|
if expired:
|
||||||
write_status()
|
write_status()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -248,10 +273,11 @@ def post_event():
|
|||||||
|
|
||||||
@app.route("/clear-all", methods=["POST"])
|
@app.route("/clear-all", methods=["POST"])
|
||||||
def clear_all_events():
|
def clear_all_events():
|
||||||
"""Clear all active events."""
|
"""Clear all active events and the notification queue."""
|
||||||
with events_lock:
|
with events_lock:
|
||||||
count = len(active_events)
|
count = len(active_events)
|
||||||
active_events.clear()
|
active_events.clear()
|
||||||
|
notify_queue.clear()
|
||||||
|
|
||||||
state = write_status()
|
state = write_status()
|
||||||
return jsonify({"status": "cleared", "count": count, "current_state": state}), 200
|
return jsonify({"status": "cleared", "count": count, "current_state": state}), 200
|
||||||
@@ -270,10 +296,27 @@ def clear_event():
|
|||||||
|
|
||||||
event_id = str(data["id"])
|
event_id = str(data["id"])
|
||||||
|
|
||||||
|
found = False
|
||||||
with events_lock:
|
with events_lock:
|
||||||
if event_id not in active_events:
|
if event_id in active_events:
|
||||||
return jsonify({"error": "Event not found"}), 404
|
was_queue_playing = active_events[event_id].get("from_queue", False)
|
||||||
del active_events[event_id]
|
del active_events[event_id]
|
||||||
|
found = True
|
||||||
|
# If the playing queue notification was cleared, promote the next one
|
||||||
|
if was_queue_playing:
|
||||||
|
_promote_next_notification(time.time())
|
||||||
|
else:
|
||||||
|
# Check if it's waiting in the queue and remove it
|
||||||
|
new_queue = collections.deque(
|
||||||
|
(qid, qe) for qid, qe in notify_queue if qid != event_id
|
||||||
|
)
|
||||||
|
if len(new_queue) < len(notify_queue):
|
||||||
|
notify_queue.clear()
|
||||||
|
notify_queue.extend(new_queue)
|
||||||
|
found = True
|
||||||
|
|
||||||
|
if not found:
|
||||||
|
return jsonify({"error": "Event not found"}), 404
|
||||||
|
|
||||||
state = write_status()
|
state = write_status()
|
||||||
return jsonify({"status": "cleared", "current_state": state}), 200
|
return jsonify({"status": "cleared", "current_state": state}), 200
|
||||||
@@ -291,6 +334,7 @@ def notify():
|
|||||||
"animation": "popping", # optional: breathing, shaking, popping, celebrating, floating, bouncing, swaying
|
"animation": "popping", # optional: breathing, shaking, popping, celebrating, floating, bouncing, swaying
|
||||||
"sound": "chime" # optional: chime, alert, warning, critical, success, none
|
"sound": "chime" # optional: chime, alert, warning, critical, success, none
|
||||||
}
|
}
|
||||||
|
Rapid calls are buffered and played sequentially.
|
||||||
"""
|
"""
|
||||||
global _notify_counter
|
global _notify_counter
|
||||||
data = request.get_json(force=True) if request.data else {}
|
data = request.get_json(force=True) if request.data else {}
|
||||||
@@ -302,27 +346,41 @@ def notify():
|
|||||||
event_id = f"notify_{int(time.time())}_{_notify_counter}"
|
event_id = f"notify_{int(time.time())}_{_notify_counter}"
|
||||||
|
|
||||||
event = {
|
event = {
|
||||||
"priority": 3, # Notify priority
|
"priority": 3,
|
||||||
"message": message,
|
"message": message,
|
||||||
"timestamp": time.time(),
|
"from_queue": True, # Tag so cleanup knows to auto-advance
|
||||||
"ttl": time.time() + duration,
|
"duration": duration, # Preserved for when promoted from queue
|
||||||
}
|
}
|
||||||
|
|
||||||
# Optional custom display properties
|
# Optional custom display properties
|
||||||
if "emote" in data:
|
for key in ("emote", "color", "animation", "sound"):
|
||||||
event["emote"] = data["emote"]
|
if key in data:
|
||||||
if "color" in data:
|
event[key] = data[key]
|
||||||
event["color"] = data["color"]
|
|
||||||
if "animation" in data:
|
|
||||||
event["animation"] = data["animation"]
|
|
||||||
if "sound" in data:
|
|
||||||
event["sound"] = data["sound"]
|
|
||||||
|
|
||||||
|
play_now = False
|
||||||
with events_lock:
|
with events_lock:
|
||||||
active_events[event_id] = event
|
# Play immediately only if no queue notification is currently active or pending
|
||||||
|
queue_busy = any(e.get("from_queue") for e in active_events.values()) or len(notify_queue) > 0
|
||||||
|
if not queue_busy:
|
||||||
|
now = time.time()
|
||||||
|
active_events[event_id] = {**event, "timestamp": now, "ttl": now + duration}
|
||||||
|
play_now = True
|
||||||
|
else:
|
||||||
|
notify_queue.append((event_id, event))
|
||||||
|
print(f"[QUEUE] Buffered notification: {event_id} ({len(notify_queue)} in queue)")
|
||||||
|
|
||||||
|
if play_now:
|
||||||
state = write_status()
|
state = write_status()
|
||||||
return jsonify({"status": "ok", "id": event_id, "current_state": state}), 200
|
else:
|
||||||
|
state = get_current_state()
|
||||||
|
|
||||||
|
return jsonify({
|
||||||
|
"status": "ok",
|
||||||
|
"id": event_id,
|
||||||
|
"queued": not play_now,
|
||||||
|
"notify_queue_size": len(notify_queue),
|
||||||
|
"current_state": state,
|
||||||
|
}), 200
|
||||||
|
|
||||||
|
|
||||||
@app.route("/sleep", methods=["POST"])
|
@app.route("/sleep", methods=["POST"])
|
||||||
|
|||||||
@@ -216,7 +216,7 @@
|
|||||||
<script>
|
<script>
|
||||||
const emoteEl = document.getElementById("emote");
|
const emoteEl = document.getElementById("emote");
|
||||||
const messageEl = document.getElementById("message");
|
const messageEl = document.getElementById("message");
|
||||||
const VERSION = "v2.3.2";
|
const VERSION = "v2.3.3";
|
||||||
|
|
||||||
// Sound system
|
// Sound system
|
||||||
let audioCtx = null;
|
let audioCtx = null;
|
||||||
|
|||||||
17
openapi.yaml
17
openapi.yaml
@@ -124,7 +124,8 @@ paths:
|
|||||||
description: |
|
description: |
|
||||||
Send a temporary notification with optional custom display properties.
|
Send a temporary notification with optional custom display properties.
|
||||||
Designed for Home Assistant integration. Notifications auto-expire after
|
Designed for Home Assistant integration. Notifications auto-expire after
|
||||||
the specified duration.
|
the specified duration. Rapid calls are buffered and played sequentially —
|
||||||
|
each notification waits for the previous to expire before displaying.
|
||||||
operationId: notify
|
operationId: notify
|
||||||
requestBody:
|
requestBody:
|
||||||
required: false
|
required: false
|
||||||
@@ -365,7 +366,15 @@ components:
|
|||||||
id:
|
id:
|
||||||
type: string
|
type: string
|
||||||
description: Generated event ID
|
description: Generated event ID
|
||||||
example: "ha_notify_1704067200000"
|
example: "notify_1704067200_1"
|
||||||
|
queued:
|
||||||
|
type: boolean
|
||||||
|
description: True if the notification was buffered (another is currently playing)
|
||||||
|
example: false
|
||||||
|
notify_queue_size:
|
||||||
|
type: integer
|
||||||
|
description: Number of notifications now waiting in the queue
|
||||||
|
example: 0
|
||||||
current_state:
|
current_state:
|
||||||
$ref: "#/components/schemas/Status"
|
$ref: "#/components/schemas/Status"
|
||||||
|
|
||||||
@@ -424,6 +433,10 @@ components:
|
|||||||
type: array
|
type: array
|
||||||
items:
|
items:
|
||||||
$ref: "#/components/schemas/ActiveEvent"
|
$ref: "#/components/schemas/ActiveEvent"
|
||||||
|
notify_queue_size:
|
||||||
|
type: integer
|
||||||
|
description: Number of notifications waiting in the queue
|
||||||
|
example: 0
|
||||||
last_updated:
|
last_updated:
|
||||||
type: string
|
type: string
|
||||||
format: date-time
|
format: date-time
|
||||||
|
|||||||
Reference in New Issue
Block a user