Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 28a1f54abf |
38
config.py
Normal file
38
config.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from queries.bergbahn import BERGBAHN_QUERY
|
||||||
|
from queries.restaurant import RESTAURANT_QUERY
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Konfiguration
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
OUTPUT_DIR = Path("results")
|
||||||
|
|
||||||
|
BBOXEN = {
|
||||||
|
"SW": (45.8, 5.9, 46.8, 8.2),
|
||||||
|
"SO": (45.8, 8.2, 46.8, 10.5),
|
||||||
|
"NW": (46.8, 5.9, 47.8, 8.2),
|
||||||
|
"NO": (46.8, 8.2, 47.8, 10.5)
|
||||||
|
}
|
||||||
|
|
||||||
|
# BBOXEN = {
|
||||||
|
# 1: (45.8, 5.9, 46.4667, 7.4333),
|
||||||
|
# 2: (45.8, 7.4333, 46.4667, 8.9667),
|
||||||
|
# 3: (45.8, 8.9667, 46.4667, 10.5),
|
||||||
|
# 4: (46.4667, 5.9, 47.1333, 7.4333),
|
||||||
|
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
|
||||||
|
# 6: (46.4667, 8.9667, 47.1333, 10.5),
|
||||||
|
# 7: (47.1333, 5.9, 47.8, 7.4333),
|
||||||
|
# 8: (47.1333, 7.4333, 47.8, 8.9667),
|
||||||
|
# 9: (47.1333, 8.9667, 47.8, 10.5)
|
||||||
|
# }
|
||||||
|
|
||||||
|
# BBOXEN = {
|
||||||
|
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
|
||||||
|
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
|
||||||
|
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
|
||||||
|
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
|
||||||
|
# }
|
||||||
|
|
||||||
|
QUERY = {"bergbahn": BERGBAHN_QUERY}
|
||||||
127
main.py
127
main.py
@ -1,11 +1,8 @@
|
|||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
|
||||||
from utils import store_to_disk
|
from utils import store_to_disk
|
||||||
from worker import fetch_fragment
|
|
||||||
from multiprocessing import Pool
|
|
||||||
from queries.bergbahn import BERGBAHN_QUERY
|
|
||||||
from queries.restaurant import RESTAURANT_QUERY
|
|
||||||
|
|
||||||
|
from config import QUERY, OUTPUT_DIR
|
||||||
|
from worker import run_seriell, run_threads, run_parallel
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -22,84 +19,32 @@ logging.basicConfig(
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Konfiguration
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
OUTPUT_DIR = Path("results")
|
|
||||||
|
|
||||||
BBOXEN = {
|
|
||||||
"SW": (45.8, 5.9, 46.8, 8.2),
|
|
||||||
"SO": (45.8, 8.2, 46.8, 10.5),
|
|
||||||
"NW": (46.8, 5.9, 47.8, 8.2),
|
|
||||||
"NO": (46.8, 8.2, 47.8, 10.5)
|
|
||||||
}
|
|
||||||
|
|
||||||
# BBOXEN = {
|
|
||||||
# 1: (45.8, 5.9, 46.4667, 7.4333),
|
|
||||||
# 2: (45.8, 7.4333, 46.4667, 8.9667),
|
|
||||||
# 3: (45.8, 8.9667, 46.4667, 10.5),
|
|
||||||
# 4: (46.4667, 5.9, 47.1333, 7.4333),
|
|
||||||
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
|
|
||||||
# 6: (46.4667, 8.9667, 47.1333, 10.5),
|
|
||||||
# 7: (47.1333, 5.9, 47.8, 7.4333),
|
|
||||||
# 8: (47.1333, 7.4333, 47.8, 8.9667),
|
|
||||||
# 9: (47.1333, 8.9667, 47.8, 10.5)
|
|
||||||
# }
|
|
||||||
|
|
||||||
# BBOXEN = {
|
|
||||||
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
|
|
||||||
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
|
|
||||||
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
|
|
||||||
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
|
|
||||||
# }
|
|
||||||
|
|
||||||
QUERY = {"bergbahn": BERGBAHN_QUERY}
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Hauptlogik
|
# Hauptlogik
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
|
|
||||||
query_name = list(QUERY.keys())[0]
|
query_name = list(QUERY.keys())[0]
|
||||||
query = QUERY[query_name]
|
query = QUERY[query_name]
|
||||||
|
|
||||||
# Argumente für jeden Worker vorbereiten
|
logger.info("=== Seriell ===")
|
||||||
tasks = [
|
overall_s = run_seriell(query)
|
||||||
(name, bbox, query)
|
|
||||||
for name, bbox in BBOXEN.items()
|
|
||||||
]
|
|
||||||
|
|
||||||
logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...")
|
logger.info("=== Multiprocessing ===")
|
||||||
|
overall_p = run_parallel(query)
|
||||||
|
|
||||||
# blockiert bis alle Prozesse fertig sind
|
logger.info("=== ThreadPoolExecutor ===")
|
||||||
with Pool(processes=len(tasks)) as pool:
|
overall_t = run_threads(query)
|
||||||
results = pool.map(fetch_fragment, tasks)
|
|
||||||
|
|
||||||
# Ergebnisse zusammenführen
|
|
||||||
overall = []
|
|
||||||
errors = []
|
|
||||||
for name, elements in results:
|
|
||||||
if elements:
|
|
||||||
overall.extend(elements)
|
|
||||||
else:
|
|
||||||
errors.append(name)
|
|
||||||
|
|
||||||
logger.info(f"Total:{len(overall)} Elemente gefunden")
|
|
||||||
if errors:
|
|
||||||
logger.warning(f"Fehler in Fragmenten: {errors}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
saved_path = store_to_disk(
|
saved_path = store_to_disk(
|
||||||
results=overall,
|
results=overall_p,
|
||||||
poi_type=query_name,
|
poi_type=query_name,
|
||||||
output_dir=OUTPUT_DIR,
|
output_dir=OUTPUT_DIR,
|
||||||
)
|
)
|
||||||
logger.info(f"Ergebnisse gespeichert: {saved_path}")
|
logger.info(f"Ergebnisse gespeichert: {saved_path}")
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
logger.error(f"Fehler beim Speichern: {e}")
|
logger.error(f"Fehler beim Speichern:{e}")
|
||||||
|
|
||||||
logger.info("Fertig.")
|
logger.info("Fertig.")
|
||||||
|
|
||||||
|
|
||||||
@ -108,27 +53,41 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
|
|
||||||
# Was ist passiert?
|
# Was ist passiert?
|
||||||
# * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt
|
# * Wir haben zusätzlich Multithreating-Code implementiert
|
||||||
|
# * Die Dekorator-Funktion in utils.py (timer) stoppt und logt die Zeit der dekorierten Funktionen, ohne deren Code
|
||||||
|
# zu verändern. Das ermöglicht uns einen einfachen Zeitvergleich zwischen den einzelnen Funktionen
|
||||||
|
# * Auslagerung von BBOXEN, OUTPUT_DIR, QUERY nach config.py, weil sie sowohl in main.py als auch in worker.py gebraucht
|
||||||
|
# werden. Wären sie in main.poy verblieben, hätten wir Probleme mit einem circular-Import bekommen...
|
||||||
|
|
||||||
# Erkenntnisse:
|
# Erkenntnisse:
|
||||||
# - Warum print() im Worker statt logging?
|
|
||||||
# python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt →
|
# Programmfluss:
|
||||||
# logger.info() im Worker-Prozess schweigt einfach
|
# main() — läuft immer sequenziell
|
||||||
# print() funktioniert immer, ist aber nicht ideal für Produktion
|
# │
|
||||||
# - Warum muss fetch_fragment auf Modul-Ebene stehen?
|
# ├── run_seriell()
|
||||||
# python# multiprocessing serialisiert Funktionen mit pickle
|
# │ ├── fetch SW ──► wartet
|
||||||
# Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError
|
# │ ├── fetch SO ──► wartet
|
||||||
# Modul-Ebene = pickle-bar
|
# │ ├── fetch NW ──► wartet
|
||||||
# - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er
|
# │ └── fetch NO ──► wartet → return → main() macht weiter
|
||||||
# ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an
|
# │
|
||||||
# den neuen Prozess:
|
# ├── run_parallel()
|
||||||
# Pool.map(fetch_fragment, tasks)
|
# │ ├── fetch SW ─┐
|
||||||
# → pickle(fetch_fragment) ──► unpickle(fetch_fragment)
|
# │ ├── fetch SO ├─ gleichzeitig
|
||||||
|
# │ ├── fetch NW │ in Prozessen
|
||||||
|
# │ └── fetch NO ─┘
|
||||||
|
# │ Pool.map() blockiert bis ALLE fertig → return → main() macht weiter
|
||||||
|
# │
|
||||||
|
# ├── run_threads()
|
||||||
|
# │ ├── fetch SW ─┐
|
||||||
|
# │ ├── fetch SO ├─ gleichzeitig
|
||||||
|
# │ ├── fetch NW │ in Threads
|
||||||
|
# │ └── fetch NO ─┘
|
||||||
|
# │ as_completed() blockiert bis ALLE fertig → return → main() macht weiter
|
||||||
|
# │
|
||||||
|
# └── store_to_disk() ← erst hier, garantiert
|
||||||
|
|
||||||
|
|
||||||
# TASK:
|
# TASK:
|
||||||
|
|
||||||
# * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul)
|
# * Bis jetzt speichern wir die Resultate als .json-File auf unserer Festplatte. Als nächstes wollen wir
|
||||||
# * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion)
|
# die Resultate in einer sqlite-Datenbank ablegen
|
||||||
# * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf
|
|
||||||
# und vergleichen die benötigte Zeit
|
|
||||||
28
utils.py
28
utils.py
@ -1,5 +1,11 @@
|
|||||||
import json
|
import json
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def store_to_disk(
|
def store_to_disk(
|
||||||
@ -35,3 +41,25 @@ def store_to_disk(
|
|||||||
json.dump(results, f, indent=2, ensure_ascii=False)
|
json.dump(results, f, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
return output_path.resolve()
|
return output_path.resolve()
|
||||||
|
|
||||||
|
|
||||||
|
def timer(func):
|
||||||
|
"""
|
||||||
|
Decorator der die Ausführungszeit einer Funktion misst und loggt.
|
||||||
|
|
||||||
|
Verwendung:
|
||||||
|
@timer
|
||||||
|
def meine_funktion():
|
||||||
|
...
|
||||||
|
"""
|
||||||
|
@wraps(func) # erhält __name__, __doc__ der originalen Funktion
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
start = time.perf_counter()
|
||||||
|
result = func(*args, **kwargs)
|
||||||
|
elapsed = time.perf_counter() - start
|
||||||
|
logger.info(f"{func.__name__}() dauerte {round(elapsed, 2)} Sekunden\n\n")
|
||||||
|
return result
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
# @wraps(func) — ohne diesen Decorator würde run_seriell.__name__ den Namen "wrapper" zurückgeben statt
|
||||||
|
# "run_seriell", was den Log-Output unbrauchbar macht
|
||||||
83
worker.py
83
worker.py
@ -1,6 +1,14 @@
|
|||||||
import requests
|
import logging
|
||||||
from overpass import fetch_overpass
|
from multiprocessing import Pool
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from config import BBOXEN
|
||||||
|
from overpass import fetch_overpass
|
||||||
|
from utils import timer
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!)
|
# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!)
|
||||||
@ -22,3 +30,74 @@ def fetch_fragment(args: tuple) -> tuple[str, list]:
|
|||||||
except (RuntimeError, requests.Timeout) as e:
|
except (RuntimeError, requests.Timeout) as e:
|
||||||
print(f"[{name}] Fehler: {e}")
|
print(f"[{name}] Fehler: {e}")
|
||||||
return name, []
|
return name, []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Serielle Variante
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
@timer
|
||||||
|
def run_seriell(query: str) -> list:
|
||||||
|
overall = []
|
||||||
|
for name, bbox in BBOXEN.items():
|
||||||
|
logger.info(f"Seriell — Fragment {name}' ...")
|
||||||
|
try:
|
||||||
|
result = fetch_overpass(overpass_query=query, bbox=bbox)
|
||||||
|
elements = result.get("elements", [])
|
||||||
|
logger.info(f"'{name}': {len(elements)} Elemente")
|
||||||
|
overall.extend(elements)
|
||||||
|
except (RuntimeError, requests.Timeout) as e:
|
||||||
|
logger.error(f"Fehler bei '{name}': {e}")
|
||||||
|
return overall
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Parallele Variante mit Multiprocessing
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
@timer
|
||||||
|
def run_parallel(query: str) -> list:
|
||||||
|
tasks = [(name, bbox, query) for name, bbox in BBOXEN.items()]
|
||||||
|
with Pool(processes=len(tasks)) as pool:
|
||||||
|
results = pool.map(fetch_fragment, tasks)
|
||||||
|
overall = []
|
||||||
|
for name, elements in results:
|
||||||
|
if elements:
|
||||||
|
overall.extend(elements)
|
||||||
|
else:
|
||||||
|
logger.warning(f"Keine Elemente für Fragment '{name}'")
|
||||||
|
return overall
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Parallele Variante mit ThreadPoolExecutor
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@timer
|
||||||
|
def run_threads(query: str) -> list:
|
||||||
|
overall = []
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
# ThreadPoolExecutor: kein Pickle-Zwang → logging funktioniert direkt!
|
||||||
|
with ThreadPoolExecutor(max_workers=len(BBOXEN)) as executor:
|
||||||
|
|
||||||
|
# Alle Tasks auf einmal einreichen → Future-Objekte zurück
|
||||||
|
futures = {
|
||||||
|
executor.submit(fetch_overpass, query, bbox): name
|
||||||
|
for name, bbox in BBOXEN.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
# as_completed() liefert Futures in der Reihenfolge, in der sie fertig werden
|
||||||
|
for future in as_completed(futures):
|
||||||
|
name = futures[future]
|
||||||
|
try:
|
||||||
|
result = future.result()
|
||||||
|
elements = result.get("elements", [])
|
||||||
|
logger.info(f"[Thread] '{name}': {len(elements)} Elemente")
|
||||||
|
overall.extend(elements)
|
||||||
|
except (RuntimeError, requests.Timeout) as e:
|
||||||
|
logger.error(f"[Thread] Fehler bei '{name}': {e}")
|
||||||
|
errors.append(name)
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
logger.warning(f"Fehler in Fragmenten: {errors}")
|
||||||
|
return overall
|
||||||
Loading…
x
Reference in New Issue
Block a user