import logging from pathlib import Path from utils import store_to_disk from worker import fetch_fragment from multiprocessing import Pool from queries.bergbahn import BERGBAHN_QUERY from queries.restaurant import RESTAURANT_QUERY # --------------------------------------------------------------------------- # Logging konfigurieren # --------------------------------------------------------------------------- # Erinnerung: Log-Levels -> DEBUG, INFO, WARNING, ERROR, CRITICAL logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Konfiguration # --------------------------------------------------------------------------- OUTPUT_DIR = Path("results") BBOXEN = { "SW": (45.8, 5.9, 46.8, 8.2), "SO": (45.8, 8.2, 46.8, 10.5), "NW": (46.8, 5.9, 47.8, 8.2), "NO": (46.8, 8.2, 47.8, 10.5) } # BBOXEN = { # 1: (45.8, 5.9, 46.4667, 7.4333), # 2: (45.8, 7.4333, 46.4667, 8.9667), # 3: (45.8, 8.9667, 46.4667, 10.5), # 4: (46.4667, 5.9, 47.1333, 7.4333), # 5: (46.4667, 7.4333, 47.1333, 8.9667), # 6: (46.4667, 8.9667, 47.1333, 10.5), # 7: (47.1333, 5.9, 47.8, 7.4333), # 8: (47.1333, 7.4333, 47.8, 8.9667), # 9: (47.1333, 8.9667, 47.8, 10.5) # } # BBOXEN = { # 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5), # 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5), # 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5), # 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5) # } QUERY = {"bergbahn": BERGBAHN_QUERY} # --------------------------------------------------------------------------- # Hauptlogik # --------------------------------------------------------------------------- def main() -> None: query_name = list(QUERY.keys())[0] query = QUERY[query_name] # Argumente für jeden Worker vorbereiten tasks = [ (name, bbox, query) for name, bbox in BBOXEN.items() ] logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...") # blockiert bis alle Prozesse fertig sind with Pool(processes=len(tasks)) as pool: results = pool.map(fetch_fragment, tasks) # Ergebnisse zusammenführen overall = [] errors = [] for name, elements in results: if elements: overall.extend(elements) else: errors.append(name) logger.info(f"Total:{len(overall)} Elemente gefunden") if errors: logger.warning(f"Fehler in Fragmenten: {errors}") try: saved_path = store_to_disk( results=overall, poi_type=query_name, output_dir=OUTPUT_DIR, ) logger.info(f"Ergebnisse gespeichert: {saved_path}") except OSError as e: logger.error(f"Fehler beim Speichern: {e}") logger.info("Fertig.") if __name__ == "__main__": main() # Was ist passiert? # * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt # Erkenntnisse: # - Warum print() im Worker statt logging? # python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt → # logger.info() im Worker-Prozess schweigt einfach # print() funktioniert immer, ist aber nicht ideal für Produktion # - Warum muss fetch_fragment auf Modul-Ebene stehen? # python# multiprocessing serialisiert Funktionen mit pickle # Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError # Modul-Ebene = pickle-bar # - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er # ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an # den neuen Prozess: # Pool.map(fetch_fragment, tasks) # → pickle(fetch_fragment) ──► unpickle(fetch_fragment) # TASK: # * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul) # * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion) # * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf # und vergleichen die benötigte Zeit