Initial implementation of Sahsa Clock

Pygame framebuffer clock for Le Potato (ARM Debian) with aiohttp webhook server.
Renders 12-hour clock directly to /dev/fb0 (no X11/Wayland). Supports full-screen
message overlays pushed via a browser dashboard or Bearer-token API. Includes
first-run setup wizard, session-based dashboard auth, bcrypt password storage,
per-IP rate limiting, and systemd service unit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-24 16:22:10 -06:00
commit 0c2392b2b3
13 changed files with 1587 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@@ -0,0 +1,8 @@
__pycache__/
*.pyc
*.pyo
.venv/
venv/
*.egg-info/
dist/
build/

262
PLAN.md Normal file
View File

@@ -0,0 +1,262 @@
# Plan: Sahsa Clock — Lightweight Boot-Persistent Display with Webhook Support
## Context
The existing solution is a manually-opened HTML file in a browser, which has two problems: it requires human intervention after every reboot, and a full browser engine is heavy overhead for a device whose sole job is to show a clock on a TV. The goal is a purpose-built, auto-starting application with minimal overhead that can also display messages pushed to it remotely via webhook.
**Target hardware**: Libre Computer AML-S905X-CC ("Le Potato") — ARM Cortex-A53, Debian-based, HDMI output to auditorium TV. Framebuffer available at `/dev/fb0`.
---
## Recommended Approach: Python + Pygame on Linux Framebuffer
Pygame can render directly to `/dev/fb0` (the Linux framebuffer) without any display server (no X11, no Wayland). This means the process starts faster, uses far less RAM, and has no dependency on a GUI session. An `asyncio`-based HTTP server (aiohttp) runs alongside the display loop to receive webhook messages.
### Why this over alternatives
| Option | Overhead | Display Server Needed | Notes |
|---|---|---|---|
| **Pygame on framebuffer** | ~3060 MB RAM | None | Recommended |
| Qt linuxfb/eglfs | ~80120 MB | None | More complex setup |
| Tauri (WebView) | ~100150 MB | Yes (or Wayland) | Good if HTML/CSS preferred |
| Chromium kiosk | ~300500 MB | Yes | Current-ish approach, heaviest |
| Tkinter + X11 | ~5080 MB | Yes (X11) | Simpler but needs Xorg |
---
## Architecture
```
sahsa_clock/
├── main.py # Entry point: asyncio event loop + pygame render loop
├── display.py # Pygame rendering: clock face, message overlay
├── server.py # aiohttp server: dashboard routes + API routes
├── dashboard/
│ ├── index.html # Dashboard UI (no framework — plain HTML/CSS/JS)
│ ├── style.css
│ └── app.js
├── config.toml # Screen resolution, colors, fonts, port, token, timeout
├── requirements.txt
└── sahsa-clock.service # systemd unit file
```
### Runtime flow
1. `main.py` initializes pygame on the framebuffer (`SDL_VIDEODRIVER=fbcon`)
2. Shared `asyncio.Lock`-protected `MessageState` object holds current message text, expiry time, and persist flag
3. `aiohttp` server starts as a background `asyncio` task with two route groups (see below)
4. Main pygame loop renders the clock and, if `MessageState` has an active message, renders the overlay banner
5. A background asyncio task ticks the message expiry and clears it when time runs out
### Route groups
**Dashboard routes** (no auth — LAN-trusted)
- `GET /` → serve `dashboard/index.html`
- `POST /dashboard/message` → update message (freeform text, duration/persist)
- `DELETE /dashboard/message` → clear current message
- `GET /dashboard/status` → return current message JSON (for live UI refresh)
**API routes** (bearer token required)
- `POST /api/message` → same as dashboard POST, for programmatic/webhook callers
- `DELETE /api/message` → same as dashboard DELETE
- `GET /api/status` → same as dashboard status
Both route groups write to the same `MessageState` object.
### Dashboard UI
Simple, large-element page — usable by non-technical staff:
- Multiline textarea for message text
- Duration picker: radio buttons (30 sec / 1 min / 5 min / Until cleared)
- "Send to Screen" button (POST /dashboard/message)
- "Clear Screen" button (DELETE /dashboard/message)
- Status bar showing what is currently displayed (polls GET /dashboard/status every 5s)
### Webhook / programmatic API
```
POST http://<device-ip>:8080/api/message
Authorization: Bearer <token>
Content-Type: application/json
{"text": "Service starts in 5 minutes", "duration": 60}
```
- `duration` (seconds): auto-dismiss after this many seconds
- `duration: 0` or `"persist": true`: message stays until replaced or cleared
- Config default (`config.toml`) applies when neither field is provided
```
DELETE http://<device-ip>:8080/api/message
Authorization: Bearer <token>
```
### Security
**Bearer token auth on `/api/*`**`aiohttp` middleware validates `Authorization: Bearer <token>` for all `/api/` routes only. Token is a randomly generated 32-byte hex string stored in `config.toml`. On first run, if no token is present, one is auto-generated and printed to stdout once for the operator to save.
**Rate limiting on `/api/*`** — max configurable requests/minute per source IP; returns `429` on excess. Prevents flooding from programmatic callers.
**Dashboard password auth** — the dashboard requires a password before any controls are accessible. On first visit, the server returns a login page. On correct password submission, the server sets a signed session cookie (using `aiohttp`'s built-in cookie response with a server-side session store). Subsequent requests check for a valid session cookie; invalid or missing cookies redirect back to the login page. Sessions expire after a configurable idle timeout (default: 8 hours).
The password is stored in `config.toml` as a **bcrypt hash**, never in plaintext. A helper command generates the hash:
```bash
python3 -c "import bcrypt; print(bcrypt.hashpw(b'yourpassword', bcrypt.gensalt()).decode())"
```
Paste the output into `config.toml`:
```toml
[dashboard]
password_hash = "$2b$12$..." # bcrypt hash only — never store plaintext here
session_timeout_hours = 8
```
**API routes still use bearer token** — separate from the dashboard password. The two auth systems are independent.
**No TLS required** — LAN-only. If ever internet-facing, add nginx in front with TLS.
### systemd service (`/etc/systemd/system/sahsa-clock.service`)
```ini
[Unit]
Description=Sahsa Clock Display
After=network.target
DefaultDependencies=no
[Service]
Type=simple
User=pi # or whatever the device user is
Environment=SDL_VIDEODRIVER=fbcon
Environment=SDL_FBDEV=/dev/fb0
ExecStart=/usr/bin/python3 /opt/sahsa_clock/main.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
```
`WantedBy=multi-user.target` means it starts without needing a graphical session.
### Screen blanking
The service file should also disable console blanking so the TV stays on:
```
ExecStartPre=/bin/sh -c 'echo -ne "\033[9;0]" > /dev/tty1'
```
---
## System Prep
These steps are done once on the Le Potato before deploying the application. SSH in from another machine to do this — you won't have a desktop after step 1.
### 1. Confirm what's running
```bash
systemctl get-default
```
- If it returns `graphical.target` → a desktop environment is configured to start at boot.
- If it returns `multi-user.target` → already headless, skip to step 3.
To see which display manager is running (GDM for GNOME, LightDM for Ubuntu/XFCE, etc.):
```bash
systemctl status display-manager
```
### 2. Disable the desktop environment
Switch the default boot target to headless:
```bash
sudo systemctl set-default multi-user.target
```
Then disable the display manager so it won't start even if manually invoked:
```bash
# Ubuntu Desktop uses GDM3:
sudo systemctl disable gdm3
# If it's LightDM (common on lighter Ubuntu variants):
sudo systemctl disable lightdm
```
If you're unsure which one, run both — the one that isn't installed will just print "not found" harmlessly.
You do **not** need to uninstall the desktop packages. Disabling the service is enough. The desktop software stays on disk in case you ever need it back.
### 3. Add the service user to the video group
The clock process needs permission to write to `/dev/fb0`:
```bash
sudo usermod -aG video <your-username>
```
Log out and back in (or reboot) for the group change to take effect.
### 4. Disable console blanking permanently
By default, Linux blanks the console after ~10 minutes of inactivity, turning the TV off. Prevent this by adding a kernel parameter:
```bash
sudo nano /etc/default/grub
```
Find the line `GRUB_CMDLINE_LINUX_DEFAULT` and append `consoleblank=0`:
```
GRUB_CMDLINE_LINUX_DEFAULT="quiet splash consoleblank=0"
```
Then apply it:
```bash
sudo update-grub
```
> **Note**: The Le Potato may use a non-GRUB bootloader (U-Boot with extlinux). If `update-grub` isn't available, the equivalent is editing `/boot/extlinux/extlinux.conf` and appending `consoleblank=0` to the `APPEND` line.
### 5. Reboot and verify
```bash
sudo reboot
```
After reboot, SSH back in and confirm:
```bash
systemctl get-default # should return multi-user.target
systemctl status gdm3 # should show disabled/inactive
ls -la /dev/fb0 # should exist; note the group (usually 'video')
```
The TV should show a plain text console at this point — that's expected. The clock service will replace it once deployed.
---
## Key implementation notes
- **Font rendering**: pygame.font can load system TTF fonts (e.g., DejaVu, or a custom downloaded digital-style font). Font path is configurable.
- **Resolution**: Read from `config.toml` or auto-detected via pygame display info.
- **Framebuffer access**: The service user needs to be in the `video` group (`usermod -aG video <user>`).
- **No internet required**: All rendering is local; webhook server only needs LAN access.
- **Dependencies**: `pygame`, `aiohttp`, `tomllib` (stdlib in Python 3.11+). Install via pip into a venv or system packages.
---
## Verification
1. Run `python3 main.py` manually — clock appears full-screen on the TV
2. Open `http://<device-ip>:8080` on a laptop on the same network — dashboard loads, staff UI is visible
3. Type a message in the dashboard, pick a duration, hit "Send" — banner appears on the TV
4. "Clear Screen" removes the banner
5. Status bar in dashboard reflects the current message within 5 seconds
6. `curl -X POST http://<device-ip>:8080/api/message -H 'Authorization: Bearer <token>' -H 'Content-Type: application/json' -d '{"text":"API test", "duration": 30}'` — message appears
7. Same curl without the header → `401 Unauthorized`
8. `sudo systemctl enable --now sahsa-clock` → service starts, reboot device, clock appears without intervention

90
config.py Normal file
View File

@@ -0,0 +1,90 @@
import re
import secrets
import sys
from dataclasses import dataclass
from pathlib import Path
try:
import tomllib
except ImportError:
try:
import tomli as tomllib # type: ignore[no-redef]
except ImportError:
print("Error: Python 3.11+ is required (for tomllib), or install tomli: pip install tomli")
sys.exit(1)
CONFIG_PATH = Path("config.toml")
@dataclass
class AppConfig:
width: int | None
height: int | None
fps: int
clock_font_path: str
message_font_path: str
port: int
default_duration: float
api_token: str
rate_limit: int
password_hash: str
session_timeout_hours: int
config_path: Path
@classmethod
def load(cls, path: str | Path = CONFIG_PATH) -> "AppConfig":
p = Path(path)
if not p.exists():
print(f"Error: Config file not found: {p}")
print("Copy config.toml to your working directory and edit it.")
sys.exit(1)
with open(p, "rb") as f:
raw = tomllib.load(f)
display = raw.get("display", {})
server = raw.get("server", {})
api = raw.get("api", {})
rate = raw.get("rate_limit", {})
dashboard = raw.get("dashboard", {})
token = api.get("token", "").strip()
if not token:
token = secrets.token_hex(32)
_update_config_field(p, "token", token)
print(f"\n[sahsa-clock] Generated API bearer token (saved to config.toml):")
print(f" {token}\n")
return cls(
width=display.get("width"),
height=display.get("height"),
fps=display.get("fps", 10),
clock_font_path=display.get("clock_font_path", "").strip(),
message_font_path=display.get("message_font_path", "").strip(),
port=server.get("port", 8080),
default_duration=float(server.get("default_duration_seconds", 20)),
api_token=token,
rate_limit=rate.get("requests_per_minute", 20),
password_hash=dashboard.get("password_hash", "").strip(),
session_timeout_hours=dashboard.get("session_timeout_hours", 8),
config_path=p,
)
def save_password_hash(self, hashed: str) -> None:
_update_config_field(self.config_path, "password_hash", hashed)
self.password_hash = hashed
def _update_config_field(config_path: Path, key: str, value: str) -> None:
"""Update a quoted string field in config.toml using regex replacement."""
content = config_path.read_text()
pattern = rf'^({re.escape(key)}\s*=\s*)"[^"]*"'
def replacer(m: re.Match) -> str:
return f'{m.group(1)}"{value}"'
new_content = re.sub(pattern, replacer, content, flags=re.MULTILINE)
if new_content == content:
print(f"Warning: Could not update '{key}' in {config_path}. Field not found.")
return
config_path.write_text(new_content)

35
config.toml Normal file
View File

@@ -0,0 +1,35 @@
[display]
# Screen resolution. Leave commented to auto-detect from the framebuffer.
# width = 1920
# height = 1080
# Display refresh rate in frames per second. 10 is plenty for a clock.
fps = 10
# Paths to TTF font files. Leave blank to auto-detect from system fonts.
# The clock uses a monospace font; messages use a sans-serif bold font.
clock_font_path = ""
message_font_path = ""
[server]
port = 8080
# Default message duration in seconds when the caller doesn't specify one.
default_duration_seconds = 20
[api]
# Bearer token for /api/* routes.
# Leave blank — a token is auto-generated on first run and saved here.
token = ""
[rate_limit]
# Maximum API requests per minute per source IP.
requests_per_minute = 20
[dashboard]
# Bcrypt hash of the dashboard password.
# Leave blank — a setup wizard runs on the first browser visit to set the password.
password_hash = ""
# How many hours of inactivity before a dashboard session expires.
session_timeout_hours = 8

130
dashboard/app.js Normal file
View File

@@ -0,0 +1,130 @@
'use strict';
const msgTextarea = document.getElementById('msg');
const charNum = document.getElementById('char-num');
const sendBtn = document.getElementById('send-btn');
const clearBtn = document.getElementById('clear-btn');
const statusDot = document.getElementById('status-dot');
const statusText = document.getElementById('status-text');
// ── Character counter ────────────────────────────────────────────────────────
msgTextarea.addEventListener('input', () => {
charNum.textContent = msgTextarea.value.length;
});
// ── Duration helper ──────────────────────────────────────────────────────────
function getSelectedDuration() {
const checked = document.querySelector('input[name="duration"]:checked');
return checked ? parseInt(checked.value, 10) : 60;
}
// ── Auth redirect helper ─────────────────────────────────────────────────────
function handleUnauth(status) {
if (status === 401) {
window.location.href = '/login';
return true;
}
return false;
}
// ── Send message ─────────────────────────────────────────────────────────────
async function sendMessage() {
const text = msgTextarea.value.trim();
if (!text) {
msgTextarea.focus();
return;
}
const duration = getSelectedDuration();
const body = { text };
if (duration === 0) {
body.persist = true;
} else {
body.duration = duration;
}
sendBtn.disabled = true;
try {
const res = await fetch('/dashboard/message', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
});
if (handleUnauth(res.status)) return;
if (res.ok) {
msgTextarea.value = '';
charNum.textContent = '0';
await refreshStatus();
} else {
const err = await res.text();
setStatus(false, `Error: ${err}`);
}
} catch {
setStatus(false, 'Could not reach server.');
} finally {
sendBtn.disabled = false;
}
}
// ── Clear message ────────────────────────────────────────────────────────────
async function clearMessage() {
try {
const res = await fetch('/dashboard/message', { method: 'DELETE' });
if (handleUnauth(res.status)) return;
if (res.ok) {
await refreshStatus();
}
} catch {
setStatus(false, 'Could not reach server.');
}
}
// ── Status polling ───────────────────────────────────────────────────────────
function setStatus(active, message) {
statusDot.className = 'status-dot' + (active ? ' active' : '');
statusText.textContent = message;
}
async function refreshStatus() {
try {
const res = await fetch('/dashboard/status');
if (handleUnauth(res.status)) return;
const data = await res.json();
if (data.active) {
let detail = '';
if (data.persistent) {
detail = ' — until cleared';
} else if (data.remaining_seconds !== null) {
const secs = Math.round(data.remaining_seconds);
const mins = Math.floor(secs / 60);
detail = mins > 0
? `${mins}m ${secs % 60}s remaining`
: `${secs}s remaining`;
}
setStatus(true, `Showing: "${data.text}"${detail}`);
} else {
setStatus(false, 'Screen is showing the clock.');
}
} catch {
setStatus(false, 'Could not reach server.');
}
}
// ── Wire up events ───────────────────────────────────────────────────────────
sendBtn.addEventListener('click', sendMessage);
clearBtn.addEventListener('click', clearMessage);
// Initial status fetch, then poll every 5 seconds
refreshStatus();
setInterval(refreshStatus, 5000);

73
dashboard/index.html Normal file
View File

@@ -0,0 +1,73 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Sahsa Clock</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<div class="container">
<header>
<h1>Sahsa Clock</h1>
<form method="post" action="/logout">
<button type="submit" class="btn-signout">Sign out</button>
</form>
</header>
<main>
<section class="card">
<h2>Send Message</h2>
<div class="field">
<label for="msg">Message</label>
<textarea
id="msg"
rows="4"
maxlength="300"
placeholder="Type a message to display on the screen…"
></textarea>
<div class="char-count"><span id="char-num">0</span> / 300</div>
</div>
<div class="field">
<label>Duration</label>
<div class="duration-grid">
<label class="dur-option">
<input type="radio" name="duration" value="30">
<span>30 sec</span>
</label>
<label class="dur-option">
<input type="radio" name="duration" value="60" checked>
<span>1 min</span>
</label>
<label class="dur-option">
<input type="radio" name="duration" value="300">
<span>5 min</span>
</label>
<label class="dur-option">
<input type="radio" name="duration" value="0">
<span>Until cleared</span>
</label>
</div>
</div>
<div class="action-row">
<button id="send-btn" class="btn btn-primary">Send to Screen</button>
<button id="clear-btn" class="btn btn-secondary">Clear Screen</button>
</div>
</section>
<section class="status-bar" id="status-bar">
<div class="status-dot" id="status-dot"></div>
<span id="status-text">Checking…</span>
</section>
</main>
</div>
<script src="/static/app.js"></script>
</body>
</html>

206
dashboard/style.css Normal file
View File

@@ -0,0 +1,206 @@
/* ── Reset & tokens ────────────────────────────────────────────────────────── */
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
:root {
--bg: #0f0f0f;
--surface: #1a1a1a;
--surface2: #242424;
--border: #333;
--text: #f0f0f0;
--muted: #888;
--primary: #2563eb;
--primary-h: #1d4ed8;
--secondary-h: #2f2f2f;
--danger: #dc2626;
--success: #16a34a;
--radius: 10px;
}
html { font-size: 16px; }
body {
background: var(--bg);
color: var(--text);
font-family: system-ui, -apple-system, sans-serif;
min-height: 100vh;
line-height: 1.5;
}
/* ── Layout ────────────────────────────────────────────────────────────────── */
.container {
max-width: 640px;
margin: 0 auto;
padding: 1.25rem 1rem;
}
/* ── Header ────────────────────────────────────────────────────────────────── */
header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1.5rem;
padding-bottom: 1rem;
border-bottom: 1px solid var(--border);
}
h1 { font-size: 1.4rem; font-weight: 700; }
.btn-signout {
background: none;
border: 1px solid var(--border);
color: var(--muted);
padding: 0.4rem 0.9rem;
border-radius: 6px;
cursor: pointer;
font-size: 0.9rem;
transition: border-color 0.15s, color 0.15s;
}
.btn-signout:hover { border-color: #666; color: var(--text); }
/* ── Cards ─────────────────────────────────────────────────────────────────── */
.card {
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
padding: 1.5rem;
margin-bottom: 1rem;
}
h2 {
font-size: 0.8rem;
font-weight: 600;
letter-spacing: 0.07em;
text-transform: uppercase;
color: var(--muted);
margin-bottom: 1.25rem;
}
/* ── Form fields ───────────────────────────────────────────────────────────── */
.field { margin-bottom: 1.25rem; }
label {
display: block;
margin-bottom: 0.45rem;
color: var(--muted);
font-size: 0.9rem;
font-weight: 500;
}
textarea {
width: 100%;
background: var(--surface2);
border: 1px solid var(--border);
border-radius: 6px;
color: var(--text);
padding: 0.75rem;
font-size: 1rem;
font-family: inherit;
resize: vertical;
line-height: 1.5;
transition: border-color 0.15s;
}
textarea:focus { outline: none; border-color: var(--primary); }
.char-count {
text-align: right;
color: var(--muted);
font-size: 0.8rem;
margin-top: 0.3rem;
}
/* ── Duration picker ───────────────────────────────────────────────────────── */
.duration-grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 0.5rem;
}
@media (min-width: 440px) {
.duration-grid { grid-template-columns: repeat(4, 1fr); }
}
.dur-option {
display: flex;
align-items: center;
justify-content: center;
background: var(--surface2);
border: 2px solid var(--border);
border-radius: 6px;
padding: 0.65rem 0.25rem;
cursor: pointer;
user-select: none;
transition: border-color 0.15s, background 0.15s;
text-align: center;
}
.dur-option input[type=radio] { display: none; }
.dur-option span { font-size: 0.95rem; font-weight: 500; }
.dur-option:hover { border-color: #555; }
.dur-option:has(input:checked) {
border-color: var(--primary);
background: #1e2d4a;
color: #93c5fd;
}
/* ── Action buttons ────────────────────────────────────────────────────────── */
.action-row {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 0.75rem;
}
@media (max-width: 360px) {
.action-row { grid-template-columns: 1fr; }
}
.btn {
padding: 0.9rem 1rem;
border: none;
border-radius: 8px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
width: 100%;
transition: background 0.15s;
/* Minimum touch target */
min-height: 48px;
}
.btn-primary { background: var(--primary); color: #fff; }
.btn-primary:hover { background: var(--primary-h); }
.btn-primary:disabled { background: #374151; color: #6b7280; cursor: not-allowed; }
.btn-secondary {
background: var(--surface2);
color: var(--text);
border: 1px solid var(--border);
}
.btn-secondary:hover { background: var(--secondary-h); border-color: #555; }
/* ── Status bar ────────────────────────────────────────────────────────────── */
.status-bar {
display: flex;
align-items: center;
gap: 0.75rem;
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
padding: 1rem 1.25rem;
}
.status-dot {
width: 10px;
height: 10px;
border-radius: 50%;
flex-shrink: 0;
background: var(--muted);
transition: background 0.3s, box-shadow 0.3s;
}
.status-dot.active {
background: var(--success);
box-shadow: 0 0 7px var(--success);
}
#status-text {
font-size: 0.95rem;
color: var(--muted);
}

259
display.py Normal file
View File

@@ -0,0 +1,259 @@
import os
import threading
import time
from pathlib import Path
import pygame
from config import AppConfig
from state import MessageState
# ── Font discovery ────────────────────────────────────────────────────────────
# Monospace for the clock so digit widths are consistent (no layout shift)
_CLOCK_FONT_CANDIDATES = [
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono-Bold.ttf",
"/usr/share/fonts/truetype/liberation/LiberationMono-Bold.ttf",
"/usr/share/fonts/truetype/freefont/FreeMonoBold.ttf",
"/usr/share/fonts/truetype/noto/NotoSansMono-Bold.ttf",
]
# Bold sans-serif for messages — maximises readability of arbitrary text
_MESSAGE_FONT_CANDIDATES = [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/freefont/FreeSansBold.ttf",
"/usr/share/fonts/truetype/noto/NotoSans-Bold.ttf",
]
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
def _find_font(configured: str, candidates: list[str]) -> str | None:
if configured and Path(configured).exists():
return configured
for p in candidates:
if Path(p).exists():
return p
return None # fall back to pygame's built-in bitmap font
def _load_font(path: str | None, size: int) -> pygame.font.Font:
if path:
try:
return pygame.font.Font(path, size)
except Exception:
pass
return pygame.font.Font(None, size) # pygame built-in fallback
# ── Text layout helpers ───────────────────────────────────────────────────────
def _wrap_text(font: pygame.font.Font, text: str, max_width: int) -> list[str]:
"""Word-wrap text to fit within max_width pixels. Returns list of lines."""
words = text.split()
lines: list[str] = []
current: list[str] = []
for word in words:
test = " ".join(current + [word])
if font.size(test)[0] <= max_width:
current.append(word)
else:
if current:
lines.append(" ".join(current))
current = [word]
else:
lines.append(word) # single word too wide — add it anyway
if current:
lines.append(" ".join(current))
return lines or [""]
def _fit_message_font(
font_path: str | None,
text: str,
max_w: int,
max_h: int,
) -> tuple[pygame.font.Font, list[str]]:
"""
Binary-search for the largest font size where the word-wrapped text fits
within (max_w, max_h). Returns (font, wrapped_lines).
"""
lo, hi = 16, 500
best_font = _load_font(font_path, lo)
best_lines = _wrap_text(best_font, text, max_w)
while lo <= hi:
mid = (lo + hi) // 2
f = _load_font(font_path, mid)
lines = _wrap_text(f, text, max_w)
total_h = len(lines) * f.get_linesize()
max_line_w = max((f.size(ln)[0] for ln in lines), default=0)
if total_h <= max_h and max_line_w <= max_w:
best_font, best_lines = f, lines
lo = mid + 1
else:
hi = mid - 1
return best_font, best_lines
def _build_clock_font(font_path: str | None, screen_w: int, screen_h: int) -> pygame.font.Font:
"""
Find the largest font size where the widest possible time string fits within
88 % of the screen width and 50 % of the screen height.
"""
target_w = int(screen_w * 0.88)
target_h = int(screen_h * 0.50)
sample = "12:00:00 AM" # widest realistic string
lo, hi = 16, 600
best = lo
while lo <= hi:
mid = (lo + hi) // 2
f = _load_font(font_path, mid)
tw, th = f.size(sample)
if tw <= target_w and th <= target_h:
best = mid
lo = mid + 1
else:
hi = mid - 1
return _load_font(font_path, best)
# ── Display thread ────────────────────────────────────────────────────────────
class DisplayThread(threading.Thread):
def __init__(self, state: MessageState, config: AppConfig, dev_mode: bool = False):
super().__init__(daemon=True, name="display")
self.state = state
self.config = config
self.dev_mode = dev_mode
self._stop_event = threading.Event()
def stop(self) -> None:
self._stop_event.set()
def run(self) -> None:
# SDL environment must be set before pygame.init()
if self.dev_mode:
os.environ.setdefault("SDL_VIDEODRIVER", "x11")
else:
os.environ["SDL_VIDEODRIVER"] = "fbcon"
os.environ["SDL_FBDEV"] = "/dev/fb0"
try:
pygame.init()
except Exception as e:
print(f"[display] pygame.init() failed: {e}")
return
try:
screen = self._create_surface()
except Exception as e:
print(f"[display] Failed to create display surface: {e}")
pygame.quit()
return
pygame.mouse.set_visible(False)
clock = pygame.time.Clock()
screen_w, screen_h = screen.get_size()
clock_font_path = _find_font(self.config.clock_font_path, _CLOCK_FONT_CANDIDATES)
msg_font_path = _find_font(self.config.message_font_path, _MESSAGE_FONT_CANDIDATES)
if not clock_font_path:
print("[display] Warning: no TTF font found; falling back to pygame built-in (may look pixelated at large sizes)")
if not msg_font_path:
msg_font_path = clock_font_path # use same font if none found
clock_font = _build_clock_font(clock_font_path, screen_w, screen_h)
# Cache message layout — recomputed only when message text changes
last_msg_text: str | None = None
msg_font: pygame.font.Font | None = None
msg_lines: list[str] = []
while not self._stop_event.is_set():
for event in pygame.event.get():
if event.type == pygame.QUIT:
self._stop_event.set()
break
# Allow Escape to exit in dev mode
if self.dev_mode and event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE:
self._stop_event.set()
break
text, _ = self.state.get()
screen.fill(BLACK)
if text:
if text != last_msg_text:
max_w = int(screen_w * 0.88)
max_h = int(screen_h * 0.88)
msg_font, msg_lines = _fit_message_font(msg_font_path, text, max_w, max_h)
last_msg_text = text
self._draw_message(screen, msg_font, msg_lines, screen_w, screen_h)
else:
last_msg_text = None
self._draw_clock(screen, clock_font, screen_w, screen_h)
pygame.display.flip()
clock.tick(self.config.fps)
pygame.quit()
def _create_surface(self) -> pygame.Surface:
if self.dev_mode:
w = self.config.width or 1280
h = self.config.height or 720
surface = pygame.display.set_mode((w, h))
pygame.display.set_caption("Sahsa Clock [DEV]")
return surface
flags = pygame.FULLSCREEN | pygame.NOFRAME
if self.config.width and self.config.height:
return pygame.display.set_mode((self.config.width, self.config.height), flags)
return pygame.display.set_mode((0, 0), flags)
@staticmethod
def _draw_clock(
screen: pygame.Surface,
font: pygame.font.Font,
screen_w: int,
screen_h: int,
) -> None:
now = time.localtime()
hour = now.tm_hour % 12 or 12
ampm = "AM" if now.tm_hour < 12 else "PM"
time_str = f"{hour}:{now.tm_min:02d}:{now.tm_sec:02d} {ampm}"
surf = font.render(time_str, True, WHITE)
rect = surf.get_rect(center=(screen_w // 2, screen_h // 2))
screen.blit(surf, rect)
@staticmethod
def _draw_message(
screen: pygame.Surface,
font: pygame.font.Font,
lines: list[str],
screen_w: int,
screen_h: int,
) -> None:
line_h = font.get_linesize()
total_h = len(lines) * line_h
y = (screen_h - total_h) // 2
for line in lines:
surf = font.render(line, True, WHITE)
x = (screen_w - surf.get_width()) // 2
screen.blit(surf, (x, y))
y += line_h

47
main.py Normal file
View File

@@ -0,0 +1,47 @@
import argparse
import asyncio
import sys
from config import AppConfig
from server import run_server
from state import MessageState
def main() -> None:
parser = argparse.ArgumentParser(description="Sahsa Clock Display")
parser.add_argument(
"--dev",
action="store_true",
help="Windowed dev mode — renders to an x11 window instead of the framebuffer",
)
parser.add_argument(
"--no-display",
action="store_true",
help="Start the HTTP server only, with no display output (useful for testing the dashboard/API)",
)
parser.add_argument(
"--config",
default="config.toml",
metavar="PATH",
help="Path to config.toml (default: config.toml)",
)
args = parser.parse_args()
config = AppConfig.load(args.config)
state = MessageState()
if not args.no_display:
# Import display only when needed — keeps --no-display working without pygame
from display import DisplayThread
display = DisplayThread(state, config, dev_mode=args.dev)
display.start()
try:
asyncio.run(run_server(state, config))
except KeyboardInterrupt:
print("\n[sahsa-clock] Shutting down.")
sys.exit(0)
if __name__ == "__main__":
main()

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
pygame>=2.5.0
aiohttp>=3.9.0
bcrypt>=4.0.0
# Uncomment if using Python < 3.11 (tomllib is stdlib in 3.11+):
# tomli>=2.0.0

27
sahsa-clock.service Normal file
View File

@@ -0,0 +1,27 @@
[Unit]
Description=Sahsa Clock Display
After=network.target
DefaultDependencies=no
[Service]
Type=simple
User=pi
WorkingDirectory=/opt/sahsa_clock
# Tell SDL to render directly to the Linux framebuffer (no display server needed)
Environment=SDL_VIDEODRIVER=fbcon
Environment=SDL_FBDEV=/dev/fb0
# Disable console blanking so the TV stays on
ExecStartPre=/bin/sh -c 'echo -ne "\033[9;0]" > /dev/tty1'
# If using a virtualenv (recommended):
ExecStart=/opt/sahsa_clock/venv/bin/python3 /opt/sahsa_clock/main.py
# If using system Python instead, replace the line above with:
# ExecStart=/usr/bin/python3 /opt/sahsa_clock/main.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target

401
server.py Normal file
View File

@@ -0,0 +1,401 @@
import asyncio
import json
import secrets
import threading
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
import bcrypt
from aiohttp import web
from config import AppConfig
from state import MessageState
DASHBOARD_DIR = Path(__file__).parent / "dashboard"
SESSION_COOKIE = "sahsa_session"
# ── Session store ─────────────────────────────────────────────────────────────
@dataclass
class _Session:
last_active: float = field(default_factory=time.time)
class SessionStore:
def __init__(self, timeout_hours: int):
self._sessions: dict[str, _Session] = {}
self._lock = threading.Lock()
self._timeout = timeout_hours * 3600.0
def create(self) -> str:
sid = str(uuid.uuid4())
with self._lock:
self._sessions[sid] = _Session()
return sid
def validate(self, sid: str) -> bool:
with self._lock:
s = self._sessions.get(sid)
if s is None:
return False
now = time.time()
if now - s.last_active > self._timeout:
del self._sessions[sid]
return False
s.last_active = now
return True
def delete(self, sid: str) -> None:
with self._lock:
self._sessions.pop(sid, None)
# ── Rate limiter ──────────────────────────────────────────────────────────────
class RateLimiter:
def __init__(self, max_per_minute: int):
self._max = max_per_minute
self._data: dict[str, list[float]] = {}
self._lock = threading.Lock()
def is_allowed(self, ip: str) -> bool:
now = time.time()
cutoff = now - 60.0
with self._lock:
ts = [t for t in self._data.get(ip, []) if t > cutoff]
if len(ts) >= self._max:
self._data[ip] = ts
return False
ts.append(now)
self._data[ip] = ts
return True
# ── Inline HTML pages ─────────────────────────────────────────────────────────
_PAGE_STYLE = """
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
background: #111; color: #f0f0f0;
font-family: system-ui, -apple-system, sans-serif;
display: flex; justify-content: center; align-items: center;
min-height: 100vh; padding: 1rem;
}
.card {
background: #1a1a1a; border: 1px solid #333; border-radius: 10px;
padding: 2rem; width: 100%; max-width: 420px;
}
h1 { font-size: 1.5rem; margin-bottom: 0.5rem; text-align: center; }
.subtitle { color: #888; text-align: center; margin-bottom: 1.75rem; font-size: 0.9rem; }
label { display: block; margin-bottom: 0.4rem; color: #aaa; font-size: 0.9rem; }
input[type=password] {
width: 100%; padding: 0.75rem; border-radius: 6px;
border: 1px solid #444; background: #242424; color: #f0f0f0;
font-size: 1.05rem; margin-bottom: 1.1rem;
}
input[type=password]:focus { outline: none; border-color: #2563eb; }
.btn {
width: 100%; padding: 0.875rem; border: none; border-radius: 8px;
font-size: 1.05rem; font-weight: 600; cursor: pointer;
}
.btn-blue { background: #2563eb; color: #fff; }
.btn-blue:hover { background: #1d4ed8; }
.btn-green { background: #16a34a; color: #fff; }
.btn-green:hover { background: #15803d; }
.error { color: #f87171; margin-bottom: 1rem; text-align: center; font-size: 0.95rem; }
"""
def _login_page(error: str = "") -> str:
err = f'<p class="error">{error}</p>' if error else ""
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Sahsa Clock — Sign In</title>
<style>{_PAGE_STYLE}</style>
</head>
<body>
<div class="card">
<h1>Sahsa Clock</h1>
<p class="subtitle">Sign in to access the dashboard.</p>
{err}
<form method="post" action="/login">
<label for="pw">Password</label>
<input type="password" id="pw" name="password" autofocus autocomplete="current-password">
<button type="submit" class="btn btn-blue">Sign In</button>
</form>
</div>
</body>
</html>"""
def _setup_page(error: str = "") -> str:
err = f'<p class="error">{error}</p>' if error else ""
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Sahsa Clock — First Run Setup</title>
<style>{_PAGE_STYLE}</style>
</head>
<body>
<div class="card">
<h1>First Run Setup</h1>
<p class="subtitle">Create a password to protect the dashboard.</p>
{err}
<form method="post" action="/setup">
<label for="pw">Password (6+ characters)</label>
<input type="password" id="pw" name="password" autofocus minlength="6" autocomplete="new-password">
<label for="pw2">Confirm Password</label>
<input type="password" id="pw2" name="confirm" autocomplete="new-password">
<button type="submit" class="btn btn-green">Set Password &amp; Continue</button>
</form>
</div>
</body>
</html>"""
# ── Message body parsing ──────────────────────────────────────────────────────
def _parse_message_body(body: dict, default_duration: float) -> tuple[str, float | None]:
"""
Returns (text, duration_seconds).
duration=None means persistent.
"""
text = body.get("text", "").strip()
if body.get("persist") or body.get("duration") == 0:
duration: float | None = None
elif "duration" in body:
duration = float(body["duration"])
else:
duration = default_duration
return text, duration
# ── Server ────────────────────────────────────────────────────────────────────
class ClockServer:
def __init__(self, state: MessageState, config: AppConfig):
self.state = state
self.config = config
self.sessions = SessionStore(config.session_timeout_hours)
self.rate_limiter = RateLimiter(config.rate_limit)
self.app = self._build_app()
def _build_app(self) -> web.Application:
app = web.Application()
# Static dashboard assets (CSS, JS) — no auth required
app.router.add_static("/static", DASHBOARD_DIR, show_index=False)
# First-run setup
app.router.add_get("/setup", self._handle_setup_get)
app.router.add_post("/setup", self._handle_setup_post)
# Authentication
app.router.add_get("/login", self._handle_login_get)
app.router.add_post("/login", self._handle_login_post)
app.router.add_post("/logout", self._handle_logout)
# Dashboard (session-protected)
app.router.add_get("/", self._handle_root)
app.router.add_post("/dashboard/message", self._handle_dashboard_set)
app.router.add_delete("/dashboard/message", self._handle_dashboard_clear)
app.router.add_get("/dashboard/status", self._handle_dashboard_status)
# API (bearer token + rate limit)
app.router.add_post("/api/message", self._handle_api_set)
app.router.add_delete("/api/message", self._handle_api_clear)
app.router.add_get("/api/status", self._handle_api_status)
return app
# ── Auth helpers ──────────────────────────────────────────────────────────
def _setup_needed(self) -> bool:
return not self.config.password_hash
def _check_session(self, request: web.Request) -> bool:
sid = request.cookies.get(SESSION_COOKIE)
return bool(sid and self.sessions.validate(sid))
def _session_redirect(self) -> web.Response:
"""Redirect to setup or login depending on configuration state."""
if self._setup_needed():
return web.HTTPFound("/setup")
return web.HTTPFound("/login")
def _set_session_cookie(self, response: web.Response, sid: str) -> None:
response.set_cookie(
SESSION_COOKIE,
sid,
httponly=True,
samesite="Lax",
max_age=self.config.session_timeout_hours * 3600,
)
def _require_bearer(self, request: web.Request) -> web.Response | None:
"""Return 401 if bearer token is missing or wrong, else None."""
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return web.Response(status=401, text="Unauthorized")
token = auth[7:].strip()
if not secrets.compare_digest(token, self.config.api_token):
return web.Response(status=401, text="Unauthorized")
return None
def _require_rate_limit(self, request: web.Request) -> web.Response | None:
"""Return 429 if this IP has exceeded the rate limit, else None."""
ip = request.remote or "unknown"
if not self.rate_limiter.is_allowed(ip):
return web.Response(status=429, text="Too Many Requests")
return None
# ── Setup handlers ────────────────────────────────────────────────────────
async def _handle_setup_get(self, request: web.Request) -> web.Response:
if not self._setup_needed():
return web.HTTPFound("/")
return web.Response(content_type="text/html", text=_setup_page())
async def _handle_setup_post(self, request: web.Request) -> web.Response:
if not self._setup_needed():
return web.HTTPFound("/")
data = await request.post()
password = data.get("password", "")
confirm = data.get("confirm", "")
if len(password) < 6:
return web.Response(
content_type="text/html",
text=_setup_page("Password must be at least 6 characters."),
)
if password != confirm:
return web.Response(
content_type="text/html",
text=_setup_page("Passwords do not match."),
)
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
self.config.save_password_hash(hashed)
return web.HTTPFound("/login")
# ── Login / logout handlers ───────────────────────────────────────────────
async def _handle_login_get(self, request: web.Request) -> web.Response:
if self._setup_needed():
return web.HTTPFound("/setup")
return web.Response(content_type="text/html", text=_login_page())
async def _handle_login_post(self, request: web.Request) -> web.Response:
if self._setup_needed():
return web.HTTPFound("/setup")
data = await request.post()
password = data.get("password", "").encode()
if not bcrypt.checkpw(password, self.config.password_hash.encode()):
return web.Response(
content_type="text/html",
text=_login_page("Incorrect password."),
)
sid = self.sessions.create()
response = web.HTTPFound("/")
self._set_session_cookie(response, sid)
return response
async def _handle_logout(self, request: web.Request) -> web.Response:
sid = request.cookies.get(SESSION_COOKIE)
if sid:
self.sessions.delete(sid)
response = web.HTTPFound("/login")
response.del_cookie(SESSION_COOKIE)
return response
# ── Dashboard handlers ────────────────────────────────────────────────────
async def _handle_root(self, request: web.Request) -> web.Response:
if not self._check_session(request):
return self._session_redirect()
return web.FileResponse(DASHBOARD_DIR / "index.html")
async def _handle_dashboard_set(self, request: web.Request) -> web.Response:
if not self._check_session(request):
return web.Response(status=401, text="Unauthorized")
try:
body = await request.json()
except Exception:
return web.Response(status=400, text="Invalid JSON")
text, duration = _parse_message_body(body, self.config.default_duration)
if not text:
return web.Response(status=400, text="Missing or empty 'text' field")
self.state.set(text, duration)
return web.Response(content_type="application/json", text=json.dumps({"ok": True}))
async def _handle_dashboard_clear(self, request: web.Request) -> web.Response:
if not self._check_session(request):
return web.Response(status=401, text="Unauthorized")
self.state.clear()
return web.Response(content_type="application/json", text=json.dumps({"ok": True}))
async def _handle_dashboard_status(self, request: web.Request) -> web.Response:
if not self._check_session(request):
return web.Response(status=401, text="Unauthorized")
return web.Response(
content_type="application/json",
text=json.dumps(self.state.to_dict()),
)
# ── API handlers ──────────────────────────────────────────────────────────
async def _handle_api_set(self, request: web.Request) -> web.Response:
if err := self._require_bearer(request):
return err
if err := self._require_rate_limit(request):
return err
try:
body = await request.json()
except Exception:
return web.Response(status=400, text="Invalid JSON")
text, duration = _parse_message_body(body, self.config.default_duration)
if not text:
return web.Response(status=400, text="Missing or empty 'text' field")
self.state.set(text, duration)
return web.Response(content_type="application/json", text=json.dumps({"ok": True}))
async def _handle_api_clear(self, request: web.Request) -> web.Response:
if err := self._require_bearer(request):
return err
if err := self._require_rate_limit(request):
return err
self.state.clear()
return web.Response(content_type="application/json", text=json.dumps({"ok": True}))
async def _handle_api_status(self, request: web.Request) -> web.Response:
if err := self._require_bearer(request):
return err
return web.Response(
content_type="application/json",
text=json.dumps(self.state.to_dict()),
)
# ── Entry point ───────────────────────────────────────────────────────────────
async def run_server(state: MessageState, config: AppConfig) -> None:
server = ClockServer(state, config)
runner = web.AppRunner(server.app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", config.port)
await site.start()
print(f"[sahsa-clock] Dashboard: http://0.0.0.0:{config.port}")
print(f"[sahsa-clock] API: http://0.0.0.0:{config.port}/api/")
await asyncio.Event().wait() # run forever

44
state.py Normal file
View File

@@ -0,0 +1,44 @@
import threading
import time
from dataclasses import dataclass, field
@dataclass
class MessageState:
_text: str | None = field(default=None)
_expires_at: float | None = field(default=None) # monotonic timestamp
_lock: threading.Lock = field(default_factory=threading.Lock)
def set(self, text: str, duration: float | None) -> None:
"""Set message. duration=None means persistent (no expiry)."""
with self._lock:
self._text = text
self._expires_at = time.monotonic() + duration if duration else None
def clear(self) -> None:
with self._lock:
self._text = None
self._expires_at = None
def get(self) -> tuple[str | None, float | None]:
"""Return (text, remaining_seconds). Both None if no active message."""
with self._lock:
if self._text is None:
return None, None
if self._expires_at is not None and time.monotonic() >= self._expires_at:
self._text = None
self._expires_at = None
return None, None
remaining: float | None = None
if self._expires_at is not None:
remaining = max(0.0, self._expires_at - time.monotonic())
return self._text, remaining
def to_dict(self) -> dict:
text, remaining = self.get()
return {
"active": text is not None,
"text": text,
"remaining_seconds": round(remaining, 1) if remaining is not None else None,
"persistent": text is not None and remaining is None,
}