Task_16: Lock und Timer
This commit is contained in:
parent
61f164bc9e
commit
6f5867f085
97
TASK.md
97
TASK.md
@ -1,57 +1,66 @@
|
|||||||
# Task 15 — Concurrency mit ThreadPoolExecutor
|
# Task 16 — Lock und @timer-Decorator
|
||||||
|
|
||||||
## Rückblick Task 14: pipeline.py
|
## Rückblick Task 15: Concurrency
|
||||||
|
|
||||||
Ihr habt die Fetch-und-Store-Logik in eine eigene Datei `pipeline.py` ausgelagert.
|
Ihr habt `ThreadPoolExecutor` und `as_completed` eingeführt. Die wichtigsten Punkte:
|
||||||
Die wichtigsten Punkte:
|
|
||||||
|
|
||||||
- **Single Responsibility Principle:** `main()` hat jetzt genau eine Aufgabe —
|
- **I/O-bound vs. CPU-bound:** Overpass-Requests sind I/O-bound — die CPU wartet
|
||||||
Konfiguration lesen und Ausführung orchestrieren. `fetch_and_store()` kümmert
|
auf die Netzwerkantwort. Threads sind dafür ideal, weil Python während des
|
||||||
sich um Fetching und Speichern, `_fetch_bbox()` um eine einzelne Bbox.
|
Wartens (I/O) den GIL freigibt und andere Threads laufen lässt. Bei CPU-bound
|
||||||
Jede Funktion hat genau eine Verantwortung.
|
Tasks (z.B. Bildverarbeitung, ML-Training) hilft Threading nicht —
|
||||||
- **`Storage` als Parameter:** `fetch_and_store()` instanziiert Storage nicht
|
dort braucht man `multiprocessing`.
|
||||||
selbst — sie bekommt ein fertiges Objekt übergeben. Das nennt sich
|
- **`executor.map()` vs. `as_completed()`:** `map()` ist einfacher, liefert
|
||||||
*Dependency Injection*: die Abhängigkeit wird von aussen hereingegeben,
|
Ergebnisse aber in der Reihenfolge der Inputs — auch wenn spätere Futures
|
||||||
nicht intern erzeugt. Das macht die Funktion unabhängig vom konkreten
|
früher fertig sind. `as_completed()` liefert Ergebnisse sobald sie fertig
|
||||||
Backend und leichter testbar.
|
sind, was bei unterschiedlichen Antwortzeiten effizienter ist und
|
||||||
- **`StorageError` vs. `ValueError`:** `StorageError` signalisiert einen
|
pro Future individuelles Error-Handling erlaubt.
|
||||||
Laufzeitfehler *im Betrieb* (Schreibfehler, DB-Verbindung weg) und wird
|
- **`all_pois.extend()` aus mehreren Threads:** In Python ist `list.extend()`
|
||||||
in `fetch_and_store()` behandelt. `ValueError` signalisiert einen
|
durch den GIL (Global Interpreter Lock) de facto atomar für einfache
|
||||||
Konfigurationsfehler — falscher `type`-Wert in `config.yaml` — und soll
|
Operationen — ein echter Race Condition-Crash ist unwahrscheinlich. Aber:
|
||||||
das Programm sofort zum Absturz bringen (*fail fast*). Beides auf
|
die **Reihenfolge** der Ergebnisse ist nicht deterministisch, und bei
|
||||||
`StorageError` zu mappen wäre falsch: ein `except StorageError` würde
|
komplexeren Operationen (read-modify-write) wäre ein Lock nötig.
|
||||||
sonst auch Konfigurationsfehler stillschweigend schlucken.
|
|
||||||
|
|
||||||
|
|
||||||
## Aufgabe
|
## Aufgabe
|
||||||
|
|
||||||
Aktuell werden alle Bboxen **seriell** abgearbeitet — eine nach der anderen.
|
Zwei Erweiterungen stehen an — eine zur Illustration von Thread-Safety,
|
||||||
Da jeder Request auf die Overpass-API wartet (I/O-bound), liegt die CPU
|
eine zur Laufzeitmessung.
|
||||||
die meiste Zeit idle. Mit Parallelisierung lassen sich die Requests
|
|
||||||
gleichzeitig abschicken und die Gesamtlaufzeit deutlich reduzieren.
|
|
||||||
|
|
||||||
**Konkret:**
|
**Teil A — `FetchMode.CONCURRENT_LOCKED`:**
|
||||||
|
|
||||||
1. Füge in `pipeline.py` eine `FetchMode`-Enum hinzu:
|
Der bisherige `CONCURRENT`-Modus sammelt Ergebnisse ohne explizite
|
||||||
|
Synchronisation. Füge einen dritten Modus hinzu, der zeigt, wie man
|
||||||
|
`all_pois.extend()` mit einem `Lock` absichert.
|
||||||
|
|
||||||
|
1. Ergänze `FetchMode` um `CONCURRENT_LOCKED = "concurrent_locked"`.
|
||||||
|
2. Implementiere den neuen Modus in `fetch_and_store()` analog zu
|
||||||
|
`CONCURRENT`, aber mit einem `threading.Lock`:
|
||||||
```python
|
```python
|
||||||
class FetchMode(StrEnum):
|
with lock:
|
||||||
SERIAL = "serial"
|
all_pois.extend(future.result())
|
||||||
CONCURRENT = "concurrent"
|
|
||||||
```
|
```
|
||||||
2. Erweitere `fetch_and_store()` um einen Parameter `fetch_mode: FetchMode`
|
3. Ergänze `config.yaml` — setze `fetch_mode: concurrent_locked`.
|
||||||
und einen `max_workers: int = 4`.
|
|
||||||
3. Implementiere `FetchMode.CONCURRENT` mit `ThreadPoolExecutor` und
|
**Teil B — `@timer`-Decorator:**
|
||||||
`as_completed` — die Futures sollen analog zur seriellen Variante
|
|
||||||
Fehler pro Bbox loggen und die Ergebnisse in `all_pois` sammeln.
|
1. Lege eine neue Datei `utils.py` an.
|
||||||
**HINT:** Du kannst als Vorlage das Code-Beispiel aus den Unterrichtsfolien nehmen (CodeWars). Es braucht nur ganz punktuelle
|
2. Schreibe darin einen `@timer`-Decorator, der die Laufzeit einer
|
||||||
Anpassungen. Überlege, was die aufzurufende Funktion ist und was für Parameter sie benötigt.
|
Funktion misst und per `logger.info()` ausgibt.
|
||||||
4. Verwende ein `match`-Statement für die beiden Modi.
|
|
||||||
5. Ergänze `fetch_mode` in `config.yaml` und lese ihn in `main.py` ein.
|
```python
|
||||||
|
def timer(func):
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
start = time.perf_counter()
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
elapsed = time.perf_counter() - start
|
||||||
|
logger.info(f"[timer] {func.__name__} → {elapsed:.2f}s")
|
||||||
|
return result
|
||||||
|
return wrapper
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Dekoriere `main()` in `main.py` mit `@timer`.
|
||||||
|
|
||||||
**Fragen zum Nachdenken:**
|
**Fragen zum Nachdenken:**
|
||||||
- Was ist der Unterschied zwischen I/O-bound und CPU-bound — und warum
|
- `list.extend()` ist in CPython durch den GIL geschützt — warum
|
||||||
eignen sich Threads für I/O-bound Tasks, aber nicht für CPU-bound?
|
empfiehlt es sich trotzdem, einen Lock zu verwenden?
|
||||||
- Was ist der Unterschied zwischen `executor.map()` und
|
|
||||||
`as_completed()` — wann ist welches besser geeignet?
|
|
||||||
- Was passiert, wenn zwei Threads gleichzeitig `all_pois.extend()` aufrufen
|
|
||||||
— ist das in Python sicher? Warum (nicht)?
|
|
||||||
@ -1,10 +1,19 @@
|
|||||||
|
fetch_mode: concurrent # serial | concurrent
|
||||||
|
|
||||||
overpass:
|
overpass:
|
||||||
timeout: 25
|
timeout: 25
|
||||||
maxsize: 5000000
|
maxsize: 5000000
|
||||||
|
|
||||||
bboxen:
|
bboxen:
|
||||||
davos: [46.72, 9.70, 46.92, 10.00]
|
"1": [45.8, 5.9, 46.4667, 7.4333]
|
||||||
schweiz: [45.8, 5.9, 47.8, 10.5]
|
"2": [45.8, 7.4333, 46.4667, 8.9667]
|
||||||
|
"3": [45.8, 8.9667, 46.4667, 10.5]
|
||||||
|
"4": [46.4667, 5.9, 47.1333, 7.4333]
|
||||||
|
"5": [46.4667, 7.4333, 47.1333, 8.9667]
|
||||||
|
"6": [46.4667, 8.9667, 47.1333, 10.5]
|
||||||
|
"7": [47.1333, 5.9, 47.8, 7.4333]
|
||||||
|
"8": [47.1333, 7.4333, 47.8, 8.9667]
|
||||||
|
"9": [47.1333, 8.9667, 47.8, 10.5]
|
||||||
|
|
||||||
active_queries:
|
active_queries:
|
||||||
- bergbahn
|
- bergbahn
|
||||||
|
|||||||
@ -3,7 +3,7 @@ import logging
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from .models import PoiType
|
from .models import PoiType
|
||||||
from .pipeline import fetch_and_store
|
from .pipeline import fetch_and_store, FetchMode
|
||||||
from .storage import build_storage
|
from .storage import build_storage
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
@ -13,7 +13,7 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
ROOT = Path(__file__).parent.parent.parent
|
ROOT = Path(__file__).parent.parent.parent # → project/
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
config = yaml.safe_load((Path(__file__).parent / "config.yaml").read_text())
|
config = yaml.safe_load((Path(__file__).parent / "config.yaml").read_text())
|
||||||
@ -22,9 +22,11 @@ def main() -> None:
|
|||||||
bboxen = config["bboxen"]
|
bboxen = config["bboxen"]
|
||||||
storage = build_storage(config["storage"], root=ROOT)
|
storage = build_storage(config["storage"], root=ROOT)
|
||||||
poi_types = [PoiType(pt) for pt in config["active_queries"]]
|
poi_types = [PoiType(pt) for pt in config["active_queries"]]
|
||||||
|
mode = FetchMode(config["fetch_mode"])
|
||||||
|
|
||||||
|
logger.info(f"Fetch mode: {mode}")
|
||||||
for poi_type in poi_types:
|
for poi_type in poi_types:
|
||||||
fetch_and_store(poi_type, bboxen, timeout, maxsize, storage)
|
fetch_and_store(poi_type, bboxen, timeout, maxsize, storage, mode)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
@ -1,4 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
from enum import StrEnum
|
||||||
|
|
||||||
from .models import PoiType, POI
|
from .models import PoiType, POI
|
||||||
from .fetcher import load_query, load_pois, OverpassApiError
|
from .fetcher import load_query, load_pois, OverpassApiError
|
||||||
from .storage import Storage, StorageError
|
from .storage import Storage, StorageError
|
||||||
@ -6,20 +9,43 @@ from .storage import Storage, StorageError
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class FetchMode(StrEnum):
|
||||||
|
SERIAL = "serial"
|
||||||
|
CONCURRENT = "concurrent"
|
||||||
|
|
||||||
|
|
||||||
def fetch_and_store(
|
def fetch_and_store(
|
||||||
poi_type: PoiType,
|
poi_type: PoiType,
|
||||||
bboxen: dict,
|
bboxen: dict,
|
||||||
timeout: int,
|
timeout: int,
|
||||||
maxsize: int,
|
maxsize: int,
|
||||||
storage: Storage,
|
storage: Storage,
|
||||||
|
fetch_mode: FetchMode = FetchMode.SERIAL,
|
||||||
|
max_workers: int = 4,
|
||||||
) -> None:
|
) -> None:
|
||||||
all_pois: list[POI] = []
|
all_pois: list[POI] = []
|
||||||
|
|
||||||
for name, bbox in bboxen.items():
|
match fetch_mode:
|
||||||
try:
|
|
||||||
all_pois.extend(_fetch_bbox(poi_type, name, bbox, timeout, maxsize))
|
case FetchMode.SERIAL:
|
||||||
except (FileNotFoundError, OverpassApiError) as exc:
|
for name, bbox in bboxen.items():
|
||||||
logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}")
|
try:
|
||||||
|
all_pois.extend(_fetch_bbox(poi_type, name, bbox, timeout, maxsize))
|
||||||
|
except (FileNotFoundError, OverpassApiError) as exc:
|
||||||
|
logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}")
|
||||||
|
|
||||||
|
case FetchMode.CONCURRENT:
|
||||||
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
futures = {
|
||||||
|
executor.submit(_fetch_bbox, poi_type, name, bbox, timeout, maxsize): name
|
||||||
|
for name, bbox in bboxen.items()
|
||||||
|
}
|
||||||
|
for future in as_completed(futures):
|
||||||
|
name = futures[future]
|
||||||
|
try:
|
||||||
|
all_pois.extend(future.result())
|
||||||
|
except (FileNotFoundError, OverpassApiError) as exc:
|
||||||
|
logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}")
|
||||||
|
|
||||||
if not all_pois:
|
if not all_pois:
|
||||||
logger.warning(f"[{poi_type}] Nichts zu speichern")
|
logger.warning(f"[{poi_type}] Nichts zu speichern")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user