update code
This commit is contained in:
parent
cb22f6fd5f
commit
5712bf54e3
BIN
src/__pycache__/gps_hat.cpython-313.pyc
Normal file
BIN
src/__pycache__/gps_hat.cpython-313.pyc
Normal file
Binary file not shown.
BIN
src/__pycache__/sens_hat.cpython-313.pyc
Normal file
BIN
src/__pycache__/sens_hat.cpython-313.pyc
Normal file
Binary file not shown.
BIN
src/__pycache__/sens_hat.cpython-314.pyc
Normal file
BIN
src/__pycache__/sens_hat.cpython-314.pyc
Normal file
Binary file not shown.
@ -1,5 +1,7 @@
|
||||
import serial
|
||||
import time
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
class GPS_DATA:
|
||||
breitengrad = ""
|
||||
@ -41,13 +43,82 @@ class GPS_DATA:
|
||||
else:
|
||||
print("Suche Satelliten...")
|
||||
|
||||
def get_sat_count(self):
|
||||
# self.ser.write(b'AT+CPSI?\r\n')
|
||||
self.ser.write(b'AT+CGPSGSV\r\n')
|
||||
time.sleep(0.1)
|
||||
result = self.ser.read_all().decode()
|
||||
print(result)
|
||||
def LTE_hat_encode_feedback(self, result_org):
|
||||
result_clean = []
|
||||
result_pars = result_org.strip().split("\n")
|
||||
|
||||
for result in result_pars:
|
||||
if result == "\r":
|
||||
continue
|
||||
|
||||
result_clean.append(result.strip())
|
||||
|
||||
return result_clean
|
||||
|
||||
def get_sim_status(self):
|
||||
ser.write(b'AT+CPIN?\r\n')
|
||||
time.sleep(0.5)
|
||||
result = self.ser.read_all().decode()
|
||||
result_clean = self.LTE_hat_encode_feedback(result)
|
||||
|
||||
# lockd: ['AT+CPIN?', '+CPIN: SIM PIN', 'OK']
|
||||
# unlock: ['AT+CPIN?', '+CPIN: READY', 'OK']
|
||||
match result_clean[1]:
|
||||
case "+CPIN: SIM PIN":
|
||||
return False
|
||||
case "+CPIN: READY":
|
||||
return True
|
||||
case _:
|
||||
print("Ein fehler ist aufgetreten. Sim Status konnte nicht abgefragt werden.")
|
||||
|
||||
|
||||
def enter_sim_pin(self):
|
||||
if self.get_sim_status():
|
||||
print("Sim bereits entsperrt.")
|
||||
return
|
||||
|
||||
self.ser.write(b'AT+CPIN="3082"\r\n')
|
||||
time.sleep(1)
|
||||
result = self.ser.read_all().decode()
|
||||
result_clean = self.LTE_hat_encode_feedback(result)
|
||||
# ['AT+CPIN="3082"', 'OK', '+CPIN: READY', 'SMS DONE']
|
||||
# print(result_clean)
|
||||
if result_clean[2] == "+CPIN: READY":
|
||||
print("Sim entsperrt.")
|
||||
|
||||
def LTE_hat_disable(self):
|
||||
self.ser.write(b'AT+CFUN=0\r\n')
|
||||
|
||||
def LTE_hat_enabel(self):
|
||||
self.ser.write(b'AT+CFUN=1\r\n')
|
||||
|
||||
def LTE_hat_start(self):
|
||||
print("Starte LTE Verbindung (Swisscom)...")
|
||||
try:
|
||||
subprocess.run(["pon", "swisscom"], check=True)
|
||||
|
||||
print("Warte auf IP-Zuweisung...")
|
||||
for _ in range(10):
|
||||
time.sleep(2)
|
||||
if os.path.exists("/sys/class/net/ppp0"):
|
||||
print("LTE erfolgreich gestartet. Interface ppp0 ist aktiv.")
|
||||
return True
|
||||
|
||||
print("Fehler: Verbindung konnte nicht in der vorgegebenen Zeit aufgebaut werden.")
|
||||
return False
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Fehler beim Starten von pon: {e}")
|
||||
return False
|
||||
|
||||
def LTE_hat_stop(self):
|
||||
print("Beende LTE Verbindung...")
|
||||
try:
|
||||
subprocess.run(["poff", "swisscom"], check=True)
|
||||
print("LTE Verbindung getrennt.")
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Fehler beim Stoppen von poff: {e}")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
gps = GPS_DATA("/dev/ttyUSB0")
|
||||
|
||||
43
src/main.py
43
src/main.py
@ -32,8 +32,9 @@ def encode_led_set_payload(payload):
|
||||
x = int(data["pos"][0])
|
||||
y = int(data["pos"][1])
|
||||
r = int(data["rgb"][0])
|
||||
b = int(data["rgb"][1])
|
||||
g = int(data["rgb"][2])
|
||||
g = int(data["rgb"][1])
|
||||
b = int(data["rgb"][2])
|
||||
print("set Pixel: ", end="")
|
||||
print(data)
|
||||
sens.led_set_pixel(x, y, r, g, b)
|
||||
except json.JSONDecodeError:
|
||||
@ -43,13 +44,34 @@ def encode_led_set_payload(payload):
|
||||
except ValueError:
|
||||
print("Werte müssen int sein:\n" + payload)
|
||||
|
||||
def encode_led_set_matrix_payload(payload):
|
||||
try:
|
||||
data = json.loads(payload)
|
||||
matrix = data["matrix"]
|
||||
print("set matrix: ")
|
||||
# print(matrix)
|
||||
|
||||
for y, row in enumerate(matrix):
|
||||
for x, col in enumerate(row):
|
||||
sens.led_set_pixel(x, y, col[0], col[1], col[2])
|
||||
except json.JSONDecodeError:
|
||||
print("Ungültiges JSON formatt:\n" + payload)
|
||||
except KeyError:
|
||||
print("Ungültige Keys:\n" + payload)
|
||||
except ValueError:
|
||||
print("Werte müssen int sein:\n" + payload)
|
||||
|
||||
def encode_led_get_payload(payload):
|
||||
try:
|
||||
data = json.loads(payload)
|
||||
x = int(data["pos"][0])
|
||||
y = int(data["pos"][1])
|
||||
print("get Pixel: ", end="")
|
||||
print(data)
|
||||
# sens.led_set_pixel(x, y)
|
||||
color = sens.led_get_pixel(x, y)
|
||||
pixel_data = [{"pos": (x, y), "rgb": color}]
|
||||
# print(pixel_data)
|
||||
client.publish("MOBKOM/LED/state", json.dumps(pixel_data))
|
||||
except json.JSONDecodeError:
|
||||
print("Ungültiges JSON formatt:\n" + payload)
|
||||
except KeyError:
|
||||
@ -70,6 +92,11 @@ def on_message(client, userdata, message):
|
||||
encode_led_set_payload(payload)
|
||||
case "MOBKOM/LED/get_pixel":
|
||||
encode_led_get_payload(payload)
|
||||
case "MOBKOM/LED/set_matrix":
|
||||
encode_led_set_matrix_payload(payload)
|
||||
case "MOBKOM/LED/get_matrix":
|
||||
client.publish("MOBKOM/LED/state_matrix", json.dumps({"matrix": sens.led_matrix.tolist()}))
|
||||
pass
|
||||
case _:
|
||||
print(f"Eingehend auf {topic}: {payload}")
|
||||
|
||||
@ -84,7 +111,8 @@ def on_connect(client, userdata, flags, rc):
|
||||
client.subscribe("MOBKOM/LED/set_pixel")
|
||||
client.subscribe("MOBKOM/LED/get_pixel")
|
||||
client.subscribe("MOBKOM/LED/clear")
|
||||
client.on_message = on_message
|
||||
client.subscribe("MOBKOM/LED/set_matrix")
|
||||
client.subscribe("MOBKOM/LED/get_matrix")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# gps = GPS_DATA("/dev/ttyUSB0")
|
||||
@ -92,11 +120,12 @@ if __name__ == "__main__":
|
||||
sens = SENS_HAT()
|
||||
|
||||
client = mqtt.Client()
|
||||
client.on_connect = on_connect
|
||||
client.on_message = on_message
|
||||
client.connect("localhost", 1883)
|
||||
|
||||
client.on_connect = on_connect
|
||||
|
||||
update_timer = 0
|
||||
update_timer = 0 # for mqtt update send
|
||||
|
||||
try:
|
||||
while True:
|
||||
@ -105,7 +134,7 @@ if __name__ == "__main__":
|
||||
update_timer = time.time()
|
||||
|
||||
client.loop()
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
except KeyboardInterrupt:
|
||||
|
||||
@ -14,7 +14,7 @@ class SENS_HAT:
|
||||
y = 0
|
||||
z = 0
|
||||
|
||||
led_matrix = np.zeros((7, 7, 3), dtype=np.uint8)
|
||||
led_matrix = np.zeros((8, 8, 3), dtype=np.uint8)
|
||||
|
||||
def __init__(self):
|
||||
self.sense = SenseHat()
|
||||
@ -47,9 +47,22 @@ class SENS_HAT:
|
||||
return
|
||||
|
||||
self.sense.set_pixel(x, y, r, g, b)
|
||||
self.led_matrix[y][x] = (r, g, b)
|
||||
|
||||
def led_get_pixel(self, x, y):
|
||||
if not (0 <= x <= 7) or not (0 <= y <= 7):
|
||||
print("x und y müssen zwischen 0 und 7 sein!")
|
||||
return
|
||||
return self.led_matrix[x][y]
|
||||
|
||||
# umwandlung in ein normales python array da json keine numpy array mag.
|
||||
pixel_data = []
|
||||
for color in self.led_matrix[x][y]:
|
||||
pixel_data.append(int(color))
|
||||
|
||||
return pixel_data
|
||||
|
||||
if __name__ == "__main__":
|
||||
sens = SENS_HAT()
|
||||
|
||||
print(sens.led_matrix)
|
||||
print(sens.led_matrix.tolist())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user