From 28a1f54abf1951a6f092839c3c75f4eda36dd78f Mon Sep 17 00:00:00 2001 From: Marco Schmid Date: Tue, 21 Apr 2026 22:24:56 +0200 Subject: [PATCH] Task 10: sqlite-db --- config.py | 38 ++++++++++++++++ main.py | 127 ++++++++++++++++++------------------------------------ utils.py | 30 ++++++++++++- worker.py | 85 ++++++++++++++++++++++++++++++++++-- 4 files changed, 192 insertions(+), 88 deletions(-) create mode 100644 config.py diff --git a/config.py b/config.py new file mode 100644 index 0000000..2fdfd5a --- /dev/null +++ b/config.py @@ -0,0 +1,38 @@ +from pathlib import Path +from queries.bergbahn import BERGBAHN_QUERY +from queries.restaurant import RESTAURANT_QUERY + + +# --------------------------------------------------------------------------- +# Konfiguration +# --------------------------------------------------------------------------- + +OUTPUT_DIR = Path("results") + +BBOXEN = { + "SW": (45.8, 5.9, 46.8, 8.2), + "SO": (45.8, 8.2, 46.8, 10.5), + "NW": (46.8, 5.9, 47.8, 8.2), + "NO": (46.8, 8.2, 47.8, 10.5) +} + +# BBOXEN = { +# 1: (45.8, 5.9, 46.4667, 7.4333), +# 2: (45.8, 7.4333, 46.4667, 8.9667), +# 3: (45.8, 8.9667, 46.4667, 10.5), +# 4: (46.4667, 5.9, 47.1333, 7.4333), +# 5: (46.4667, 7.4333, 47.1333, 8.9667), +# 6: (46.4667, 8.9667, 47.1333, 10.5), +# 7: (47.1333, 5.9, 47.8, 7.4333), +# 8: (47.1333, 7.4333, 47.8, 8.9667), +# 9: (47.1333, 8.9667, 47.8, 10.5) +# } + +# BBOXEN = { +# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5), +# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5), +# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5), +# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5) +# } + +QUERY = {"bergbahn": BERGBAHN_QUERY} \ No newline at end of file diff --git a/main.py b/main.py index bd65fa5..94ffc88 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,8 @@ import logging -from pathlib import Path from utils import store_to_disk -from worker import fetch_fragment -from multiprocessing import Pool -from queries.bergbahn import BERGBAHN_QUERY -from queries.restaurant import RESTAURANT_QUERY +from config import QUERY, OUTPUT_DIR +from worker import run_seriell, run_threads, run_parallel # --------------------------------------------------------------------------- @@ -22,84 +19,32 @@ logging.basicConfig( logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# Konfiguration -# --------------------------------------------------------------------------- - -OUTPUT_DIR = Path("results") - -BBOXEN = { - "SW": (45.8, 5.9, 46.8, 8.2), - "SO": (45.8, 8.2, 46.8, 10.5), - "NW": (46.8, 5.9, 47.8, 8.2), - "NO": (46.8, 8.2, 47.8, 10.5) -} - -# BBOXEN = { -# 1: (45.8, 5.9, 46.4667, 7.4333), -# 2: (45.8, 7.4333, 46.4667, 8.9667), -# 3: (45.8, 8.9667, 46.4667, 10.5), -# 4: (46.4667, 5.9, 47.1333, 7.4333), -# 5: (46.4667, 7.4333, 47.1333, 8.9667), -# 6: (46.4667, 8.9667, 47.1333, 10.5), -# 7: (47.1333, 5.9, 47.8, 7.4333), -# 8: (47.1333, 7.4333, 47.8, 8.9667), -# 9: (47.1333, 8.9667, 47.8, 10.5) -# } - -# BBOXEN = { -# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5), -# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5), -# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5), -# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5) -# } - -QUERY = {"bergbahn": BERGBAHN_QUERY} - - # --------------------------------------------------------------------------- # Hauptlogik # --------------------------------------------------------------------------- def main() -> None: query_name = list(QUERY.keys())[0] - query = QUERY[query_name] + query = QUERY[query_name] - # Argumente für jeden Worker vorbereiten - tasks = [ - (name, bbox, query) - for name, bbox in BBOXEN.items() - ] + logger.info("=== Seriell ===") + overall_s = run_seriell(query) - logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...") + logger.info("=== Multiprocessing ===") + overall_p = run_parallel(query) - # blockiert bis alle Prozesse fertig sind - with Pool(processes=len(tasks)) as pool: - results = pool.map(fetch_fragment, tasks) - - # Ergebnisse zusammenführen - overall = [] - errors = [] - for name, elements in results: - if elements: - overall.extend(elements) - else: - errors.append(name) - - logger.info(f"Total:{len(overall)} Elemente gefunden") - if errors: - logger.warning(f"Fehler in Fragmenten: {errors}") + logger.info("=== ThreadPoolExecutor ===") + overall_t = run_threads(query) try: saved_path = store_to_disk( - results=overall, + results=overall_p, poi_type=query_name, output_dir=OUTPUT_DIR, ) logger.info(f"Ergebnisse gespeichert: {saved_path}") except OSError as e: - logger.error(f"Fehler beim Speichern: {e}") - + logger.error(f"Fehler beim Speichern:{e}") logger.info("Fertig.") @@ -108,27 +53,41 @@ if __name__ == "__main__": # Was ist passiert? - # * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt + # * Wir haben zusätzlich Multithreating-Code implementiert + # * Die Dekorator-Funktion in utils.py (timer) stoppt und logt die Zeit der dekorierten Funktionen, ohne deren Code + # zu verändern. Das ermöglicht uns einen einfachen Zeitvergleich zwischen den einzelnen Funktionen + # * Auslagerung von BBOXEN, OUTPUT_DIR, QUERY nach config.py, weil sie sowohl in main.py als auch in worker.py gebraucht + # werden. Wären sie in main.poy verblieben, hätten wir Probleme mit einem circular-Import bekommen... # Erkenntnisse: - # - Warum print() im Worker statt logging? - # python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt → - # logger.info() im Worker-Prozess schweigt einfach - # print() funktioniert immer, ist aber nicht ideal für Produktion - # - Warum muss fetch_fragment auf Modul-Ebene stehen? - # python# multiprocessing serialisiert Funktionen mit pickle - # Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError - # Modul-Ebene = pickle-bar - # - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er - # ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an - # den neuen Prozess: - # Pool.map(fetch_fragment, tasks) - # → pickle(fetch_fragment) ──► unpickle(fetch_fragment) + + # Programmfluss: + # main() — läuft immer sequenziell + # │ + # ├── run_seriell() + # │ ├── fetch SW ──► wartet + # │ ├── fetch SO ──► wartet + # │ ├── fetch NW ──► wartet + # │ └── fetch NO ──► wartet → return → main() macht weiter + # │ + # ├── run_parallel() + # │ ├── fetch SW ─┐ + # │ ├── fetch SO ├─ gleichzeitig + # │ ├── fetch NW │ in Prozessen + # │ └── fetch NO ─┘ + # │ Pool.map() blockiert bis ALLE fertig → return → main() macht weiter + # │ + # ├── run_threads() + # │ ├── fetch SW ─┐ + # │ ├── fetch SO ├─ gleichzeitig + # │ ├── fetch NW │ in Threads + # │ └── fetch NO ─┘ + # │ as_completed() blockiert bis ALLE fertig → return → main() macht weiter + # │ + # └── store_to_disk() ← erst hier, garantiert # TASK: - # * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul) - # * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion) - # * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf - # und vergleichen die benötigte Zeit \ No newline at end of file + # * Bis jetzt speichern wir die Resultate als .json-File auf unserer Festplatte. Als nächstes wollen wir + # die Resultate in einer sqlite-Datenbank ablegen \ No newline at end of file diff --git a/utils.py b/utils.py index 249316f..09894b2 100644 --- a/utils.py +++ b/utils.py @@ -1,5 +1,11 @@ import json +import time +import logging from pathlib import Path +from functools import wraps + + +logger = logging.getLogger(__name__) def store_to_disk( @@ -34,4 +40,26 @@ def store_to_disk( with output_path.open("w", encoding="utf-8") as f: json.dump(results, f, indent=2, ensure_ascii=False) - return output_path.resolve() \ No newline at end of file + return output_path.resolve() + + +def timer(func): + """ + Decorator der die Ausführungszeit einer Funktion misst und loggt. + + Verwendung: + @timer + def meine_funktion(): + ... + """ + @wraps(func) # erhält __name__, __doc__ der originalen Funktion + def wrapper(*args, **kwargs): + start = time.perf_counter() + result = func(*args, **kwargs) + elapsed = time.perf_counter() - start + logger.info(f"{func.__name__}() dauerte {round(elapsed, 2)} Sekunden\n\n") + return result + return wrapper + +# @wraps(func) — ohne diesen Decorator würde run_seriell.__name__ den Namen "wrapper" zurückgeben statt +# "run_seriell", was den Log-Output unbrauchbar macht \ No newline at end of file diff --git a/worker.py b/worker.py index 7d3ba10..6175e5b 100644 --- a/worker.py +++ b/worker.py @@ -1,6 +1,14 @@ -import requests -from overpass import fetch_overpass +import logging +from multiprocessing import Pool +from concurrent.futures import ThreadPoolExecutor, as_completed +import requests + +from config import BBOXEN +from overpass import fetch_overpass +from utils import timer + +logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!) @@ -21,4 +29,75 @@ def fetch_fragment(args: tuple) -> tuple[str, list]: return name, elements except (RuntimeError, requests.Timeout) as e: print(f"[{name}] Fehler: {e}") - return name, [] \ No newline at end of file + return name, [] + + +# --------------------------------------------------------------------------- +# Serielle Variante +# --------------------------------------------------------------------------- +@timer +def run_seriell(query: str) -> list: + overall = [] + for name, bbox in BBOXEN.items(): + logger.info(f"Seriell — Fragment {name}' ...") + try: + result = fetch_overpass(overpass_query=query, bbox=bbox) + elements = result.get("elements", []) + logger.info(f"'{name}': {len(elements)} Elemente") + overall.extend(elements) + except (RuntimeError, requests.Timeout) as e: + logger.error(f"Fehler bei '{name}': {e}") + return overall + + +# --------------------------------------------------------------------------- +# Parallele Variante mit Multiprocessing +# --------------------------------------------------------------------------- +@timer +def run_parallel(query: str) -> list: + tasks = [(name, bbox, query) for name, bbox in BBOXEN.items()] + with Pool(processes=len(tasks)) as pool: + results = pool.map(fetch_fragment, tasks) + overall = [] + for name, elements in results: + if elements: + overall.extend(elements) + else: + logger.warning(f"Keine Elemente für Fragment '{name}'") + return overall + + + +# --------------------------------------------------------------------------- +# Parallele Variante mit ThreadPoolExecutor +# --------------------------------------------------------------------------- + +@timer +def run_threads(query: str) -> list: + overall = [] + errors = [] + + # ThreadPoolExecutor: kein Pickle-Zwang → logging funktioniert direkt! + with ThreadPoolExecutor(max_workers=len(BBOXEN)) as executor: + + # Alle Tasks auf einmal einreichen → Future-Objekte zurück + futures = { + executor.submit(fetch_overpass, query, bbox): name + for name, bbox in BBOXEN.items() + } + + # as_completed() liefert Futures in der Reihenfolge, in der sie fertig werden + for future in as_completed(futures): + name = futures[future] + try: + result = future.result() + elements = result.get("elements", []) + logger.info(f"[Thread] '{name}': {len(elements)} Elemente") + overall.extend(elements) + except (RuntimeError, requests.Timeout) as e: + logger.error(f"[Thread] Fehler bei '{name}': {e}") + errors.append(name) + + if errors: + logger.warning(f"Fehler in Fragmenten: {errors}") + return overall \ No newline at end of file