diff --git a/TASK.md b/TASK.md index c047d5d..4440f2b 100644 --- a/TASK.md +++ b/TASK.md @@ -1,66 +1,76 @@ -# Task 16 — Lock und @timer-Decorator +# Task 17 — PostgresStorage -## Rückblick Task 15: Concurrency +## Rückblick Task 16: Lock und @timer-Decorator -Ihr habt `ThreadPoolExecutor` und `as_completed` eingeführt. Die wichtigsten Punkte: +Ihr habt `CONCURRENT_LOCKED` und den `@timer`-Decorator eingeführt. +Die wichtigsten Punkte: -- **I/O-bound vs. CPU-bound:** Overpass-Requests sind I/O-bound — die CPU wartet - auf die Netzwerkantwort. Threads sind dafür ideal, weil Python während des - Wartens (I/O) den GIL freigibt und andere Threads laufen lässt. Bei CPU-bound - Tasks (z.B. Bildverarbeitung, ML-Training) hilft Threading nicht — - dort braucht man `multiprocessing`. -- **`executor.map()` vs. `as_completed()`:** `map()` ist einfacher, liefert - Ergebnisse aber in der Reihenfolge der Inputs — auch wenn spätere Futures - früher fertig sind. `as_completed()` liefert Ergebnisse sobald sie fertig - sind, was bei unterschiedlichen Antwortzeiten effizienter ist und - pro Future individuelles Error-Handling erlaubt. -- **`all_pois.extend()` aus mehreren Threads:** In Python ist `list.extend()` - durch den GIL (Global Interpreter Lock) de facto atomar für einfache - Operationen — ein echter Race Condition-Crash ist unwahrscheinlich. Aber: - die **Reihenfolge** der Ergebnisse ist nicht deterministisch, und bei - komplexeren Operationen (read-modify-write) wäre ein Lock nötig. +- **Warum Lock, obwohl der GIL schützt?** Der GIL verhindert echte + Parallelität auf CPU-Ebene, aber er gibt keine Garantien über die + Reihenfolge von Operationen oder die Konsistenz komplexerer + Datenstrukturen. Ein expliziter `Lock` macht die Absicht klar, + ist portabel (auch ohne GIL, z.B. in PyPy oder zukünftigen + Python-Versionen) und schützt bei read-modify-write-Operationen + zuverlässig. +- **`time.perf_counter()`** ist präziser als `time.time()` für + Laufzeitmessungen, weil er eine hochauflösende Systemuhr + verwendet und nicht durch Systemzeit-Anpassungen beeinflusst wird. ## Aufgabe -Zwei Erweiterungen stehen an — eine zur Illustration von Thread-Safety, -eine zur Laufzeitmessung. +Bisher speichern wir POIs in JSON-Dateien. Für grössere Datenmengen +und spätere Abfragen ist eine Datenbank besser geeignet. Ziel ist es, +`PostgresStorage` als zweites konkretes Backend zu implementieren — +ohne `main.py`, `pipeline.py` oder `fetcher.py` anzufassen. -**Teil A — `FetchMode.CONCURRENT_LOCKED`:** +**Vorbereitung:** +- Stelle sicher, dass eine PostgreSQL-Datenbank läuft und erreichbar ist. +- Erstelle in der PostgreSQL-Datenbank eine neue Datenbank. Das kannst Du z.B. in PGAdmin oder aus dem Terminal erreichen. + Gebt der neuen Datenbank einen Namen (z.B. overpass) und ordnet ihr einen Nutzer (z.B. postgres) inkl. Passwort zu. +- Installiere `psycopg2`: `pip install psycopg2-binary` +- Ergänze den Connection-String in `config.yaml` ("postgresql://user:password@localhost:5432/dbname"). -Der bisherige `CONCURRENT`-Modus sammelt Ergebnisse ohne explizite -Synchronisation. Füge einen dritten Modus hinzu, der zeigt, wie man -`all_pois.extend()` mit einem `Lock` absichert. +**Konkret:** -1. Ergänze `FetchMode` um `CONCURRENT_LOCKED = "concurrent_locked"`. -2. Implementiere den neuen Modus in `fetch_and_store()` analog zu - `CONCURRENT`, aber mit einem `threading.Lock`: +1. Ergänze in `models.py` zwei Methoden in der `POI`-Dataclass: ```python - with lock: - all_pois.extend(future.result()) + def to_row(self) -> tuple: + """POI-Objekt → DB-Zeile (Schreiben)""" + ... + + @classmethod + def from_row(cls, row: dict) -> POI: + """DB-Zeile → POI-Objekt (Lesen)""" + ... ``` -3. Ergänze `config.yaml` — setze `fetch_mode: concurrent_locked`. + Die `tags` in POI sollen als JSONB gespeichert werden — + verwende dafür `psycopg2.extras.Json`. -**Teil B — `@timer`-Decorator:** +2. Implementiere `PostgresStorage(Storage)` in `storage.py`: + - Nimmt `connection_string: str` und `table: str = "pois"` entgegen. + - `_ensure_table()`: Erstellt die Tabelle, falls sie nicht existiert + (Primary Key: `(id, poi_type)`). + - `store()`: Schreibt alle POIs per UPSERT in die Tabelle + (`ON CONFLICT DO UPDATE`). Verwende `execute_values` aus + `psycopg2.extras` für effizientes Bulk-Insert. + - Verwende `with conn:` als Context-Manager für Transaktionen + (automatisches commit/rollback). + - Für die Bildung des SQL-Statements könnt ihr gerne in den Unterrichtsunterlagen (Folien) 'spicken'. -1. Lege eine neue Datei `utils.py` an. -2. Schreibe darin einen `@timer`-Decorator, der die Laufzeit einer - Funktion misst und per `logger.info()` ausgibt. - - ```python - def timer(func): - @wraps(func) - def wrapper(*args, **kwargs): - start = time.perf_counter() - result = func(*args, **kwargs) - elapsed = time.perf_counter() - start - logger.info(f"[timer] {func.__name__} → {elapsed:.2f}s") - return result - return wrapper - ``` +3. Ergänze `build_storage()` — der `POSTGRES`-Case soll nun + `PostgresStorage(**params)` zurückgeben statt `NotImplementedError`. -3. Dekoriere `main()` in `main.py` mit `@timer`. +4. Passe `config.yaml` an: +```yaml + storage: + type: postgres + params: + connection_string: "postgresql://user:password@localhost:5432/dbname" +``` **Fragen zum Nachdenken:** -- `list.extend()` ist in CPython durch den GIL geschützt — warum - empfiehlt es sich trotzdem, einen Lock zu verwenden? \ No newline at end of file +- Warum `ON CONFLICT DO UPDATE` (UPSERT) statt einem einfachen + `INSERT` — was passiert ohne es beim zweiten Ausführen? +- Was macht `with conn:` als Context-Manager — was passiert bei + einem Fehler innerhalb des Blocks? \ No newline at end of file diff --git a/src/overpass/config.yaml b/src/overpass/config.yaml index e53e011..f531c3d 100644 --- a/src/overpass/config.yaml +++ b/src/overpass/config.yaml @@ -1,4 +1,4 @@ -fetch_mode: concurrent # serial | concurrent +fetch_mode: concurrent_locked # serial | concurrent | concurrent_locked overpass: timeout: 25 diff --git a/src/overpass/main.py b/src/overpass/main.py index 4dc1568..f7e86c9 100644 --- a/src/overpass/main.py +++ b/src/overpass/main.py @@ -5,6 +5,7 @@ from pathlib import Path from .models import PoiType from .pipeline import fetch_and_store, FetchMode from .storage import build_storage +from .utils import timer logging.basicConfig( level=logging.INFO, @@ -13,8 +14,9 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -ROOT = Path(__file__).parent.parent.parent # → project/ +ROOT = Path(__file__).parent.parent.parent +@timer def main() -> None: config = yaml.safe_load((Path(__file__).parent / "config.yaml").read_text()) timeout = config["overpass"]["timeout"] diff --git a/src/overpass/pipeline.py b/src/overpass/pipeline.py index 62fa8e0..c0fb917 100644 --- a/src/overpass/pipeline.py +++ b/src/overpass/pipeline.py @@ -1,6 +1,7 @@ import logging from concurrent.futures import ThreadPoolExecutor, as_completed from enum import StrEnum +from threading import Lock from .models import PoiType, POI from .fetcher import load_query, load_pois, OverpassApiError @@ -10,8 +11,9 @@ logger = logging.getLogger(__name__) class FetchMode(StrEnum): - SERIAL = "serial" - CONCURRENT = "concurrent" + SERIAL = "serial" + CONCURRENT = "concurrent" + CONCURRENT_LOCKED = "concurrent_locked" def fetch_and_store( @@ -47,6 +49,21 @@ def fetch_and_store( except (FileNotFoundError, OverpassApiError) as exc: logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}") + case FetchMode.CONCURRENT_LOCKED: + lock = Lock() + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(_fetch_bbox, poi_type, name, bbox, timeout, maxsize): name + for name, bbox in bboxen.items() + } + for future in as_completed(futures): + name = futures[future] + try: + with lock: + all_pois.extend(future.result()) + except (FileNotFoundError, OverpassApiError) as exc: + logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}") + if not all_pois: logger.warning(f"[{poi_type}] Nichts zu speichern") return diff --git a/src/overpass/utils.py b/src/overpass/utils.py new file mode 100644 index 0000000..2e7e4bb --- /dev/null +++ b/src/overpass/utils.py @@ -0,0 +1,16 @@ +import time +import logging +from functools import wraps + +logger = logging.getLogger(__name__) + + +def timer(func): + @wraps(func) + def wrapper(*args, **kwargs): + start = time.perf_counter() + result = func(*args, **kwargs) + elapsed = time.perf_counter() - start + logger.info(f"[timer] {func.__name__} → {elapsed:.2f}s") + return result + return wrapper \ No newline at end of file