134 lines
4.6 KiB
Python
134 lines
4.6 KiB
Python
import logging
|
|
from pathlib import Path
|
|
from utils import store_to_disk
|
|
from worker import fetch_fragment
|
|
from multiprocessing import Pool
|
|
from queries.bergbahn import BERGBAHN_QUERY
|
|
from queries.restaurant import RESTAURANT_QUERY
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging konfigurieren
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Erinnerung: Log-Levels -> DEBUG, INFO, WARNING, ERROR, CRITICAL
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Konfiguration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
OUTPUT_DIR = Path("results")
|
|
|
|
BBOXEN = {
|
|
"SW": (45.8, 5.9, 46.8, 8.2),
|
|
"SO": (45.8, 8.2, 46.8, 10.5),
|
|
"NW": (46.8, 5.9, 47.8, 8.2),
|
|
"NO": (46.8, 8.2, 47.8, 10.5)
|
|
}
|
|
|
|
# BBOXEN = {
|
|
# 1: (45.8, 5.9, 46.4667, 7.4333),
|
|
# 2: (45.8, 7.4333, 46.4667, 8.9667),
|
|
# 3: (45.8, 8.9667, 46.4667, 10.5),
|
|
# 4: (46.4667, 5.9, 47.1333, 7.4333),
|
|
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
|
|
# 6: (46.4667, 8.9667, 47.1333, 10.5),
|
|
# 7: (47.1333, 5.9, 47.8, 7.4333),
|
|
# 8: (47.1333, 7.4333, 47.8, 8.9667),
|
|
# 9: (47.1333, 8.9667, 47.8, 10.5)
|
|
# }
|
|
|
|
# BBOXEN = {
|
|
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
|
|
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
|
|
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
|
|
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
|
|
# }
|
|
|
|
QUERY = {"bergbahn": BERGBAHN_QUERY}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hauptlogik
|
|
# ---------------------------------------------------------------------------
|
|
def main() -> None:
|
|
|
|
query_name = list(QUERY.keys())[0]
|
|
query = QUERY[query_name]
|
|
|
|
# Argumente für jeden Worker vorbereiten
|
|
tasks = [
|
|
(name, bbox, query)
|
|
for name, bbox in BBOXEN.items()
|
|
]
|
|
|
|
logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...")
|
|
|
|
# blockiert bis alle Prozesse fertig sind
|
|
with Pool(processes=len(tasks)) as pool:
|
|
results = pool.map(fetch_fragment, tasks)
|
|
|
|
# Ergebnisse zusammenführen
|
|
overall = []
|
|
errors = []
|
|
for name, elements in results:
|
|
if elements:
|
|
overall.extend(elements)
|
|
else:
|
|
errors.append(name)
|
|
|
|
logger.info(f"Total:{len(overall)} Elemente gefunden")
|
|
if errors:
|
|
logger.warning(f"Fehler in Fragmenten: {errors}")
|
|
|
|
try:
|
|
saved_path = store_to_disk(
|
|
results=overall,
|
|
poi_type=query_name,
|
|
output_dir=OUTPUT_DIR,
|
|
)
|
|
logger.info(f"Ergebnisse gespeichert: {saved_path}")
|
|
except OSError as e:
|
|
logger.error(f"Fehler beim Speichern: {e}")
|
|
|
|
logger.info("Fertig.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
# Was ist passiert?
|
|
# * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt
|
|
|
|
# Erkenntnisse:
|
|
# - Warum print() im Worker statt logging?
|
|
# python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt →
|
|
# logger.info() im Worker-Prozess schweigt einfach
|
|
# print() funktioniert immer, ist aber nicht ideal für Produktion
|
|
# - Warum muss fetch_fragment auf Modul-Ebene stehen?
|
|
# python# multiprocessing serialisiert Funktionen mit pickle
|
|
# Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError
|
|
# Modul-Ebene = pickle-bar
|
|
# - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er
|
|
# ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an
|
|
# den neuen Prozess:
|
|
# Pool.map(fetch_fragment, tasks)
|
|
# → pickle(fetch_fragment) ──► unpickle(fetch_fragment)
|
|
|
|
|
|
# TASK:
|
|
|
|
# * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul)
|
|
# * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion)
|
|
# * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf
|
|
# und vergleichen die benötigte Zeit |