#!/usr/bin/env python import gymnasium as gym import numpy as np import matplotlib.pyplot as plt import seaborn as sns def initialize_environment(): """Initialisiere die FrozenLake Umgebung (deterministisch).""" env = gym.make('FrozenLake-v1', is_slippery=False) return env def initialize_q_table(n_states, n_actions): """Initialisiere Q-Tabelle mit Nullen.""" return np.zeros((n_states, n_actions)) def select_action(state, Q, epsilon, n_actions): """ Wähle Aktion mit ε-greedy Strategie. Args: state: Aktueller Zustand Q: Q-Tabelle epsilon: Explorations-Wahrscheinlichkeit n_actions: Anzahl möglicher Aktionen Returns: Gewählte Aktion """ if np.random.random() < epsilon: # Exploration: Zufällige Aktion return np.random.randint(n_actions) else: # Exploitation: Beste bekannte Aktion return np.argmax(Q[state, :]) def q_learning_update(Q, state, action, reward, next_state, alpha, gamma, done): """ Führe Q-Learning Update aus. Q(s,a) ← Q(s,a) + α · [r + γ · max Q(s',a') - Q(s,a)] Args: Q: Q-Tabelle state: Aktueller Zustand action: Ausgeführte Aktion reward: Erhaltene Belohnung next_state: Nächster Zustand alpha: Lernrate gamma: Diskontierungsfaktor done: Episode beendet? """ # Beste nächste Aktion best_next_action = np.argmax(Q[next_state, :]) # TD Target if done: td_target = reward # Keine zukünftigen Belohnungen else: td_target = reward + gamma * Q[next_state, best_next_action] # TD Error td_error = td_target - Q[state, action] # Update Q-Wert Q[state, action] += alpha * td_error def train_q_learning(env, n_episodes=2000, alpha=0.1, gamma=0.99, epsilon_start=1.0, epsilon_min=0.01, epsilon_decay=0.999): """ Trainiere Q-Learning Agent. Args: env: Gymnasium Environment n_episodes: Anzahl Trainings-Episoden alpha: Lernrate gamma: Diskontierungsfaktor epsilon_start: Initiale Explorations-Rate epsilon_min: Minimale Explorations-Rate epsilon_decay: Explorations-Abnahme-Rate Returns: Q: Trainierte Q-Tabelle rewards: Liste der Episoden-Belohnungen """ # Initialisierung n_states = env.observation_space.n n_actions = env.action_space.n Q = initialize_q_table(n_states, n_actions) epsilon = epsilon_start rewards = [] # Trainingsschleife for episode in range(n_episodes): state, _ = env.reset() done = False episode_reward = 0 while not done: # Aktion wählen (ε-greedy) action = select_action(state, Q, epsilon, n_actions) # Aktion ausführen next_state, reward, terminated, truncated, _ = env.step(action) done = terminated or truncated episode_reward += reward # Q-Learning Update q_learning_update(Q, state, action, reward, next_state, alpha, gamma, done) # Zustand updaten state = next_state # Epsilon Decay epsilon = max(epsilon_min, epsilon * epsilon_decay) rewards.append(episode_reward) # Fortschritt ausgeben if (episode + 1) % 100 == 0: avg_reward = np.mean(rewards[-100:]) print(f"Episode {episode+1}/{n_episodes}, Avg Belohnung (letzte 100): {avg_reward:.2f}, ε: {epsilon:.3f}") return Q, rewards def evaluate_policy(env, Q, n_episodes=100): """ Evaluiere gelernte Policy ohne Exploration. Args: env: Gymnasium Environment Q: Trainierte Q-Tabelle n_episodes: Anzahl Evaluations-Episoden Returns: success_rate: Erfolgsrate in Prozent avg_reward: Durchschnittliche Belohnung pro Episode """ successes = 0 total_reward = 0 for episode in range(n_episodes): state, _ = env.reset() done = False episode_reward = 0 while not done: # Greedy Aktion (keine Exploration) action = np.argmax(Q[state, :]) state, reward, terminated, truncated, _ = env.step(action) done = terminated or truncated episode_reward += reward total_reward += episode_reward if episode_reward > 0: successes += 1 success_rate = (successes / n_episodes) * 100 avg_reward = total_reward / n_episodes return success_rate, avg_reward def plot_learning_curve(rewards, window=100): """ Plotte Lernkurve mit gleitendem Durchschnitt. Args: rewards: Liste der Episoden-Belohnungen window: Fenstergrösse für gleitenden Durchschnitt """ plt.figure(figsize=(10, 6)) plt.plot(rewards, alpha=0.3, label='Episoden-Belohnung') # Gleitender Durchschnitt if len(rewards) >= window: moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid') plt.plot(range(window-1, len(rewards)), moving_avg, label=f'Gleitender Durchschnitt ({window} Episoden)', linewidth=2) plt.xlabel('Episode') plt.ylabel('Belohnung') plt.title('Lernkurve') plt.legend(loc='lower right') plt.grid(True, alpha=0.3) plt.tight_layout() plt.show() def visualize_q_table(Q): """ Visualisiere Q-Tabelle als Heatmap. Args: Q: Q-Tabelle """ plt.figure(figsize=(10, 8)) sns.heatmap(Q, annot=True, fmt='.2f', cmap='YlOrRd', xticklabels=['←', '↓', '→', '↑'], yticklabels=[str(i) for i in range(Q.shape[0])]) plt.title('Q-Tabelle nach Training') plt.xlabel('Aktion') plt.ylabel('Zustand') plt.tight_layout() plt.show() def visualize_policy(Q): """ Visualisiere gelernte Policy auf dem Grid. Args: Q: Q-Tabelle """ policy = np.argmax(Q, axis=1) action_arrows = {0: '←', 1: '↓', 2: '→', 3: '↑'} print("\n" + "="*40) print("GELERNTE POLICY") print("="*40) print("\nFrozenLake Grid:") print("SFFF") print("FHFH") print("FFFH") print("HFFG") print("\nOptimale Aktionen:") policy_grid = policy.reshape(4, 4) for i, row in enumerate(policy_grid): row_str = ' '.join([action_arrows[a] for a in row]) print(f"Reihe {i}: {row_str}") print("\nZustand-für-Zustand Policy:") for state in range(16): row, col = state // 4, state % 4 print(f"Zustand {state:2d} (Reihe {row}, Spalte {col}): {action_arrows[policy[state]]}") print("="*40 + "\n") def main(): """Hauptfunktion für alle Aufgaben aus Übung 1.""" print("="*60) print("Q-LEARNING FÜR FROZENLAKE (DETERMINISTISCH)") print("Praktische Übung 1 - Lösung") print("="*60) # Environment initialisieren env = initialize_environment() n_states = env.observation_space.n # type: ignore n_actions = env.action_space.n # type: ignore print(f"\nEnvironment: FrozenLake-v1 (is_slippery=False)") print(f"Zustände: {n_states}") print(f"Aktionen: {n_actions}") # ======================================== # HYPERPARAMETER - Optimal getunt für deterministisches FrozenLake # ======================================== n_episodes = 3000 alpha = 0.4 # Lernrate gamma = 0.99 # Diskontierung epsilon_start = 1.0 epsilon_min = 0.01 epsilon_decay = 0.999 # Optimal für deterministisches Environment # Q-Learning trainieren print("\n--- Q-Learning Training ---") print(f"Hyperparameter: α={alpha}, γ={gamma}, ε_decay={epsilon_decay}") Q, rewards = train_q_learning( env, n_episodes=n_episodes, alpha=alpha, gamma=gamma, epsilon_start=epsilon_start, epsilon_min=epsilon_min, epsilon_decay=epsilon_decay ) # Lernkurve plotten print("\n--- Lernkurve ---") plot_learning_curve(rewards) # Policy evaluieren print("\n--- Policy Evaluation ---") success_rate, avg_reward = evaluate_policy(env, Q, n_episodes=100) print(f"Erfolgsrate (100 Episoden): {success_rate:.1f}%") print(f"Durchschnittliche Belohnung: {avg_reward:.3f}") # Visualisierungen print("\n--- Visualisierungen ---") visualize_q_table(Q) visualize_policy(Q) print("\n" + "="*60) print("TRAINING ABGESCHLOSSEN!") print("="*60) print("\nÄndere die Hyperparameter oben und führe erneut aus!") print(" - Versuche α = 0.01 oder α = 0.5") print(" - Versuche γ = 0.5 oder γ = 0.99") print(" - Versuche epsilon_decay = 1.0 (kein Decay)") env.close() if __name__ == "__main__": main()