185 lines
6.4 KiB
Python
Executable File
185 lines
6.4 KiB
Python
Executable File
"""
|
||
USBRead.py
|
||
──────────
|
||
Reads raw lines from a serial USB device (e.g. Arduino) at the configured
|
||
baud rate, parses numeric sensor values, and stores everything in a shared
|
||
thread-safe ring-buffer.
|
||
|
||
Callers import `start_reader()` to launch the background thread, then read
|
||
from `get_snapshot()` at any time.
|
||
"""
|
||
|
||
import threading
|
||
import time
|
||
from collections import deque
|
||
from datetime import datetime
|
||
from typing import Any
|
||
|
||
import serial
|
||
|
||
# ── Populated once by init() ──────────────────────────────────────────────────
|
||
_cfg: dict[str, Any] = {}
|
||
_lock = threading.Lock()
|
||
_buffer: deque = deque()
|
||
|
||
status: dict[str, Any] = {
|
||
"connected": False,
|
||
"port": "",
|
||
"baud": 0,
|
||
"last_received": None,
|
||
"total_lines": 0,
|
||
"errors": 0,
|
||
}
|
||
|
||
# Callbacks: list of callables(entry) invoked after every parsed entry
|
||
_callbacks: list = []
|
||
|
||
|
||
# ── Public API ────────────────────────────────────────────────────────────────
|
||
|
||
def init(usb_cfg: dict[str, Any]) -> None:
|
||
"""Must be called once before start_reader()."""
|
||
global _buffer, _cfg
|
||
_cfg = usb_cfg
|
||
_buffer = deque(maxlen=usb_cfg.get("buffer_size", 200))
|
||
status["port"] = usb_cfg["port"]
|
||
status["baud"] = usb_cfg["baud_rate"]
|
||
|
||
|
||
def register_callback(fn) -> None:
|
||
"""Register a function(entry) that is called for each new parsed line."""
|
||
_callbacks.append(fn)
|
||
|
||
|
||
def get_snapshot() -> dict[str, Any]:
|
||
"""Return a thread-safe snapshot of the current buffer and status."""
|
||
with _lock:
|
||
return {
|
||
"status": dict(status),
|
||
"entries": list(_buffer),
|
||
}
|
||
|
||
|
||
def start_reader() -> threading.Thread:
|
||
"""Start the background serial-reader thread (daemon)."""
|
||
t = threading.Thread(target=_reader_loop, daemon=True, name="usb-reader")
|
||
t.start()
|
||
return t
|
||
|
||
|
||
# ── Parsing helpers ───────────────────────────────────────────────────────────
|
||
|
||
def parse_line(raw: str) -> dict[str, Any]:
|
||
"""
|
||
Parse a raw serial line into a structured entry dict.
|
||
|
||
Supported Arduino output formats (auto-detected):
|
||
• Space / comma / semicolon separated numbers:
|
||
"23.5 67.1 4.92" → values: [23.5, 67.1, 4.92]
|
||
• Key=value pairs:
|
||
"temp=23.5,hum=67.1" → values: [23.5, 67.1],
|
||
labels: ["temp", "hum"]
|
||
• JSON (if Arduino outputs JSON):
|
||
'{"temp":23.5,"hum":67.1}' → values: [23.5, 67.1],
|
||
labels: ["temp", "hum"]
|
||
"""
|
||
ts = datetime.now().isoformat(timespec="milliseconds")
|
||
entry: dict[str, Any] = {"ts": ts, "raw": raw, "values": [], "labels": []}
|
||
|
||
# Try JSON first
|
||
if raw.startswith("{"):
|
||
try:
|
||
import json
|
||
|
||
def _flatten(obj, prefix=""):
|
||
for k, v in obj.items():
|
||
key = f"{prefix}.{k}" if prefix else k
|
||
if isinstance(v, dict):
|
||
_flatten(v, key)
|
||
else:
|
||
try:
|
||
entry["values"].append(float(v))
|
||
entry["labels"].append(key)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
_flatten(json.loads(raw))
|
||
return entry
|
||
except Exception:
|
||
pass
|
||
|
||
# Try key=value pairs (e.g. "temp=23.5,hum=67")
|
||
if "=" in raw:
|
||
for token in raw.replace(",", " ").replace(";", " ").split():
|
||
if "=" in token:
|
||
k, _, v = token.partition("=")
|
||
try:
|
||
entry["values"].append(float(v))
|
||
entry["labels"].append(k.strip())
|
||
except ValueError:
|
||
pass
|
||
if entry["values"]:
|
||
return entry
|
||
|
||
# Fallback: space / comma / semicolon delimited numbers
|
||
for token in raw.replace(",", " ").replace(";", " ").split():
|
||
try:
|
||
entry["values"].append(float(token))
|
||
entry["labels"].append(f"ch{len(entry['values'])}")
|
||
except ValueError:
|
||
pass
|
||
|
||
return entry
|
||
|
||
|
||
# ── Internal reader loop ──────────────────────────────────────────────────────
|
||
|
||
def _reader_loop() -> None:
|
||
port = _cfg["port"]
|
||
baud = _cfg["baud_rate"]
|
||
reconnect_delay = _cfg.get("reconnect_delay_s", 3)
|
||
|
||
while True:
|
||
try:
|
||
with serial.Serial(port, baud, timeout=1) as ser:
|
||
with _lock:
|
||
status["connected"] = True
|
||
print(f"[USBRead] Connected → {port} @ {baud} baud")
|
||
|
||
while True:
|
||
raw_bytes = ser.readline()
|
||
if not raw_bytes:
|
||
continue
|
||
|
||
raw_str = raw_bytes.decode("utf-8", errors="replace").strip()
|
||
if not raw_str:
|
||
continue
|
||
|
||
entry = parse_line(raw_str)
|
||
|
||
with _lock:
|
||
_buffer.append(entry)
|
||
status["last_received"] = entry["ts"]
|
||
status["total_lines"] += 1
|
||
|
||
# Fire registered callbacks outside the lock
|
||
for cb in _callbacks:
|
||
try:
|
||
cb(entry)
|
||
except Exception as exc:
|
||
print(f"[USBRead] Callback error: {exc}")
|
||
|
||
except serial.SerialException as exc:
|
||
with _lock:
|
||
status["connected"] = False
|
||
status["errors"] += 1
|
||
print(f"[USBRead] SerialException: {exc} – retry in {reconnect_delay}s")
|
||
time.sleep(reconnect_delay)
|
||
|
||
except Exception as exc:
|
||
with _lock:
|
||
status["connected"] = False
|
||
status["errors"] += 1
|
||
print(f"[USBRead] Unexpected error: {exc} – retry in {reconnect_delay}s")
|
||
time.sleep(reconnect_delay)
|