From b93e1a7667c84b72110cd1b6a53cb44a124b8b08 Mon Sep 17 00:00:00 2001 From: Marco Schmid Date: Tue, 21 Apr 2026 22:00:56 +0200 Subject: [PATCH] Task 9: multithreading, time-Dekorator --- main.py | 78 ++++++++++++++++++++++++++++++++----------------------- worker.py | 24 +++++++++++++++++ 2 files changed, 69 insertions(+), 33 deletions(-) create mode 100644 worker.py diff --git a/main.py b/main.py index 5df8d3c..bd65fa5 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,11 @@ import logging from pathlib import Path from utils import store_to_disk -from overpass import fetch_overpass +from worker import fetch_fragment +from multiprocessing import Pool from queries.bergbahn import BERGBAHN_QUERY from queries.restaurant import RESTAURANT_QUERY -import requests + # --------------------------------------------------------------------------- @@ -61,32 +62,34 @@ QUERY = {"bergbahn": BERGBAHN_QUERY} # --------------------------------------------------------------------------- def main() -> None: - overall = [] - errors = [] query_name = list(QUERY.keys())[0] + query = QUERY[query_name] - for name, bbox in BBOXEN.items(): - logger.info(f"Starte Abfrage für Query: {query_name}, '{name}' mit bbox={bbox}") - try: - result = fetch_overpass(overpass_query=QUERY.get(query_name,""), bbox=bbox) - except RuntimeError as e: + # Argumente für jeden Worker vorbereiten + tasks = [ + (name, bbox, query) + for name, bbox in BBOXEN.items() + ] + + logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...") + + # blockiert bis alle Prozesse fertig sind + with Pool(processes=len(tasks)) as pool: + results = pool.map(fetch_fragment, tasks) + + # Ergebnisse zusammenführen + overall = [] + errors = [] + for name, elements in results: + if elements: + overall.extend(elements) + else: errors.append(name) - logger.error(f"API-Fehler bei '{name}': {e}") - continue - except requests.Timeout: - errors.append(name) - logger.error(f"Timeout bei '{name}' — bbox zu gross oder Server überlastet") - continue - elements = result.get("elements", []) - logger.info(f"'{name}': {len(elements)} Elemente gefunden") - overall.extend(elements) - - logger.info(f"Total: {len(overall)} Elemente gefunden") + logger.info(f"Total:{len(overall)} Elemente gefunden") if errors: - logger.info(f"Fehler in Fragmenten: {errors}") + logger.warning(f"Fehler in Fragmenten: {errors}") - # Ergebnisse speichern try: saved_path = store_to_disk( results=overall, @@ -105,18 +108,27 @@ if __name__ == "__main__": # Was ist passiert? - # * Wir haben einen ersten Test im Modul 'tests' geschrieben - # * Der Kern von fetch_overpass() ist ein HTTP-Request — ohne Mocks lässt sich die Funktion selbst kaum sinnvoll testen, - # weil jeder Test auf die echte API angewiesen wäre (langsam, flaky, Netzwerkabhängig). - # -> deshalb war nun der erste Test (zum eigentlich weniger wichtigen) 'store_to_disk' ...! + # * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt + + # Erkenntnisse: + # - Warum print() im Worker statt logging? + # python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt → + # logger.info() im Worker-Prozess schweigt einfach + # print() funktioniert immer, ist aber nicht ideal für Produktion + # - Warum muss fetch_fragment auf Modul-Ebene stehen? + # python# multiprocessing serialisiert Funktionen mit pickle + # Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError + # Modul-Ebene = pickle-bar + # - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er + # ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an + # den neuen Prozess: + # Pool.map(fetch_fragment, tasks) + # → pickle(fetch_fragment) ──► unpickle(fetch_fragment) # TASK: - # * Wir arbeiten nun alle 4,9, 16 Sequenzen hintereinander seriell ab. Wir könnten versuchen den ganzen Prozess zu - # beschleunigen und ihn parallel auszuführen... - # * Es gibt je nach Problem verschiedene Möglichkeiten unseren Code zu parallelisieren -> beide haben Vor- und Nachteile! - # - Multithreating - # - Multiprocessing - # -> wir beginnen mit dem Multiprocessing-Ansatz (multiprocessing). Dazu bauen wir wieder ein neues model namens 'worker' - # wo wir unseren Code für die Parallelisierung ablegen. \ No newline at end of file + # * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul) + # * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion) + # * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf + # und vergleichen die benötigte Zeit \ No newline at end of file diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..7d3ba10 --- /dev/null +++ b/worker.py @@ -0,0 +1,24 @@ +import requests +from overpass import fetch_overpass + + +# --------------------------------------------------------------------------- +# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!) +# --------------------------------------------------------------------------- +def fetch_fragment(args: tuple) -> tuple[str, list]: + """ + Führt einen einzelnen Overpass-Request aus. + Gibt (fragment_name, elements) zurück — oder (fragment_name, []) bei Fehler. + + Wird von Pool.map() in einem separaten Prozess ausgeführt. + Logging funktioniert hier nicht zuverlässig → print() als Fallback. + """ + name, bbox, query = args + try: + result = fetch_overpass(overpass_query=query, bbox=bbox) + elements = result.get("elements", []) + print(f"[{name}] {len(elements)} Elemente gefunden") + return name, elements + except (RuntimeError, requests.Timeout) as e: + print(f"[{name}] Fehler: {e}") + return name, [] \ No newline at end of file