overpass/TASK.md

2.8 KiB

Task 15 — Concurrency mit ThreadPoolExecutor

Rückblick Task 14: pipeline.py

Ihr habt die Fetch-und-Store-Logik in eine eigene Datei pipeline.py ausgelagert. Die wichtigsten Punkte:

  • Single Responsibility Principle: main() hat jetzt genau eine Aufgabe — Konfiguration lesen und Ausführung orchestrieren. fetch_and_store() kümmert sich um Fetching und Speichern, _fetch_bbox() um eine einzelne Bbox. Jede Funktion hat genau eine Verantwortung.
  • Storage als Parameter: fetch_and_store() instanziiert Storage nicht selbst — sie bekommt ein fertiges Objekt übergeben. Das nennt sich Dependency Injection: die Abhängigkeit wird von aussen hereingegeben, nicht intern erzeugt. Das macht die Funktion unabhängig vom konkreten Backend und leichter testbar.
  • StorageError vs. ValueError: StorageError signalisiert einen Laufzeitfehler im Betrieb (Schreibfehler, DB-Verbindung weg) und wird in fetch_and_store() behandelt. ValueError signalisiert einen Konfigurationsfehler — falscher type-Wert in config.yaml — und soll das Programm sofort zum Absturz bringen (fail fast). Beides auf StorageError zu mappen wäre falsch: ein except StorageError würde sonst auch Konfigurationsfehler stillschweigend schlucken.

Aufgabe

Aktuell werden alle Bboxen seriell abgearbeitet — eine nach der anderen. Da jeder Request auf die Overpass-API wartet (I/O-bound), liegt die CPU die meiste Zeit idle. Mit Parallelisierung lassen sich die Requests gleichzeitig abschicken und die Gesamtlaufzeit deutlich reduzieren.

Konkret:

  1. Füge in pipeline.py eine FetchMode-Enum hinzu:
   class FetchMode(StrEnum):
       SERIAL     = "serial"
       CONCURRENT = "concurrent"
  1. Erweitere fetch_and_store() um einen Parameter fetch_mode: FetchMode und einen max_workers: int = 4.
  2. Implementiere FetchMode.CONCURRENT mit ThreadPoolExecutor und as_completed — die Futures sollen analog zur seriellen Variante Fehler pro Bbox loggen und die Ergebnisse in all_pois sammeln. HINT: Du kannst als Vorlage das Code-Beispiel aus den Unterrichtsfolien nehmen (CodeWars). Es braucht nur ganz punktuelle Anpassungen. Überlege, was die aufzurufende Funktion ist und was für Parameter sie benötigt.
  3. Verwende ein match-Statement für die beiden Modi.
  4. Ergänze fetch_mode in config.yaml und lese ihn in main.py ein.

Fragen zum Nachdenken:

  • Was ist der Unterschied zwischen I/O-bound und CPU-bound — und warum eignen sich Threads für I/O-bound Tasks, aber nicht für CPU-bound?
  • Was ist der Unterschied zwischen executor.map() und as_completed() — wann ist welches besser geeignet?
  • Was passiert, wenn zwei Threads gleichzeitig all_pois.extend() aufrufen — ist das in Python sicher? Warum (nicht)?