Compare commits
No commits in common. "main" and "Task_3" have entirely different histories.
38
config.py
38
config.py
@ -1,38 +0,0 @@
|
|||||||
from pathlib import Path
|
|
||||||
from queries.bergbahn import BERGBAHN_QUERY
|
|
||||||
from queries.restaurant import RESTAURANT_QUERY
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Konfiguration
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
OUTPUT_DIR = Path("results")
|
|
||||||
|
|
||||||
BBOXEN = {
|
|
||||||
"SW": (45.8, 5.9, 46.8, 8.2),
|
|
||||||
"SO": (45.8, 8.2, 46.8, 10.5),
|
|
||||||
"NW": (46.8, 5.9, 47.8, 8.2),
|
|
||||||
"NO": (46.8, 8.2, 47.8, 10.5)
|
|
||||||
}
|
|
||||||
|
|
||||||
# BBOXEN = {
|
|
||||||
# 1: (45.8, 5.9, 46.4667, 7.4333),
|
|
||||||
# 2: (45.8, 7.4333, 46.4667, 8.9667),
|
|
||||||
# 3: (45.8, 8.9667, 46.4667, 10.5),
|
|
||||||
# 4: (46.4667, 5.9, 47.1333, 7.4333),
|
|
||||||
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
|
|
||||||
# 6: (46.4667, 8.9667, 47.1333, 10.5),
|
|
||||||
# 7: (47.1333, 5.9, 47.8, 7.4333),
|
|
||||||
# 8: (47.1333, 7.4333, 47.8, 8.9667),
|
|
||||||
# 9: (47.1333, 8.9667, 47.8, 10.5)
|
|
||||||
# }
|
|
||||||
|
|
||||||
# BBOXEN = {
|
|
||||||
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
|
|
||||||
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
|
|
||||||
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
|
|
||||||
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
|
|
||||||
# }
|
|
||||||
|
|
||||||
QUERY = {"bergbahn": BERGBAHN_QUERY}
|
|
||||||
163
main.py
163
main.py
@ -1,93 +1,108 @@
|
|||||||
import logging
|
import requests
|
||||||
from utils import store_to_disk
|
from pprint import pprint
|
||||||
|
|
||||||
from config import QUERY, OUTPUT_DIR
|
|
||||||
from worker import run_seriell, run_threads, run_parallel
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
|
||||||
# Logging konfigurieren
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
# Erinnerung: Log-Levels -> DEBUG, INFO, WARNING, ERROR, CRITICAL
|
BERGBAHN_QUERY = """
|
||||||
|
[out:json][timeout:3][maxsize:500000];
|
||||||
logging.basicConfig(
|
(
|
||||||
level=logging.INFO,
|
node["aerialway"="station"]({bbox});
|
||||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
way["aerialway"="station"]({bbox});
|
||||||
datefmt="%H:%M:%S",
|
node["railway"="funicular"]({bbox});
|
||||||
)
|
way["railway"="funicular"]({bbox});
|
||||||
logger = logging.getLogger(__name__)
|
node["railway"="station"]["funicular"="yes"]({bbox});
|
||||||
|
);
|
||||||
|
out center body;
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
def fetch_bergbahnen(bbox: tuple) -> dict:
|
||||||
# Hauptlogik
|
"""
|
||||||
# ---------------------------------------------------------------------------
|
Fragt die Overpass API nach Bergbahnen in der angegebenen Bounding Box ab.
|
||||||
def main() -> None:
|
|
||||||
|
|
||||||
query_name = list(QUERY.keys())[0]
|
Sendet einen HTTP-POST-Request an die Overpass API und gibt die geparste
|
||||||
query = QUERY[query_name]
|
JSON-Antwort zurück. Die Funktion prüft den HTTP-Status-Code, bevor sie
|
||||||
|
versucht die Antwort als JSON zu parsen.
|
||||||
|
|
||||||
logger.info("=== Seriell ===")
|
Args:
|
||||||
overall_s = run_seriell(query)
|
bbox (tuple): Bounding Box als 4-Tuple in Dezimalgrad:
|
||||||
|
(south, west, north, east)
|
||||||
|
Beispiel Davos: (46.72, 9.70, 46.92, 10.00)
|
||||||
|
Beispiel Schweiz: (45.8, 5.9, 47.8, 10.5)
|
||||||
|
|
||||||
logger.info("=== Multiprocessing ===")
|
Returns:
|
||||||
overall_p = run_parallel(query)
|
dict: Geparste JSON-Antwort der Overpass API. Die Antwort enthält
|
||||||
|
unter dem Schlüssel "elements" eine Liste von OSM-Objekten
|
||||||
|
(nodes und ways) mit ihren Tags und Koordinaten.
|
||||||
|
Beispiel:
|
||||||
|
{
|
||||||
|
"elements": [
|
||||||
|
{
|
||||||
|
"type": "node",
|
||||||
|
"id": 123456,
|
||||||
|
"lat": 46.8, "lon": 9.8,
|
||||||
|
"tags": {"aerialway": "station", "name": "Jakobshorn"}
|
||||||
|
},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("=== ThreadPoolExecutor ===")
|
Raises:
|
||||||
overall_t = run_threads(query)
|
RuntimeError: Wenn die API einen HTTP-Fehlercode zurückgibt
|
||||||
|
(z.B. 429 Too Many Requests, 504 Gateway Timeout).
|
||||||
|
RuntimeError: Wenn der Response-Body kein gültiges JSON enthält
|
||||||
|
(z.B. bei leerem Body nach einem Server-Timeout).
|
||||||
|
requests.Timeout: Wenn die API nicht innerhalb einer bestimmten Zeit
|
||||||
|
(timeout) antwortet.
|
||||||
|
"""
|
||||||
|
bbox_str = ",".join(map(str, bbox))
|
||||||
|
query = BERGBAHN_QUERY.format(bbox=bbox_str)
|
||||||
|
|
||||||
try:
|
resp = requests.post(
|
||||||
saved_path = store_to_disk(
|
OVERPASS_URL,
|
||||||
results=overall_p,
|
data={"data": query},
|
||||||
poi_type=query_name,
|
timeout=10, # clientseitiges timeout ist nicht serverseitiges timeout (das ist im query selbst auf z.B. 3 Sekunden gestellt)
|
||||||
output_dir=OUTPUT_DIR,
|
headers={"User-Agent": "GeoService/1.0 (poi-generator)"},
|
||||||
)
|
)
|
||||||
logger.info(f"Ergebnisse gespeichert: {saved_path}")
|
|
||||||
except OSError as e:
|
if resp.status_code != 200:
|
||||||
logger.error(f"Fehler beim Speichern:{e}")
|
raise RuntimeError(
|
||||||
logger.info("Fertig.")
|
f"Overpass API Fehler: {resp.status_code}\n{resp.text}"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
data = resp.json()
|
||||||
|
except requests.exceptions.JSONDecodeError:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Antwort ist kein gültiges JSON:\n{resp.text[:200]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# HTTP 200 bedeutet nicht immer Erfolg — Overpass meldet Fehler im Body unter 'remark'!
|
||||||
|
if "remark" in data:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Overpass Query-Fehler: {data['remark']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
|
||||||
|
|
||||||
|
bbox = (46.72, 9.70, 46.92, 10.00)
|
||||||
|
# bbox = (45.8, 5.9, 47.8, 10.5)
|
||||||
|
result = fetch_bergbahnen(bbox)
|
||||||
|
pprint(result)
|
||||||
|
|
||||||
# Was ist passiert?
|
# Was ist passiert?
|
||||||
# * Wir haben zusätzlich Multithreating-Code implementiert
|
# Wir haben verschiedene Fehler (verschiedene Ebenen) abgefangen:
|
||||||
# * Die Dekorator-Funktion in utils.py (timer) stoppt und logt die Zeit der dekorierten Funktionen, ohne deren Code
|
# Ebene 1 — Netzwerk: requests.Timeout → Client wartet vergebens
|
||||||
# zu verändern. Das ermöglicht uns einen einfachen Zeitvergleich zwischen den einzelnen Funktionen
|
# Ebene 2 — HTTP: status_code != 200 → Server meldet Fehler
|
||||||
# * Auslagerung von BBOXEN, OUTPUT_DIR, QUERY nach config.py, weil sie sowohl in main.py als auch in worker.py gebraucht
|
# Ebene 3 — Fachlich: "remark" im JSON-Body → Query lief, aber mit Fehler
|
||||||
# werden. Wären sie in main.poy verblieben, hätten wir Probleme mit einem circular-Import bekommen...
|
|
||||||
|
|
||||||
# Erkenntnisse:
|
|
||||||
|
|
||||||
# Programmfluss:
|
|
||||||
# main() — läuft immer sequenziell
|
|
||||||
# │
|
|
||||||
# ├── run_seriell()
|
|
||||||
# │ ├── fetch SW ──► wartet
|
|
||||||
# │ ├── fetch SO ──► wartet
|
|
||||||
# │ ├── fetch NW ──► wartet
|
|
||||||
# │ └── fetch NO ──► wartet → return → main() macht weiter
|
|
||||||
# │
|
|
||||||
# ├── run_parallel()
|
|
||||||
# │ ├── fetch SW ─┐
|
|
||||||
# │ ├── fetch SO ├─ gleichzeitig
|
|
||||||
# │ ├── fetch NW │ in Prozessen
|
|
||||||
# │ └── fetch NO ─┘
|
|
||||||
# │ Pool.map() blockiert bis ALLE fertig → return → main() macht weiter
|
|
||||||
# │
|
|
||||||
# ├── run_threads()
|
|
||||||
# │ ├── fetch SW ─┐
|
|
||||||
# │ ├── fetch SO ├─ gleichzeitig
|
|
||||||
# │ ├── fetch NW │ in Threads
|
|
||||||
# │ └── fetch NO ─┘
|
|
||||||
# │ as_completed() blockiert bis ALLE fertig → return → main() macht weiter
|
|
||||||
# │
|
|
||||||
# └── store_to_disk() ← erst hier, garantiert
|
|
||||||
|
|
||||||
|
|
||||||
# TASK:
|
# TASK:
|
||||||
|
# * verlagert als nächstes die Logik von 'fetch_bergbahnen' in ein eigenes Modul 'overpass.py' aus.
|
||||||
# * Bis jetzt speichern wir die Resultate als .json-File auf unserer Festplatte. Als nächstes wollen wir
|
# * nennt die Funktion allgemeiner 'fetch_overpass' (anstelle fetch_bergbahnen) -> somit würde es Sinn
|
||||||
# die Resultate in einer sqlite-Datenbank ablegen
|
# machen, wenn wir den Query der Funktion als Argument mitgeben könnten (ist genereller).
|
||||||
|
# * Erstellt in diesem main.py eine eigene 'main-Funktion', welche nur die Hauptlogik beinhalten und somit 'fetch_overpass'
|
||||||
|
# importiert und aufruft.
|
||||||
97
overpass.py
97
overpass.py
@ -1,97 +0,0 @@
|
|||||||
import requests
|
|
||||||
from pprint import pprint
|
|
||||||
|
|
||||||
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
|
|
||||||
|
|
||||||
def fetch_overpass(overpass_query: str, bbox: tuple) -> dict:
|
|
||||||
""" Fragt die Overpass API mit einem beliebigen Query nach POIs in der angegebenen Bounding Box ab.
|
|
||||||
|
|
||||||
Sendet einen HTTP-POST-Request an die Overpass API und gibt die geparste
|
|
||||||
JSON-Antwort zurück. Die Funktion prüft den HTTP-Status-Code, einen
|
|
||||||
allfälligen Fehler im Response-Body sowie die JSON-Struktur der Antwort.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
overpass_query (str): Overpass-QL-Query mit dem Platzhalter {bbox}.
|
|
||||||
Beispiel:
|
|
||||||
'[out:json][timeout:5];
|
|
||||||
(node["aerialway"="station"]({bbox}););
|
|
||||||
out center body;'
|
|
||||||
bbox (tuple): Bounding Box als 4-Tuple in Dezimalgrad:
|
|
||||||
(south, west, north, east)
|
|
||||||
Beispiel Davos: (46.72, 9.70, 46.92, 10.00)
|
|
||||||
Beispiel Schweiz: (45.8, 5.9, 47.8, 10.5)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Geparste JSON-Antwort der Overpass API. Die Antwort enthält
|
|
||||||
unter dem Schlüssel "elements" eine Liste von OSM-Objekten
|
|
||||||
(nodes und ways) mit ihren Tags und Koordinaten.
|
|
||||||
Beispiel:
|
|
||||||
{
|
|
||||||
"elements": [
|
|
||||||
{
|
|
||||||
"type": "node",
|
|
||||||
"id": 123456,
|
|
||||||
"lat": 46.8, "lon": 9.8,
|
|
||||||
"tags": {"aerialway": "station", "name": "Jakobshorn"}
|
|
||||||
},
|
|
||||||
...
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
RuntimeError: Wenn die API einen HTTP-Fehlercode zurückgibt
|
|
||||||
(z.B. 429 Too Many Requests, 504 Gateway Timeout).
|
|
||||||
RuntimeError: Wenn der Response-Body kein gültiges JSON enthält
|
|
||||||
(z.B. bei leerem Body nach einem Server-Timeout).
|
|
||||||
RuntimeError: Wenn Overpass einen fachlichen Fehler meldet
|
|
||||||
(HTTP 200, aber "remark" im Body, z.B. bei
|
|
||||||
Speicher- oder Timeout-Überschreitung).
|
|
||||||
requests.Timeout: Wenn die API nicht innerhalb des Client-Timeouts
|
|
||||||
antwortet.
|
|
||||||
"""
|
|
||||||
bbox_str = ",".join(map(str, bbox))
|
|
||||||
query = overpass_query.format(bbox=bbox_str)
|
|
||||||
resp = requests.post(
|
|
||||||
OVERPASS_URL,
|
|
||||||
data={"data": query},
|
|
||||||
timeout=40, # clientseitiges timeout ist nicht serverseitiges timeout (das ist im query selbst auf z.B. 3 Sekunden gestellt)
|
|
||||||
headers={"User-Agent": "GeoService/1.0 (poi-generator)"},
|
|
||||||
)
|
|
||||||
if resp.status_code != 200:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Overpass API Fehler: {resp.status_code}\n{resp.text}"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
data = resp.json()
|
|
||||||
except requests.exceptions.JSONDecodeError:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Antwort ist kein gültiges JSON:\n{resp.text[:200]}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# HTTP 200 bedeutet nicht immer Erfolg — Overpass meldet Fehler im Body!
|
|
||||||
if "remark" in data:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Overpass Query-Fehler: {data['remark']}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
|
|
||||||
BERGBAHN_QUERY = """
|
|
||||||
[out:json][timeout:2][maxsize:500000];
|
|
||||||
(
|
|
||||||
node["aerialway"="station"]({bbox});
|
|
||||||
way["aerialway"="station"]({bbox});
|
|
||||||
node["railway"="funicular"]({bbox});
|
|
||||||
way["railway"="funicular"]({bbox});
|
|
||||||
node["railway"="station"]["funicular"="yes"]({bbox});
|
|
||||||
);
|
|
||||||
out center body;
|
|
||||||
"""
|
|
||||||
|
|
||||||
bbox = (46.72, 9.70, 46.92, 10.00)
|
|
||||||
# bbox = (45.8, 5.9, 47.8, 10.5)
|
|
||||||
result = fetch_overpass(overpass_query=BERGBAHN_QUERY, bbox=bbox)
|
|
||||||
pprint(result)
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,12 +0,0 @@
|
|||||||
|
|
||||||
BERGBAHN_QUERY = """
|
|
||||||
[out:json][timeout:10][maxsize:500000];
|
|
||||||
(
|
|
||||||
node["aerialway"="station"]({bbox});
|
|
||||||
way["aerialway"="station"]({bbox});
|
|
||||||
node["railway"="funicular"]({bbox});
|
|
||||||
way["railway"="funicular"]({bbox});
|
|
||||||
node["railway"="station"]["funicular"="yes"]({bbox});
|
|
||||||
);
|
|
||||||
out center body;
|
|
||||||
"""
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
|
|
||||||
RESTAURANT_QUERY = """
|
|
||||||
[out:json][timeout:10][maxsize:500000];
|
|
||||||
(
|
|
||||||
node["amenity"="restaurant"]({bbox});
|
|
||||||
way["amenity"="restaurant"]({bbox});
|
|
||||||
node["amenity"="cafe"]({bbox});
|
|
||||||
way["amenity"="cafe"]({bbox});
|
|
||||||
);
|
|
||||||
out center body;
|
|
||||||
"""
|
|
||||||
Binary file not shown.
@ -1,33 +0,0 @@
|
|||||||
import pytest
|
|
||||||
from utils import store_to_disk
|
|
||||||
|
|
||||||
# tmp_path ist ein pytest-Fixture — d.h. pytest stellt ihn automatisch bereit, ohne dass man ihn importieren muss.
|
|
||||||
# pytest startet
|
|
||||||
# → sieht Parameter "tmp_path" in der Funktionssignatur
|
|
||||||
# → erstellt automatisch ein temporäres Verzeichnis (z.B. /tmp/pytest-123/test_store_0/)
|
|
||||||
# → übergibt es als Path-Objekt an die Testfunktion
|
|
||||||
# → löscht es nach dem Test wieder
|
|
||||||
|
|
||||||
def test_store_to_disk_creates_file(tmp_path):
|
|
||||||
"""Stellt sicher, dass store_to_disk eine lesbare JSON-Datei erstellt."""
|
|
||||||
import json
|
|
||||||
|
|
||||||
elements = [
|
|
||||||
{"type": "node", "id": 1, "lat": 46.1835291, "lon": 6.8346732, "tags": {"name": "Jakobshorn"}},
|
|
||||||
{"type": "way", "id": 2, "lat": 46.1772269, "lon": 6.8402226, "tags": {"name": "Parsenn"}},
|
|
||||||
]
|
|
||||||
|
|
||||||
saved_path = store_to_disk(
|
|
||||||
results=elements,
|
|
||||||
poi_type="bergbahn",
|
|
||||||
output_dir=tmp_path,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert saved_path.exists()
|
|
||||||
assert saved_path.name == "bergbahn_results.json"
|
|
||||||
|
|
||||||
with saved_path.open(encoding="utf-8") as f:
|
|
||||||
loaded = json.load(f)
|
|
||||||
|
|
||||||
assert loaded == elements # Inhalt identisch
|
|
||||||
assert len(loaded) == 2
|
|
||||||
65
utils.py
65
utils.py
@ -1,65 +0,0 @@
|
|||||||
import json
|
|
||||||
import time
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
|
||||||
from functools import wraps
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def store_to_disk(
|
|
||||||
results: list[dict],
|
|
||||||
poi_type: str = "overpass",
|
|
||||||
output_dir: Path = Path("."),
|
|
||||||
) -> Path:
|
|
||||||
"""
|
|
||||||
Speichert eine Liste von OSM-Elementen als JSON-Datei auf der Festplatte.
|
|
||||||
|
|
||||||
Der Dateiname wird aus dem poi_type-Parameter abgeleitet:
|
|
||||||
z.B. poi_type="bergbahn" → "bergbahn_results.json"
|
|
||||||
|
|
||||||
Args:
|
|
||||||
results (list[dict]): Liste von OSM-Elementen, wie sie die
|
|
||||||
Overpass API unter "elements" zurückgibt.
|
|
||||||
poi_type (str): Bezeichnung des POI-Typs. Bestimmt den
|
|
||||||
Dateinamen. Standard: "overpass"
|
|
||||||
output_dir (Path): Zielverzeichnis. Wird erstellt falls nicht
|
|
||||||
vorhanden. Standard: aktuelles Verzeichnis.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Path: Absoluter Pfad zur gespeicherten Datei.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
OSError: Wenn das Verzeichnis nicht erstellt oder die Datei nicht
|
|
||||||
geschrieben werden kann (z.B. fehlende Schreibrechte).
|
|
||||||
"""
|
|
||||||
output_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
output_path = output_dir / f"{poi_type}_results.json"
|
|
||||||
|
|
||||||
with output_path.open("w", encoding="utf-8") as f:
|
|
||||||
json.dump(results, f, indent=2, ensure_ascii=False)
|
|
||||||
|
|
||||||
return output_path.resolve()
|
|
||||||
|
|
||||||
|
|
||||||
def timer(func):
|
|
||||||
"""
|
|
||||||
Decorator der die Ausführungszeit einer Funktion misst und loggt.
|
|
||||||
|
|
||||||
Verwendung:
|
|
||||||
@timer
|
|
||||||
def meine_funktion():
|
|
||||||
...
|
|
||||||
"""
|
|
||||||
@wraps(func) # erhält __name__, __doc__ der originalen Funktion
|
|
||||||
def wrapper(*args, **kwargs):
|
|
||||||
start = time.perf_counter()
|
|
||||||
result = func(*args, **kwargs)
|
|
||||||
elapsed = time.perf_counter() - start
|
|
||||||
logger.info(f"{func.__name__}() dauerte {round(elapsed, 2)} Sekunden\n\n")
|
|
||||||
return result
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
# @wraps(func) — ohne diesen Decorator würde run_seriell.__name__ den Namen "wrapper" zurückgeben statt
|
|
||||||
# "run_seriell", was den Log-Output unbrauchbar macht
|
|
||||||
103
worker.py
103
worker.py
@ -1,103 +0,0 @@
|
|||||||
import logging
|
|
||||||
from multiprocessing import Pool
|
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
||||||
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from config import BBOXEN
|
|
||||||
from overpass import fetch_overpass
|
|
||||||
from utils import timer
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!)
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
def fetch_fragment(args: tuple) -> tuple[str, list]:
|
|
||||||
"""
|
|
||||||
Führt einen einzelnen Overpass-Request aus.
|
|
||||||
Gibt (fragment_name, elements) zurück — oder (fragment_name, []) bei Fehler.
|
|
||||||
|
|
||||||
Wird von Pool.map() in einem separaten Prozess ausgeführt.
|
|
||||||
Logging funktioniert hier nicht zuverlässig → print() als Fallback.
|
|
||||||
"""
|
|
||||||
name, bbox, query = args
|
|
||||||
try:
|
|
||||||
result = fetch_overpass(overpass_query=query, bbox=bbox)
|
|
||||||
elements = result.get("elements", [])
|
|
||||||
print(f"[{name}] {len(elements)} Elemente gefunden")
|
|
||||||
return name, elements
|
|
||||||
except (RuntimeError, requests.Timeout) as e:
|
|
||||||
print(f"[{name}] Fehler: {e}")
|
|
||||||
return name, []
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Serielle Variante
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
@timer
|
|
||||||
def run_seriell(query: str) -> list:
|
|
||||||
overall = []
|
|
||||||
for name, bbox in BBOXEN.items():
|
|
||||||
logger.info(f"Seriell — Fragment {name}' ...")
|
|
||||||
try:
|
|
||||||
result = fetch_overpass(overpass_query=query, bbox=bbox)
|
|
||||||
elements = result.get("elements", [])
|
|
||||||
logger.info(f"'{name}': {len(elements)} Elemente")
|
|
||||||
overall.extend(elements)
|
|
||||||
except (RuntimeError, requests.Timeout) as e:
|
|
||||||
logger.error(f"Fehler bei '{name}': {e}")
|
|
||||||
return overall
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Parallele Variante mit Multiprocessing
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
@timer
|
|
||||||
def run_parallel(query: str) -> list:
|
|
||||||
tasks = [(name, bbox, query) for name, bbox in BBOXEN.items()]
|
|
||||||
with Pool(processes=len(tasks)) as pool:
|
|
||||||
results = pool.map(fetch_fragment, tasks)
|
|
||||||
overall = []
|
|
||||||
for name, elements in results:
|
|
||||||
if elements:
|
|
||||||
overall.extend(elements)
|
|
||||||
else:
|
|
||||||
logger.warning(f"Keine Elemente für Fragment '{name}'")
|
|
||||||
return overall
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Parallele Variante mit ThreadPoolExecutor
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
@timer
|
|
||||||
def run_threads(query: str) -> list:
|
|
||||||
overall = []
|
|
||||||
errors = []
|
|
||||||
|
|
||||||
# ThreadPoolExecutor: kein Pickle-Zwang → logging funktioniert direkt!
|
|
||||||
with ThreadPoolExecutor(max_workers=len(BBOXEN)) as executor:
|
|
||||||
|
|
||||||
# Alle Tasks auf einmal einreichen → Future-Objekte zurück
|
|
||||||
futures = {
|
|
||||||
executor.submit(fetch_overpass, query, bbox): name
|
|
||||||
for name, bbox in BBOXEN.items()
|
|
||||||
}
|
|
||||||
|
|
||||||
# as_completed() liefert Futures in der Reihenfolge, in der sie fertig werden
|
|
||||||
for future in as_completed(futures):
|
|
||||||
name = futures[future]
|
|
||||||
try:
|
|
||||||
result = future.result()
|
|
||||||
elements = result.get("elements", [])
|
|
||||||
logger.info(f"[Thread] '{name}': {len(elements)} Elemente")
|
|
||||||
overall.extend(elements)
|
|
||||||
except (RuntimeError, requests.Timeout) as e:
|
|
||||||
logger.error(f"[Thread] Fehler bei '{name}': {e}")
|
|
||||||
errors.append(name)
|
|
||||||
|
|
||||||
if errors:
|
|
||||||
logger.warning(f"Fehler in Fragmenten: {errors}")
|
|
||||||
return overall
|
|
||||||
Loading…
x
Reference in New Issue
Block a user