overpass_old/main.py

134 lines
4.6 KiB
Python

import logging
from pathlib import Path
from utils import store_to_disk
from worker import fetch_fragment
from multiprocessing import Pool
from queries.bergbahn import BERGBAHN_QUERY
from queries.restaurant import RESTAURANT_QUERY
# ---------------------------------------------------------------------------
# Logging konfigurieren
# ---------------------------------------------------------------------------
# Erinnerung: Log-Levels -> DEBUG, INFO, WARNING, ERROR, CRITICAL
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
OUTPUT_DIR = Path("results")
BBOXEN = {
"SW": (45.8, 5.9, 46.8, 8.2),
"SO": (45.8, 8.2, 46.8, 10.5),
"NW": (46.8, 5.9, 47.8, 8.2),
"NO": (46.8, 8.2, 47.8, 10.5)
}
# BBOXEN = {
# 1: (45.8, 5.9, 46.4667, 7.4333),
# 2: (45.8, 7.4333, 46.4667, 8.9667),
# 3: (45.8, 8.9667, 46.4667, 10.5),
# 4: (46.4667, 5.9, 47.1333, 7.4333),
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
# 6: (46.4667, 8.9667, 47.1333, 10.5),
# 7: (47.1333, 5.9, 47.8, 7.4333),
# 8: (47.1333, 7.4333, 47.8, 8.9667),
# 9: (47.1333, 8.9667, 47.8, 10.5)
# }
# BBOXEN = {
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
# }
QUERY = {"bergbahn": BERGBAHN_QUERY}
# ---------------------------------------------------------------------------
# Hauptlogik
# ---------------------------------------------------------------------------
def main() -> None:
query_name = list(QUERY.keys())[0]
query = QUERY[query_name]
# Argumente für jeden Worker vorbereiten
tasks = [
(name, bbox, query)
for name, bbox in BBOXEN.items()
]
logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...")
# blockiert bis alle Prozesse fertig sind
with Pool(processes=len(tasks)) as pool:
results = pool.map(fetch_fragment, tasks)
# Ergebnisse zusammenführen
overall = []
errors = []
for name, elements in results:
if elements:
overall.extend(elements)
else:
errors.append(name)
logger.info(f"Total:{len(overall)} Elemente gefunden")
if errors:
logger.warning(f"Fehler in Fragmenten: {errors}")
try:
saved_path = store_to_disk(
results=overall,
poi_type=query_name,
output_dir=OUTPUT_DIR,
)
logger.info(f"Ergebnisse gespeichert: {saved_path}")
except OSError as e:
logger.error(f"Fehler beim Speichern: {e}")
logger.info("Fertig.")
if __name__ == "__main__":
main()
# Was ist passiert?
# * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt
# Erkenntnisse:
# - Warum print() im Worker statt logging?
# python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt →
# logger.info() im Worker-Prozess schweigt einfach
# print() funktioniert immer, ist aber nicht ideal für Produktion
# - Warum muss fetch_fragment auf Modul-Ebene stehen?
# python# multiprocessing serialisiert Funktionen mit pickle
# Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError
# Modul-Ebene = pickle-bar
# - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er
# ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an
# den neuen Prozess:
# Pool.map(fetch_fragment, tasks)
# → pickle(fetch_fragment) ──► unpickle(fetch_fragment)
# TASK:
# * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul)
# * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion)
# * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf
# und vergleichen die benötigte Zeit