90 lines
3.2 KiB
Python
90 lines
3.2 KiB
Python
import requests
|
|
from pprint import pprint
|
|
|
|
|
|
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
|
|
|
|
|
|
class OverpassApiError(Exception):
|
|
pass
|
|
|
|
|
|
def fetch_overpass(overpass_query: str, bbox: tuple) -> dict:
|
|
"""
|
|
Fragt die Overpass API nach Bergbahnen in der angegebenen Bounding Box ab.
|
|
Sendet einen HTTP-POST-Request an die Overpass API und gibt die geparste
|
|
JSON-Antwort zurück.
|
|
|
|
Args:
|
|
overpass_query (str): Overpass-QL-Query mit dem Platzhalter {bbox}.
|
|
Beispiel:
|
|
'[out:json][timeout:5];
|
|
(node["aerialway"="station"]({bbox}););
|
|
out center body;'
|
|
bbox (tuple): Bounding Box als 4-Tuple in Dezimalgrad:
|
|
(south, west, north, east)
|
|
Beispiel Davos: (46.72, 9.70, 46.92, 10.00)
|
|
Beispiel Schweiz: (45.8, 5.9, 47.8, 10.5)
|
|
|
|
Returns:
|
|
dict: Geparste JSON-Antwort der Overpass API. Die Antwort enthält
|
|
unter dem Schlüssel "elements" eine Liste von OSM-Objekten
|
|
(nodes und ways) mit ihren Tags und Koordinaten.
|
|
Beispiel:
|
|
{
|
|
"elements": [
|
|
{
|
|
"type": "node",
|
|
"id": 123456,
|
|
"lat": 46.8, "lon": 9.8,
|
|
"tags": {"aerialway": "station", "name": "Jakobshorn"}
|
|
},
|
|
...
|
|
]
|
|
}
|
|
|
|
Raises:
|
|
OverpassApiError: Wenn die API nicht innerhalb des gesetzten Timeouts
|
|
antwortet (clientseitig, unabhängig vom serverseitigen
|
|
Timeout im Query).
|
|
OverpassApiError: Wenn der Request aus einem anderen Grund fehlschlägt
|
|
(z.B. 429 Too Many Requests, 504 Gateway Timeout,
|
|
Netzwerkfehler).
|
|
"""
|
|
|
|
bbox_str = ",".join(map(str, bbox))
|
|
query = overpass_query.format(bbox=bbox_str)
|
|
|
|
try:
|
|
response = requests.post(
|
|
OVERPASS_URL,
|
|
data={"data": query},
|
|
timeout=5,
|
|
headers={"User-Agent": "CDS Exercise"},
|
|
)
|
|
response.raise_for_status() # prüft den HTTP-Statuscode der Antwort und wirft eine Exception, wenn es ein Fehler war (requests.HTTPError)
|
|
except requests.Timeout as exc:
|
|
raise OverpassApiError("Overpass-API Timeout") from exc
|
|
except requests.RequestException as exc:
|
|
raise OverpassApiError("Overpass-API Request fehlgeschlagen") from exc
|
|
return response.json()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
BERGBAHN_QUERY = """
|
|
[out:json][timeout:2][maxsize:500000];
|
|
(
|
|
node["aerialway"="station"]({bbox});
|
|
way["aerialway"="station"]({bbox});
|
|
node["railway"="funicular"]({bbox});
|
|
way["railway"="funicular"]({bbox});
|
|
node["railway"="station"]["funicular"="yes"]({bbox});
|
|
);
|
|
out center body;
|
|
"""
|
|
|
|
bbox = (46.72, 9.70, 46.92, 10.00)
|
|
# bbox = (45.8, 5.9, 47.8, 10.5)
|
|
result = fetch_overpass(overpass_query=BERGBAHN_QUERY, bbox=bbox)
|
|
pprint(result) |