From 6f5867f0853024a41d7e98ac29e566238bb90875 Mon Sep 17 00:00:00 2001 From: Marco Schmid Date: Tue, 12 May 2026 18:20:24 +0200 Subject: [PATCH] Task_16: Lock und Timer --- TASK.md | 97 ++++++++++++++++++++++------------------ src/overpass/config.yaml | 13 +++++- src/overpass/main.py | 8 ++-- src/overpass/pipeline.py | 46 ++++++++++++++----- 4 files changed, 105 insertions(+), 59 deletions(-) diff --git a/TASK.md b/TASK.md index 3cfa883..c047d5d 100644 --- a/TASK.md +++ b/TASK.md @@ -1,57 +1,66 @@ -# Task 15 — Concurrency mit ThreadPoolExecutor +# Task 16 — Lock und @timer-Decorator -## Rückblick Task 14: pipeline.py +## Rückblick Task 15: Concurrency -Ihr habt die Fetch-und-Store-Logik in eine eigene Datei `pipeline.py` ausgelagert. -Die wichtigsten Punkte: +Ihr habt `ThreadPoolExecutor` und `as_completed` eingeführt. Die wichtigsten Punkte: -- **Single Responsibility Principle:** `main()` hat jetzt genau eine Aufgabe — - Konfiguration lesen und Ausführung orchestrieren. `fetch_and_store()` kümmert - sich um Fetching und Speichern, `_fetch_bbox()` um eine einzelne Bbox. - Jede Funktion hat genau eine Verantwortung. -- **`Storage` als Parameter:** `fetch_and_store()` instanziiert Storage nicht - selbst — sie bekommt ein fertiges Objekt übergeben. Das nennt sich - *Dependency Injection*: die Abhängigkeit wird von aussen hereingegeben, - nicht intern erzeugt. Das macht die Funktion unabhängig vom konkreten - Backend und leichter testbar. -- **`StorageError` vs. `ValueError`:** `StorageError` signalisiert einen - Laufzeitfehler *im Betrieb* (Schreibfehler, DB-Verbindung weg) und wird - in `fetch_and_store()` behandelt. `ValueError` signalisiert einen - Konfigurationsfehler — falscher `type`-Wert in `config.yaml` — und soll - das Programm sofort zum Absturz bringen (*fail fast*). Beides auf - `StorageError` zu mappen wäre falsch: ein `except StorageError` würde - sonst auch Konfigurationsfehler stillschweigend schlucken. +- **I/O-bound vs. CPU-bound:** Overpass-Requests sind I/O-bound — die CPU wartet + auf die Netzwerkantwort. Threads sind dafür ideal, weil Python während des + Wartens (I/O) den GIL freigibt und andere Threads laufen lässt. Bei CPU-bound + Tasks (z.B. Bildverarbeitung, ML-Training) hilft Threading nicht — + dort braucht man `multiprocessing`. +- **`executor.map()` vs. `as_completed()`:** `map()` ist einfacher, liefert + Ergebnisse aber in der Reihenfolge der Inputs — auch wenn spätere Futures + früher fertig sind. `as_completed()` liefert Ergebnisse sobald sie fertig + sind, was bei unterschiedlichen Antwortzeiten effizienter ist und + pro Future individuelles Error-Handling erlaubt. +- **`all_pois.extend()` aus mehreren Threads:** In Python ist `list.extend()` + durch den GIL (Global Interpreter Lock) de facto atomar für einfache + Operationen — ein echter Race Condition-Crash ist unwahrscheinlich. Aber: + die **Reihenfolge** der Ergebnisse ist nicht deterministisch, und bei + komplexeren Operationen (read-modify-write) wäre ein Lock nötig. ## Aufgabe -Aktuell werden alle Bboxen **seriell** abgearbeitet — eine nach der anderen. -Da jeder Request auf die Overpass-API wartet (I/O-bound), liegt die CPU -die meiste Zeit idle. Mit Parallelisierung lassen sich die Requests -gleichzeitig abschicken und die Gesamtlaufzeit deutlich reduzieren. +Zwei Erweiterungen stehen an — eine zur Illustration von Thread-Safety, +eine zur Laufzeitmessung. -**Konkret:** +**Teil A — `FetchMode.CONCURRENT_LOCKED`:** -1. Füge in `pipeline.py` eine `FetchMode`-Enum hinzu: +Der bisherige `CONCURRENT`-Modus sammelt Ergebnisse ohne explizite +Synchronisation. Füge einen dritten Modus hinzu, der zeigt, wie man +`all_pois.extend()` mit einem `Lock` absichert. + +1. Ergänze `FetchMode` um `CONCURRENT_LOCKED = "concurrent_locked"`. +2. Implementiere den neuen Modus in `fetch_and_store()` analog zu + `CONCURRENT`, aber mit einem `threading.Lock`: ```python - class FetchMode(StrEnum): - SERIAL = "serial" - CONCURRENT = "concurrent" + with lock: + all_pois.extend(future.result()) ``` -2. Erweitere `fetch_and_store()` um einen Parameter `fetch_mode: FetchMode` - und einen `max_workers: int = 4`. -3. Implementiere `FetchMode.CONCURRENT` mit `ThreadPoolExecutor` und - `as_completed` — die Futures sollen analog zur seriellen Variante - Fehler pro Bbox loggen und die Ergebnisse in `all_pois` sammeln. - **HINT:** Du kannst als Vorlage das Code-Beispiel aus den Unterrichtsfolien nehmen (CodeWars). Es braucht nur ganz punktuelle - Anpassungen. Überlege, was die aufzurufende Funktion ist und was für Parameter sie benötigt. -4. Verwende ein `match`-Statement für die beiden Modi. -5. Ergänze `fetch_mode` in `config.yaml` und lese ihn in `main.py` ein. +3. Ergänze `config.yaml` — setze `fetch_mode: concurrent_locked`. + +**Teil B — `@timer`-Decorator:** + +1. Lege eine neue Datei `utils.py` an. +2. Schreibe darin einen `@timer`-Decorator, der die Laufzeit einer + Funktion misst und per `logger.info()` ausgibt. + + ```python + def timer(func): + @wraps(func) + def wrapper(*args, **kwargs): + start = time.perf_counter() + result = func(*args, **kwargs) + elapsed = time.perf_counter() - start + logger.info(f"[timer] {func.__name__} → {elapsed:.2f}s") + return result + return wrapper + ``` + +3. Dekoriere `main()` in `main.py` mit `@timer`. **Fragen zum Nachdenken:** -- Was ist der Unterschied zwischen I/O-bound und CPU-bound — und warum - eignen sich Threads für I/O-bound Tasks, aber nicht für CPU-bound? -- Was ist der Unterschied zwischen `executor.map()` und - `as_completed()` — wann ist welches besser geeignet? -- Was passiert, wenn zwei Threads gleichzeitig `all_pois.extend()` aufrufen - — ist das in Python sicher? Warum (nicht)? \ No newline at end of file +- `list.extend()` ist in CPython durch den GIL geschützt — warum + empfiehlt es sich trotzdem, einen Lock zu verwenden? \ No newline at end of file diff --git a/src/overpass/config.yaml b/src/overpass/config.yaml index 931eb17..e53e011 100644 --- a/src/overpass/config.yaml +++ b/src/overpass/config.yaml @@ -1,10 +1,19 @@ +fetch_mode: concurrent # serial | concurrent + overpass: timeout: 25 maxsize: 5000000 bboxen: - davos: [46.72, 9.70, 46.92, 10.00] - schweiz: [45.8, 5.9, 47.8, 10.5] + "1": [45.8, 5.9, 46.4667, 7.4333] + "2": [45.8, 7.4333, 46.4667, 8.9667] + "3": [45.8, 8.9667, 46.4667, 10.5] + "4": [46.4667, 5.9, 47.1333, 7.4333] + "5": [46.4667, 7.4333, 47.1333, 8.9667] + "6": [46.4667, 8.9667, 47.1333, 10.5] + "7": [47.1333, 5.9, 47.8, 7.4333] + "8": [47.1333, 7.4333, 47.8, 8.9667] + "9": [47.1333, 8.9667, 47.8, 10.5] active_queries: - bergbahn diff --git a/src/overpass/main.py b/src/overpass/main.py index fdfbf54..4dc1568 100644 --- a/src/overpass/main.py +++ b/src/overpass/main.py @@ -3,7 +3,7 @@ import logging from pathlib import Path from .models import PoiType -from .pipeline import fetch_and_store +from .pipeline import fetch_and_store, FetchMode from .storage import build_storage logging.basicConfig( @@ -13,7 +13,7 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -ROOT = Path(__file__).parent.parent.parent +ROOT = Path(__file__).parent.parent.parent # → project/ def main() -> None: config = yaml.safe_load((Path(__file__).parent / "config.yaml").read_text()) @@ -22,9 +22,11 @@ def main() -> None: bboxen = config["bboxen"] storage = build_storage(config["storage"], root=ROOT) poi_types = [PoiType(pt) for pt in config["active_queries"]] + mode = FetchMode(config["fetch_mode"]) + logger.info(f"Fetch mode: {mode}") for poi_type in poi_types: - fetch_and_store(poi_type, bboxen, timeout, maxsize, storage) + fetch_and_store(poi_type, bboxen, timeout, maxsize, storage, mode) if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/overpass/pipeline.py b/src/overpass/pipeline.py index 2040a6e..62fa8e0 100644 --- a/src/overpass/pipeline.py +++ b/src/overpass/pipeline.py @@ -1,4 +1,7 @@ import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from enum import StrEnum + from .models import PoiType, POI from .fetcher import load_query, load_pois, OverpassApiError from .storage import Storage, StorageError @@ -6,20 +9,43 @@ from .storage import Storage, StorageError logger = logging.getLogger(__name__) +class FetchMode(StrEnum): + SERIAL = "serial" + CONCURRENT = "concurrent" + + def fetch_and_store( - poi_type: PoiType, - bboxen: dict, - timeout: int, - maxsize: int, - storage: Storage, + poi_type: PoiType, + bboxen: dict, + timeout: int, + maxsize: int, + storage: Storage, + fetch_mode: FetchMode = FetchMode.SERIAL, + max_workers: int = 4, ) -> None: all_pois: list[POI] = [] - for name, bbox in bboxen.items(): - try: - all_pois.extend(_fetch_bbox(poi_type, name, bbox, timeout, maxsize)) - except (FileNotFoundError, OverpassApiError) as exc: - logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}") + match fetch_mode: + + case FetchMode.SERIAL: + for name, bbox in bboxen.items(): + try: + all_pois.extend(_fetch_bbox(poi_type, name, bbox, timeout, maxsize)) + except (FileNotFoundError, OverpassApiError) as exc: + logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}") + + case FetchMode.CONCURRENT: + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(_fetch_bbox, poi_type, name, bbox, timeout, maxsize): name + for name, bbox in bboxen.items() + } + for future in as_completed(futures): + name = futures[future] + try: + all_pois.extend(future.result()) + except (FileNotFoundError, OverpassApiError) as exc: + logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}") if not all_pois: logger.warning(f"[{poi_type}] Nichts zu speichern")