diff --git a/sol1.py b/sol1.py new file mode 100755 index 0000000..0259e10 --- /dev/null +++ b/sol1.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python +import gymnasium as gym +import numpy as np +import matplotlib.pyplot as plt +import seaborn as sns + + +def initialize_environment(): + """Initialisiere die FrozenLake Umgebung (deterministisch).""" + env = gym.make('FrozenLake-v1', is_slippery=False) + return env + + +def initialize_q_table(n_states, n_actions): + """Initialisiere Q-Tabelle mit Nullen.""" + return np.zeros((n_states, n_actions)) + + +def select_action(state, Q, epsilon, n_actions): + """ + Wähle Aktion mit ε-greedy Strategie. + + Args: + state: Aktueller Zustand + Q: Q-Tabelle + epsilon: Explorations-Wahrscheinlichkeit + n_actions: Anzahl möglicher Aktionen + + Returns: + Gewählte Aktion + """ + if np.random.random() < epsilon: + # Exploration: Zufällige Aktion + return np.random.randint(n_actions) + else: + # Exploitation: Beste bekannte Aktion + return np.argmax(Q[state, :]) + + +def q_learning_update(Q, state, action, reward, next_state, alpha, gamma, done): + """ + Führe Q-Learning Update aus. + + Q(s,a) ← Q(s,a) + α · [r + γ · max Q(s',a') - Q(s,a)] + + Args: + Q: Q-Tabelle + state: Aktueller Zustand + action: Ausgeführte Aktion + reward: Erhaltene Belohnung + next_state: Nächster Zustand + alpha: Lernrate + gamma: Diskontierungsfaktor + done: Episode beendet? + """ + # Beste nächste Aktion + best_next_action = np.argmax(Q[next_state, :]) + + # TD Target + if done: + td_target = reward # Keine zukünftigen Belohnungen + else: + td_target = reward + gamma * Q[next_state, best_next_action] + + # TD Error + td_error = td_target - Q[state, action] + + # Update Q-Wert + Q[state, action] += alpha * td_error + + +def train_q_learning(env, n_episodes=2000, alpha=0.1, gamma=0.99, + epsilon_start=1.0, epsilon_min=0.01, epsilon_decay=0.999): + """ + Trainiere Q-Learning Agent. + + Args: + env: Gymnasium Environment + n_episodes: Anzahl Trainings-Episoden + alpha: Lernrate + gamma: Diskontierungsfaktor + epsilon_start: Initiale Explorations-Rate + epsilon_min: Minimale Explorations-Rate + epsilon_decay: Explorations-Abnahme-Rate + + Returns: + Q: Trainierte Q-Tabelle + rewards: Liste der Episoden-Belohnungen + """ + # Initialisierung + n_states = env.observation_space.n + n_actions = env.action_space.n + Q = initialize_q_table(n_states, n_actions) + + epsilon = epsilon_start + rewards = [] + + # Trainingsschleife + for episode in range(n_episodes): + state, _ = env.reset() + done = False + episode_reward = 0 + + while not done: + # Aktion wählen (ε-greedy) + action = select_action(state, Q, epsilon, n_actions) + + # Aktion ausführen + next_state, reward, terminated, truncated, _ = env.step(action) + done = terminated or truncated + episode_reward += reward + + # Q-Learning Update + q_learning_update(Q, state, action, reward, next_state, alpha, gamma, done) + + # Zustand updaten + state = next_state + + # Epsilon Decay + epsilon = max(epsilon_min, epsilon * epsilon_decay) + rewards.append(episode_reward) + + # Fortschritt ausgeben + if (episode + 1) % 100 == 0: + avg_reward = np.mean(rewards[-100:]) + print(f"Episode {episode+1}/{n_episodes}, Avg Belohnung (letzte 100): {avg_reward:.2f}, ε: {epsilon:.3f}") + + return Q, rewards + + +def evaluate_policy(env, Q, n_episodes=100): + """ + Evaluiere gelernte Policy ohne Exploration. + + Args: + env: Gymnasium Environment + Q: Trainierte Q-Tabelle + n_episodes: Anzahl Evaluations-Episoden + + Returns: + success_rate: Erfolgsrate in Prozent + avg_reward: Durchschnittliche Belohnung pro Episode + """ + successes = 0 + total_reward = 0 + + for episode in range(n_episodes): + state, _ = env.reset() + done = False + episode_reward = 0 + + while not done: + # Greedy Aktion (keine Exploration) + action = np.argmax(Q[state, :]) + state, reward, terminated, truncated, _ = env.step(action) + done = terminated or truncated + episode_reward += reward + + total_reward += episode_reward + if episode_reward > 0: + successes += 1 + + success_rate = (successes / n_episodes) * 100 + avg_reward = total_reward / n_episodes + + return success_rate, avg_reward + + +def plot_learning_curve(rewards, window=100): + """ + Plotte Lernkurve mit gleitendem Durchschnitt. + + Args: + rewards: Liste der Episoden-Belohnungen + window: Fenstergrösse für gleitenden Durchschnitt + """ + plt.figure(figsize=(10, 6)) + + plt.plot(rewards, alpha=0.3, label='Episoden-Belohnung') + + # Gleitender Durchschnitt + if len(rewards) >= window: + moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid') + plt.plot(range(window-1, len(rewards)), moving_avg, + label=f'Gleitender Durchschnitt ({window} Episoden)', linewidth=2) + + plt.xlabel('Episode') + plt.ylabel('Belohnung') + plt.title('Lernkurve') + plt.legend(loc='lower right') + plt.grid(True, alpha=0.3) + + plt.tight_layout() + plt.show() + + +def visualize_q_table(Q): + """ + Visualisiere Q-Tabelle als Heatmap. + + Args: + Q: Q-Tabelle + """ + plt.figure(figsize=(10, 8)) + sns.heatmap(Q, annot=True, fmt='.2f', cmap='YlOrRd', + xticklabels=['←', '↓', '→', '↑'], + yticklabels=[str(i) for i in range(Q.shape[0])]) + plt.title('Q-Tabelle nach Training') + plt.xlabel('Aktion') + plt.ylabel('Zustand') + plt.tight_layout() + plt.show() + + +def visualize_policy(Q): + """ + Visualisiere gelernte Policy auf dem Grid. + + Args: + Q: Q-Tabelle + """ + policy = np.argmax(Q, axis=1) + action_arrows = {0: '←', 1: '↓', 2: '→', 3: '↑'} + + print("\n" + "="*40) + print("GELERNTE POLICY") + print("="*40) + print("\nFrozenLake Grid:") + print("SFFF") + print("FHFH") + print("FFFH") + print("HFFG") + print("\nOptimale Aktionen:") + + policy_grid = policy.reshape(4, 4) + for i, row in enumerate(policy_grid): + row_str = ' '.join([action_arrows[a] for a in row]) + print(f"Reihe {i}: {row_str}") + + print("\nZustand-für-Zustand Policy:") + for state in range(16): + row, col = state // 4, state % 4 + print(f"Zustand {state:2d} (Reihe {row}, Spalte {col}): {action_arrows[policy[state]]}") + print("="*40 + "\n") + + + + + +def main(): + """Hauptfunktion für alle Aufgaben aus Übung 1.""" + + print("="*60) + print("Q-LEARNING FÜR FROZENLAKE (DETERMINISTISCH)") + print("Praktische Übung 1 - Lösung") + print("="*60) + + # Environment initialisieren + env = initialize_environment() + n_states = env.observation_space.n # type: ignore + n_actions = env.action_space.n # type: ignore + print(f"\nEnvironment: FrozenLake-v1 (is_slippery=False)") + print(f"Zustände: {n_states}") + print(f"Aktionen: {n_actions}") + + # ======================================== + # HYPERPARAMETER - Optimal getunt für deterministisches FrozenLake + # ======================================== + n_episodes = 3000 + alpha = 0.4 # Lernrate + gamma = 0.99 # Diskontierung + epsilon_start = 1.0 + epsilon_min = 0.01 + epsilon_decay = 0.999 # Optimal für deterministisches Environment + + # Q-Learning trainieren + print("\n--- Q-Learning Training ---") + print(f"Hyperparameter: α={alpha}, γ={gamma}, ε_decay={epsilon_decay}") + Q, rewards = train_q_learning( + env, + n_episodes=n_episodes, + alpha=alpha, + gamma=gamma, + epsilon_start=epsilon_start, + epsilon_min=epsilon_min, + epsilon_decay=epsilon_decay + ) + + # Lernkurve plotten + print("\n--- Lernkurve ---") + plot_learning_curve(rewards) + + # Policy evaluieren + print("\n--- Policy Evaluation ---") + success_rate, avg_reward = evaluate_policy(env, Q, n_episodes=100) + print(f"Erfolgsrate (100 Episoden): {success_rate:.1f}%") + print(f"Durchschnittliche Belohnung: {avg_reward:.3f}") + + # Visualisierungen + print("\n--- Visualisierungen ---") + visualize_q_table(Q) + visualize_policy(Q) + + print("\n" + "="*60) + print("TRAINING ABGESCHLOSSEN!") + print("="*60) + print("\nÄndere die Hyperparameter oben und führe erneut aus!") + print(" - Versuche α = 0.01 oder α = 0.5") + print(" - Versuche γ = 0.5 oder γ = 0.99") + print(" - Versuche epsilon_decay = 1.0 (kein Decay)") + + env.close() + + +if __name__ == "__main__": + main()