diff --git a/src/gps_hat.py b/src/gps_hat.py index e5322f3..ad807da 100644 --- a/src/gps_hat.py +++ b/src/gps_hat.py @@ -15,7 +15,7 @@ class GPS_DATA: self.port = port self.ser = serial.Serial(port, baud, timeout=1) self.ser.write(b'AT+CGPS=1\r\n') - time.sleep(2) + time.sleep(1) def __str__(self): @@ -56,26 +56,36 @@ class GPS_DATA: return result_clean def get_sim_status(self): - ser.write(b'AT+CPIN?\r\n') + self.ser.write(b'AT+CPIN?\r\n') time.sleep(0.5) result = self.ser.read_all().decode() result_clean = self.LTE_hat_encode_feedback(result) # lockd: ['AT+CPIN?', '+CPIN: SIM PIN', 'OK'] # unlock: ['AT+CPIN?', '+CPIN: READY', 'OK'] - match result_clean[1]: + # print(result_clean) + data = "" + for res in result_clean: + if str(res).startswith("+CPIN"): + data = res + break + if data == "": + return + + match data: case "+CPIN: SIM PIN": return False case "+CPIN: READY": return True case _: print("Ein fehler ist aufgetreten. Sim Status konnte nicht abgefragt werden.") + return False def enter_sim_pin(self): if self.get_sim_status(): print("Sim bereits entsperrt.") - return + return True self.ser.write(b'AT+CPIN="3082"\r\n') time.sleep(1) @@ -85,6 +95,8 @@ class GPS_DATA: # print(result_clean) if result_clean[2] == "+CPIN: READY": print("Sim entsperrt.") + return True + return False def LTE_hat_disable(self): self.ser.write(b'AT+CFUN=0\r\n') diff --git a/src/main.py b/src/main.py index 6621cc9..376f2a6 100644 --- a/src/main.py +++ b/src/main.py @@ -10,9 +10,12 @@ import paho.mqtt.client as mqtt from gps_hat import GPS_DATA from sens_hat import SENS_HAT +from selecta_mode import SelectaMode +selecta = None def update_hat_data(): - if gps.get_gps_info(): + if 0: + # if gps.get_gps_info(): # print(gps) client.publish("MOBKOM/GPS/N", gps.breitengrad) client.publish("MOBKOM/GPS/E", gps.laengengrad) @@ -98,6 +101,9 @@ def on_message(client, userdata, message): payload = message.payload.decode("utf-8").lower() topic = message.topic + if selecta is not None and selecta.handle_mqtt(topic, payload): + return + match topic: case "MOBKOM/LED/clear": print("clear LEDs") @@ -128,6 +134,9 @@ def on_connect(client, userdata, flags, rc): client.subscribe("MOBKOM/LED/set_matrix") client.subscribe("MOBKOM/LED/get_matrix") + if selecta is not None: + selecta.subscribe(client) + def mqtt_loop_thread(run): while not run.is_set(): client.loop() @@ -142,7 +151,7 @@ if __name__ == "__main__": sens = SENS_HAT() sens.led_clear() - for _ in range(5): + for _ in range(4): sens.led_set_pixel(0, 0, 0, 0, 0) time.sleep(0.5) sens.led_set_pixel(0, 0, 255, 0, 0) @@ -153,8 +162,20 @@ if __name__ == "__main__": client.on_message = on_message client.connect("localhost", 1883) + selecta = SelectaMode(sens, client) + sens.led_set_pixel(1, 0, 255, 0, 0) + # if not gps.enter_sim_pin(): + # exit(1) + # + # sens.led_set_pixel(2, 0, 255, 0, 0) + # + # if not gps.LTE_hat_start(): + # exit(1) + + sens.led_set_pixel(3, 0, 255, 0, 0) + stop_event = threading.Event() t = threading.Thread(target=mqtt_loop_thread, args=(stop_event,)) t.start() @@ -168,10 +189,7 @@ if __name__ == "__main__": [[0,255,146],[0,255,255],[0,146,255],[0,36,255],[73,0,255],[182,0,255],[255,0,219],[255,0,109]], [[0,255,255],[0,146,255],[0,36,255],[73,0,255],[182,0,255],[255,0,219],[255,0,109],[255,0,0]]]) - for y, row in enumerate(sens.led_matrix): - for x, col in enumerate(row): - sens.led_set_pixel(x, y, col[0], col[1], col[2]) - + sens.led_restore_matrix() try: while run: @@ -179,6 +197,9 @@ if __name__ == "__main__": update_hat_data() update_timer = time.time() + if selecta is not None: + selecta.update() + run = not sens.detect_long_press() time.sleep(0.01) @@ -190,5 +211,5 @@ if __name__ == "__main__": t.join() # warten bis der thread fertig ist gps.ser.close() client.disconnect() - subprocess.run(["sudo", "shutdown", "-h", "0"]) + # subprocess.run(["sudo", "shutdown", "-h", "0"]) sys.exit(0) diff --git a/src/selecta_mode.py b/src/selecta_mode.py new file mode 100644 index 0000000..ce764b6 --- /dev/null +++ b/src/selecta_mode.py @@ -0,0 +1,561 @@ +#!/usr/bin/env python3 +""" +Selecta mode for MOBKOM main.py + +Usage from main.py: + from selecta_mode import SelectaMode + + selecta = SelectaMode(sens, client) + ... + selecta.subscribe(client) + ... + if selecta.handle_mqtt(topic, payload): + return + ... + selecta.update() + +Controls: + Sense HAT joystick UP -> activate Selecta mode + Sense HAT joystick DOWN -> deactivate Selecta mode + +MQTT topics: + MOBKOM/SELECTA/state + MOBKOM/SELECTA/warning + MOBKOM/SELECTA/refill + MOBKOM/SELECTA/reset + MOBKOM/SELECTA/get_state + MOBKOM/SELECTA/event +""" + +import copy +import json +import math +import os +import time +from typing import Any + +# ------------------------- +# MQTT topics +# ------------------------- +TOPIC_STATE = "MOBKOM/SELECTA/state" +TOPIC_WARNING = "MOBKOM/SELECTA/warning" +TOPIC_REFILL = "MOBKOM/SELECTA/refill" +TOPIC_RESET = "MOBKOM/SELECTA/reset" +TOPIC_GET_STATE = "MOBKOM/SELECTA/get_state" +TOPIC_EVENT = "MOBKOM/SELECTA/event" +TOPIC_ACTIVE = "MOBKOM/SELECTA/active" + +# ------------------------- +# Automat configuration +# ------------------------- +MAX_STOCK = 8 +WARNING_LIMIT = 3 +STOCK_FILE = "selecta_stock.json" + +TILT_THRESHOLD = 0.35 +NEUTRAL_THRESHOLD = 0.18 +ACTION_COOLDOWN = 0.45 + +# left/right selection uses x acceleration by default +LR_AXIS = "x" +LR_INVERT = False + +# forward dispense uses angle relative to the activation position +FORWARD_ANGLE_THRESHOLD = 18.0 +FORWARD_NEUTRAL_ANGLE = 7.0 +FORWARD_INVERT = False + +BLINK_INTERVAL = 0.35 +RENDER_INTERVAL = 0.08 +DEBUG_IMU = False + +BEVERAGES = [ + {"id": "redbull", "name": "Red Bull", "color": [0, 80, 255]}, + {"id": "orangina", "name": "Orangina", "color": [255, 120, 0]}, + {"id": "cola", "name": "Cola", "color": [255, 0, 0]}, + {"id": "sprite", "name": "Sprite", "color": [0, 255, 70]}, + {"id": "water", "name": "Wasser", "color": [0, 220, 255]}, + {"id": "fanta", "name": "Fanta", "color": [255, 200, 0]}, + {"id": "icetea", "name": "Ice Tea", "color": [180, 100, 20]}, + {"id": "mate", "name": "Mate", "color": [120, 255, 0]}, +] + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def clamp(v: Any, lo: int, hi: int) -> int: + try: + return max(lo, min(hi, int(v))) + except Exception: + return lo + + +def angle_diff_deg(current: float, reference: float) -> float: + diff = current - reference + while diff > 180: + diff -= 360 + while diff < -180: + diff += 360 + return diff + + +class SelectaMode: + def __init__(self, sens, mqtt_client): + self.sens = sens + self.client = mqtt_client + + self.active = False + self.selected = 0 + self.stock = self.load_stock() + + # LED matrix that was shown before Selecta mode was activated. + # Restored when Selecta mode is deactivated. + self.saved_matrix = None + + self.blink_on = True + self.last_blink = 0.0 + self.last_render = 0.0 + self.last_action = 0.0 + self.last_debug = 0.0 + + self.lr_ready = True + self.forward_ready = True + self.forward_neutral_angle = 0.0 + + self._pending_activate = False + self._pending_deactivate = False + self._install_joystick_callbacks() + + # ------------------------- + # Setup / subscriptions + # ------------------------- + @staticmethod + def mqtt_topics(): + return [TOPIC_REFILL, TOPIC_RESET, TOPIC_GET_STATE, TOPIC_ACTIVE] + + def subscribe(self, client=None): + c = client or self.client + for topic in self.mqtt_topics(): + c.subscribe(topic) + + def _install_joystick_callbacks(self): + stick = self.sens.stick + # stick = None + # + # # Prefer the SenseHat object inside your wrapper if it exists. + # if hasattr(self.sens, "sense") and hasattr(self.sens.sense, "stick"): + # stick = self.sens.sense.stick + # elif hasattr(self.sens, "stick"): + # stick = self.sens.stick + # else: + # try: + # from sense_hat import SenseHat + # stick = SenseHat().stick + # except Exception as e: + # print("Selecta: joystick callbacks not available:", e) + # return + + def on_up(event): + if getattr(event, "action", "pressed") == "pressed": + self._pending_activate = True + + def on_down(event): + if getattr(event, "action", "pressed") == "pressed": + self._pending_deactivate = True + + try: + stick.direction_up = on_up + stick.direction_down = on_down + print("Selecta: UP aktiviert, DOWN deaktiviert") + except Exception as e: + print("Selecta: could not assign joystick callbacks:", e) + + # ------------------------- + # Stock persistence + # ------------------------- + def load_stock(self): + default = {b["id"]: MAX_STOCK for b in BEVERAGES} + if not os.path.exists(STOCK_FILE): + return default + try: + with open(STOCK_FILE, "r", encoding="utf-8") as f: + data = json.load(f) + for b in BEVERAGES: + bid = b["id"] + default[bid] = clamp(data.get(bid, MAX_STOCK), 0, MAX_STOCK) + return default + except Exception as e: + print("Selecta: stock file could not be read, using full stock:", e) + return default + + def save_stock(self): + try: + with open(STOCK_FILE, "w", encoding="utf-8") as f: + json.dump(self.stock, f, indent=2) + except Exception as e: + print("Selecta: stock could not be saved:", e) + + # ------------------------- + # MQTT helpers + # ------------------------- + def publish_json(self, topic, data, retain=False): + try: + self.client.publish(topic, json.dumps(data), qos=0, retain=retain) + except Exception as e: + print("Selecta MQTT publish error:", e) + + def publish_event(self, event_type, message, extra=None): + data = {"type": event_type, "message": message, "timestamp": now_ms()} + if extra: + data.update(extra) + self.publish_json(TOPIC_EVENT, data, retain=False) + + def publish_state(self): + drinks = [] + for i, b in enumerate(BEVERAGES): + drinks.append({ + "column": i, + "id": b["id"], + "name": b["name"], + "color": b["color"], + "stock": self.stock[b["id"]], + "max": MAX_STOCK, + "selected": i == self.selected, + }) + + self.publish_json(TOPIC_STATE, { + "active": self.active, + "selected": self.selected, + "selected_id": BEVERAGES[self.selected]["id"], + "selected_name": BEVERAGES[self.selected]["name"], + "warning_limit": WARNING_LIMIT, + "drinks": drinks, + "timestamp": now_ms(), + }, retain=True) + + def _make_warning(self, beverage): + count = self.stock[beverage["id"]] + severity = "empty" if count <= 0 else "low" + message = ( + f"Leer: {beverage['name']} ist ausverkauft." + if severity == "empty" + else f"Warnung: Nur noch {count}x {beverage['name']} übrig." + ) + return { + "active": True, + "id": beverage["id"], + "name": beverage["name"], + "stock": count, + "limit": WARNING_LIMIT, + "severity": severity, + "message": message, + } + + def publish_warning(self, beverage=None, clear=False): + if clear: + self.publish_json(TOPIC_WARNING, { + "active": False, + "warnings": [], + "message": "", + "timestamp": now_ms(), + }, retain=True) + return + + warnings = [] + if beverage is not None: + warnings = [self._make_warning(beverage)] + else: + warnings = [self._make_warning(b) for b in BEVERAGES if self.stock[b["id"]] <= WARNING_LIMIT] + + self.publish_json(TOPIC_WARNING, { + "active": len(warnings) > 0, + "warnings": warnings, + "message": "\n".join(w["message"] for w in warnings), + "timestamp": now_ms(), + }, retain=True) + + def handle_mqtt(self, topic: str, payload: str) -> bool: + if topic == TOPIC_REFILL: + self.handle_refill(payload) + return True + if topic == TOPIC_RESET: + self.reset_all() + return True + if topic == TOPIC_GET_STATE: + self.publish_state() + return True + if topic == TOPIC_ACTIVE: + value = payload.strip().lower() + if value in ["1", "true", "on", "start", "active"]: + self.activate() + elif value in ["0", "false", "off", "stop", "inactive"]: + self.deactivate() + return True + return False + + # ------------------------- + # Web commands + # ------------------------- + def handle_refill(self, payload: str): + try: + data = json.loads(payload) + except json.JSONDecodeError: + data = {"id": payload.strip(), "stock": MAX_STOCK} + + beverage_id = data.get("id") + column = data.get("column") + + if beverage_id is None and column is not None: + try: + beverage_id = BEVERAGES[int(column)]["id"] + except Exception: + print("Selecta: invalid refill column:", column) + return + + if beverage_id not in self.stock: + print("Selecta: invalid refill id:", beverage_id) + return + + if "stock" in data: + new_stock = clamp(data["stock"], 0, MAX_STOCK) + elif "amount" in data: + new_stock = clamp(self.stock[beverage_id] + int(data["amount"]), 0, MAX_STOCK) + else: + new_stock = MAX_STOCK + + self.stock[beverage_id] = new_stock + self.save_stock() + self.publish_event("refill", f"{beverage_id} auf {new_stock} aufgefüllt", {"id": beverage_id, "stock": new_stock}) + self.publish_state() + self._publish_current_warning_state() + if self.active: + self.render(force=True) + + def reset_all(self): + for b in BEVERAGES: + self.stock[b["id"]] = MAX_STOCK + self.selected = 0 + self.save_stock() + self.publish_event("reset", "Alle Getränke aufgefüllt") + self.publish_state() + self.publish_warning(clear=True) + if self.active: + self.render(force=True) + + def _publish_current_warning_state(self): + low = [b for b in BEVERAGES if self.stock[b["id"]] <= WARNING_LIMIT] + if low: + self.publish_warning() + else: + self.publish_warning(clear=True) + + # ------------------------- + # LED matrix save / restore + # ------------------------- + def save_matrix(self): + """Save the current LED matrix before Selecta takes over the display.""" + try: + if hasattr(self.sens.led_matrix, "copy"): + # Works for NumPy arrays and many array-like objects. + self.saved_matrix = self.sens.led_matrix.copy() + else: + # Fallback for nested Python lists. + self.saved_matrix = copy.deepcopy(self.sens.led_matrix) + print("Selecta: LED matrix saved") + except Exception as e: + self.saved_matrix = None + print("Selecta: could not save LED matrix:", e) + + def restore_matrix(self): + """Restore the LED matrix that was shown before Selecta mode.""" + if self.saved_matrix is None: + print("Selecta: no saved LED matrix to restore") + return + + try: + print("Selecta: restoring LED matrix") + for y, row in enumerate(self.saved_matrix): + if y >= 8: + break + for x, col in enumerate(row): + if x >= 8: + break + r = int(col[0]) + g = int(col[1]) + b = int(col[2]) + self.sens.led_set_pixel(x, y, r, g, b) + except Exception as e: + print("Selecta: could not restore LED matrix:", e) + + # ------------------------- + # Activation + # ------------------------- + def activate(self): + if self.active: + return + self.save_matrix() + self.active = True + self.lr_ready = True + self.forward_ready = True + self.calibrate_forward_neutral() + self.publish_event("active", "Selecta aktiviert") + self.publish_state() + self.render(force=True) + print("Selecta aktiviert") + + def deactivate(self): + if not self.active: + return + self.active = False + self.publish_event("inactive", "Selecta deaktiviert") + self.publish_state() + self.restore_matrix() + print("Selecta deaktiviert") + + # ------------------------- + # Tilt input + # ------------------------- + def axis_value(self, axis_name): + return float(getattr(self.sens, axis_name, 0.0)) + + def read_forward_angle(self): + y = self.axis_value("y") + z = self.axis_value("z") + angle = math.degrees(math.atan2(-y, z)) + return -angle if FORWARD_INVERT else angle + + def calibrate_forward_neutral(self): + samples = [] + for _ in range(12): + self.sens.get_sensor_data() + samples.append(self.read_forward_angle()) + time.sleep(0.01) + sin_sum = sum(math.sin(math.radians(a)) for a in samples) + cos_sum = sum(math.cos(math.radians(a)) for a in samples) + self.forward_neutral_angle = math.degrees(math.atan2(sin_sum, cos_sum)) + print(f"Selecta forward neutral: {self.forward_neutral_angle:.1f}°") + + def select_delta(self, delta): + self.selected = (self.selected + delta) % len(BEVERAGES) + self.publish_state() + self.render(force=True) + + def dispense(self): + b = BEVERAGES[self.selected] + bid = b["id"] + + if self.stock[bid] <= 0: + self.publish_event("empty", f"{b['name']} ist leer", {"id": bid, "stock": 0}) + self._publish_current_warning_state() + return + + self.stock[bid] -= 1 + count = self.stock[bid] + self.save_stock() + + self.publish_event("dispense", f"Ausgabe: {b['name']} ({count} übrig)", {"id": bid, "stock": count}) + self.publish_state() + + if count <= WARNING_LIMIT: + self._publish_current_warning_state() + + self.render(force=True) + + def handle_tilt(self): + self.sens.get_sensor_data() + + lr = self.axis_value(LR_AXIS) + if LR_INVERT: + lr = -lr + + forward_angle = self.read_forward_angle() + forward_delta = angle_diff_deg(forward_angle, self.forward_neutral_angle) + + now = time.time() + + if DEBUG_IMU and now - self.last_debug > 0.35: + print(f"Selecta: lr={lr:+.2f} forward_delta={forward_delta:+.1f}°") + self.last_debug = now + + if abs(lr) < NEUTRAL_THRESHOLD: + self.lr_ready = True + if abs(forward_delta) < FORWARD_NEUTRAL_ANGLE: + self.forward_ready = True + + if now - self.last_action < ACTION_COOLDOWN: + return + + if self.lr_ready and lr < -TILT_THRESHOLD: + self.select_delta(-1) + self.lr_ready = False + self.last_action = now + return + + if self.lr_ready and lr > TILT_THRESHOLD: + self.select_delta(1) + self.lr_ready = False + self.last_action = now + return + + if self.forward_ready and forward_delta > FORWARD_ANGLE_THRESHOLD: + self.dispense() + self.forward_ready = False + self.last_action = now + + # ------------------------- + # LED rendering + # ------------------------- + def set_pixel_safe(self, x, y, rgb): + self.sens.led_set_pixel(x, y, clamp(rgb[0], 0, 255), clamp(rgb[1], 0, 255), clamp(rgb[2], 0, 255)) + + def render(self, force=False): + if not self.active: + return + + now = time.time() + if now - self.last_blink >= BLINK_INTERVAL: + self.blink_on = not self.blink_on + self.last_blink = now + force = True + + if not force and now - self.last_render < RENDER_INTERVAL: + return + + self.last_render = now + matrix = [[[0, 0, 0] for _ in range(8)] for _ in range(8)] + + for x, b in enumerate(BEVERAGES): + count = self.stock[b["id"]] + color = b["color"] + + for i in range(count): + y = 7 - i + matrix[y][x] = color[:] + + if x == self.selected: + if count > 0: + matrix[7][x] = [255, 255, 255] if self.blink_on else color[:] + else: + matrix[7][x] = [255, 0, 0] if self.blink_on else [0, 0, 0] + + for y in range(8): + for x in range(8): + self.set_pixel_safe(x, y, matrix[y][x]) + + # ------------------------- + # Called from main loop + # ------------------------- + def update(self): + if self._pending_activate: + self._pending_activate = False + self.activate() + + if self._pending_deactivate: + self._pending_deactivate = False + self.deactivate() + + if self.active: + self.handle_tilt() + self.render() diff --git a/src/selecta_stock.json b/src/selecta_stock.json new file mode 100644 index 0000000..8e876ff --- /dev/null +++ b/src/selecta_stock.json @@ -0,0 +1,10 @@ +{ + "redbull": 8, + "orangina": 8, + "cola": 7, + "sprite": 0, + "water": 4, + "fanta": 0, + "icetea": 8, + "mate": 8 +} \ No newline at end of file diff --git a/src/sens_hat.py b/src/sens_hat.py index f570ed4..ed0cf5f 100644 --- a/src/sens_hat.py +++ b/src/sens_hat.py @@ -66,6 +66,11 @@ class SENS_HAT: return pixel_data + def led_restore_matrix(self): + for y, row in enumerate(self.led_matrix): + for x, col in enumerate(row): + self.sense.set_pixel(x, y, col[0], col[1], col[2]) + def detect_long_press(self): """ Returns True wenn der Taster für 4 Sekunden gedrückt werde. @@ -79,9 +84,7 @@ class SENS_HAT: if event.action == "released": self.time_down = time.time() - for y, row in enumerate(self.led_matrix): - for x, col in enumerate(row): - self.sense.set_pixel(x, y, col[0], col[1], col[2]) + self.led_restore_matrix() if self.time_down + 0.5 < time.time():