""" USBRead.py ────────── Reads raw lines from a serial USB device (e.g. Arduino) at the configured baud rate, parses numeric sensor values, and stores everything in a shared thread-safe ring-buffer. Callers import `start_reader()` to launch the background thread, then read from `get_snapshot()` at any time. """ import threading import time from collections import deque from datetime import datetime from typing import Any import serial # ── Populated once by init() ────────────────────────────────────────────────── _cfg: dict[str, Any] = {} _lock = threading.Lock() _buffer: deque = deque() status: dict[str, Any] = { "connected": False, "port": "", "baud": 0, "last_received": None, "total_lines": 0, "errors": 0, } # Callbacks: list of callables(entry) invoked after every parsed entry _callbacks: list = [] # ── Public API ──────────────────────────────────────────────────────────────── def init(usb_cfg: dict[str, Any]) -> None: """Must be called once before start_reader().""" global _buffer, _cfg _cfg = usb_cfg _buffer = deque(maxlen=usb_cfg.get("buffer_size", 200)) status["port"] = usb_cfg["port"] status["baud"] = usb_cfg["baud_rate"] def register_callback(fn) -> None: """Register a function(entry) that is called for each new parsed line.""" _callbacks.append(fn) def get_snapshot() -> dict[str, Any]: """Return a thread-safe snapshot of the current buffer and status.""" with _lock: return { "status": dict(status), "entries": list(_buffer), } def start_reader() -> threading.Thread: """Start the background serial-reader thread (daemon).""" t = threading.Thread(target=_reader_loop, daemon=True, name="usb-reader") t.start() return t # ── Parsing helpers ─────────────────────────────────────────────────────────── def parse_line(raw: str) -> dict[str, Any]: """ Parse a raw serial line into a structured entry dict. Supported Arduino output formats (auto-detected): • Space / comma / semicolon separated numbers: "23.5 67.1 4.92" → values: [23.5, 67.1, 4.92] • Key=value pairs: "temp=23.5,hum=67.1" → values: [23.5, 67.1], labels: ["temp", "hum"] • JSON (if Arduino outputs JSON): '{"temp":23.5,"hum":67.1}' → values: [23.5, 67.1], labels: ["temp", "hum"] """ ts = datetime.now().isoformat(timespec="milliseconds") entry: dict[str, Any] = {"ts": ts, "raw": raw, "values": [], "labels": []} # Try JSON first if raw.startswith("{"): try: import json obj = json.loads(raw) for k, v in obj.items(): try: entry["values"].append(float(v)) entry["labels"].append(str(k)) except (TypeError, ValueError): pass return entry except Exception: pass # Try key=value pairs (e.g. "temp=23.5,hum=67") if "=" in raw: for token in raw.replace(",", " ").replace(";", " ").split(): if "=" in token: k, _, v = token.partition("=") try: entry["values"].append(float(v)) entry["labels"].append(k.strip()) except ValueError: pass if entry["values"]: return entry # Fallback: space / comma / semicolon delimited numbers for token in raw.replace(",", " ").replace(";", " ").split(): try: entry["values"].append(float(token)) entry["labels"].append(f"ch{len(entry['values'])}") except ValueError: pass return entry # ── Internal reader loop ────────────────────────────────────────────────────── def _reader_loop() -> None: port = _cfg["port"] baud = _cfg["baud_rate"] reconnect_delay = _cfg.get("reconnect_delay_s", 3) while True: try: with serial.Serial(port, baud, timeout=1) as ser: with _lock: status["connected"] = True print(f"[USBRead] Connected → {port} @ {baud} baud") while True: raw_bytes = ser.readline() if not raw_bytes: continue raw_str = raw_bytes.decode("utf-8", errors="replace").strip() if not raw_str: continue entry = parse_line(raw_str) with _lock: _buffer.append(entry) status["last_received"] = entry["ts"] status["total_lines"] += 1 # Fire registered callbacks outside the lock for cb in _callbacks: try: cb(entry) except Exception as exc: print(f"[USBRead] Callback error: {exc}") except serial.SerialException as exc: with _lock: status["connected"] = False status["errors"] += 1 print(f"[USBRead] SerialException: {exc} – retry in {reconnect_delay}s") time.sleep(reconnect_delay) except Exception as exc: with _lock: status["connected"] = False status["errors"] += 1 print(f"[USBRead] Unexpected error: {exc} – retry in {reconnect_delay}s") time.sleep(reconnect_delay)