From 61f164bc9e469803fb37666ae77a4938d60ff9a4 Mon Sep 17 00:00:00 2001 From: Marco Schmid Date: Tue, 12 May 2026 18:07:45 +0200 Subject: [PATCH] Task_15: Concurrency (ThreadPool) --- TASK.md | 85 ++++++++++++++++++++-------------------- src/overpass/main.py | 30 +++----------- src/overpass/pipeline.py | 45 +++++++++++++++++++++ 3 files changed, 94 insertions(+), 66 deletions(-) create mode 100644 src/overpass/pipeline.py diff --git a/TASK.md b/TASK.md index d11af9c..3cfa883 100644 --- a/TASK.md +++ b/TASK.md @@ -1,56 +1,57 @@ -# Task 14 — pipeline.py: fetch_and_store() +# Task 15 — Concurrency mit ThreadPoolExecutor -## Rückblick Task 13: Storage-Factory +## Rückblick Task 14: pipeline.py -Ihr habt eine Factory-Funktion `build_storage()` eingeführt. Die wichtigsten Punkte: +Ihr habt die Fetch-und-Store-Logik in eine eigene Datei `pipeline.py` ausgelagert. +Die wichtigsten Punkte: -- **Factory-Pattern:** `build_storage()` zentralisiert die Entscheidung, welches - Backend instanziiert wird. `main.py` muss weder `JsonStorage` noch - `PostgresStorage` kennen — es übergibt nur die Config und bekommt ein - fertiges `Storage`-Objekt zurück. -- **`match`-Statement:** Klarer und erweiterbarer als eine `if/elif`-Kette, - besonders wenn neue Storage-Typen dazukommen. Der `case _`-Zweig fängt - ungültige Werte ab. -- **`StorageType`-Enum:** Verhindert Magic Strings in der Factory — ein - ungültiger `type`-Wert in der Config wirft sofort einen `ValueError`. -- **`**params`:** Die Parameter aus der Config werden direkt als Keyword-Argumente - an den Konstruktor übergeben. Das macht `build_storage()` generisch — - sie muss nicht wissen, welche Parameter `JsonStorage` oder `PostgresStorage` - konkret erwarten. -- **`root`-Parameter:** Die Pfadauflösung bleibt in `main.py` verankert — - `storage.py` weiss nichts von der Projektstruktur, was die Wiederverwendbarkeit - erhöht. +- **Single Responsibility Principle:** `main()` hat jetzt genau eine Aufgabe — + Konfiguration lesen und Ausführung orchestrieren. `fetch_and_store()` kümmert + sich um Fetching und Speichern, `_fetch_bbox()` um eine einzelne Bbox. + Jede Funktion hat genau eine Verantwortung. +- **`Storage` als Parameter:** `fetch_and_store()` instanziiert Storage nicht + selbst — sie bekommt ein fertiges Objekt übergeben. Das nennt sich + *Dependency Injection*: die Abhängigkeit wird von aussen hereingegeben, + nicht intern erzeugt. Das macht die Funktion unabhängig vom konkreten + Backend und leichter testbar. +- **`StorageError` vs. `ValueError`:** `StorageError` signalisiert einen + Laufzeitfehler *im Betrieb* (Schreibfehler, DB-Verbindung weg) und wird + in `fetch_and_store()` behandelt. `ValueError` signalisiert einen + Konfigurationsfehler — falscher `type`-Wert in `config.yaml` — und soll + das Programm sofort zum Absturz bringen (*fail fast*). Beides auf + `StorageError` zu mappen wäre falsch: ein `except StorageError` würde + sonst auch Konfigurationsfehler stillschweigend schlucken. ---- ## Aufgabe -In `main.py` ist die Fetch-Schleife über Bboxen direkt in `main()` eingebettet. -Das hat einen Nachteil: Die Logik ist schwer isoliert testbar, und `main()` -wird mit jedem Feature länger und unübersichtlicher. - -Ziel ist es, die gesamte Fetch-und-Store-Logik für einen POI-Typ in eine -eigene Funktion auszulagern. +Aktuell werden alle Bboxen **seriell** abgearbeitet — eine nach der anderen. +Da jeder Request auf die Overpass-API wartet (I/O-bound), liegt die CPU +die meiste Zeit idle. Mit Parallelisierung lassen sich die Requests +gleichzeitig abschicken und die Gesamtlaufzeit deutlich reduzieren. **Konkret:** -1. Lege eine neue Datei `pipeline.py` an. -2. Schreibe darin eine Funktion `fetch_and_store()` mit folgender Signatur: +1. Füge in `pipeline.py` eine `FetchMode`-Enum hinzu: ```python - def fetch_and_store( - poi_type: PoiType, - bboxen: dict, - timeout: int, - maxsize: int, - storage: Storage, - ) -> None: + class FetchMode(StrEnum): + SERIAL = "serial" + CONCURRENT = "concurrent" ``` - Sie soll die bisherige Schleife aus `main.py` übernehmen: - über alle Bboxen iterieren, POIs fetchen, sammeln und am Ende speichern. -3. Vereinfache `main()` so, dass sie nur noch Config liest, Storage baut - und `fetch_and_store()` pro POI-Typ aufruft. +2. Erweitere `fetch_and_store()` um einen Parameter `fetch_mode: FetchMode` + und einen `max_workers: int = 4`. +3. Implementiere `FetchMode.CONCURRENT` mit `ThreadPoolExecutor` und + `as_completed` — die Futures sollen analog zur seriellen Variante + Fehler pro Bbox loggen und die Ergebnisse in `all_pois` sammeln. + **HINT:** Du kannst als Vorlage das Code-Beispiel aus den Unterrichtsfolien nehmen (CodeWars). Es braucht nur ganz punktuelle + Anpassungen. Überlege, was die aufzurufende Funktion ist und was für Parameter sie benötigt. +4. Verwende ein `match`-Statement für die beiden Modi. +5. Ergänze `fetch_mode` in `config.yaml` und lese ihn in `main.py` ein. **Fragen zum Nachdenken:** -- Welche konkreten Vorteile hat eine schlanke `main()`-Funktion? -- Warum bekommt `fetch_and_store()` ein fertiges `Storage`-Objekt - übergeben — statt es selbst zu instanziieren? \ No newline at end of file +- Was ist der Unterschied zwischen I/O-bound und CPU-bound — und warum + eignen sich Threads für I/O-bound Tasks, aber nicht für CPU-bound? +- Was ist der Unterschied zwischen `executor.map()` und + `as_completed()` — wann ist welches besser geeignet? +- Was passiert, wenn zwei Threads gleichzeitig `all_pois.extend()` aufrufen + — ist das in Python sicher? Warum (nicht)? \ No newline at end of file diff --git a/src/overpass/main.py b/src/overpass/main.py index b400300..fdfbf54 100644 --- a/src/overpass/main.py +++ b/src/overpass/main.py @@ -2,9 +2,9 @@ import yaml import logging from pathlib import Path -from .fetcher import load_query, load_pois, OverpassApiError -from .models import POI, PoiType -from .storage import build_storage, StorageError +from .models import PoiType +from .pipeline import fetch_and_store +from .storage import build_storage logging.basicConfig( level=logging.INFO, @@ -13,36 +13,18 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -ROOT = Path(__file__).parent.parent.parent # → project/ +ROOT = Path(__file__).parent.parent.parent def main() -> None: config = yaml.safe_load((Path(__file__).parent / "config.yaml").read_text()) timeout = config["overpass"]["timeout"] maxsize = config["overpass"]["maxsize"] bboxen = config["bboxen"] - poi_types = [PoiType(pt) for pt in config["active_queries"]] storage = build_storage(config["storage"], root=ROOT) + poi_types = [PoiType(pt) for pt in config["active_queries"]] for poi_type in poi_types: - collected_pois = [] - for name, bbox in bboxen.items(): - try: - query = load_query(poi_type, bbox, timeout, maxsize) - pois: list[POI] = load_pois(query=query, poi_type=poi_type) - collected_pois.extend(pois) - except OverpassApiError as exc: - logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}") - continue - logger.info(f"[{poi_type}] {name}: {len(pois)} POIs gefunden") - - if collected_pois: - try: - location = storage.store(collected_pois) - logger.info(f"[{poi_type}] {len(collected_pois)} POIs gespeichert: {location}") - except StorageError as exc: - logger.error(f"[{poi_type}] Fehler beim Speichern: {exc}") - else: - logger.warning(f"[{poi_type}] Nichts zu speichern") + fetch_and_store(poi_type, bboxen, timeout, maxsize, storage) if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/overpass/pipeline.py b/src/overpass/pipeline.py new file mode 100644 index 0000000..2040a6e --- /dev/null +++ b/src/overpass/pipeline.py @@ -0,0 +1,45 @@ +import logging +from .models import PoiType, POI +from .fetcher import load_query, load_pois, OverpassApiError +from .storage import Storage, StorageError + +logger = logging.getLogger(__name__) + + +def fetch_and_store( + poi_type: PoiType, + bboxen: dict, + timeout: int, + maxsize: int, + storage: Storage, +) -> None: + all_pois: list[POI] = [] + + for name, bbox in bboxen.items(): + try: + all_pois.extend(_fetch_bbox(poi_type, name, bbox, timeout, maxsize)) + except (FileNotFoundError, OverpassApiError) as exc: + logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}") + + if not all_pois: + logger.warning(f"[{poi_type}] Nichts zu speichern") + return + + try: + location = storage.store(all_pois) + logger.info(f"[{poi_type}] {len(all_pois)} POIs gespeichert: {location}") + except StorageError as exc: + logger.error(f"[{poi_type}] Fehler beim Speichern: {exc}") + + +def _fetch_bbox( + poi_type: PoiType, + name: str, + bbox: tuple, + timeout: int, + maxsize: int, +) -> list[POI]: + query = load_query(poi_type, bbox, timeout, maxsize) + pois = load_pois(query=query, poi_type=poi_type) + logger.info(f"[{poi_type}] {name}: {len(pois)} POIs") + return pois \ No newline at end of file