import logging from multiprocessing import Pool from concurrent.futures import ThreadPoolExecutor, as_completed import requests from config import BBOXEN from overpass import fetch_overpass from utils import timer logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!) # --------------------------------------------------------------------------- def fetch_fragment(args: tuple) -> tuple[str, list]: """ Führt einen einzelnen Overpass-Request aus. Gibt (fragment_name, elements) zurück — oder (fragment_name, []) bei Fehler. Wird von Pool.map() in einem separaten Prozess ausgeführt. Logging funktioniert hier nicht zuverlässig → print() als Fallback. """ name, bbox, query = args try: result = fetch_overpass(overpass_query=query, bbox=bbox) elements = result.get("elements", []) print(f"[{name}] {len(elements)} Elemente gefunden") return name, elements except (RuntimeError, requests.Timeout) as e: print(f"[{name}] Fehler: {e}") return name, [] # --------------------------------------------------------------------------- # Serielle Variante # --------------------------------------------------------------------------- @timer def run_seriell(query: str) -> list: overall = [] for name, bbox in BBOXEN.items(): logger.info(f"Seriell — Fragment {name}' ...") try: result = fetch_overpass(overpass_query=query, bbox=bbox) elements = result.get("elements", []) logger.info(f"'{name}': {len(elements)} Elemente") overall.extend(elements) except (RuntimeError, requests.Timeout) as e: logger.error(f"Fehler bei '{name}': {e}") return overall # --------------------------------------------------------------------------- # Parallele Variante mit Multiprocessing # --------------------------------------------------------------------------- @timer def run_parallel(query: str) -> list: tasks = [(name, bbox, query) for name, bbox in BBOXEN.items()] with Pool(processes=len(tasks)) as pool: results = pool.map(fetch_fragment, tasks) overall = [] for name, elements in results: if elements: overall.extend(elements) else: logger.warning(f"Keine Elemente für Fragment '{name}'") return overall # --------------------------------------------------------------------------- # Parallele Variante mit ThreadPoolExecutor # --------------------------------------------------------------------------- @timer def run_threads(query: str) -> list: overall = [] errors = [] # ThreadPoolExecutor: kein Pickle-Zwang → logging funktioniert direkt! with ThreadPoolExecutor(max_workers=len(BBOXEN)) as executor: # Alle Tasks auf einmal einreichen → Future-Objekte zurück futures = { executor.submit(fetch_overpass, query, bbox): name for name, bbox in BBOXEN.items() } # as_completed() liefert Futures in der Reihenfolge, in der sie fertig werden for future in as_completed(futures): name = futures[future] try: result = future.result() elements = result.get("elements", []) logger.info(f"[Thread] '{name}': {len(elements)} Elemente") overall.extend(elements) except (RuntimeError, requests.Timeout) as e: logger.error(f"[Thread] Fehler bei '{name}': {e}") errors.append(name) if errors: logger.warning(f"Fehler in Fragmenten: {errors}") return overall