Task 9: multithreading, time-Dekorator

This commit is contained in:
Marco Schmid 2026-04-21 22:00:56 +02:00
parent 76482aca50
commit cc365a98b4
2 changed files with 69 additions and 33 deletions

78
main.py
View File

@ -1,10 +1,11 @@
import logging import logging
from pathlib import Path from pathlib import Path
from utils import store_to_disk from utils import store_to_disk
from overpass import fetch_overpass from worker import fetch_fragment
from multiprocessing import Pool
from queries.bergbahn import BERGBAHN_QUERY from queries.bergbahn import BERGBAHN_QUERY
from queries.restaurant import RESTAURANT_QUERY from queries.restaurant import RESTAURANT_QUERY
import requests
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -61,32 +62,34 @@ QUERY = {"bergbahn": BERGBAHN_QUERY}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def main() -> None: def main() -> None:
overall = []
errors = []
query_name = list(QUERY.keys())[0] query_name = list(QUERY.keys())[0]
query = QUERY[query_name]
for name, bbox in BBOXEN.items(): # Argumente für jeden Worker vorbereiten
logger.info(f"Starte Abfrage für Query: {query_name}, '{name}' mit bbox={bbox}") tasks = [
try: (name, bbox, query)
result = fetch_overpass(overpass_query=QUERY.get(query_name,""), bbox=bbox) for name, bbox in BBOXEN.items()
except RuntimeError as e: ]
logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...")
# blockiert bis alle Prozesse fertig sind
with Pool(processes=len(tasks)) as pool:
results = pool.map(fetch_fragment, tasks)
# Ergebnisse zusammenführen
overall = []
errors = []
for name, elements in results:
if elements:
overall.extend(elements)
else:
errors.append(name) errors.append(name)
logger.error(f"API-Fehler bei '{name}': {e}")
continue
except requests.Timeout:
errors.append(name)
logger.error(f"Timeout bei '{name}' — bbox zu gross oder Server überlastet")
continue
elements = result.get("elements", []) logger.info(f"Total:{len(overall)} Elemente gefunden")
logger.info(f"'{name}': {len(elements)} Elemente gefunden")
overall.extend(elements)
logger.info(f"Total: {len(overall)} Elemente gefunden")
if errors: if errors:
logger.info(f"Fehler in Fragmenten: {errors}") logger.warning(f"Fehler in Fragmenten: {errors}")
# Ergebnisse speichern
try: try:
saved_path = store_to_disk( saved_path = store_to_disk(
results=overall, results=overall,
@ -105,18 +108,27 @@ if __name__ == "__main__":
# Was ist passiert? # Was ist passiert?
# * Wir haben einen ersten Test im Modul 'tests' geschrieben # * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt
# * Der Kern von fetch_overpass() ist ein HTTP-Request — ohne Mocks lässt sich die Funktion selbst kaum sinnvoll testen,
# weil jeder Test auf die echte API angewiesen wäre (langsam, flaky, Netzwerkabhängig). # Erkenntnisse:
# -> deshalb war nun der erste Test (zum eigentlich weniger wichtigen) 'store_to_disk' ...! # - Warum print() im Worker statt logging?
# python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt →
# logger.info() im Worker-Prozess schweigt einfach
# print() funktioniert immer, ist aber nicht ideal für Produktion
# - Warum muss fetch_fragment auf Modul-Ebene stehen?
# python# multiprocessing serialisiert Funktionen mit pickle
# Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError
# Modul-Ebene = pickle-bar
# - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er
# ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an
# den neuen Prozess:
# Pool.map(fetch_fragment, tasks)
# → pickle(fetch_fragment) ──► unpickle(fetch_fragment)
# TASK: # TASK:
# * Wir arbeiten nun alle 4,9, 16 Sequenzen hintereinander seriell ab. Wir könnten versuchen den ganzen Prozess zu # * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul)
# beschleunigen und ihn parallel auszuführen... # * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion)
# * Es gibt je nach Problem verschiedene Möglichkeiten unseren Code zu parallelisieren -> beide haben Vor- und Nachteile! # * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf
# - Multithreating # und vergleichen die benötigte Zeit
# - Multiprocessing
# -> wir beginnen mit dem Multiprocessing-Ansatz (multiprocessing). Dazu bauen wir wieder ein neues model namens 'worker'
# wo wir unseren Code für die Parallelisierung ablegen.

24
worker.py Normal file
View File

@ -0,0 +1,24 @@
import requests
from overpass import fetch_overpass
# ---------------------------------------------------------------------------
# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!)
# ---------------------------------------------------------------------------
def fetch_fragment(args: tuple) -> tuple[str, list]:
"""
Führt einen einzelnen Overpass-Request aus.
Gibt (fragment_name, elements) zurück oder (fragment_name, []) bei Fehler.
Wird von Pool.map() in einem separaten Prozess ausgeführt.
Logging funktioniert hier nicht zuverlässig print() als Fallback.
"""
name, bbox, query = args
try:
result = fetch_overpass(overpass_query=query, bbox=bbox)
elements = result.get("elements", [])
print(f"[{name}] {len(elements)} Elemente gefunden")
return name, elements
except (RuntimeError, requests.Timeout) as e:
print(f"[{name}] Fehler: {e}")
return name, []