This commit is contained in:
Mobkom21 2026-05-12 21:22:39 +02:00
parent 0403ac21ef
commit 9a12b8b862
5 changed files with 621 additions and 14 deletions

View File

@ -15,7 +15,7 @@ class GPS_DATA:
self.port = port
self.ser = serial.Serial(port, baud, timeout=1)
self.ser.write(b'AT+CGPS=1\r\n')
time.sleep(2)
time.sleep(1)
def __str__(self):
@ -56,26 +56,36 @@ class GPS_DATA:
return result_clean
def get_sim_status(self):
ser.write(b'AT+CPIN?\r\n')
self.ser.write(b'AT+CPIN?\r\n')
time.sleep(0.5)
result = self.ser.read_all().decode()
result_clean = self.LTE_hat_encode_feedback(result)
# lockd: ['AT+CPIN?', '+CPIN: SIM PIN', 'OK']
# unlock: ['AT+CPIN?', '+CPIN: READY', 'OK']
match result_clean[1]:
# print(result_clean)
data = ""
for res in result_clean:
if str(res).startswith("+CPIN"):
data = res
break
if data == "":
return
match data:
case "+CPIN: SIM PIN":
return False
case "+CPIN: READY":
return True
case _:
print("Ein fehler ist aufgetreten. Sim Status konnte nicht abgefragt werden.")
return False
def enter_sim_pin(self):
if self.get_sim_status():
print("Sim bereits entsperrt.")
return
return True
self.ser.write(b'AT+CPIN="3082"\r\n')
time.sleep(1)
@ -85,6 +95,8 @@ class GPS_DATA:
# print(result_clean)
if result_clean[2] == "+CPIN: READY":
print("Sim entsperrt.")
return True
return False
def LTE_hat_disable(self):
self.ser.write(b'AT+CFUN=0\r\n')

View File

@ -10,9 +10,12 @@ import paho.mqtt.client as mqtt
from gps_hat import GPS_DATA
from sens_hat import SENS_HAT
from selecta_mode import SelectaMode
selecta = None
def update_hat_data():
if gps.get_gps_info():
if 0:
# if gps.get_gps_info():
# print(gps)
client.publish("MOBKOM/GPS/N", gps.breitengrad)
client.publish("MOBKOM/GPS/E", gps.laengengrad)
@ -98,6 +101,9 @@ def on_message(client, userdata, message):
payload = message.payload.decode("utf-8").lower()
topic = message.topic
if selecta is not None and selecta.handle_mqtt(topic, payload):
return
match topic:
case "MOBKOM/LED/clear":
print("clear LEDs")
@ -128,6 +134,9 @@ def on_connect(client, userdata, flags, rc):
client.subscribe("MOBKOM/LED/set_matrix")
client.subscribe("MOBKOM/LED/get_matrix")
if selecta is not None:
selecta.subscribe(client)
def mqtt_loop_thread(run):
while not run.is_set():
client.loop()
@ -142,7 +151,7 @@ if __name__ == "__main__":
sens = SENS_HAT()
sens.led_clear()
for _ in range(5):
for _ in range(4):
sens.led_set_pixel(0, 0, 0, 0, 0)
time.sleep(0.5)
sens.led_set_pixel(0, 0, 255, 0, 0)
@ -153,8 +162,20 @@ if __name__ == "__main__":
client.on_message = on_message
client.connect("localhost", 1883)
selecta = SelectaMode(sens, client)
sens.led_set_pixel(1, 0, 255, 0, 0)
# if not gps.enter_sim_pin():
# exit(1)
#
# sens.led_set_pixel(2, 0, 255, 0, 0)
#
# if not gps.LTE_hat_start():
# exit(1)
sens.led_set_pixel(3, 0, 255, 0, 0)
stop_event = threading.Event()
t = threading.Thread(target=mqtt_loop_thread, args=(stop_event,))
t.start()
@ -168,10 +189,7 @@ if __name__ == "__main__":
[[0,255,146],[0,255,255],[0,146,255],[0,36,255],[73,0,255],[182,0,255],[255,0,219],[255,0,109]],
[[0,255,255],[0,146,255],[0,36,255],[73,0,255],[182,0,255],[255,0,219],[255,0,109],[255,0,0]]])
for y, row in enumerate(sens.led_matrix):
for x, col in enumerate(row):
sens.led_set_pixel(x, y, col[0], col[1], col[2])
sens.led_restore_matrix()
try:
while run:
@ -179,6 +197,9 @@ if __name__ == "__main__":
update_hat_data()
update_timer = time.time()
if selecta is not None:
selecta.update()
run = not sens.detect_long_press()
time.sleep(0.01)
@ -190,5 +211,5 @@ if __name__ == "__main__":
t.join() # warten bis der thread fertig ist
gps.ser.close()
client.disconnect()
subprocess.run(["sudo", "shutdown", "-h", "0"])
# subprocess.run(["sudo", "shutdown", "-h", "0"])
sys.exit(0)

561
src/selecta_mode.py Normal file
View File

@ -0,0 +1,561 @@
#!/usr/bin/env python3
"""
Selecta mode for MOBKOM main.py
Usage from main.py:
from selecta_mode import SelectaMode
selecta = SelectaMode(sens, client)
...
selecta.subscribe(client)
...
if selecta.handle_mqtt(topic, payload):
return
...
selecta.update()
Controls:
Sense HAT joystick UP -> activate Selecta mode
Sense HAT joystick DOWN -> deactivate Selecta mode
MQTT topics:
MOBKOM/SELECTA/state
MOBKOM/SELECTA/warning
MOBKOM/SELECTA/refill
MOBKOM/SELECTA/reset
MOBKOM/SELECTA/get_state
MOBKOM/SELECTA/event
"""
import copy
import json
import math
import os
import time
from typing import Any
# -------------------------
# MQTT topics
# -------------------------
TOPIC_STATE = "MOBKOM/SELECTA/state"
TOPIC_WARNING = "MOBKOM/SELECTA/warning"
TOPIC_REFILL = "MOBKOM/SELECTA/refill"
TOPIC_RESET = "MOBKOM/SELECTA/reset"
TOPIC_GET_STATE = "MOBKOM/SELECTA/get_state"
TOPIC_EVENT = "MOBKOM/SELECTA/event"
TOPIC_ACTIVE = "MOBKOM/SELECTA/active"
# -------------------------
# Automat configuration
# -------------------------
MAX_STOCK = 8
WARNING_LIMIT = 3
STOCK_FILE = "selecta_stock.json"
TILT_THRESHOLD = 0.35
NEUTRAL_THRESHOLD = 0.18
ACTION_COOLDOWN = 0.45
# left/right selection uses x acceleration by default
LR_AXIS = "x"
LR_INVERT = False
# forward dispense uses angle relative to the activation position
FORWARD_ANGLE_THRESHOLD = 18.0
FORWARD_NEUTRAL_ANGLE = 7.0
FORWARD_INVERT = False
BLINK_INTERVAL = 0.35
RENDER_INTERVAL = 0.08
DEBUG_IMU = False
BEVERAGES = [
{"id": "redbull", "name": "Red Bull", "color": [0, 80, 255]},
{"id": "orangina", "name": "Orangina", "color": [255, 120, 0]},
{"id": "cola", "name": "Cola", "color": [255, 0, 0]},
{"id": "sprite", "name": "Sprite", "color": [0, 255, 70]},
{"id": "water", "name": "Wasser", "color": [0, 220, 255]},
{"id": "fanta", "name": "Fanta", "color": [255, 200, 0]},
{"id": "icetea", "name": "Ice Tea", "color": [180, 100, 20]},
{"id": "mate", "name": "Mate", "color": [120, 255, 0]},
]
def now_ms() -> int:
return int(time.time() * 1000)
def clamp(v: Any, lo: int, hi: int) -> int:
try:
return max(lo, min(hi, int(v)))
except Exception:
return lo
def angle_diff_deg(current: float, reference: float) -> float:
diff = current - reference
while diff > 180:
diff -= 360
while diff < -180:
diff += 360
return diff
class SelectaMode:
def __init__(self, sens, mqtt_client):
self.sens = sens
self.client = mqtt_client
self.active = False
self.selected = 0
self.stock = self.load_stock()
# LED matrix that was shown before Selecta mode was activated.
# Restored when Selecta mode is deactivated.
self.saved_matrix = None
self.blink_on = True
self.last_blink = 0.0
self.last_render = 0.0
self.last_action = 0.0
self.last_debug = 0.0
self.lr_ready = True
self.forward_ready = True
self.forward_neutral_angle = 0.0
self._pending_activate = False
self._pending_deactivate = False
self._install_joystick_callbacks()
# -------------------------
# Setup / subscriptions
# -------------------------
@staticmethod
def mqtt_topics():
return [TOPIC_REFILL, TOPIC_RESET, TOPIC_GET_STATE, TOPIC_ACTIVE]
def subscribe(self, client=None):
c = client or self.client
for topic in self.mqtt_topics():
c.subscribe(topic)
def _install_joystick_callbacks(self):
stick = self.sens.stick
# stick = None
#
# # Prefer the SenseHat object inside your wrapper if it exists.
# if hasattr(self.sens, "sense") and hasattr(self.sens.sense, "stick"):
# stick = self.sens.sense.stick
# elif hasattr(self.sens, "stick"):
# stick = self.sens.stick
# else:
# try:
# from sense_hat import SenseHat
# stick = SenseHat().stick
# except Exception as e:
# print("Selecta: joystick callbacks not available:", e)
# return
def on_up(event):
if getattr(event, "action", "pressed") == "pressed":
self._pending_activate = True
def on_down(event):
if getattr(event, "action", "pressed") == "pressed":
self._pending_deactivate = True
try:
stick.direction_up = on_up
stick.direction_down = on_down
print("Selecta: UP aktiviert, DOWN deaktiviert")
except Exception as e:
print("Selecta: could not assign joystick callbacks:", e)
# -------------------------
# Stock persistence
# -------------------------
def load_stock(self):
default = {b["id"]: MAX_STOCK for b in BEVERAGES}
if not os.path.exists(STOCK_FILE):
return default
try:
with open(STOCK_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for b in BEVERAGES:
bid = b["id"]
default[bid] = clamp(data.get(bid, MAX_STOCK), 0, MAX_STOCK)
return default
except Exception as e:
print("Selecta: stock file could not be read, using full stock:", e)
return default
def save_stock(self):
try:
with open(STOCK_FILE, "w", encoding="utf-8") as f:
json.dump(self.stock, f, indent=2)
except Exception as e:
print("Selecta: stock could not be saved:", e)
# -------------------------
# MQTT helpers
# -------------------------
def publish_json(self, topic, data, retain=False):
try:
self.client.publish(topic, json.dumps(data), qos=0, retain=retain)
except Exception as e:
print("Selecta MQTT publish error:", e)
def publish_event(self, event_type, message, extra=None):
data = {"type": event_type, "message": message, "timestamp": now_ms()}
if extra:
data.update(extra)
self.publish_json(TOPIC_EVENT, data, retain=False)
def publish_state(self):
drinks = []
for i, b in enumerate(BEVERAGES):
drinks.append({
"column": i,
"id": b["id"],
"name": b["name"],
"color": b["color"],
"stock": self.stock[b["id"]],
"max": MAX_STOCK,
"selected": i == self.selected,
})
self.publish_json(TOPIC_STATE, {
"active": self.active,
"selected": self.selected,
"selected_id": BEVERAGES[self.selected]["id"],
"selected_name": BEVERAGES[self.selected]["name"],
"warning_limit": WARNING_LIMIT,
"drinks": drinks,
"timestamp": now_ms(),
}, retain=True)
def _make_warning(self, beverage):
count = self.stock[beverage["id"]]
severity = "empty" if count <= 0 else "low"
message = (
f"Leer: {beverage['name']} ist ausverkauft."
if severity == "empty"
else f"Warnung: Nur noch {count}x {beverage['name']} übrig."
)
return {
"active": True,
"id": beverage["id"],
"name": beverage["name"],
"stock": count,
"limit": WARNING_LIMIT,
"severity": severity,
"message": message,
}
def publish_warning(self, beverage=None, clear=False):
if clear:
self.publish_json(TOPIC_WARNING, {
"active": False,
"warnings": [],
"message": "",
"timestamp": now_ms(),
}, retain=True)
return
warnings = []
if beverage is not None:
warnings = [self._make_warning(beverage)]
else:
warnings = [self._make_warning(b) for b in BEVERAGES if self.stock[b["id"]] <= WARNING_LIMIT]
self.publish_json(TOPIC_WARNING, {
"active": len(warnings) > 0,
"warnings": warnings,
"message": "\n".join(w["message"] for w in warnings),
"timestamp": now_ms(),
}, retain=True)
def handle_mqtt(self, topic: str, payload: str) -> bool:
if topic == TOPIC_REFILL:
self.handle_refill(payload)
return True
if topic == TOPIC_RESET:
self.reset_all()
return True
if topic == TOPIC_GET_STATE:
self.publish_state()
return True
if topic == TOPIC_ACTIVE:
value = payload.strip().lower()
if value in ["1", "true", "on", "start", "active"]:
self.activate()
elif value in ["0", "false", "off", "stop", "inactive"]:
self.deactivate()
return True
return False
# -------------------------
# Web commands
# -------------------------
def handle_refill(self, payload: str):
try:
data = json.loads(payload)
except json.JSONDecodeError:
data = {"id": payload.strip(), "stock": MAX_STOCK}
beverage_id = data.get("id")
column = data.get("column")
if beverage_id is None and column is not None:
try:
beverage_id = BEVERAGES[int(column)]["id"]
except Exception:
print("Selecta: invalid refill column:", column)
return
if beverage_id not in self.stock:
print("Selecta: invalid refill id:", beverage_id)
return
if "stock" in data:
new_stock = clamp(data["stock"], 0, MAX_STOCK)
elif "amount" in data:
new_stock = clamp(self.stock[beverage_id] + int(data["amount"]), 0, MAX_STOCK)
else:
new_stock = MAX_STOCK
self.stock[beverage_id] = new_stock
self.save_stock()
self.publish_event("refill", f"{beverage_id} auf {new_stock} aufgefüllt", {"id": beverage_id, "stock": new_stock})
self.publish_state()
self._publish_current_warning_state()
if self.active:
self.render(force=True)
def reset_all(self):
for b in BEVERAGES:
self.stock[b["id"]] = MAX_STOCK
self.selected = 0
self.save_stock()
self.publish_event("reset", "Alle Getränke aufgefüllt")
self.publish_state()
self.publish_warning(clear=True)
if self.active:
self.render(force=True)
def _publish_current_warning_state(self):
low = [b for b in BEVERAGES if self.stock[b["id"]] <= WARNING_LIMIT]
if low:
self.publish_warning()
else:
self.publish_warning(clear=True)
# -------------------------
# LED matrix save / restore
# -------------------------
def save_matrix(self):
"""Save the current LED matrix before Selecta takes over the display."""
try:
if hasattr(self.sens.led_matrix, "copy"):
# Works for NumPy arrays and many array-like objects.
self.saved_matrix = self.sens.led_matrix.copy()
else:
# Fallback for nested Python lists.
self.saved_matrix = copy.deepcopy(self.sens.led_matrix)
print("Selecta: LED matrix saved")
except Exception as e:
self.saved_matrix = None
print("Selecta: could not save LED matrix:", e)
def restore_matrix(self):
"""Restore the LED matrix that was shown before Selecta mode."""
if self.saved_matrix is None:
print("Selecta: no saved LED matrix to restore")
return
try:
print("Selecta: restoring LED matrix")
for y, row in enumerate(self.saved_matrix):
if y >= 8:
break
for x, col in enumerate(row):
if x >= 8:
break
r = int(col[0])
g = int(col[1])
b = int(col[2])
self.sens.led_set_pixel(x, y, r, g, b)
except Exception as e:
print("Selecta: could not restore LED matrix:", e)
# -------------------------
# Activation
# -------------------------
def activate(self):
if self.active:
return
self.save_matrix()
self.active = True
self.lr_ready = True
self.forward_ready = True
self.calibrate_forward_neutral()
self.publish_event("active", "Selecta aktiviert")
self.publish_state()
self.render(force=True)
print("Selecta aktiviert")
def deactivate(self):
if not self.active:
return
self.active = False
self.publish_event("inactive", "Selecta deaktiviert")
self.publish_state()
self.restore_matrix()
print("Selecta deaktiviert")
# -------------------------
# Tilt input
# -------------------------
def axis_value(self, axis_name):
return float(getattr(self.sens, axis_name, 0.0))
def read_forward_angle(self):
y = self.axis_value("y")
z = self.axis_value("z")
angle = math.degrees(math.atan2(-y, z))
return -angle if FORWARD_INVERT else angle
def calibrate_forward_neutral(self):
samples = []
for _ in range(12):
self.sens.get_sensor_data()
samples.append(self.read_forward_angle())
time.sleep(0.01)
sin_sum = sum(math.sin(math.radians(a)) for a in samples)
cos_sum = sum(math.cos(math.radians(a)) for a in samples)
self.forward_neutral_angle = math.degrees(math.atan2(sin_sum, cos_sum))
print(f"Selecta forward neutral: {self.forward_neutral_angle:.1f}°")
def select_delta(self, delta):
self.selected = (self.selected + delta) % len(BEVERAGES)
self.publish_state()
self.render(force=True)
def dispense(self):
b = BEVERAGES[self.selected]
bid = b["id"]
if self.stock[bid] <= 0:
self.publish_event("empty", f"{b['name']} ist leer", {"id": bid, "stock": 0})
self._publish_current_warning_state()
return
self.stock[bid] -= 1
count = self.stock[bid]
self.save_stock()
self.publish_event("dispense", f"Ausgabe: {b['name']} ({count} übrig)", {"id": bid, "stock": count})
self.publish_state()
if count <= WARNING_LIMIT:
self._publish_current_warning_state()
self.render(force=True)
def handle_tilt(self):
self.sens.get_sensor_data()
lr = self.axis_value(LR_AXIS)
if LR_INVERT:
lr = -lr
forward_angle = self.read_forward_angle()
forward_delta = angle_diff_deg(forward_angle, self.forward_neutral_angle)
now = time.time()
if DEBUG_IMU and now - self.last_debug > 0.35:
print(f"Selecta: lr={lr:+.2f} forward_delta={forward_delta:+.1f}°")
self.last_debug = now
if abs(lr) < NEUTRAL_THRESHOLD:
self.lr_ready = True
if abs(forward_delta) < FORWARD_NEUTRAL_ANGLE:
self.forward_ready = True
if now - self.last_action < ACTION_COOLDOWN:
return
if self.lr_ready and lr < -TILT_THRESHOLD:
self.select_delta(-1)
self.lr_ready = False
self.last_action = now
return
if self.lr_ready and lr > TILT_THRESHOLD:
self.select_delta(1)
self.lr_ready = False
self.last_action = now
return
if self.forward_ready and forward_delta > FORWARD_ANGLE_THRESHOLD:
self.dispense()
self.forward_ready = False
self.last_action = now
# -------------------------
# LED rendering
# -------------------------
def set_pixel_safe(self, x, y, rgb):
self.sens.led_set_pixel(x, y, clamp(rgb[0], 0, 255), clamp(rgb[1], 0, 255), clamp(rgb[2], 0, 255))
def render(self, force=False):
if not self.active:
return
now = time.time()
if now - self.last_blink >= BLINK_INTERVAL:
self.blink_on = not self.blink_on
self.last_blink = now
force = True
if not force and now - self.last_render < RENDER_INTERVAL:
return
self.last_render = now
matrix = [[[0, 0, 0] for _ in range(8)] for _ in range(8)]
for x, b in enumerate(BEVERAGES):
count = self.stock[b["id"]]
color = b["color"]
for i in range(count):
y = 7 - i
matrix[y][x] = color[:]
if x == self.selected:
if count > 0:
matrix[7][x] = [255, 255, 255] if self.blink_on else color[:]
else:
matrix[7][x] = [255, 0, 0] if self.blink_on else [0, 0, 0]
for y in range(8):
for x in range(8):
self.set_pixel_safe(x, y, matrix[y][x])
# -------------------------
# Called from main loop
# -------------------------
def update(self):
if self._pending_activate:
self._pending_activate = False
self.activate()
if self._pending_deactivate:
self._pending_deactivate = False
self.deactivate()
if self.active:
self.handle_tilt()
self.render()

10
src/selecta_stock.json Normal file
View File

@ -0,0 +1,10 @@
{
"redbull": 8,
"orangina": 8,
"cola": 7,
"sprite": 0,
"water": 4,
"fanta": 0,
"icetea": 8,
"mate": 8
}

View File

@ -66,6 +66,11 @@ class SENS_HAT:
return pixel_data
def led_restore_matrix(self):
for y, row in enumerate(self.led_matrix):
for x, col in enumerate(row):
self.sense.set_pixel(x, y, col[0], col[1], col[2])
def detect_long_press(self):
"""
Returns True wenn der Taster für 4 Sekunden gedrückt werde.
@ -79,9 +84,7 @@ class SENS_HAT:
if event.action == "released":
self.time_down = time.time()
for y, row in enumerate(self.led_matrix):
for x, col in enumerate(row):
self.sense.set_pixel(x, y, col[0], col[1], col[2])
self.led_restore_matrix()
if self.time_down + 0.5 < time.time():