lawarob-rl/sol2.py
Eric Seuret de8122208c Sol2.py
2025-11-04 16:18:13 +01:00

317 lines
8.8 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
def initialize_environment():
"""Initialisiere die FrozenLake Umgebung (deterministisch)."""
env = gym.make('FrozenLake-v1', is_slippery=True)
return env
def initialize_q_table(n_states, n_actions):
"""Initialisiere Q-Tabelle mit Nullen."""
return np.zeros((n_states, n_actions))
def select_action(state, Q, epsilon, n_actions):
"""
Wähle Aktion mit ε-greedy Strategie.
Args:
state: Aktueller Zustand
Q: Q-Tabelle
epsilon: Explorations-Wahrscheinlichkeit
n_actions: Anzahl möglicher Aktionen
Returns:
Gewählte Aktion
"""
if np.random.random() < epsilon:
# Exploration: Zufällige Aktion
return np.random.randint(n_actions)
else:
# Exploitation: Beste bekannte Aktion
return np.argmax(Q[state, :])
def q_learning_update(Q, state, action, reward, next_state, alpha, gamma, done):
"""
Führe Q-Learning Update aus.
Q(s,a) ← Q(s,a) + α · [r + γ · max Q(s',a') - Q(s,a)]
Args:
Q: Q-Tabelle
state: Aktueller Zustand
action: Ausgeführte Aktion
reward: Erhaltene Belohnung
next_state: Nächster Zustand
alpha: Lernrate
gamma: Diskontierungsfaktor
done: Episode beendet?
"""
# Beste nächste Aktion
best_next_action = np.argmax(Q[next_state, :])
# TD Target
if done:
td_target = reward # Keine zukünftigen Belohnungen
else:
td_target = reward + gamma * Q[next_state, best_next_action]
# TD Error
td_error = td_target - Q[state, action]
# Update Q-Wert
Q[state, action] += alpha * td_error
def train_q_learning(env, n_episodes=2000, alpha=0.1, gamma=0.99,
epsilon_start=1.0, epsilon_min=0.01, epsilon_decay=0.999):
"""
Trainiere Q-Learning Agent.
Args:
env: Gymnasium Environment
n_episodes: Anzahl Trainings-Episoden
alpha: Lernrate
gamma: Diskontierungsfaktor
epsilon_start: Initiale Explorations-Rate
epsilon_min: Minimale Explorations-Rate
epsilon_decay: Explorations-Abnahme-Rate
Returns:
Q: Trainierte Q-Tabelle
rewards: Liste der Episoden-Belohnungen
"""
# Initialisierung
n_states = env.observation_space.n
n_actions = env.action_space.n
Q = initialize_q_table(n_states, n_actions)
epsilon = epsilon_start
rewards = []
# Trainingsschleife
for episode in range(n_episodes):
state, _ = env.reset()
done = False
episode_reward = 0
while not done:
# Aktion wählen (ε-greedy)
action = select_action(state, Q, epsilon, n_actions)
# Aktion ausführen
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
episode_reward += reward
# Q-Learning Update
q_learning_update(Q, state, action, reward, next_state, alpha, gamma, done)
# Zustand updaten
state = next_state
# Epsilon Decay
epsilon = max(epsilon_min, epsilon * epsilon_decay)
rewards.append(episode_reward)
# Fortschritt ausgeben
if (episode + 1) % 100 == 0:
avg_reward = np.mean(rewards[-100:])
print(f"Episode {episode+1}/{n_episodes}, Avg Belohnung (letzte 100): {avg_reward:.2f}, ε: {epsilon:.3f}")
return Q, rewards
def evaluate_policy(env, Q, n_episodes=100):
"""
Evaluiere gelernte Policy ohne Exploration.
Args:
env: Gymnasium Environment
Q: Trainierte Q-Tabelle
n_episodes: Anzahl Evaluations-Episoden
Returns:
success_rate: Erfolgsrate in Prozent
avg_reward: Durchschnittliche Belohnung pro Episode
"""
successes = 0
total_reward = 0
for episode in range(n_episodes):
state, _ = env.reset()
done = False
episode_reward = 0
while not done:
# Greedy Aktion (keine Exploration)
action = np.argmax(Q[state, :])
state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
episode_reward += reward
total_reward += episode_reward
if episode_reward > 0:
successes += 1
success_rate = (successes / n_episodes) * 100
avg_reward = total_reward / n_episodes
return success_rate, avg_reward
def plot_learning_curve(rewards, window=100):
"""
Plotte Lernkurve mit gleitendem Durchschnitt.
Args:
rewards: Liste der Episoden-Belohnungen
window: Fenstergrösse für gleitenden Durchschnitt
"""
plt.figure(figsize=(10, 6))
plt.plot(rewards, alpha=0.3, label='Episoden-Belohnung')
# Gleitender Durchschnitt
if len(rewards) >= window:
moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid')
plt.plot(range(window-1, len(rewards)), moving_avg,
label=f'Gleitender Durchschnitt ({window} Episoden)', linewidth=2)
plt.xlabel('Episode')
plt.ylabel('Belohnung')
plt.title('Lernkurve')
plt.legend(loc='lower right')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def visualize_q_table(Q):
"""
Visualisiere Q-Tabelle als Heatmap.
Args:
Q: Q-Tabelle
"""
plt.figure(figsize=(10, 8))
sns.heatmap(Q, annot=True, fmt='.2f', cmap='YlOrRd',
xticklabels=['', '', '', ''],
yticklabels=[str(i) for i in range(Q.shape[0])])
plt.title('Q-Tabelle nach Training')
plt.xlabel('Aktion')
plt.ylabel('Zustand')
plt.tight_layout()
plt.show()
def visualize_policy(Q):
"""
Visualisiere gelernte Policy auf dem Grid.
Args:
Q: Q-Tabelle
"""
policy = np.argmax(Q, axis=1)
action_arrows = {0: '', 1: '', 2: '', 3: ''}
print("\n" + "="*40)
print("GELERNTE POLICY")
print("="*40)
print("\nFrozenLake Grid:")
print("SFFF")
print("FHFH")
print("FFFH")
print("HFFG")
print("\nOptimale Aktionen:")
policy_grid = policy.reshape(4, 4)
for i, row in enumerate(policy_grid):
row_str = ' '.join([action_arrows[a] for a in row])
print(f"Reihe {i}: {row_str}")
print("\nZustand-für-Zustand Policy:")
for state in range(16):
row, col = state // 4, state % 4
print(f"Zustand {state:2d} (Reihe {row}, Spalte {col}): {action_arrows[policy[state]]}")
print("="*40 + "\n")
def main():
"""Hauptfunktion für alle Aufgaben aus Übung 1."""
print("="*60)
print("Q-LEARNING FÜR FROZENLAKE (STOCHASTISCH)")
print("Praktische Übung 2 - Lösung")
print("="*60)
# Environment initialisieren
env = initialize_environment()
n_states = env.observation_space.n # type: ignore
n_actions = env.action_space.n # type: ignore
print(f"\nEnvironment: FrozenLake-v1 (is_slippery=True)")
print(f"Zustände: {n_states}")
print(f"Aktionen: {n_actions}")
# ========================================
# HYPERPARAMETER - Optimal getunt für stochastisches FrozenLake
# ========================================
n_episodes = 6000
alpha = 0.1 # Lernrate
gamma = 0.99 # Diskontierung
epsilon_start = 1.0
epsilon_min = 0.01
epsilon_decay = 0.9985 # Optimal für stochastisches Environment
# Q-Learning trainieren
print("\n--- Q-Learning Training ---")
print(f"Hyperparameter: α={alpha}, γ={gamma}, ε_decay={epsilon_decay}")
Q, rewards = train_q_learning(
env,
n_episodes=n_episodes,
alpha=alpha,
gamma=gamma,
epsilon_start=epsilon_start,
epsilon_min=epsilon_min,
epsilon_decay=epsilon_decay
)
# Lernkurve plotten
print("\n--- Lernkurve ---")
plot_learning_curve(rewards)
# Policy evaluieren
print("\n--- Policy Evaluation ---")
success_rate, avg_reward = evaluate_policy(env, Q, n_episodes=100)
print(f"Erfolgsrate (100 Episoden): {success_rate:.1f}%")
print(f"Durchschnittliche Belohnung: {avg_reward:.3f}")
# Visualisierungen
print("\n--- Visualisierungen ---")
visualize_q_table(Q)
visualize_policy(Q)
print("\n" + "="*60)
print("TRAINING ABGESCHLOSSEN!")
print("="*60)
print("\nÄndere die Hyperparameter oben und führe erneut aus!")
print(" - Versuche α = 0.01 oder α = 0.5")
print(" - Versuche γ = 0.5 oder γ = 0.99")
print(" - Versuche epsilon_decay = 1.0 (kein Decay)")
env.close()
if __name__ == "__main__":
main()