Compare commits
No commits in common. "main" and "Task_7" have entirely different histories.
38
config.py
38
config.py
@ -1,38 +0,0 @@
|
||||
from pathlib import Path
|
||||
from queries.bergbahn import BERGBAHN_QUERY
|
||||
from queries.restaurant import RESTAURANT_QUERY
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Konfiguration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
OUTPUT_DIR = Path("results")
|
||||
|
||||
BBOXEN = {
|
||||
"SW": (45.8, 5.9, 46.8, 8.2),
|
||||
"SO": (45.8, 8.2, 46.8, 10.5),
|
||||
"NW": (46.8, 5.9, 47.8, 8.2),
|
||||
"NO": (46.8, 8.2, 47.8, 10.5)
|
||||
}
|
||||
|
||||
# BBOXEN = {
|
||||
# 1: (45.8, 5.9, 46.4667, 7.4333),
|
||||
# 2: (45.8, 7.4333, 46.4667, 8.9667),
|
||||
# 3: (45.8, 8.9667, 46.4667, 10.5),
|
||||
# 4: (46.4667, 5.9, 47.1333, 7.4333),
|
||||
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
|
||||
# 6: (46.4667, 8.9667, 47.1333, 10.5),
|
||||
# 7: (47.1333, 5.9, 47.8, 7.4333),
|
||||
# 8: (47.1333, 7.4333, 47.8, 8.9667),
|
||||
# 9: (47.1333, 8.9667, 47.8, 10.5)
|
||||
# }
|
||||
|
||||
# BBOXEN = {
|
||||
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
|
||||
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
|
||||
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
|
||||
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
|
||||
# }
|
||||
|
||||
QUERY = {"bergbahn": BERGBAHN_QUERY}
|
||||
115
main.py
115
main.py
@ -1,8 +1,10 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from utils import store_to_disk
|
||||
|
||||
from config import QUERY, OUTPUT_DIR
|
||||
from worker import run_seriell, run_threads, run_parallel
|
||||
from overpass import fetch_overpass
|
||||
from queries.bergbahn import BERGBAHN_QUERY
|
||||
from queries.restaurant import RESTAURANT_QUERY
|
||||
import requests
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -19,32 +21,82 @@ logging.basicConfig(
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Konfiguration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
OUTPUT_DIR = Path("results")
|
||||
|
||||
BBOXEN = {
|
||||
"SW": (45.8, 5.9, 46.8, 8.2),
|
||||
"SO": (45.8, 8.2, 46.8, 10.5),
|
||||
"NW": (46.8, 5.9, 47.8, 8.2),
|
||||
"NO": (46.8, 8.2, 47.8, 10.5)
|
||||
}
|
||||
|
||||
# BBOXEN = {
|
||||
# 1: (45.8, 5.9, 46.4667, 7.4333),
|
||||
# 2: (45.8, 7.4333, 46.4667, 8.9667),
|
||||
# 3: (45.8, 8.9667, 46.4667, 10.5),
|
||||
# 4: (46.4667, 5.9, 47.1333, 7.4333),
|
||||
# 5: (46.4667, 7.4333, 47.1333, 8.9667),
|
||||
# 6: (46.4667, 8.9667, 47.1333, 10.5),
|
||||
# 7: (47.1333, 5.9, 47.8, 7.4333),
|
||||
# 8: (47.1333, 7.4333, 47.8, 8.9667),
|
||||
# 9: (47.1333, 8.9667, 47.8, 10.5)
|
||||
# }
|
||||
|
||||
# BBOXEN = {
|
||||
# 1: (45.8, 5.9, 46.3, 7.05), 2: (45.8, 7.05, 46.3, 8.2), 3: (45.8, 8.2, 46.3, 9.35), 4: (45.8, 9.35, 46.3, 10.5),
|
||||
# 5: (46.3, 5.9, 46.8, 7.05), 6: (46.3, 7.05, 46.8, 8.2), 7: (46.3, 8.2, 46.8, 9.35), 8: (46.3, 9.35, 46.8, 10.5),
|
||||
# 9: (46.8, 5.9, 47.3, 7.05), 10: (46.8, 7.05, 47.3, 8.2), 11: (46.8, 8.2, 47.3, 9.35), 12: (46.8, 9.35, 47.3, 10.5),
|
||||
# 13: (47.3, 5.9, 47.8, 7.05), 14: (47.3, 7.05, 47.8, 8.2), 15: (47.3, 8.2, 47.8, 9.35), 16: (47.3, 9.35, 47.8, 10.5)
|
||||
# }
|
||||
|
||||
QUERY = {"bergbahn": BERGBAHN_QUERY}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Hauptlogik
|
||||
# ---------------------------------------------------------------------------
|
||||
def main() -> None:
|
||||
|
||||
overall = []
|
||||
errors = []
|
||||
query_name = list(QUERY.keys())[0]
|
||||
query = QUERY[query_name]
|
||||
|
||||
logger.info("=== Seriell ===")
|
||||
overall_s = run_seriell(query)
|
||||
for name, bbox in BBOXEN.items():
|
||||
logger.info(f"Starte Abfrage für Query: {query_name}, '{name}' mit bbox={bbox}")
|
||||
try:
|
||||
result = fetch_overpass(overpass_query=QUERY.get(query_name,""), bbox=bbox)
|
||||
except RuntimeError as e:
|
||||
errors.append(name)
|
||||
logger.error(f"API-Fehler bei '{name}': {e}")
|
||||
continue
|
||||
except requests.Timeout:
|
||||
errors.append(name)
|
||||
logger.error(f"Timeout bei '{name}' — bbox zu gross oder Server überlastet")
|
||||
continue
|
||||
|
||||
logger.info("=== Multiprocessing ===")
|
||||
overall_p = run_parallel(query)
|
||||
elements = result.get("elements", [])
|
||||
logger.info(f"'{name}': {len(elements)} Elemente gefunden")
|
||||
overall.extend(elements)
|
||||
|
||||
logger.info("=== ThreadPoolExecutor ===")
|
||||
overall_t = run_threads(query)
|
||||
logger.info(f"Total: {len(overall)} Elemente gefunden")
|
||||
if errors:
|
||||
logger.info(f"Fehler in Fragmenten: {errors}")
|
||||
|
||||
# Ergebnisse speichern
|
||||
try:
|
||||
saved_path = store_to_disk(
|
||||
results=overall_p,
|
||||
results=overall,
|
||||
poi_type=query_name,
|
||||
output_dir=OUTPUT_DIR,
|
||||
)
|
||||
logger.info(f"Ergebnisse gespeichert: {saved_path}")
|
||||
except OSError as e:
|
||||
logger.error(f"Fehler beim Speichern:{e}")
|
||||
logger.error(f"Fehler beim Speichern: {e}")
|
||||
|
||||
logger.info("Fertig.")
|
||||
|
||||
|
||||
@ -53,41 +105,10 @@ if __name__ == "__main__":
|
||||
|
||||
|
||||
# Was ist passiert?
|
||||
# * Wir haben zusätzlich Multithreating-Code implementiert
|
||||
# * Die Dekorator-Funktion in utils.py (timer) stoppt und logt die Zeit der dekorierten Funktionen, ohne deren Code
|
||||
# zu verändern. Das ermöglicht uns einen einfachen Zeitvergleich zwischen den einzelnen Funktionen
|
||||
# * Auslagerung von BBOXEN, OUTPUT_DIR, QUERY nach config.py, weil sie sowohl in main.py als auch in worker.py gebraucht
|
||||
# werden. Wären sie in main.poy verblieben, hätten wir Probleme mit einem circular-Import bekommen...
|
||||
|
||||
# Erkenntnisse:
|
||||
|
||||
# Programmfluss:
|
||||
# main() — läuft immer sequenziell
|
||||
# │
|
||||
# ├── run_seriell()
|
||||
# │ ├── fetch SW ──► wartet
|
||||
# │ ├── fetch SO ──► wartet
|
||||
# │ ├── fetch NW ──► wartet
|
||||
# │ └── fetch NO ──► wartet → return → main() macht weiter
|
||||
# │
|
||||
# ├── run_parallel()
|
||||
# │ ├── fetch SW ─┐
|
||||
# │ ├── fetch SO ├─ gleichzeitig
|
||||
# │ ├── fetch NW │ in Prozessen
|
||||
# │ └── fetch NO ─┘
|
||||
# │ Pool.map() blockiert bis ALLE fertig → return → main() macht weiter
|
||||
# │
|
||||
# ├── run_threads()
|
||||
# │ ├── fetch SW ─┐
|
||||
# │ ├── fetch SO ├─ gleichzeitig
|
||||
# │ ├── fetch NW │ in Threads
|
||||
# │ └── fetch NO ─┘
|
||||
# │ as_completed() blockiert bis ALLE fertig → return → main() macht weiter
|
||||
# │
|
||||
# └── store_to_disk() ← erst hier, garantiert
|
||||
|
||||
# * eigenes Modul 'utils' mit Funktion 'store_to_disk()', welche für die Speicherung der Daten zuständig ist.
|
||||
|
||||
# TASK:
|
||||
|
||||
# * Bis jetzt speichern wir die Resultate als .json-File auf unserer Festplatte. Als nächstes wollen wir
|
||||
# die Resultate in einer sqlite-Datenbank ablegen
|
||||
# * Bis jetzt haben wir nur Code geschrieben, aber keine Tests... Das holen wir jetzt nach!
|
||||
# -> Überlegt: Was für Tests könnten geschrieben werden? Was macht Sinn? Schreibt 1-2 euerer Tests. Erstellt
|
||||
# dazu wie gewöhnlich den Order tests und speichert darin eure Tests (mit pytest)
|
||||
Binary file not shown.
@ -1,33 +0,0 @@
|
||||
import pytest
|
||||
from utils import store_to_disk
|
||||
|
||||
# tmp_path ist ein pytest-Fixture — d.h. pytest stellt ihn automatisch bereit, ohne dass man ihn importieren muss.
|
||||
# pytest startet
|
||||
# → sieht Parameter "tmp_path" in der Funktionssignatur
|
||||
# → erstellt automatisch ein temporäres Verzeichnis (z.B. /tmp/pytest-123/test_store_0/)
|
||||
# → übergibt es als Path-Objekt an die Testfunktion
|
||||
# → löscht es nach dem Test wieder
|
||||
|
||||
def test_store_to_disk_creates_file(tmp_path):
|
||||
"""Stellt sicher, dass store_to_disk eine lesbare JSON-Datei erstellt."""
|
||||
import json
|
||||
|
||||
elements = [
|
||||
{"type": "node", "id": 1, "lat": 46.1835291, "lon": 6.8346732, "tags": {"name": "Jakobshorn"}},
|
||||
{"type": "way", "id": 2, "lat": 46.1772269, "lon": 6.8402226, "tags": {"name": "Parsenn"}},
|
||||
]
|
||||
|
||||
saved_path = store_to_disk(
|
||||
results=elements,
|
||||
poi_type="bergbahn",
|
||||
output_dir=tmp_path,
|
||||
)
|
||||
|
||||
assert saved_path.exists()
|
||||
assert saved_path.name == "bergbahn_results.json"
|
||||
|
||||
with saved_path.open(encoding="utf-8") as f:
|
||||
loaded = json.load(f)
|
||||
|
||||
assert loaded == elements # Inhalt identisch
|
||||
assert len(loaded) == 2
|
||||
30
utils.py
30
utils.py
@ -1,11 +1,5 @@
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from functools import wraps
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def store_to_disk(
|
||||
@ -40,26 +34,4 @@ def store_to_disk(
|
||||
with output_path.open("w", encoding="utf-8") as f:
|
||||
json.dump(results, f, indent=2, ensure_ascii=False)
|
||||
|
||||
return output_path.resolve()
|
||||
|
||||
|
||||
def timer(func):
|
||||
"""
|
||||
Decorator der die Ausführungszeit einer Funktion misst und loggt.
|
||||
|
||||
Verwendung:
|
||||
@timer
|
||||
def meine_funktion():
|
||||
...
|
||||
"""
|
||||
@wraps(func) # erhält __name__, __doc__ der originalen Funktion
|
||||
def wrapper(*args, **kwargs):
|
||||
start = time.perf_counter()
|
||||
result = func(*args, **kwargs)
|
||||
elapsed = time.perf_counter() - start
|
||||
logger.info(f"{func.__name__}() dauerte {round(elapsed, 2)} Sekunden\n\n")
|
||||
return result
|
||||
return wrapper
|
||||
|
||||
# @wraps(func) — ohne diesen Decorator würde run_seriell.__name__ den Namen "wrapper" zurückgeben statt
|
||||
# "run_seriell", was den Log-Output unbrauchbar macht
|
||||
return output_path.resolve()
|
||||
103
worker.py
103
worker.py
@ -1,103 +0,0 @@
|
||||
import logging
|
||||
from multiprocessing import Pool
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
import requests
|
||||
|
||||
from config import BBOXEN
|
||||
from overpass import fetch_overpass
|
||||
from utils import timer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Worker-Funktion (muss auf Modul-Ebene stehen — Pickle-Anforderung!)
|
||||
# ---------------------------------------------------------------------------
|
||||
def fetch_fragment(args: tuple) -> tuple[str, list]:
|
||||
"""
|
||||
Führt einen einzelnen Overpass-Request aus.
|
||||
Gibt (fragment_name, elements) zurück — oder (fragment_name, []) bei Fehler.
|
||||
|
||||
Wird von Pool.map() in einem separaten Prozess ausgeführt.
|
||||
Logging funktioniert hier nicht zuverlässig → print() als Fallback.
|
||||
"""
|
||||
name, bbox, query = args
|
||||
try:
|
||||
result = fetch_overpass(overpass_query=query, bbox=bbox)
|
||||
elements = result.get("elements", [])
|
||||
print(f"[{name}] {len(elements)} Elemente gefunden")
|
||||
return name, elements
|
||||
except (RuntimeError, requests.Timeout) as e:
|
||||
print(f"[{name}] Fehler: {e}")
|
||||
return name, []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Serielle Variante
|
||||
# ---------------------------------------------------------------------------
|
||||
@timer
|
||||
def run_seriell(query: str) -> list:
|
||||
overall = []
|
||||
for name, bbox in BBOXEN.items():
|
||||
logger.info(f"Seriell — Fragment {name}' ...")
|
||||
try:
|
||||
result = fetch_overpass(overpass_query=query, bbox=bbox)
|
||||
elements = result.get("elements", [])
|
||||
logger.info(f"'{name}': {len(elements)} Elemente")
|
||||
overall.extend(elements)
|
||||
except (RuntimeError, requests.Timeout) as e:
|
||||
logger.error(f"Fehler bei '{name}': {e}")
|
||||
return overall
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Parallele Variante mit Multiprocessing
|
||||
# ---------------------------------------------------------------------------
|
||||
@timer
|
||||
def run_parallel(query: str) -> list:
|
||||
tasks = [(name, bbox, query) for name, bbox in BBOXEN.items()]
|
||||
with Pool(processes=len(tasks)) as pool:
|
||||
results = pool.map(fetch_fragment, tasks)
|
||||
overall = []
|
||||
for name, elements in results:
|
||||
if elements:
|
||||
overall.extend(elements)
|
||||
else:
|
||||
logger.warning(f"Keine Elemente für Fragment '{name}'")
|
||||
return overall
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Parallele Variante mit ThreadPoolExecutor
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@timer
|
||||
def run_threads(query: str) -> list:
|
||||
overall = []
|
||||
errors = []
|
||||
|
||||
# ThreadPoolExecutor: kein Pickle-Zwang → logging funktioniert direkt!
|
||||
with ThreadPoolExecutor(max_workers=len(BBOXEN)) as executor:
|
||||
|
||||
# Alle Tasks auf einmal einreichen → Future-Objekte zurück
|
||||
futures = {
|
||||
executor.submit(fetch_overpass, query, bbox): name
|
||||
for name, bbox in BBOXEN.items()
|
||||
}
|
||||
|
||||
# as_completed() liefert Futures in der Reihenfolge, in der sie fertig werden
|
||||
for future in as_completed(futures):
|
||||
name = futures[future]
|
||||
try:
|
||||
result = future.result()
|
||||
elements = result.get("elements", [])
|
||||
logger.info(f"[Thread] '{name}': {len(elements)} Elemente")
|
||||
overall.extend(elements)
|
||||
except (RuntimeError, requests.Timeout) as e:
|
||||
logger.error(f"[Thread] Fehler bei '{name}': {e}")
|
||||
errors.append(name)
|
||||
|
||||
if errors:
|
||||
logger.warning(f"Fehler in Fragmenten: {errors}")
|
||||
return overall
|
||||
Loading…
x
Reference in New Issue
Block a user