overpass_old/main.py

122 lines
4.3 KiB
Python

import logging
from pathlib import Path
from utils import store_to_disk
from overpass import fetch_overpass
from queries.bergbahn import BERGBAHN_QUERY
from queries.restaurant import RESTAURANT_QUERY
import requests
# ---------------------------------------------------------------------------
# Logging konfigurieren
# ---------------------------------------------------------------------------
# Erinnerung: Log-Levels -> DEBUG, INFO, WARNING, ERROR, CRITICAL
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
OUTPUT_DIR = Path("results")
BBOXEN = {
"SW": (45.8, 5.9, 46.8, 8.2),
"SO": (45.8, 8.2, 46.8, 10.5),
"NW": (46.8, 5.9, 47.8, 8.2),
"NO": (46.8, 8.2, 47.8, 10.5)
}
# BBOXEN = {
# 1: (45.8, 5.9, 46.4667, 7.4333),
# 2: (45.8, 7.4333, 46.4667, 8.9667),
# 3: (45.8, 8.9667, 46.4667, 10.5),
# 4: (46.4667, 5.9, 47.1333, 7.4333),
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
# 6: (46.4667, 8.9667, 47.1333, 10.5),
# 7: (47.1333, 5.9, 47.8, 7.4333),
# 8: (47.1333, 7.4333, 47.8, 8.9667),
# 9: (47.1333, 8.9667, 47.8, 10.5)
# }
# BBOXEN = {
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
# }
QUERY = {"bergbahn": BERGBAHN_QUERY}
# ---------------------------------------------------------------------------
# Hauptlogik
# ---------------------------------------------------------------------------
def main() -> None:
overall = []
errors = []
query_name = list(QUERY.keys())[0]
for name, bbox in BBOXEN.items():
logger.info(f"Starte Abfrage für Query: {query_name}, '{name}' mit bbox={bbox}")
try:
result = fetch_overpass(overpass_query=QUERY.get(query_name,""), bbox=bbox)
except RuntimeError as e:
errors.append(name)
logger.error(f"API-Fehler bei '{name}': {e}")
continue
except requests.Timeout:
errors.append(name)
logger.error(f"Timeout bei '{name}' — bbox zu gross oder Server überlastet")
continue
elements = result.get("elements", [])
logger.info(f"'{name}': {len(elements)} Elemente gefunden")
overall.extend(elements)
logger.info(f"Total: {len(overall)} Elemente gefunden")
if errors:
logger.info(f"Fehler in Fragmenten: {errors}")
# Ergebnisse speichern
try:
saved_path = store_to_disk(
results=overall,
poi_type=query_name,
output_dir=OUTPUT_DIR,
)
logger.info(f"Ergebnisse gespeichert: {saved_path}")
except OSError as e:
logger.error(f"Fehler beim Speichern: {e}")
logger.info("Fertig.")
if __name__ == "__main__":
main()
# Was ist passiert?
# * Wir haben einen ersten Test im Modul 'tests' geschrieben
# * Der Kern von fetch_overpass() ist ein HTTP-Request — ohne Mocks lässt sich die Funktion selbst kaum sinnvoll testen,
# weil jeder Test auf die echte API angewiesen wäre (langsam, flaky, Netzwerkabhängig).
# -> deshalb war nun der erste Test (zum eigentlich weniger wichtigen) 'store_to_disk' ...!
# TASK:
# * Wir arbeiten nun alle 4,9, 16 Sequenzen hintereinander seriell ab. Wir könnten versuchen den ganzen Prozess zu
# beschleunigen und ihn parallel auszuführen...
# * Es gibt je nach Problem verschiedene Möglichkeiten unseren Code zu parallelisieren -> beide haben Vor- und Nachteile!
# - Multithreating
# - Multiprocessing
# -> wir beginnen mit dem Multiprocessing-Ansatz (multiprocessing). Dazu bauen wir wieder ein neues model namens 'worker'
# wo wir unseren Code für die Parallelisierung ablegen.