Task_15: Concurrency (ThreadPool)

This commit is contained in:
Marco Schmid 2026-05-12 18:07:45 +02:00
parent bd443fd02b
commit 61f164bc9e
3 changed files with 94 additions and 66 deletions

85
TASK.md
View File

@ -1,56 +1,57 @@
# Task 14 — pipeline.py: fetch_and_store() # Task 15 — Concurrency mit ThreadPoolExecutor
## Rückblick Task 13: Storage-Factory ## Rückblick Task 14: pipeline.py
Ihr habt eine Factory-Funktion `build_storage()` eingeführt. Die wichtigsten Punkte: Ihr habt die Fetch-und-Store-Logik in eine eigene Datei `pipeline.py` ausgelagert.
Die wichtigsten Punkte:
- **Factory-Pattern:** `build_storage()` zentralisiert die Entscheidung, welches - **Single Responsibility Principle:** `main()` hat jetzt genau eine Aufgabe —
Backend instanziiert wird. `main.py` muss weder `JsonStorage` noch Konfiguration lesen und Ausführung orchestrieren. `fetch_and_store()` kümmert
`PostgresStorage` kennen — es übergibt nur die Config und bekommt ein sich um Fetching und Speichern, `_fetch_bbox()` um eine einzelne Bbox.
fertiges `Storage`-Objekt zurück. Jede Funktion hat genau eine Verantwortung.
- **`match`-Statement:** Klarer und erweiterbarer als eine `if/elif`-Kette, - **`Storage` als Parameter:** `fetch_and_store()` instanziiert Storage nicht
besonders wenn neue Storage-Typen dazukommen. Der `case _`-Zweig fängt selbst — sie bekommt ein fertiges Objekt übergeben. Das nennt sich
ungültige Werte ab. *Dependency Injection*: die Abhängigkeit wird von aussen hereingegeben,
- **`StorageType`-Enum:** Verhindert Magic Strings in der Factory — ein nicht intern erzeugt. Das macht die Funktion unabhängig vom konkreten
ungültiger `type`-Wert in der Config wirft sofort einen `ValueError`. Backend und leichter testbar.
- **`**params`:** Die Parameter aus der Config werden direkt als Keyword-Argumente - **`StorageError` vs. `ValueError`:** `StorageError` signalisiert einen
an den Konstruktor übergeben. Das macht `build_storage()` generisch — Laufzeitfehler *im Betrieb* (Schreibfehler, DB-Verbindung weg) und wird
sie muss nicht wissen, welche Parameter `JsonStorage` oder `PostgresStorage` in `fetch_and_store()` behandelt. `ValueError` signalisiert einen
konkret erwarten. Konfigurationsfehler — falscher `type`-Wert in `config.yaml` — und soll
- **`root`-Parameter:** Die Pfadauflösung bleibt in `main.py` verankert — das Programm sofort zum Absturz bringen (*fail fast*). Beides auf
`storage.py` weiss nichts von der Projektstruktur, was die Wiederverwendbarkeit `StorageError` zu mappen wäre falsch: ein `except StorageError` würde
erhöht. sonst auch Konfigurationsfehler stillschweigend schlucken.
---
## Aufgabe ## Aufgabe
In `main.py` ist die Fetch-Schleife über Bboxen direkt in `main()` eingebettet. Aktuell werden alle Bboxen **seriell** abgearbeitet — eine nach der anderen.
Das hat einen Nachteil: Die Logik ist schwer isoliert testbar, und `main()` Da jeder Request auf die Overpass-API wartet (I/O-bound), liegt die CPU
wird mit jedem Feature länger und unübersichtlicher. die meiste Zeit idle. Mit Parallelisierung lassen sich die Requests
gleichzeitig abschicken und die Gesamtlaufzeit deutlich reduzieren.
Ziel ist es, die gesamte Fetch-und-Store-Logik für einen POI-Typ in eine
eigene Funktion auszulagern.
**Konkret:** **Konkret:**
1. Lege eine neue Datei `pipeline.py` an. 1. Füge in `pipeline.py` eine `FetchMode`-Enum hinzu:
2. Schreibe darin eine Funktion `fetch_and_store()` mit folgender Signatur:
```python ```python
def fetch_and_store( class FetchMode(StrEnum):
poi_type: PoiType, SERIAL = "serial"
bboxen: dict, CONCURRENT = "concurrent"
timeout: int,
maxsize: int,
storage: Storage,
) -> None:
``` ```
Sie soll die bisherige Schleife aus `main.py` übernehmen: 2. Erweitere `fetch_and_store()` um einen Parameter `fetch_mode: FetchMode`
über alle Bboxen iterieren, POIs fetchen, sammeln und am Ende speichern. und einen `max_workers: int = 4`.
3. Vereinfache `main()` so, dass sie nur noch Config liest, Storage baut 3. Implementiere `FetchMode.CONCURRENT` mit `ThreadPoolExecutor` und
und `fetch_and_store()` pro POI-Typ aufruft. `as_completed` — die Futures sollen analog zur seriellen Variante
Fehler pro Bbox loggen und die Ergebnisse in `all_pois` sammeln.
**HINT:** Du kannst als Vorlage das Code-Beispiel aus den Unterrichtsfolien nehmen (CodeWars). Es braucht nur ganz punktuelle
Anpassungen. Überlege, was die aufzurufende Funktion ist und was für Parameter sie benötigt.
4. Verwende ein `match`-Statement für die beiden Modi.
5. Ergänze `fetch_mode` in `config.yaml` und lese ihn in `main.py` ein.
**Fragen zum Nachdenken:** **Fragen zum Nachdenken:**
- Welche konkreten Vorteile hat eine schlanke `main()`-Funktion? - Was ist der Unterschied zwischen I/O-bound und CPU-bound — und warum
- Warum bekommt `fetch_and_store()` ein fertiges `Storage`-Objekt eignen sich Threads für I/O-bound Tasks, aber nicht für CPU-bound?
übergeben — statt es selbst zu instanziieren? - Was ist der Unterschied zwischen `executor.map()` und
`as_completed()` — wann ist welches besser geeignet?
- Was passiert, wenn zwei Threads gleichzeitig `all_pois.extend()` aufrufen
— ist das in Python sicher? Warum (nicht)?

View File

@ -2,9 +2,9 @@ import yaml
import logging import logging
from pathlib import Path from pathlib import Path
from .fetcher import load_query, load_pois, OverpassApiError from .models import PoiType
from .models import POI, PoiType from .pipeline import fetch_and_store
from .storage import build_storage, StorageError from .storage import build_storage
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@ -13,36 +13,18 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ROOT = Path(__file__).parent.parent.parent # → project/ ROOT = Path(__file__).parent.parent.parent
def main() -> None: def main() -> None:
config = yaml.safe_load((Path(__file__).parent / "config.yaml").read_text()) config = yaml.safe_load((Path(__file__).parent / "config.yaml").read_text())
timeout = config["overpass"]["timeout"] timeout = config["overpass"]["timeout"]
maxsize = config["overpass"]["maxsize"] maxsize = config["overpass"]["maxsize"]
bboxen = config["bboxen"] bboxen = config["bboxen"]
poi_types = [PoiType(pt) for pt in config["active_queries"]]
storage = build_storage(config["storage"], root=ROOT) storage = build_storage(config["storage"], root=ROOT)
poi_types = [PoiType(pt) for pt in config["active_queries"]]
for poi_type in poi_types: for poi_type in poi_types:
collected_pois = [] fetch_and_store(poi_type, bboxen, timeout, maxsize, storage)
for name, bbox in bboxen.items():
try:
query = load_query(poi_type, bbox, timeout, maxsize)
pois: list[POI] = load_pois(query=query, poi_type=poi_type)
collected_pois.extend(pois)
except OverpassApiError as exc:
logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}")
continue
logger.info(f"[{poi_type}] {name}: {len(pois)} POIs gefunden")
if collected_pois:
try:
location = storage.store(collected_pois)
logger.info(f"[{poi_type}] {len(collected_pois)} POIs gespeichert: {location}")
except StorageError as exc:
logger.error(f"[{poi_type}] Fehler beim Speichern: {exc}")
else:
logger.warning(f"[{poi_type}] Nichts zu speichern")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

45
src/overpass/pipeline.py Normal file
View File

@ -0,0 +1,45 @@
import logging
from .models import PoiType, POI
from .fetcher import load_query, load_pois, OverpassApiError
from .storage import Storage, StorageError
logger = logging.getLogger(__name__)
def fetch_and_store(
poi_type: PoiType,
bboxen: dict,
timeout: int,
maxsize: int,
storage: Storage,
) -> None:
all_pois: list[POI] = []
for name, bbox in bboxen.items():
try:
all_pois.extend(_fetch_bbox(poi_type, name, bbox, timeout, maxsize))
except (FileNotFoundError, OverpassApiError) as exc:
logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}")
if not all_pois:
logger.warning(f"[{poi_type}] Nichts zu speichern")
return
try:
location = storage.store(all_pois)
logger.info(f"[{poi_type}] {len(all_pois)} POIs gespeichert: {location}")
except StorageError as exc:
logger.error(f"[{poi_type}] Fehler beim Speichern: {exc}")
def _fetch_bbox(
poi_type: PoiType,
name: str,
bbox: tuple,
timeout: int,
maxsize: int,
) -> list[POI]:
query = load_query(poi_type, bbox, timeout, maxsize)
pois = load_pois(query=query, poi_type=poi_type)
logger.info(f"[{poi_type}] {name}: {len(pois)} POIs")
return pois