Files
smart-mirror/raspi/USBRead.py
2026-04-27 08:32:24 +02:00

178 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
USBRead.py
──────────
Reads raw lines from a serial USB device (e.g. Arduino) at the configured
baud rate, parses numeric sensor values, and stores everything in a shared
thread-safe ring-buffer.
Callers import `start_reader()` to launch the background thread, then read
from `get_snapshot()` at any time.
"""
import threading
import time
from collections import deque
from datetime import datetime
from typing import Any
import serial
# ── Populated once by init() ──────────────────────────────────────────────────
_cfg: dict[str, Any] = {}
_lock = threading.Lock()
_buffer: deque = deque()
status: dict[str, Any] = {
"connected": False,
"port": "",
"baud": 0,
"last_received": None,
"total_lines": 0,
"errors": 0,
}
# Callbacks: list of callables(entry) invoked after every parsed entry
_callbacks: list = []
# ── Public API ────────────────────────────────────────────────────────────────
def init(usb_cfg: dict[str, Any]) -> None:
"""Must be called once before start_reader()."""
global _buffer, _cfg
_cfg = usb_cfg
_buffer = deque(maxlen=usb_cfg.get("buffer_size", 200))
status["port"] = usb_cfg["port"]
status["baud"] = usb_cfg["baud_rate"]
def register_callback(fn) -> None:
"""Register a function(entry) that is called for each new parsed line."""
_callbacks.append(fn)
def get_snapshot() -> dict[str, Any]:
"""Return a thread-safe snapshot of the current buffer and status."""
with _lock:
return {
"status": dict(status),
"entries": list(_buffer),
}
def start_reader() -> threading.Thread:
"""Start the background serial-reader thread (daemon)."""
t = threading.Thread(target=_reader_loop, daemon=True, name="usb-reader")
t.start()
return t
# ── Parsing helpers ───────────────────────────────────────────────────────────
def parse_line(raw: str) -> dict[str, Any]:
"""
Parse a raw serial line into a structured entry dict.
Supported Arduino output formats (auto-detected):
• Space / comma / semicolon separated numbers:
"23.5 67.1 4.92" → values: [23.5, 67.1, 4.92]
• Key=value pairs:
"temp=23.5,hum=67.1" → values: [23.5, 67.1],
labels: ["temp", "hum"]
• JSON (if Arduino outputs JSON):
'{"temp":23.5,"hum":67.1}' → values: [23.5, 67.1],
labels: ["temp", "hum"]
"""
ts = datetime.now().isoformat(timespec="milliseconds")
entry: dict[str, Any] = {"ts": ts, "raw": raw, "values": [], "labels": []}
# Try JSON first
if raw.startswith("{"):
try:
import json
obj = json.loads(raw)
for k, v in obj.items():
try:
entry["values"].append(float(v))
entry["labels"].append(str(k))
except (TypeError, ValueError):
pass
return entry
except Exception:
pass
# Try key=value pairs (e.g. "temp=23.5,hum=67")
if "=" in raw:
for token in raw.replace(",", " ").replace(";", " ").split():
if "=" in token:
k, _, v = token.partition("=")
try:
entry["values"].append(float(v))
entry["labels"].append(k.strip())
except ValueError:
pass
if entry["values"]:
return entry
# Fallback: space / comma / semicolon delimited numbers
for token in raw.replace(",", " ").replace(";", " ").split():
try:
entry["values"].append(float(token))
entry["labels"].append(f"ch{len(entry['values'])}")
except ValueError:
pass
return entry
# ── Internal reader loop ──────────────────────────────────────────────────────
def _reader_loop() -> None:
port = _cfg["port"]
baud = _cfg["baud_rate"]
reconnect_delay = _cfg.get("reconnect_delay_s", 3)
while True:
try:
with serial.Serial(port, baud, timeout=1) as ser:
with _lock:
status["connected"] = True
print(f"[USBRead] Connected → {port} @ {baud} baud")
while True:
raw_bytes = ser.readline()
if not raw_bytes:
continue
raw_str = raw_bytes.decode("utf-8", errors="replace").strip()
if not raw_str:
continue
entry = parse_line(raw_str)
with _lock:
_buffer.append(entry)
status["last_received"] = entry["ts"]
status["total_lines"] += 1
# Fire registered callbacks outside the lock
for cb in _callbacks:
try:
cb(entry)
except Exception as exc:
print(f"[USBRead] Callback error: {exc}")
except serial.SerialException as exc:
with _lock:
status["connected"] = False
status["errors"] += 1
print(f"[USBRead] SerialException: {exc} retry in {reconnect_delay}s")
time.sleep(reconnect_delay)
except Exception as exc:
with _lock:
status["connected"] = False
status["errors"] += 1
print(f"[USBRead] Unexpected error: {exc} retry in {reconnect_delay}s")
time.sleep(reconnect_delay)