Add files via upload
This commit is contained in:
177
USBRead.py
Normal file
177
USBRead.py
Normal file
@@ -0,0 +1,177 @@
|
||||
"""
|
||||
USBRead.py
|
||||
──────────
|
||||
Reads raw lines from a serial USB device (e.g. Arduino) at the configured
|
||||
baud rate, parses numeric sensor values, and stores everything in a shared
|
||||
thread-safe ring-buffer.
|
||||
|
||||
Callers import `start_reader()` to launch the background thread, then read
|
||||
from `get_snapshot()` at any time.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import serial
|
||||
|
||||
# ── Populated once by init() ──────────────────────────────────────────────────
|
||||
_cfg: dict[str, Any] = {}
|
||||
_lock = threading.Lock()
|
||||
_buffer: deque = deque()
|
||||
|
||||
status: dict[str, Any] = {
|
||||
"connected": False,
|
||||
"port": "",
|
||||
"baud": 0,
|
||||
"last_received": None,
|
||||
"total_lines": 0,
|
||||
"errors": 0,
|
||||
}
|
||||
|
||||
# Callbacks: list of callables(entry) invoked after every parsed entry
|
||||
_callbacks: list = []
|
||||
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────────
|
||||
|
||||
def init(usb_cfg: dict[str, Any]) -> None:
|
||||
"""Must be called once before start_reader()."""
|
||||
global _buffer, _cfg
|
||||
_cfg = usb_cfg
|
||||
_buffer = deque(maxlen=usb_cfg.get("buffer_size", 200))
|
||||
status["port"] = usb_cfg["port"]
|
||||
status["baud"] = usb_cfg["baud_rate"]
|
||||
|
||||
|
||||
def register_callback(fn) -> None:
|
||||
"""Register a function(entry) that is called for each new parsed line."""
|
||||
_callbacks.append(fn)
|
||||
|
||||
|
||||
def get_snapshot() -> dict[str, Any]:
|
||||
"""Return a thread-safe snapshot of the current buffer and status."""
|
||||
with _lock:
|
||||
return {
|
||||
"status": dict(status),
|
||||
"entries": list(_buffer),
|
||||
}
|
||||
|
||||
|
||||
def start_reader() -> threading.Thread:
|
||||
"""Start the background serial-reader thread (daemon)."""
|
||||
t = threading.Thread(target=_reader_loop, daemon=True, name="usb-reader")
|
||||
t.start()
|
||||
return t
|
||||
|
||||
|
||||
# ── Parsing helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
def parse_line(raw: str) -> dict[str, Any]:
|
||||
"""
|
||||
Parse a raw serial line into a structured entry dict.
|
||||
|
||||
Supported Arduino output formats (auto-detected):
|
||||
• Space / comma / semicolon separated numbers:
|
||||
"23.5 67.1 4.92" → values: [23.5, 67.1, 4.92]
|
||||
• Key=value pairs:
|
||||
"temp=23.5,hum=67.1" → values: [23.5, 67.1],
|
||||
labels: ["temp", "hum"]
|
||||
• JSON (if Arduino outputs JSON):
|
||||
'{"temp":23.5,"hum":67.1}' → values: [23.5, 67.1],
|
||||
labels: ["temp", "hum"]
|
||||
"""
|
||||
ts = datetime.now().isoformat(timespec="milliseconds")
|
||||
entry: dict[str, Any] = {"ts": ts, "raw": raw, "values": [], "labels": []}
|
||||
|
||||
# Try JSON first
|
||||
if raw.startswith("{"):
|
||||
try:
|
||||
import json
|
||||
obj = json.loads(raw)
|
||||
for k, v in obj.items():
|
||||
try:
|
||||
entry["values"].append(float(v))
|
||||
entry["labels"].append(str(k))
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
return entry
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Try key=value pairs (e.g. "temp=23.5,hum=67")
|
||||
if "=" in raw:
|
||||
for token in raw.replace(",", " ").replace(";", " ").split():
|
||||
if "=" in token:
|
||||
k, _, v = token.partition("=")
|
||||
try:
|
||||
entry["values"].append(float(v))
|
||||
entry["labels"].append(k.strip())
|
||||
except ValueError:
|
||||
pass
|
||||
if entry["values"]:
|
||||
return entry
|
||||
|
||||
# Fallback: space / comma / semicolon delimited numbers
|
||||
for token in raw.replace(",", " ").replace(";", " ").split():
|
||||
try:
|
||||
entry["values"].append(float(token))
|
||||
entry["labels"].append(f"ch{len(entry['values'])}")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return entry
|
||||
|
||||
|
||||
# ── Internal reader loop ──────────────────────────────────────────────────────
|
||||
|
||||
def _reader_loop() -> None:
|
||||
port = _cfg["port"]
|
||||
baud = _cfg["baud_rate"]
|
||||
reconnect_delay = _cfg.get("reconnect_delay_s", 3)
|
||||
|
||||
while True:
|
||||
try:
|
||||
with serial.Serial(port, baud, timeout=1) as ser:
|
||||
with _lock:
|
||||
status["connected"] = True
|
||||
print(f"[USBRead] Connected → {port} @ {baud} baud")
|
||||
|
||||
while True:
|
||||
raw_bytes = ser.readline()
|
||||
if not raw_bytes:
|
||||
continue
|
||||
|
||||
raw_str = raw_bytes.decode("utf-8", errors="replace").strip()
|
||||
if not raw_str:
|
||||
continue
|
||||
|
||||
entry = parse_line(raw_str)
|
||||
|
||||
with _lock:
|
||||
_buffer.append(entry)
|
||||
status["last_received"] = entry["ts"]
|
||||
status["total_lines"] += 1
|
||||
|
||||
# Fire registered callbacks outside the lock
|
||||
for cb in _callbacks:
|
||||
try:
|
||||
cb(entry)
|
||||
except Exception as exc:
|
||||
print(f"[USBRead] Callback error: {exc}")
|
||||
|
||||
except serial.SerialException as exc:
|
||||
with _lock:
|
||||
status["connected"] = False
|
||||
status["errors"] += 1
|
||||
print(f"[USBRead] SerialException: {exc} – retry in {reconnect_delay}s")
|
||||
time.sleep(reconnect_delay)
|
||||
|
||||
except Exception as exc:
|
||||
with _lock:
|
||||
status["connected"] = False
|
||||
status["errors"] += 1
|
||||
print(f"[USBRead] Unexpected error: {exc} – retry in {reconnect_delay}s")
|
||||
time.sleep(reconnect_delay)
|
||||
Reference in New Issue
Block a user