overpass_old/worker.py
2026-04-21 22:24:56 +02:00

103 lines
3.7 KiB
Python

import logging
from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from config import BBOXEN
from overpass import fetch_overpass
from utils import timer
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!)
# ---------------------------------------------------------------------------
def fetch_fragment(args: tuple) -> tuple[str, list]:
"""
Führt einen einzelnen Overpass-Request aus.
Gibt (fragment_name, elements) zurück — oder (fragment_name, []) bei Fehler.
Wird von Pool.map() in einem separaten Prozess ausgeführt.
Logging funktioniert hier nicht zuverlässig → print() als Fallback.
"""
name, bbox, query = args
try:
result = fetch_overpass(overpass_query=query, bbox=bbox)
elements = result.get("elements", [])
print(f"[{name}] {len(elements)} Elemente gefunden")
return name, elements
except (RuntimeError, requests.Timeout) as e:
print(f"[{name}] Fehler: {e}")
return name, []
# ---------------------------------------------------------------------------
# Serielle Variante
# ---------------------------------------------------------------------------
@timer
def run_seriell(query: str) -> list:
overall = []
for name, bbox in BBOXEN.items():
logger.info(f"Seriell — Fragment {name}' ...")
try:
result = fetch_overpass(overpass_query=query, bbox=bbox)
elements = result.get("elements", [])
logger.info(f"'{name}': {len(elements)} Elemente")
overall.extend(elements)
except (RuntimeError, requests.Timeout) as e:
logger.error(f"Fehler bei '{name}': {e}")
return overall
# ---------------------------------------------------------------------------
# Parallele Variante mit Multiprocessing
# ---------------------------------------------------------------------------
@timer
def run_parallel(query: str) -> list:
tasks = [(name, bbox, query) for name, bbox in BBOXEN.items()]
with Pool(processes=len(tasks)) as pool:
results = pool.map(fetch_fragment, tasks)
overall = []
for name, elements in results:
if elements:
overall.extend(elements)
else:
logger.warning(f"Keine Elemente für Fragment '{name}'")
return overall
# ---------------------------------------------------------------------------
# Parallele Variante mit ThreadPoolExecutor
# ---------------------------------------------------------------------------
@timer
def run_threads(query: str) -> list:
overall = []
errors = []
# ThreadPoolExecutor: kein Pickle-Zwang → logging funktioniert direkt!
with ThreadPoolExecutor(max_workers=len(BBOXEN)) as executor:
# Alle Tasks auf einmal einreichen → Future-Objekte zurück
futures = {
executor.submit(fetch_overpass, query, bbox): name
for name, bbox in BBOXEN.items()
}
# as_completed() liefert Futures in der Reihenfolge, in der sie fertig werden
for future in as_completed(futures):
name = futures[future]
try:
result = future.result()
elements = result.get("elements", [])
logger.info(f"[Thread] '{name}': {len(elements)} Elemente")
overall.extend(elements)
except (RuntimeError, requests.Timeout) as e:
logger.error(f"[Thread] Fehler bei '{name}': {e}")
errors.append(name)
if errors:
logger.warning(f"Fehler in Fragmenten: {errors}")
return overall