cds-104_Datenbanken/Block-2/SQL_Injectionn_mit_Code.ipynb

288 lines
10 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# SQL - Injection\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Vorbereitung"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Falls noch nicht geschehen können die Pakte importiert werden\n",
"import psycopg2\n",
"import psycopg2.extras"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"ename": "OperationalError",
"evalue": "connection to server on socket \"/tmp/.s.PGSQL.5432\" failed: No such file or directory\n\tIs the server running locally and accepting connections on that socket?\n",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mOperationalError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 2\u001b[39m\n\u001b[32m 1\u001b[39m \u001b[38;5;66;03m# als nächstes bauen wir unsere Verbindung auf, legen eine Tabelle an und füllen diese mit zwei Usern (bitte eigene Datenbank auswählen):\u001b[39;00m\n\u001b[32m----> \u001b[39m\u001b[32m2\u001b[39m conn = \u001b[43mpsycopg2\u001b[49m\u001b[43m.\u001b[49m\u001b[43mconnect\u001b[49m\u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43mdbname=7Wochen user=postgres password=postgres\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[32m 4\u001b[39m cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)\n\u001b[32m 6\u001b[39m cursor.execute(\u001b[33m\"\"\"\u001b[39m\n\u001b[32m 7\u001b[39m \u001b[33m DROP TABLE IF EXISTS users;\u001b[39m\n\u001b[32m 8\u001b[39m \u001b[33m CREATE TABLE IF NOT EXISTS users (\u001b[39m\n\u001b[32m (...)\u001b[39m\u001b[32m 12\u001b[39m \u001b[33m )\u001b[39m\n\u001b[32m 13\u001b[39m \u001b[33m\"\"\"\u001b[39m)\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/anaconda3/lib/python3.13/site-packages/psycopg2/__init__.py:122\u001b[39m, in \u001b[36mconnect\u001b[39m\u001b[34m(dsn, connection_factory, cursor_factory, **kwargs)\u001b[39m\n\u001b[32m 119\u001b[39m kwasync[\u001b[33m'\u001b[39m\u001b[33masync_\u001b[39m\u001b[33m'\u001b[39m] = kwargs.pop(\u001b[33m'\u001b[39m\u001b[33masync_\u001b[39m\u001b[33m'\u001b[39m)\n\u001b[32m 121\u001b[39m dsn = _ext.make_dsn(dsn, **kwargs)\n\u001b[32m--> \u001b[39m\u001b[32m122\u001b[39m conn = \u001b[43m_connect\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mconnection_factory\u001b[49m\u001b[43m=\u001b[49m\u001b[43mconnection_factory\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m*\u001b[49m\u001b[43m*\u001b[49m\u001b[43mkwasync\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 123\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m cursor_factory \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m 124\u001b[39m conn.cursor_factory = cursor_factory\n",
"\u001b[31mOperationalError\u001b[39m: connection to server on socket \"/tmp/.s.PGSQL.5432\" failed: No such file or directory\n\tIs the server running locally and accepting connections on that socket?\n"
]
}
],
"source": [
"# als nächstes bauen wir unsere Verbindung auf, legen eine Tabelle an und füllen diese mit zwei Usern (bitte eigene Datenbank auswählen):\n",
"conn = psycopg2.connect (\"dbname=7Wochen user=postgres password=postgres\")\n",
"\n",
"cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)\n",
"\n",
"cursor.execute(\"\"\"\n",
" DROP TABLE IF EXISTS users;\n",
" CREATE TABLE IF NOT EXISTS users (\n",
" id SERIAL PRIMARY KEY,\n",
" username VARCHAR(255) NOT NULL,\n",
" password VARCHAR(255) NOT NULL\n",
" )\n",
"\"\"\")\n",
"\n",
"cursor.execute(\"INSERT INTO users (username, password) VALUES (%s, %s)\", (\"user1\", \"password1\"))\n",
"cursor.execute(\"INSERT INTO users (username, password) VALUES (%s, %s)\", (\"user2\", \"password2\"))\n",
"conn.commit();"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[RealDictRow([('id', 1), ('username', 'user1'), ('password', 'password1')]),\n",
" RealDictRow([('id', 2), ('username', 'user2'), ('password', 'password2')])]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Kurz zum überprüfen, ob die Tabelle angelegt wurde\n",
"\n",
"cursor.execute(\"SELECT * FROM users;\")\n",
"result = cursor.fetchall()\n",
"result"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Angriffsszenario 1 - direkt Übermittlung von Zugangsdaten"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Login erfolgreich\n"
]
}
],
"source": [
"# gehen wir nun davon aus, dass sich ein User, zum Beispiel über ein Formularfeld anmelden möchte.\n",
"# Und zwar mit den folgenden Zugangsdaten:\n",
"username = \"user1\"\n",
"password = \"password1\"\n",
"\n",
"# Das würde dann so ablaufen:\n",
"try:\n",
" cursor.execute(f\"SELECT * FROM users WHERE username='{username}' AND password='{password}'\")\n",
"# Nun holen wir uns eine Ergebniszeile. Gibt es die waren wir mit unserer Anmeldung erfolgreich. Kommt keine Zeile waren unsere Zugangsdaten falsch.\n",
" user = cursor.fetchone()\n",
"\n",
" if user:\n",
" print(\"Login erfolgreich\")\n",
" else:\n",
" print(\"Login fehlgeschlagen\")\n",
"except Exception as e:\n",
" print(e)\n",
" conn.rollback()\n",
"\n",
"# Wir übergeben also die Zeichenkette aus der Python-Variable direkt an unsere SQL-Datenbank.\n",
"# Ändern wir das Passwort sind wir nicht erfolgreich.\n",
"# Um es schöner zu machen fügen wir noch ein Rollback ein"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Angriffszenario 2 - Übergabe von SQL-Befehlen, um Fehler zu hervorzurufen. "
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"FEHLER: Zeichenkette in Anführungszeichen nicht abgeschlossen bei »'''«\n",
"LINE 1: SELECT * FROM users WHERE username='user1' AND password='''\n",
" ^\n",
"\n"
]
}
],
"source": [
"# gehen wir nun davon aus, dass ein User SQL-Befehle eingibt. Zunächst nur ein einfaches Anführungszeichen als Passwort.\n",
"# Das Ergebnis ist ein erzeugter SQL-Fehler.\n",
"username = \"user1\"\n",
"password = \"'\"\n",
"\n",
"try:\n",
" cursor.execute(f\"SELECT * FROM users WHERE username='{username}' AND password='{password}'\")\n",
" user = cursor.fetchone()\n",
"\n",
" if user:\n",
" print(\"Login erfolgreich\")\n",
" else:\n",
" print(\"Login fehlgeschlagen\")\n",
"except Exception as e:\n",
" print(e)\n",
" conn.rollback()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Angriffszenario 3 - Übergabe von SQL-Befehlen, um einen erfolgreichen Login zu generieren."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Login erfolgreich\n"
]
}
],
"source": [
"# gehen wir nun davon aus, dass ein User SQL-Befehle eingibt. Diesmal die Übergabe eines echten Befehls.\n",
"# Dieser erzeugt eine wahre Aussage wodurch die Passwortüberprüfung erfolgreich wird.\n",
"username = \"user1\"\n",
"password = \"' OR 1=1 --\"\n",
"\n",
"try:\n",
" cursor.execute(f\"SELECT * FROM users WHERE username='{username}' AND password='{password}'\")\n",
" user = cursor.fetchone()\n",
"\n",
" if user:\n",
" print(\"Login erfolgreich\")\n",
" else:\n",
" print(\"Login fehlgeschlagen\")\n",
"except Exception as e:\n",
" print(e)\n",
" conn.rollback()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Abschliessend noch die Variante mit Platzhaltern, die sicherer wäre."
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Login fehlgeschlagen\n"
]
}
],
"source": [
"# hier würde nun die Zeichenkette aus der Passworteingabe übergeben und nicht als String direkt zu SQL-Code werden.\n",
"username = \"user1\"\n",
"password = \"' OR 1=1 --\"\n",
"\n",
"try:\n",
" cursor.execute(f\"SELECT * FROM users WHERE username=%s AND password=%s\", (username, password))\n",
" user = cursor.fetchone()\n",
"\n",
" if user:\n",
" print(\"Login erfolgreich\")\n",
" else:\n",
" print(\"Login fehlgeschlagen\")\n",
"except Exception as e:\n",
" print(e)\n",
" conn.rollback()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"interpreter": {
"hash": "a6b707a736c5fbba452b904aff207ddd250a7524df1f8c74db5bc52ff4a2560b"
},
"kernelspec": {
"display_name": "Python [conda env:base] *",
"language": "python",
"name": "conda-base-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}