Task 9: multithreading, time-Dekorator
This commit is contained in:
parent
76482aca50
commit
b93e1a7667
76
main.py
76
main.py
@ -1,10 +1,11 @@
|
|||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from utils import store_to_disk
|
from utils import store_to_disk
|
||||||
from overpass import fetch_overpass
|
from worker import fetch_fragment
|
||||||
|
from multiprocessing import Pool
|
||||||
from queries.bergbahn import BERGBAHN_QUERY
|
from queries.bergbahn import BERGBAHN_QUERY
|
||||||
from queries.restaurant import RESTAURANT_QUERY
|
from queries.restaurant import RESTAURANT_QUERY
|
||||||
import requests
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -61,32 +62,34 @@ QUERY = {"bergbahn": BERGBAHN_QUERY}
|
|||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
|
|
||||||
|
query_name = list(QUERY.keys())[0]
|
||||||
|
query = QUERY[query_name]
|
||||||
|
|
||||||
|
# Argumente für jeden Worker vorbereiten
|
||||||
|
tasks = [
|
||||||
|
(name, bbox, query)
|
||||||
|
for name, bbox in BBOXEN.items()
|
||||||
|
]
|
||||||
|
|
||||||
|
logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...")
|
||||||
|
|
||||||
|
# blockiert bis alle Prozesse fertig sind
|
||||||
|
with Pool(processes=len(tasks)) as pool:
|
||||||
|
results = pool.map(fetch_fragment, tasks)
|
||||||
|
|
||||||
|
# Ergebnisse zusammenführen
|
||||||
overall = []
|
overall = []
|
||||||
errors = []
|
errors = []
|
||||||
query_name = list(QUERY.keys())[0]
|
for name, elements in results:
|
||||||
|
if elements:
|
||||||
for name, bbox in BBOXEN.items():
|
|
||||||
logger.info(f"Starte Abfrage für Query: {query_name}, '{name}' mit bbox={bbox}")
|
|
||||||
try:
|
|
||||||
result = fetch_overpass(overpass_query=QUERY.get(query_name,""), bbox=bbox)
|
|
||||||
except RuntimeError as e:
|
|
||||||
errors.append(name)
|
|
||||||
logger.error(f"API-Fehler bei '{name}': {e}")
|
|
||||||
continue
|
|
||||||
except requests.Timeout:
|
|
||||||
errors.append(name)
|
|
||||||
logger.error(f"Timeout bei '{name}' — bbox zu gross oder Server überlastet")
|
|
||||||
continue
|
|
||||||
|
|
||||||
elements = result.get("elements", [])
|
|
||||||
logger.info(f"'{name}': {len(elements)} Elemente gefunden")
|
|
||||||
overall.extend(elements)
|
overall.extend(elements)
|
||||||
|
else:
|
||||||
|
errors.append(name)
|
||||||
|
|
||||||
logger.info(f"Total:{len(overall)} Elemente gefunden")
|
logger.info(f"Total:{len(overall)} Elemente gefunden")
|
||||||
if errors:
|
if errors:
|
||||||
logger.info(f"Fehler in Fragmenten: {errors}")
|
logger.warning(f"Fehler in Fragmenten: {errors}")
|
||||||
|
|
||||||
# Ergebnisse speichern
|
|
||||||
try:
|
try:
|
||||||
saved_path = store_to_disk(
|
saved_path = store_to_disk(
|
||||||
results=overall,
|
results=overall,
|
||||||
@ -105,18 +108,27 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
|
|
||||||
# Was ist passiert?
|
# Was ist passiert?
|
||||||
# * Wir haben einen ersten Test im Modul 'tests' geschrieben
|
# * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt
|
||||||
# * Der Kern von fetch_overpass() ist ein HTTP-Request — ohne Mocks lässt sich die Funktion selbst kaum sinnvoll testen,
|
|
||||||
# weil jeder Test auf die echte API angewiesen wäre (langsam, flaky, Netzwerkabhängig).
|
# Erkenntnisse:
|
||||||
# -> deshalb war nun der erste Test (zum eigentlich weniger wichtigen) 'store_to_disk' ...!
|
# - Warum print() im Worker statt logging?
|
||||||
|
# python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt →
|
||||||
|
# logger.info() im Worker-Prozess schweigt einfach
|
||||||
|
# print() funktioniert immer, ist aber nicht ideal für Produktion
|
||||||
|
# - Warum muss fetch_fragment auf Modul-Ebene stehen?
|
||||||
|
# python# multiprocessing serialisiert Funktionen mit pickle
|
||||||
|
# Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError
|
||||||
|
# Modul-Ebene = pickle-bar
|
||||||
|
# - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er
|
||||||
|
# ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an
|
||||||
|
# den neuen Prozess:
|
||||||
|
# Pool.map(fetch_fragment, tasks)
|
||||||
|
# → pickle(fetch_fragment) ──► unpickle(fetch_fragment)
|
||||||
|
|
||||||
|
|
||||||
# TASK:
|
# TASK:
|
||||||
|
|
||||||
# * Wir arbeiten nun alle 4,9, 16 Sequenzen hintereinander seriell ab. Wir könnten versuchen den ganzen Prozess zu
|
# * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul)
|
||||||
# beschleunigen und ihn parallel auszuführen...
|
# * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion)
|
||||||
# * Es gibt je nach Problem verschiedene Möglichkeiten unseren Code zu parallelisieren -> beide haben Vor- und Nachteile!
|
# * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf
|
||||||
# - Multithreating
|
# und vergleichen die benötigte Zeit
|
||||||
# - Multiprocessing
|
|
||||||
# -> wir beginnen mit dem Multiprocessing-Ansatz (multiprocessing). Dazu bauen wir wieder ein neues model namens 'worker'
|
|
||||||
# wo wir unseren Code für die Parallelisierung ablegen.
|
|
||||||
24
worker.py
Normal file
24
worker.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import requests
|
||||||
|
from overpass import fetch_overpass
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def fetch_fragment(args: tuple) -> tuple[str, list]:
|
||||||
|
"""
|
||||||
|
Führt einen einzelnen Overpass-Request aus.
|
||||||
|
Gibt (fragment_name, elements) zurück — oder (fragment_name, []) bei Fehler.
|
||||||
|
|
||||||
|
Wird von Pool.map() in einem separaten Prozess ausgeführt.
|
||||||
|
Logging funktioniert hier nicht zuverlässig → print() als Fallback.
|
||||||
|
"""
|
||||||
|
name, bbox, query = args
|
||||||
|
try:
|
||||||
|
result = fetch_overpass(overpass_query=query, bbox=bbox)
|
||||||
|
elements = result.get("elements", [])
|
||||||
|
print(f"[{name}] {len(elements)} Elemente gefunden")
|
||||||
|
return name, elements
|
||||||
|
except (RuntimeError, requests.Timeout) as e:
|
||||||
|
print(f"[{name}] Fehler: {e}")
|
||||||
|
return name, []
|
||||||
Loading…
x
Reference in New Issue
Block a user