{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# SQL - Injection\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Vorbereitung" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# Falls noch nicht geschehen können die Pakte importiert werden\n", "import psycopg2\n", "import psycopg2.extras" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "ename": "OperationalError", "evalue": "connection to server on socket \"/tmp/.s.PGSQL.5432\" failed: No such file or directory\n\tIs the server running locally and accepting connections on that socket?\n", "output_type": "error", "traceback": [ "\u001b[31m---------------------------------------------------------------------------\u001b[39m", "\u001b[31mOperationalError\u001b[39m Traceback (most recent call last)", "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 2\u001b[39m\n\u001b[32m 1\u001b[39m \u001b[38;5;66;03m# als nächstes bauen wir unsere Verbindung auf, legen eine Tabelle an und füllen diese mit zwei Usern (bitte eigene Datenbank auswählen):\u001b[39;00m\n\u001b[32m----> \u001b[39m\u001b[32m2\u001b[39m conn = \u001b[43mpsycopg2\u001b[49m\u001b[43m.\u001b[49m\u001b[43mconnect\u001b[49m\u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43mdbname=7Wochen user=postgres password=postgres\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[32m 4\u001b[39m cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)\n\u001b[32m 6\u001b[39m cursor.execute(\u001b[33m\"\"\"\u001b[39m\n\u001b[32m 7\u001b[39m \u001b[33m DROP TABLE IF EXISTS users;\u001b[39m\n\u001b[32m 8\u001b[39m \u001b[33m CREATE TABLE IF NOT EXISTS users (\u001b[39m\n\u001b[32m (...)\u001b[39m\u001b[32m 12\u001b[39m \u001b[33m )\u001b[39m\n\u001b[32m 13\u001b[39m \u001b[33m\"\"\"\u001b[39m)\n", "\u001b[36mFile \u001b[39m\u001b[32m~/anaconda3/lib/python3.13/site-packages/psycopg2/__init__.py:122\u001b[39m, in \u001b[36mconnect\u001b[39m\u001b[34m(dsn, connection_factory, cursor_factory, **kwargs)\u001b[39m\n\u001b[32m 119\u001b[39m kwasync[\u001b[33m'\u001b[39m\u001b[33masync_\u001b[39m\u001b[33m'\u001b[39m] = kwargs.pop(\u001b[33m'\u001b[39m\u001b[33masync_\u001b[39m\u001b[33m'\u001b[39m)\n\u001b[32m 121\u001b[39m dsn = _ext.make_dsn(dsn, **kwargs)\n\u001b[32m--> \u001b[39m\u001b[32m122\u001b[39m conn = \u001b[43m_connect\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mconnection_factory\u001b[49m\u001b[43m=\u001b[49m\u001b[43mconnection_factory\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m*\u001b[49m\u001b[43m*\u001b[49m\u001b[43mkwasync\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 123\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m cursor_factory \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m 124\u001b[39m conn.cursor_factory = cursor_factory\n", "\u001b[31mOperationalError\u001b[39m: connection to server on socket \"/tmp/.s.PGSQL.5432\" failed: No such file or directory\n\tIs the server running locally and accepting connections on that socket?\n" ] } ], "source": [ "# als nächstes bauen wir unsere Verbindung auf, legen eine Tabelle an und füllen diese mit zwei Usern (bitte eigene Datenbank auswählen):\n", "conn = psycopg2.connect (\"dbname=7Wochen user=postgres password=postgres\")\n", "\n", "cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)\n", "\n", "cursor.execute(\"\"\"\n", " DROP TABLE IF EXISTS users;\n", " CREATE TABLE IF NOT EXISTS users (\n", " id SERIAL PRIMARY KEY,\n", " username VARCHAR(255) NOT NULL,\n", " password VARCHAR(255) NOT NULL\n", " )\n", "\"\"\")\n", "\n", "cursor.execute(\"INSERT INTO users (username, password) VALUES (%s, %s)\", (\"user1\", \"password1\"))\n", "cursor.execute(\"INSERT INTO users (username, password) VALUES (%s, %s)\", (\"user2\", \"password2\"))\n", "conn.commit();" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[RealDictRow([('id', 1), ('username', 'user1'), ('password', 'password1')]),\n", " RealDictRow([('id', 2), ('username', 'user2'), ('password', 'password2')])]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Kurz zum überprüfen, ob die Tabelle angelegt wurde\n", "\n", "cursor.execute(\"SELECT * FROM users;\")\n", "result = cursor.fetchall()\n", "result" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Angriffsszenario 1 - direkt Übermittlung von Zugangsdaten" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Login erfolgreich\n" ] } ], "source": [ "# gehen wir nun davon aus, dass sich ein User, zum Beispiel über ein Formularfeld anmelden möchte.\n", "# Und zwar mit den folgenden Zugangsdaten:\n", "username = \"user1\"\n", "password = \"password1\"\n", "\n", "# Das würde dann so ablaufen:\n", "try:\n", " cursor.execute(f\"SELECT * FROM users WHERE username='{username}' AND password='{password}'\")\n", "# Nun holen wir uns eine Ergebniszeile. Gibt es die waren wir mit unserer Anmeldung erfolgreich. Kommt keine Zeile waren unsere Zugangsdaten falsch.\n", " user = cursor.fetchone()\n", "\n", " if user:\n", " print(\"Login erfolgreich\")\n", " else:\n", " print(\"Login fehlgeschlagen\")\n", "except Exception as e:\n", " print(e)\n", " conn.rollback()\n", "\n", "# Wir übergeben also die Zeichenkette aus der Python-Variable direkt an unsere SQL-Datenbank.\n", "# Ändern wir das Passwort sind wir nicht erfolgreich.\n", "# Um es schöner zu machen fügen wir noch ein Rollback ein" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Angriffszenario 2 - Übergabe von SQL-Befehlen, um Fehler zu hervorzurufen. " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "FEHLER: Zeichenkette in Anführungszeichen nicht abgeschlossen bei »'''«\n", "LINE 1: SELECT * FROM users WHERE username='user1' AND password='''\n", " ^\n", "\n" ] } ], "source": [ "# gehen wir nun davon aus, dass ein User SQL-Befehle eingibt. Zunächst nur ein einfaches Anführungszeichen als Passwort.\n", "# Das Ergebnis ist ein erzeugter SQL-Fehler.\n", "username = \"user1\"\n", "password = \"'\"\n", "\n", "try:\n", " cursor.execute(f\"SELECT * FROM users WHERE username='{username}' AND password='{password}'\")\n", " user = cursor.fetchone()\n", "\n", " if user:\n", " print(\"Login erfolgreich\")\n", " else:\n", " print(\"Login fehlgeschlagen\")\n", "except Exception as e:\n", " print(e)\n", " conn.rollback()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Angriffszenario 3 - Übergabe von SQL-Befehlen, um einen erfolgreichen Login zu generieren." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Login erfolgreich\n" ] } ], "source": [ "# gehen wir nun davon aus, dass ein User SQL-Befehle eingibt. Diesmal die Übergabe eines echten Befehls.\n", "# Dieser erzeugt eine wahre Aussage wodurch die Passwortüberprüfung erfolgreich wird.\n", "username = \"user1\"\n", "password = \"' OR 1=1 --\"\n", "\n", "try:\n", " cursor.execute(f\"SELECT * FROM users WHERE username='{username}' AND password='{password}'\")\n", " user = cursor.fetchone()\n", "\n", " if user:\n", " print(\"Login erfolgreich\")\n", " else:\n", " print(\"Login fehlgeschlagen\")\n", "except Exception as e:\n", " print(e)\n", " conn.rollback()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Abschliessend noch die Variante mit Platzhaltern, die sicherer wäre." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Login fehlgeschlagen\n" ] } ], "source": [ "# hier würde nun die Zeichenkette aus der Passworteingabe übergeben und nicht als String direkt zu SQL-Code werden.\n", "username = \"user1\"\n", "password = \"' OR 1=1 --\"\n", "\n", "try:\n", " cursor.execute(f\"SELECT * FROM users WHERE username=%s AND password=%s\", (username, password))\n", " user = cursor.fetchone()\n", "\n", " if user:\n", " print(\"Login erfolgreich\")\n", " else:\n", " print(\"Login fehlgeschlagen\")\n", "except Exception as e:\n", " print(e)\n", " conn.rollback()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "interpreter": { "hash": "a6b707a736c5fbba452b904aff207ddd250a7524df1f8c74db5bc52ff4a2560b" }, "kernelspec": { "display_name": "Python [conda env:base] *", "language": "python", "name": "conda-base-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.9" } }, "nbformat": 4, "nbformat_minor": 4 }