Files
smart-mirror/raspi/Notification.py

290 lines
11 KiB
Python
Raw Permalink Normal View History

2026-05-02 20:54:53 +02:00
"""
Notification.py
Monitors incoming sensor entries from USBRead and dispatches alerts via:
SMTP e-mail (plain text + HTML)
WhatsApp (Twilio Business API OR CallMeBot free API)
Two WhatsApp providers are supported and selected via settings.json:
"provider": "twilio" requires twilio package + Twilio account
"provider": "callmebot" free, no extra package needed (HTTP GET)
Usage
-----
import Notification
Notification.init(settings) # pass full settings dict
Notification.attach_to_usb() # registers USBRead callback
"""
import smtplib
import threading
import time
import urllib.parse
import urllib.request
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any
import USBRead
# ── Module-level state ────────────────────────────────────────────────────────
_smtp_cfg: dict[str, Any] = {}
_whatsapp_cfg: dict[str, Any] = {}
_sensors: list[dict[str, Any]] = []
_smtp_cooldown_s: float = 300.0
_wa_cooldown_s: float = 300.0
# cooldown trackers: sensor_name → monotonic time of last sent alert
_last_email: dict[str, float] = {}
_last_whatsapp: dict[str, float] = {}
_lock = threading.Lock()
# ── Public API ────────────────────────────────────────────────────────────────
def init(settings: dict[str, Any]) -> None:
"""Initialise with the full settings dict loaded from settings.json."""
global _smtp_cfg, _whatsapp_cfg, _sensors, _smtp_cooldown_s, _wa_cooldown_s
_smtp_cfg = settings.get("smtp", {})
_whatsapp_cfg = settings.get("whatsapp", {})
_sensors = settings.get("sensors", [])
_smtp_cooldown_s = float(_smtp_cfg.get("cooldown_s", 300))
_wa_cooldown_s = float(_whatsapp_cfg.get("cooldown_s", 300))
channels = []
if _smtp_cfg.get("enabled", True):
channels.append("SMTP")
if _whatsapp_cfg.get("enabled", False):
channels.append(f"WhatsApp/{_whatsapp_cfg.get('provider','?')}")
print(f"[Notification] {len(_sensors)} Sensor-Regeln geladen. "
f"Kanäle: {', '.join(channels) or ''}. "
f"Cooldown E-Mail={_smtp_cooldown_s}s WA={_wa_cooldown_s}s")
def attach_to_usb() -> None:
"""Register the threshold checker as a USBRead callback."""
USBRead.register_callback(_on_entry)
print("[Notification] An USBRead-Callback angehängt.")
def send_alert(subject: str, body: str) -> None:
"""
Dispatch an alert on all enabled channels synchronously.
Call this from a daemon thread to avoid blocking USBRead.
"""
if _smtp_cfg.get("enabled", True):
_send_email(subject, body)
if _whatsapp_cfg.get("enabled", False):
short = f"{subject}\n\n{body[:400]}" # WA messages kept compact
_send_whatsapp(short)
# ── Threshold checker (USBRead callback) ──────────────────────────────────────
def _on_entry(entry: dict[str, Any]) -> None:
values = entry.get("values", [])
if not values:
return
now = time.monotonic()
for sensor in _sensors:
idx = sensor.get("field_index", 0)
if idx >= len(values):
continue
value = values[idx]
name = sensor.get("name", f"Sensor {idx}")
unit = sensor.get("unit", "")
th_high = sensor.get("threshold_high")
th_low = sensor.get("threshold_low")
triggered = False
direction = ""
threshold = None
if sensor.get("notify_on_high", True) and th_high is not None and value > th_high:
triggered, direction, threshold = True, "HOCH", th_high
elif sensor.get("notify_on_low", True) and th_low is not None and value < th_low:
triggered, direction, threshold = True, "NIEDRIG", th_low
if not triggered:
continue
subject = f"[Arduino Alert] {name} zu {direction}: {value:.2f}{unit}"
body = (
f"Sensor-Alarm {entry['ts']}\n\n"
f" Sensor : {name}\n"
f" Messwert : {value:.4f} {unit}\n"
f" Schwelle : {threshold} {unit} ({direction})\n"
f" Rohzeile : {entry['raw']}\n\n"
f"Nächste Benachrichtigung frühestens nach {int(max(_smtp_cooldown_s, _wa_cooldown_s))}s."
)
# Per-channel cooldown check + fire
with _lock:
send_email_now = _check_cooldown(_last_email, name, now, _smtp_cooldown_s)
send_whatsapp_now = _check_cooldown(_last_whatsapp, name, now, _wa_cooldown_s)
if send_email_now or send_whatsapp_now:
threading.Thread(
target=_dispatch,
args=(subject, body, send_email_now, send_whatsapp_now),
daemon=True
).start()
def _check_cooldown(tracker: dict, name: str, now: float, cooldown: float) -> bool:
"""Return True and update tracker if cooldown has elapsed. NOT thread-safe alone."""
last = tracker.get(name, 0.0)
if now - last >= cooldown:
tracker[name] = now
return True
remaining = int(cooldown - (now - last))
print(f"[Notification] Cooldown aktiv für '{name}' (noch {remaining}s)")
return False
def _dispatch(subject: str, body: str, do_email: bool, do_whatsapp: bool) -> None:
if do_email and _smtp_cfg.get("enabled", True):
_send_email(subject, body)
if do_whatsapp and _whatsapp_cfg.get("enabled", False):
_send_whatsapp(f"{subject}\n\n{body[:400]}")
# ── E-Mail ────────────────────────────────────────────────────────────────────
def _send_email(subject: str, body: str) -> None:
cfg = _smtp_cfg
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = cfg["from_address"]
msg["To"] = ", ".join(cfg["to_addresses"])
msg.attach(MIMEText(body, "plain", "utf-8"))
msg.attach(MIMEText(_build_html(subject, body), "html", "utf-8"))
try:
if cfg.get("use_tls", True):
srv = smtplib.SMTP(cfg["host"], cfg["port"], timeout=10)
srv.ehlo(); srv.starttls()
else:
srv = smtplib.SMTP_SSL(cfg["host"], cfg["port"], timeout=10)
srv.login(cfg["username"], cfg["password"])
srv.sendmail(cfg["from_address"], cfg["to_addresses"], msg.as_string())
srv.quit()
print(f"[Notification] ✉ E-Mail gesendet: {subject}")
except Exception as exc:
print(f"[Notification] ✗ SMTP Fehler: {exc}")
# ── WhatsApp ──────────────────────────────────────────────────────────────────
def _send_whatsapp(text: str) -> None:
provider = _whatsapp_cfg.get("provider", "twilio").lower()
if provider == "twilio":
_send_whatsapp_twilio(text)
elif provider == "callmebot":
_send_whatsapp_callmebot(text)
else:
print(f"[Notification] ✗ Unbekannter WhatsApp-Provider: '{provider}'")
def _send_whatsapp_twilio(text: str) -> None:
"""
Send via Twilio WhatsApp API.
Requires: pip install twilio
Sandbox: Join https://www.twilio.com/console/sms/whatsapp/sandbox first.
"""
cfg = _whatsapp_cfg.get("twilio", {})
try:
from twilio.rest import Client # type: ignore
client = Client(cfg["account_sid"], cfg["auth_token"])
for to in cfg.get("to_numbers", []):
msg = client.messages.create(
from_=cfg["from_number"],
to=to,
body=text,
)
print(f"[Notification] 📱 Twilio WA gesendet → {to} SID={msg.sid}")
except ImportError:
print("[Notification] ✗ Twilio nicht installiert: pip install twilio")
except Exception as exc:
print(f"[Notification] ✗ Twilio Fehler: {exc}")
def _send_whatsapp_callmebot(text: str) -> None:
"""
Send via CallMeBot (free, no account needed beyond one-time activation).
Activate at: https://www.callmebot.com/blog/free-api-whatsapp-messages/
No extra packages required.
"""
cfg = _whatsapp_cfg.get("callmebot", {})
api_key = cfg.get("api_key", "")
encoded = urllib.parse.quote(text)
for number in cfg.get("to_numbers", []):
url = (
f"https://api.callmebot.com/whatsapp.php"
f"?phone={urllib.parse.quote(number)}"
f"&text={encoded}"
f"&apikey={api_key}"
)
try:
with urllib.request.urlopen(url, timeout=10) as resp:
status_code = resp.status
print(f"[Notification] 📱 CallMeBot WA → {number} HTTP {status_code}")
except Exception as exc:
print(f"[Notification] ✗ CallMeBot Fehler ({number}): {exc}")
# ── HTML e-mail builder ───────────────────────────────────────────────────────
def _build_html(subject: str, body: str) -> str:
rows = ""
for line in body.splitlines():
line = line.strip()
if not line:
rows += "<tr><td colspan='2' style='padding:5px 0'></td></tr>"
elif ":" in line:
k, _, v = line.partition(":")
rows += (
f"<tr>"
f"<td style='padding:4px 14px 4px 0;color:#888;white-space:nowrap;"
f"font-size:12px'>{k.strip()}</td>"
f"<td style='padding:4px 0;font-family:monospace;font-size:12px'>"
f"{v.strip()}</td></tr>"
)
else:
rows += (
f"<tr><td colspan='2' style='padding:4px 0;color:#ccc;"
f"font-size:13px'>{line}</td></tr>"
)
return f"""<!DOCTYPE html>
<html><head><meta charset="UTF-8"/></head>
<body style="margin:0;padding:0;background:#080b10;font-family:'Segoe UI',sans-serif;color:#c8d6e5">
<table width="100%" cellpadding="0" cellspacing="0"
style="max-width:520px;margin:40px auto">
<tr>
<td style="background:#0f1318;border:1px solid rgba(255,255,255,.08);
border-radius:10px;padding:32px">
<p style="margin:0 0 6px;font-size:10px;letter-spacing:.22em;
text-transform:uppercase;color:#4a5568">Smart Mirror · Arduino Alert</p>
<h1 style="margin:0 0 22px;font-size:17px;color:#e2e8f0;font-weight:600;
line-height:1.3">{subject}</h1>
<table cellpadding="0" cellspacing="0" style="width:100%">{rows}</table>
<p style="margin:24px 0 0;font-size:10px;color:#4a5568">
Automatisch generiert · bitte nicht antworten.
</p>
</td>
</tr>
</table>
</body></html>"""