178 lines
6.0 KiB
Python
178 lines
6.0 KiB
Python
|
|
"""
|
|||
|
|
USBRead.py
|
|||
|
|
──────────
|
|||
|
|
Reads raw lines from a serial USB device (e.g. Arduino) at the configured
|
|||
|
|
baud rate, parses numeric sensor values, and stores everything in a shared
|
|||
|
|
thread-safe ring-buffer.
|
|||
|
|
|
|||
|
|
Callers import `start_reader()` to launch the background thread, then read
|
|||
|
|
from `get_snapshot()` at any time.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import threading
|
|||
|
|
import time
|
|||
|
|
from collections import deque
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Any
|
|||
|
|
|
|||
|
|
import serial
|
|||
|
|
|
|||
|
|
# ── Populated once by init() ──────────────────────────────────────────────────
|
|||
|
|
_cfg: dict[str, Any] = {}
|
|||
|
|
_lock = threading.Lock()
|
|||
|
|
_buffer: deque = deque()
|
|||
|
|
|
|||
|
|
status: dict[str, Any] = {
|
|||
|
|
"connected": False,
|
|||
|
|
"port": "",
|
|||
|
|
"baud": 0,
|
|||
|
|
"last_received": None,
|
|||
|
|
"total_lines": 0,
|
|||
|
|
"errors": 0,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# Callbacks: list of callables(entry) invoked after every parsed entry
|
|||
|
|
_callbacks: list = []
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Public API ────────────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
def init(usb_cfg: dict[str, Any]) -> None:
|
|||
|
|
"""Must be called once before start_reader()."""
|
|||
|
|
global _buffer, _cfg
|
|||
|
|
_cfg = usb_cfg
|
|||
|
|
_buffer = deque(maxlen=usb_cfg.get("buffer_size", 200))
|
|||
|
|
status["port"] = usb_cfg["port"]
|
|||
|
|
status["baud"] = usb_cfg["baud_rate"]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def register_callback(fn) -> None:
|
|||
|
|
"""Register a function(entry) that is called for each new parsed line."""
|
|||
|
|
_callbacks.append(fn)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_snapshot() -> dict[str, Any]:
|
|||
|
|
"""Return a thread-safe snapshot of the current buffer and status."""
|
|||
|
|
with _lock:
|
|||
|
|
return {
|
|||
|
|
"status": dict(status),
|
|||
|
|
"entries": list(_buffer),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def start_reader() -> threading.Thread:
|
|||
|
|
"""Start the background serial-reader thread (daemon)."""
|
|||
|
|
t = threading.Thread(target=_reader_loop, daemon=True, name="usb-reader")
|
|||
|
|
t.start()
|
|||
|
|
return t
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Parsing helpers ───────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
def parse_line(raw: str) -> dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
Parse a raw serial line into a structured entry dict.
|
|||
|
|
|
|||
|
|
Supported Arduino output formats (auto-detected):
|
|||
|
|
• Space / comma / semicolon separated numbers:
|
|||
|
|
"23.5 67.1 4.92" → values: [23.5, 67.1, 4.92]
|
|||
|
|
• Key=value pairs:
|
|||
|
|
"temp=23.5,hum=67.1" → values: [23.5, 67.1],
|
|||
|
|
labels: ["temp", "hum"]
|
|||
|
|
• JSON (if Arduino outputs JSON):
|
|||
|
|
'{"temp":23.5,"hum":67.1}' → values: [23.5, 67.1],
|
|||
|
|
labels: ["temp", "hum"]
|
|||
|
|
"""
|
|||
|
|
ts = datetime.now().isoformat(timespec="milliseconds")
|
|||
|
|
entry: dict[str, Any] = {"ts": ts, "raw": raw, "values": [], "labels": []}
|
|||
|
|
|
|||
|
|
# Try JSON first
|
|||
|
|
if raw.startswith("{"):
|
|||
|
|
try:
|
|||
|
|
import json
|
|||
|
|
obj = json.loads(raw)
|
|||
|
|
for k, v in obj.items():
|
|||
|
|
try:
|
|||
|
|
entry["values"].append(float(v))
|
|||
|
|
entry["labels"].append(str(k))
|
|||
|
|
except (TypeError, ValueError):
|
|||
|
|
pass
|
|||
|
|
return entry
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# Try key=value pairs (e.g. "temp=23.5,hum=67")
|
|||
|
|
if "=" in raw:
|
|||
|
|
for token in raw.replace(",", " ").replace(";", " ").split():
|
|||
|
|
if "=" in token:
|
|||
|
|
k, _, v = token.partition("=")
|
|||
|
|
try:
|
|||
|
|
entry["values"].append(float(v))
|
|||
|
|
entry["labels"].append(k.strip())
|
|||
|
|
except ValueError:
|
|||
|
|
pass
|
|||
|
|
if entry["values"]:
|
|||
|
|
return entry
|
|||
|
|
|
|||
|
|
# Fallback: space / comma / semicolon delimited numbers
|
|||
|
|
for token in raw.replace(",", " ").replace(";", " ").split():
|
|||
|
|
try:
|
|||
|
|
entry["values"].append(float(token))
|
|||
|
|
entry["labels"].append(f"ch{len(entry['values'])}")
|
|||
|
|
except ValueError:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return entry
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Internal reader loop ──────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
def _reader_loop() -> None:
|
|||
|
|
port = _cfg["port"]
|
|||
|
|
baud = _cfg["baud_rate"]
|
|||
|
|
reconnect_delay = _cfg.get("reconnect_delay_s", 3)
|
|||
|
|
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
with serial.Serial(port, baud, timeout=1) as ser:
|
|||
|
|
with _lock:
|
|||
|
|
status["connected"] = True
|
|||
|
|
print(f"[USBRead] Connected → {port} @ {baud} baud")
|
|||
|
|
|
|||
|
|
while True:
|
|||
|
|
raw_bytes = ser.readline()
|
|||
|
|
if not raw_bytes:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
raw_str = raw_bytes.decode("utf-8", errors="replace").strip()
|
|||
|
|
if not raw_str:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
entry = parse_line(raw_str)
|
|||
|
|
|
|||
|
|
with _lock:
|
|||
|
|
_buffer.append(entry)
|
|||
|
|
status["last_received"] = entry["ts"]
|
|||
|
|
status["total_lines"] += 1
|
|||
|
|
|
|||
|
|
# Fire registered callbacks outside the lock
|
|||
|
|
for cb in _callbacks:
|
|||
|
|
try:
|
|||
|
|
cb(entry)
|
|||
|
|
except Exception as exc:
|
|||
|
|
print(f"[USBRead] Callback error: {exc}")
|
|||
|
|
|
|||
|
|
except serial.SerialException as exc:
|
|||
|
|
with _lock:
|
|||
|
|
status["connected"] = False
|
|||
|
|
status["errors"] += 1
|
|||
|
|
print(f"[USBRead] SerialException: {exc} – retry in {reconnect_delay}s")
|
|||
|
|
time.sleep(reconnect_delay)
|
|||
|
|
|
|||
|
|
except Exception as exc:
|
|||
|
|
with _lock:
|
|||
|
|
status["connected"] = False
|
|||
|
|
status["errors"] += 1
|
|||
|
|
print(f"[USBRead] Unexpected error: {exc} – retry in {reconnect_delay}s")
|
|||
|
|
time.sleep(reconnect_delay)
|