import logging from pathlib import Path from utils import store_to_disk from overpass import fetch_overpass from queries.bergbahn import BERGBAHN_QUERY from queries.restaurant import RESTAURANT_QUERY import requests # --------------------------------------------------------------------------- # Logging konfigurieren # --------------------------------------------------------------------------- # Erinnerung: Log-Levels -> DEBUG, INFO, WARNING, ERROR, CRITICAL logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Konfiguration # --------------------------------------------------------------------------- OUTPUT_DIR = Path("results") BBOXEN = { "SW": (45.8, 5.9, 46.8, 8.2), "SO": (45.8, 8.2, 46.8, 10.5), "NW": (46.8, 5.9, 47.8, 8.2), "NO": (46.8, 8.2, 47.8, 10.5) } # BBOXEN = { # 1: (45.8, 5.9, 46.4667, 7.4333), # 2: (45.8, 7.4333, 46.4667, 8.9667), # 3: (45.8, 8.9667, 46.4667, 10.5), # 4: (46.4667, 5.9, 47.1333, 7.4333), # 5: (46.4667, 7.4333, 47.1333, 8.9667), # 6: (46.4667, 8.9667, 47.1333, 10.5), # 7: (47.1333, 5.9, 47.8, 7.4333), # 8: (47.1333, 7.4333, 47.8, 8.9667), # 9: (47.1333, 8.9667, 47.8, 10.5) # } # BBOXEN = { # 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5), # 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5), # 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5), # 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5) # } QUERY = {"bergbahn": BERGBAHN_QUERY} # --------------------------------------------------------------------------- # Hauptlogik # --------------------------------------------------------------------------- def main() -> None: overall = [] errors = [] query_name = list(QUERY.keys())[0] for name, bbox in BBOXEN.items(): logger.info(f"Starte Abfrage für Query: {query_name}, '{name}' mit bbox={bbox}") try: result = fetch_overpass(overpass_query=QUERY.get(query_name,""), bbox=bbox) except RuntimeError as e: errors.append(name) logger.error(f"API-Fehler bei '{name}': {e}") continue except requests.Timeout: errors.append(name) logger.error(f"Timeout bei '{name}' — bbox zu gross oder Server überlastet") continue elements = result.get("elements", []) logger.info(f"'{name}': {len(elements)} Elemente gefunden") overall.extend(elements) logger.info(f"Total: {len(overall)} Elemente gefunden") if errors: logger.info(f"Fehler in Fragmenten: {errors}") # Ergebnisse speichern try: saved_path = store_to_disk( results=overall, poi_type=query_name, output_dir=OUTPUT_DIR, ) logger.info(f"Ergebnisse gespeichert: {saved_path}") except OSError as e: logger.error(f"Fehler beim Speichern: {e}") logger.info("Fertig.") if __name__ == "__main__": main() # Was ist passiert? # * Wir haben einen ersten Test im Modul 'tests' geschrieben # * Der Kern von fetch_overpass() ist ein HTTP-Request — ohne Mocks lässt sich die Funktion selbst kaum sinnvoll testen, # weil jeder Test auf die echte API angewiesen wäre (langsam, flaky, Netzwerkabhängig). # -> deshalb war nun der erste Test (zum eigentlich weniger wichtigen) 'store_to_disk' ...! # TASK: # * Wir arbeiten nun alle 4,9, 16 Sequenzen hintereinander seriell ab. Wir könnten versuchen den ganzen Prozess zu # beschleunigen und ihn parallel auszuführen... # * Es gibt je nach Problem verschiedene Möglichkeiten unseren Code zu parallelisieren -> beide haben Vor- und Nachteile! # - Multithreating # - Multiprocessing # -> wir beginnen mit dem Multiprocessing-Ansatz (multiprocessing). Dazu bauen wir wieder ein neues model namens 'worker' # wo wir unseren Code für die Parallelisierung ablegen.