Task_17: PostgresStorage

This commit is contained in:
Marco Schmid 2026-05-13 08:58:25 +02:00
parent 6f5867f085
commit d154b15280
5 changed files with 98 additions and 53 deletions

108
TASK.md
View File

@ -1,66 +1,76 @@
# Task 16 — Lock und @timer-Decorator
# Task 17 — PostgresStorage
## Rückblick Task 15: Concurrency
## Rückblick Task 16: Lock und @timer-Decorator
Ihr habt `ThreadPoolExecutor` und `as_completed` eingeführt. Die wichtigsten Punkte:
Ihr habt `CONCURRENT_LOCKED` und den `@timer`-Decorator eingeführt.
Die wichtigsten Punkte:
- **I/O-bound vs. CPU-bound:** Overpass-Requests sind I/O-bound — die CPU wartet
auf die Netzwerkantwort. Threads sind dafür ideal, weil Python während des
Wartens (I/O) den GIL freigibt und andere Threads laufen lässt. Bei CPU-bound
Tasks (z.B. Bildverarbeitung, ML-Training) hilft Threading nicht —
dort braucht man `multiprocessing`.
- **`executor.map()` vs. `as_completed()`:** `map()` ist einfacher, liefert
Ergebnisse aber in der Reihenfolge der Inputs — auch wenn spätere Futures
früher fertig sind. `as_completed()` liefert Ergebnisse sobald sie fertig
sind, was bei unterschiedlichen Antwortzeiten effizienter ist und
pro Future individuelles Error-Handling erlaubt.
- **`all_pois.extend()` aus mehreren Threads:** In Python ist `list.extend()`
durch den GIL (Global Interpreter Lock) de facto atomar für einfache
Operationen — ein echter Race Condition-Crash ist unwahrscheinlich. Aber:
die **Reihenfolge** der Ergebnisse ist nicht deterministisch, und bei
komplexeren Operationen (read-modify-write) wäre ein Lock nötig.
- **Warum Lock, obwohl der GIL schützt?** Der GIL verhindert echte
Parallelität auf CPU-Ebene, aber er gibt keine Garantien über die
Reihenfolge von Operationen oder die Konsistenz komplexerer
Datenstrukturen. Ein expliziter `Lock` macht die Absicht klar,
ist portabel (auch ohne GIL, z.B. in PyPy oder zukünftigen
Python-Versionen) und schützt bei read-modify-write-Operationen
zuverlässig.
- **`time.perf_counter()`** ist präziser als `time.time()` für
Laufzeitmessungen, weil er eine hochauflösende Systemuhr
verwendet und nicht durch Systemzeit-Anpassungen beeinflusst wird.
## Aufgabe
Zwei Erweiterungen stehen an — eine zur Illustration von Thread-Safety,
eine zur Laufzeitmessung.
Bisher speichern wir POIs in JSON-Dateien. Für grössere Datenmengen
und spätere Abfragen ist eine Datenbank besser geeignet. Ziel ist es,
`PostgresStorage` als zweites konkretes Backend zu implementieren —
ohne `main.py`, `pipeline.py` oder `fetcher.py` anzufassen.
**Teil A — `FetchMode.CONCURRENT_LOCKED`:**
**Vorbereitung:**
- Stelle sicher, dass eine PostgreSQL-Datenbank läuft und erreichbar ist.
- Erstelle in der PostgreSQL-Datenbank eine neue Datenbank. Das kannst Du z.B. in PGAdmin oder aus dem Terminal erreichen.
Gebt der neuen Datenbank einen Namen (z.B. overpass) und ordnet ihr einen Nutzer (z.B. postgres) inkl. Passwort zu.
- Installiere `psycopg2`: `pip install psycopg2-binary`
- Ergänze den Connection-String in `config.yaml` ("postgresql://user:password@localhost:5432/dbname").
Der bisherige `CONCURRENT`-Modus sammelt Ergebnisse ohne explizite
Synchronisation. Füge einen dritten Modus hinzu, der zeigt, wie man
`all_pois.extend()` mit einem `Lock` absichert.
**Konkret:**
1. Ergänze `FetchMode` um `CONCURRENT_LOCKED = "concurrent_locked"`.
2. Implementiere den neuen Modus in `fetch_and_store()` analog zu
`CONCURRENT`, aber mit einem `threading.Lock`:
1. Ergänze in `models.py` zwei Methoden in der `POI`-Dataclass:
```python
with lock:
all_pois.extend(future.result())
def to_row(self) -> tuple:
"""POI-Objekt → DB-Zeile (Schreiben)"""
...
@classmethod
def from_row(cls, row: dict) -> POI:
"""DB-Zeile → POI-Objekt (Lesen)"""
...
```
3. Ergänze `config.yaml` — setze `fetch_mode: concurrent_locked`.
Die `tags` in POI sollen als JSONB gespeichert werden —
verwende dafür `psycopg2.extras.Json`.
**Teil B — `@timer`-Decorator:**
2. Implementiere `PostgresStorage(Storage)` in `storage.py`:
- Nimmt `connection_string: str` und `table: str = "pois"` entgegen.
- `_ensure_table()`: Erstellt die Tabelle, falls sie nicht existiert
(Primary Key: `(id, poi_type)`).
- `store()`: Schreibt alle POIs per UPSERT in die Tabelle
(`ON CONFLICT DO UPDATE`). Verwende `execute_values` aus
`psycopg2.extras` für effizientes Bulk-Insert.
- Verwende `with conn:` als Context-Manager für Transaktionen
(automatisches commit/rollback).
- Für die Bildung des SQL-Statements könnt ihr gerne in den Unterrichtsunterlagen (Folien) 'spicken'.
1. Lege eine neue Datei `utils.py` an.
2. Schreibe darin einen `@timer`-Decorator, der die Laufzeit einer
Funktion misst und per `logger.info()` ausgibt.
```python
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.info(f"[timer] {func.__name__} → {elapsed:.2f}s")
return result
return wrapper
```
3. Ergänze `build_storage()` — der `POSTGRES`-Case soll nun
`PostgresStorage(**params)` zurückgeben statt `NotImplementedError`.
3. Dekoriere `main()` in `main.py` mit `@timer`.
4. Passe `config.yaml` an:
```yaml
storage:
type: postgres
params:
connection_string: "postgresql://user:password@localhost:5432/dbname"
```
**Fragen zum Nachdenken:**
- `list.extend()` ist in CPython durch den GIL geschützt — warum
empfiehlt es sich trotzdem, einen Lock zu verwenden?
- Warum `ON CONFLICT DO UPDATE` (UPSERT) statt einem einfachen
`INSERT` — was passiert ohne es beim zweiten Ausführen?
- Was macht `with conn:` als Context-Manager — was passiert bei
einem Fehler innerhalb des Blocks?

View File

@ -1,4 +1,4 @@
fetch_mode: concurrent # serial | concurrent
fetch_mode: concurrent_locked # serial | concurrent | concurrent_locked
overpass:
timeout: 25

View File

@ -5,6 +5,7 @@ from pathlib import Path
from .models import PoiType
from .pipeline import fetch_and_store, FetchMode
from .storage import build_storage
from .utils import timer
logging.basicConfig(
level=logging.INFO,
@ -13,8 +14,9 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
ROOT = Path(__file__).parent.parent.parent # → project/
ROOT = Path(__file__).parent.parent.parent
@timer
def main() -> None:
config = yaml.safe_load((Path(__file__).parent / "config.yaml").read_text())
timeout = config["overpass"]["timeout"]

View File

@ -1,6 +1,7 @@
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from enum import StrEnum
from threading import Lock
from .models import PoiType, POI
from .fetcher import load_query, load_pois, OverpassApiError
@ -10,8 +11,9 @@ logger = logging.getLogger(__name__)
class FetchMode(StrEnum):
SERIAL = "serial"
CONCURRENT = "concurrent"
SERIAL = "serial"
CONCURRENT = "concurrent"
CONCURRENT_LOCKED = "concurrent_locked"
def fetch_and_store(
@ -47,6 +49,21 @@ def fetch_and_store(
except (FileNotFoundError, OverpassApiError) as exc:
logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}")
case FetchMode.CONCURRENT_LOCKED:
lock = Lock()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_fetch_bbox, poi_type, name, bbox, timeout, maxsize): name
for name, bbox in bboxen.items()
}
for future in as_completed(futures):
name = futures[future]
try:
with lock:
all_pois.extend(future.result())
except (FileNotFoundError, OverpassApiError) as exc:
logger.error(f"[{poi_type}] Fehler bei '{name}': {exc}")
if not all_pois:
logger.warning(f"[{poi_type}] Nichts zu speichern")
return

16
src/overpass/utils.py Normal file
View File

@ -0,0 +1,16 @@
import time
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.info(f"[timer] {func.__name__}{elapsed:.2f}s")
return result
return wrapper