Compare commits

...

1 Commits
Task_9 ... main

Author SHA1 Message Date
28a1f54abf Task 10: sqlite-db 2026-04-21 22:24:56 +02:00
4 changed files with 192 additions and 88 deletions

38
config.py Normal file
View File

@ -0,0 +1,38 @@
from pathlib import Path
from queries.bergbahn import BERGBAHN_QUERY
from queries.restaurant import RESTAURANT_QUERY
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
OUTPUT_DIR = Path("results")
BBOXEN = {
"SW": (45.8, 5.9, 46.8, 8.2),
"SO": (45.8, 8.2, 46.8, 10.5),
"NW": (46.8, 5.9, 47.8, 8.2),
"NO": (46.8, 8.2, 47.8, 10.5)
}
# BBOXEN = {
# 1: (45.8, 5.9, 46.4667, 7.4333),
# 2: (45.8, 7.4333, 46.4667, 8.9667),
# 3: (45.8, 8.9667, 46.4667, 10.5),
# 4: (46.4667, 5.9, 47.1333, 7.4333),
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
# 6: (46.4667, 8.9667, 47.1333, 10.5),
# 7: (47.1333, 5.9, 47.8, 7.4333),
# 8: (47.1333, 7.4333, 47.8, 8.9667),
# 9: (47.1333, 8.9667, 47.8, 10.5)
# }
# BBOXEN = {
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
# }
QUERY = {"bergbahn": BERGBAHN_QUERY}

127
main.py
View File

@ -1,11 +1,8 @@
import logging import logging
from pathlib import Path
from utils import store_to_disk from utils import store_to_disk
from worker import fetch_fragment
from multiprocessing import Pool
from queries.bergbahn import BERGBAHN_QUERY
from queries.restaurant import RESTAURANT_QUERY
from config import QUERY, OUTPUT_DIR
from worker import run_seriell, run_threads, run_parallel
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -22,84 +19,32 @@ logging.basicConfig(
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
OUTPUT_DIR = Path("results")
BBOXEN = {
"SW": (45.8, 5.9, 46.8, 8.2),
"SO": (45.8, 8.2, 46.8, 10.5),
"NW": (46.8, 5.9, 47.8, 8.2),
"NO": (46.8, 8.2, 47.8, 10.5)
}
# BBOXEN = {
# 1: (45.8, 5.9, 46.4667, 7.4333),
# 2: (45.8, 7.4333, 46.4667, 8.9667),
# 3: (45.8, 8.9667, 46.4667, 10.5),
# 4: (46.4667, 5.9, 47.1333, 7.4333),
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
# 6: (46.4667, 8.9667, 47.1333, 10.5),
# 7: (47.1333, 5.9, 47.8, 7.4333),
# 8: (47.1333, 7.4333, 47.8, 8.9667),
# 9: (47.1333, 8.9667, 47.8, 10.5)
# }
# BBOXEN = {
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
# }
QUERY = {"bergbahn": BERGBAHN_QUERY}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Hauptlogik # Hauptlogik
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def main() -> None: def main() -> None:
query_name = list(QUERY.keys())[0] query_name = list(QUERY.keys())[0]
query = QUERY[query_name] query = QUERY[query_name]
# Argumente für jeden Worker vorbereiten logger.info("=== Seriell ===")
tasks = [ overall_s = run_seriell(query)
(name, bbox, query)
for name, bbox in BBOXEN.items()
]
logger.info(f"Starte parallele Abfrage mit {len(tasks)} Prozessen ...") logger.info("=== Multiprocessing ===")
overall_p = run_parallel(query)
# blockiert bis alle Prozesse fertig sind logger.info("=== ThreadPoolExecutor ===")
with Pool(processes=len(tasks)) as pool: overall_t = run_threads(query)
results = pool.map(fetch_fragment, tasks)
# Ergebnisse zusammenführen
overall = []
errors = []
for name, elements in results:
if elements:
overall.extend(elements)
else:
errors.append(name)
logger.info(f"Total:{len(overall)} Elemente gefunden")
if errors:
logger.warning(f"Fehler in Fragmenten: {errors}")
try: try:
saved_path = store_to_disk( saved_path = store_to_disk(
results=overall, results=overall_p,
poi_type=query_name, poi_type=query_name,
output_dir=OUTPUT_DIR, output_dir=OUTPUT_DIR,
) )
logger.info(f"Ergebnisse gespeichert: {saved_path}") logger.info(f"Ergebnisse gespeichert: {saved_path}")
except OSError as e: except OSError as e:
logger.error(f"Fehler beim Speichern: {e}") logger.error(f"Fehler beim Speichern:{e}")
logger.info("Fertig.") logger.info("Fertig.")
@ -108,27 +53,41 @@ if __name__ == "__main__":
# Was ist passiert? # Was ist passiert?
# * Wir haben ein worker-Modul gebaut und dort den Code für den Multiprocessing-Code abgelegt # * Wir haben zusätzlich Multithreating-Code implementiert
# * Die Dekorator-Funktion in utils.py (timer) stoppt und logt die Zeit der dekorierten Funktionen, ohne deren Code
# zu verändern. Das ermöglicht uns einen einfachen Zeitvergleich zwischen den einzelnen Funktionen
# * Auslagerung von BBOXEN, OUTPUT_DIR, QUERY nach config.py, weil sie sowohl in main.py als auch in worker.py gebraucht
# werden. Wären sie in main.poy verblieben, hätten wir Probleme mit einem circular-Import bekommen...
# Erkenntnisse: # Erkenntnisse:
# - Warum print() im Worker statt logging?
# python# Logging-Konfiguration aus dem Hauptprozess wird nicht vererbt → # Programmfluss:
# logger.info() im Worker-Prozess schweigt einfach # main() — läuft immer sequenziell
# print() funktioniert immer, ist aber nicht ideal für Produktion # │
# - Warum muss fetch_fragment auf Modul-Ebene stehen? # ├── run_seriell()
# python# multiprocessing serialisiert Funktionen mit pickle # │ ├── fetch SW ──► wartet
# Lambda und lokale Funktionen sind nicht pickle-bar → AttributeError # │ ├── fetch SO ──► wartet
# Modul-Ebene = pickle-bar # │ ├── fetch NW ──► wartet
# - Wenn Pool.map() einen neuen Prozess startet, muss es der neue Prozess wissen, welche Funktion er # │ └── fetch NO ──► wartet → return → main() macht weiter
# ausführen soll. Das geschieht über Pickle — Python serialisiert die Funktion und schickt sie an # │
# den neuen Prozess: # ├── run_parallel()
# Pool.map(fetch_fragment, tasks) # │ ├── fetch SW ─┐
# → pickle(fetch_fragment) ──► unpickle(fetch_fragment) # │ ├── fetch SO ├─ gleichzeitig
# │ ├── fetch NW │ in Prozessen
# │ └── fetch NO ─┘
# │ Pool.map() blockiert bis ALLE fertig → return → main() macht weiter
# │
# ├── run_threads()
# │ ├── fetch SW ─┐
# │ ├── fetch SO ├─ gleichzeitig
# │ ├── fetch NW │ in Threads
# │ └── fetch NO ─┘
# │ as_completed() blockiert bis ALLE fertig → return → main() macht weiter
# │
# └── store_to_disk() ← erst hier, garantiert
# TASK: # TASK:
# * Wir implementieren nun auch einen Multithreating-Ansatz (ebenfalls im worker-modul) # * Bis jetzt speichern wir die Resultate als .json-File auf unserer Festplatte. Als nächstes wollen wir
# * Zusätzlich bauen wir in Modul 'utils' einen Dekorator, welcher die Zeit messen kann (einer Funktion) # die Resultate in einer sqlite-Datenbank ablegen
# * Wir rufen aus unserer Hauptfunktion alle Working-Funktionen (seriell, multihreating und multiprocessing) auf
# und vergleichen die benötigte Zeit

View File

@ -1,5 +1,11 @@
import json import json
import time
import logging
from pathlib import Path from pathlib import Path
from functools import wraps
logger = logging.getLogger(__name__)
def store_to_disk( def store_to_disk(
@ -34,4 +40,26 @@ def store_to_disk(
with output_path.open("w", encoding="utf-8") as f: with output_path.open("w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False) json.dump(results, f, indent=2, ensure_ascii=False)
return output_path.resolve() return output_path.resolve()
def timer(func):
"""
Decorator der die Ausführungszeit einer Funktion misst und loggt.
Verwendung:
@timer
def meine_funktion():
...
"""
@wraps(func) # erhält __name__, __doc__ der originalen Funktion
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.info(f"{func.__name__}() dauerte {round(elapsed, 2)} Sekunden\n\n")
return result
return wrapper
# @wraps(func) — ohne diesen Decorator würde run_seriell.__name__ den Namen "wrapper" zurückgeben statt
# "run_seriell", was den Log-Output unbrauchbar macht

View File

@ -1,6 +1,14 @@
import requests import logging
from overpass import fetch_overpass from multiprocessing import Pool
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from config import BBOXEN
from overpass import fetch_overpass
from utils import timer
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!) # Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!)
@ -21,4 +29,75 @@ def fetch_fragment(args: tuple) -> tuple[str, list]:
return name, elements return name, elements
except (RuntimeError, requests.Timeout) as e: except (RuntimeError, requests.Timeout) as e:
print(f"[{name}] Fehler: {e}") print(f"[{name}] Fehler: {e}")
return name, [] return name, []
# ---------------------------------------------------------------------------
# Serielle Variante
# ---------------------------------------------------------------------------
@timer
def run_seriell(query: str) -> list:
overall = []
for name, bbox in BBOXEN.items():
logger.info(f"Seriell — Fragment {name}' ...")
try:
result = fetch_overpass(overpass_query=query, bbox=bbox)
elements = result.get("elements", [])
logger.info(f"'{name}': {len(elements)} Elemente")
overall.extend(elements)
except (RuntimeError, requests.Timeout) as e:
logger.error(f"Fehler bei '{name}': {e}")
return overall
# ---------------------------------------------------------------------------
# Parallele Variante mit Multiprocessing
# ---------------------------------------------------------------------------
@timer
def run_parallel(query: str) -> list:
tasks = [(name, bbox, query) for name, bbox in BBOXEN.items()]
with Pool(processes=len(tasks)) as pool:
results = pool.map(fetch_fragment, tasks)
overall = []
for name, elements in results:
if elements:
overall.extend(elements)
else:
logger.warning(f"Keine Elemente für Fragment '{name}'")
return overall
# ---------------------------------------------------------------------------
# Parallele Variante mit ThreadPoolExecutor
# ---------------------------------------------------------------------------
@timer
def run_threads(query: str) -> list:
overall = []
errors = []
# ThreadPoolExecutor: kein Pickle-Zwang → logging funktioniert direkt!
with ThreadPoolExecutor(max_workers=len(BBOXEN)) as executor:
# Alle Tasks auf einmal einreichen → Future-Objekte zurück
futures = {
executor.submit(fetch_overpass, query, bbox): name
for name, bbox in BBOXEN.items()
}
# as_completed() liefert Futures in der Reihenfolge, in der sie fertig werden
for future in as_completed(futures):
name = futures[future]
try:
result = future.result()
elements = result.get("elements", [])
logger.info(f"[Thread] '{name}': {len(elements)} Elemente")
overall.extend(elements)
except (RuntimeError, requests.Timeout) as e:
logger.error(f"[Thread] Fehler bei '{name}': {e}")
errors.append(name)
if errors:
logger.warning(f"Fehler in Fragmenten: {errors}")
return overall