2.8 KiB
2.8 KiB
Task 15 — Concurrency mit ThreadPoolExecutor
Rückblick Task 14: pipeline.py
Ihr habt die Fetch-und-Store-Logik in eine eigene Datei pipeline.py ausgelagert.
Die wichtigsten Punkte:
- Single Responsibility Principle:
main()hat jetzt genau eine Aufgabe — Konfiguration lesen und Ausführung orchestrieren.fetch_and_store()kümmert sich um Fetching und Speichern,_fetch_bbox()um eine einzelne Bbox. Jede Funktion hat genau eine Verantwortung. Storageals Parameter:fetch_and_store()instanziiert Storage nicht selbst — sie bekommt ein fertiges Objekt übergeben. Das nennt sich Dependency Injection: die Abhängigkeit wird von aussen hereingegeben, nicht intern erzeugt. Das macht die Funktion unabhängig vom konkreten Backend und leichter testbar.StorageErrorvs.ValueError:StorageErrorsignalisiert einen Laufzeitfehler im Betrieb (Schreibfehler, DB-Verbindung weg) und wird infetch_and_store()behandelt.ValueErrorsignalisiert einen Konfigurationsfehler — falschertype-Wert inconfig.yaml— und soll das Programm sofort zum Absturz bringen (fail fast). Beides aufStorageErrorzu mappen wäre falsch: einexcept StorageErrorwürde sonst auch Konfigurationsfehler stillschweigend schlucken.
Aufgabe
Aktuell werden alle Bboxen seriell abgearbeitet — eine nach der anderen. Da jeder Request auf die Overpass-API wartet (I/O-bound), liegt die CPU die meiste Zeit idle. Mit Parallelisierung lassen sich die Requests gleichzeitig abschicken und die Gesamtlaufzeit deutlich reduzieren.
Konkret:
- Füge in
pipeline.pyeineFetchMode-Enum hinzu:
class FetchMode(StrEnum):
SERIAL = "serial"
CONCURRENT = "concurrent"
- Erweitere
fetch_and_store()um einen Parameterfetch_mode: FetchModeund einenmax_workers: int = 4. - Implementiere
FetchMode.CONCURRENTmitThreadPoolExecutorundas_completed— die Futures sollen analog zur seriellen Variante Fehler pro Bbox loggen und die Ergebnisse inall_poissammeln. HINT: Du kannst als Vorlage das Code-Beispiel aus den Unterrichtsfolien nehmen (CodeWars). Es braucht nur ganz punktuelle Anpassungen. Überlege, was die aufzurufende Funktion ist und was für Parameter sie benötigt. - Verwende ein
match-Statement für die beiden Modi. - Ergänze
fetch_modeinconfig.yamlund lese ihn inmain.pyein.
Fragen zum Nachdenken:
- Was ist der Unterschied zwischen I/O-bound und CPU-bound — und warum eignen sich Threads für I/O-bound Tasks, aber nicht für CPU-bound?
- Was ist der Unterschied zwischen
executor.map()undas_completed()— wann ist welches besser geeignet? - Was passiert, wenn zwei Threads gleichzeitig
all_pois.extend()aufrufen — ist das in Python sicher? Warum (nicht)?