317 lines
8.8 KiB
Python
Executable File
317 lines
8.8 KiB
Python
Executable File
#!/usr/bin/env python
|
||
import gymnasium as gym
|
||
import numpy as np
|
||
import matplotlib.pyplot as plt
|
||
import seaborn as sns
|
||
|
||
|
||
def initialize_environment():
|
||
"""Initialisiere die FrozenLake Umgebung (deterministisch)."""
|
||
env = gym.make('FrozenLake-v1', is_slippery=False)
|
||
return env
|
||
|
||
|
||
def initialize_q_table(n_states, n_actions):
|
||
"""Initialisiere Q-Tabelle mit Nullen."""
|
||
return np.zeros((n_states, n_actions))
|
||
|
||
|
||
def select_action(state, Q, epsilon, n_actions):
|
||
"""
|
||
Wähle Aktion mit ε-greedy Strategie.
|
||
|
||
Args:
|
||
state: Aktueller Zustand
|
||
Q: Q-Tabelle
|
||
epsilon: Explorations-Wahrscheinlichkeit
|
||
n_actions: Anzahl möglicher Aktionen
|
||
|
||
Returns:
|
||
Gewählte Aktion
|
||
"""
|
||
if np.random.random() < epsilon:
|
||
# Exploration: Zufällige Aktion
|
||
return np.random.randint(n_actions)
|
||
else:
|
||
# Exploitation: Beste bekannte Aktion
|
||
return np.argmax(Q[state, :])
|
||
|
||
|
||
def q_learning_update(Q, state, action, reward, next_state, alpha, gamma, done):
|
||
"""
|
||
Führe Q-Learning Update aus.
|
||
|
||
Q(s,a) ← Q(s,a) + α · [r + γ · max Q(s',a') - Q(s,a)]
|
||
|
||
Args:
|
||
Q: Q-Tabelle
|
||
state: Aktueller Zustand
|
||
action: Ausgeführte Aktion
|
||
reward: Erhaltene Belohnung
|
||
next_state: Nächster Zustand
|
||
alpha: Lernrate
|
||
gamma: Diskontierungsfaktor
|
||
done: Episode beendet?
|
||
"""
|
||
# Beste nächste Aktion
|
||
best_next_action = np.argmax(Q[next_state, :])
|
||
|
||
# TD Target
|
||
if done:
|
||
td_target = reward # Keine zukünftigen Belohnungen
|
||
else:
|
||
td_target = reward + gamma * Q[next_state, best_next_action]
|
||
|
||
# TD Error
|
||
td_error = td_target - Q[state, action]
|
||
|
||
# Update Q-Wert
|
||
Q[state, action] += alpha * td_error
|
||
|
||
|
||
def train_q_learning(env, n_episodes=2000, alpha=0.1, gamma=0.99,
|
||
epsilon_start=1.0, epsilon_min=0.01, epsilon_decay=0.999):
|
||
"""
|
||
Trainiere Q-Learning Agent.
|
||
|
||
Args:
|
||
env: Gymnasium Environment
|
||
n_episodes: Anzahl Trainings-Episoden
|
||
alpha: Lernrate
|
||
gamma: Diskontierungsfaktor
|
||
epsilon_start: Initiale Explorations-Rate
|
||
epsilon_min: Minimale Explorations-Rate
|
||
epsilon_decay: Explorations-Abnahme-Rate
|
||
|
||
Returns:
|
||
Q: Trainierte Q-Tabelle
|
||
rewards: Liste der Episoden-Belohnungen
|
||
"""
|
||
# Initialisierung
|
||
n_states = env.observation_space.n
|
||
n_actions = env.action_space.n
|
||
Q = initialize_q_table(n_states, n_actions)
|
||
|
||
epsilon = epsilon_start
|
||
rewards = []
|
||
|
||
# Trainingsschleife
|
||
for episode in range(n_episodes):
|
||
state, _ = env.reset()
|
||
done = False
|
||
episode_reward = 0
|
||
|
||
while not done:
|
||
# Aktion wählen (ε-greedy)
|
||
action = select_action(state, Q, epsilon, n_actions)
|
||
|
||
# Aktion ausführen
|
||
next_state, reward, terminated, truncated, _ = env.step(action)
|
||
done = terminated or truncated
|
||
episode_reward += reward
|
||
|
||
# Q-Learning Update
|
||
q_learning_update(Q, state, action, reward, next_state, alpha, gamma, done)
|
||
|
||
# Zustand updaten
|
||
state = next_state
|
||
|
||
# Epsilon Decay
|
||
epsilon = max(epsilon_min, epsilon * epsilon_decay)
|
||
rewards.append(episode_reward)
|
||
|
||
# Fortschritt ausgeben
|
||
if (episode + 1) % 100 == 0:
|
||
avg_reward = np.mean(rewards[-100:])
|
||
print(f"Episode {episode+1}/{n_episodes}, Avg Belohnung (letzte 100): {avg_reward:.2f}, ε: {epsilon:.3f}")
|
||
|
||
return Q, rewards
|
||
|
||
|
||
def evaluate_policy(env, Q, n_episodes=100):
|
||
"""
|
||
Evaluiere gelernte Policy ohne Exploration.
|
||
|
||
Args:
|
||
env: Gymnasium Environment
|
||
Q: Trainierte Q-Tabelle
|
||
n_episodes: Anzahl Evaluations-Episoden
|
||
|
||
Returns:
|
||
success_rate: Erfolgsrate in Prozent
|
||
avg_reward: Durchschnittliche Belohnung pro Episode
|
||
"""
|
||
successes = 0
|
||
total_reward = 0
|
||
|
||
for episode in range(n_episodes):
|
||
state, _ = env.reset()
|
||
done = False
|
||
episode_reward = 0
|
||
|
||
while not done:
|
||
# Greedy Aktion (keine Exploration)
|
||
action = np.argmax(Q[state, :])
|
||
state, reward, terminated, truncated, _ = env.step(action)
|
||
done = terminated or truncated
|
||
episode_reward += reward
|
||
|
||
total_reward += episode_reward
|
||
if episode_reward > 0:
|
||
successes += 1
|
||
|
||
success_rate = (successes / n_episodes) * 100
|
||
avg_reward = total_reward / n_episodes
|
||
|
||
return success_rate, avg_reward
|
||
|
||
|
||
def plot_learning_curve(rewards, window=100):
|
||
"""
|
||
Plotte Lernkurve mit gleitendem Durchschnitt.
|
||
|
||
Args:
|
||
rewards: Liste der Episoden-Belohnungen
|
||
window: Fenstergrösse für gleitenden Durchschnitt
|
||
"""
|
||
plt.figure(figsize=(10, 6))
|
||
|
||
plt.plot(rewards, alpha=0.3, label='Episoden-Belohnung')
|
||
|
||
# Gleitender Durchschnitt
|
||
if len(rewards) >= window:
|
||
moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid')
|
||
plt.plot(range(window-1, len(rewards)), moving_avg,
|
||
label=f'Gleitender Durchschnitt ({window} Episoden)', linewidth=2)
|
||
|
||
plt.xlabel('Episode')
|
||
plt.ylabel('Belohnung')
|
||
plt.title('Lernkurve')
|
||
plt.legend(loc='lower right')
|
||
plt.grid(True, alpha=0.3)
|
||
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
|
||
def visualize_q_table(Q):
|
||
"""
|
||
Visualisiere Q-Tabelle als Heatmap.
|
||
|
||
Args:
|
||
Q: Q-Tabelle
|
||
"""
|
||
plt.figure(figsize=(10, 8))
|
||
sns.heatmap(Q, annot=True, fmt='.2f', cmap='YlOrRd',
|
||
xticklabels=['←', '↓', '→', '↑'],
|
||
yticklabels=[str(i) for i in range(Q.shape[0])])
|
||
plt.title('Q-Tabelle nach Training')
|
||
plt.xlabel('Aktion')
|
||
plt.ylabel('Zustand')
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
|
||
def visualize_policy(Q):
|
||
"""
|
||
Visualisiere gelernte Policy auf dem Grid.
|
||
|
||
Args:
|
||
Q: Q-Tabelle
|
||
"""
|
||
policy = np.argmax(Q, axis=1)
|
||
action_arrows = {0: '←', 1: '↓', 2: '→', 3: '↑'}
|
||
|
||
print("\n" + "="*40)
|
||
print("GELERNTE POLICY")
|
||
print("="*40)
|
||
print("\nFrozenLake Grid:")
|
||
print("SFFF")
|
||
print("FHFH")
|
||
print("FFFH")
|
||
print("HFFG")
|
||
print("\nOptimale Aktionen:")
|
||
|
||
policy_grid = policy.reshape(4, 4)
|
||
for i, row in enumerate(policy_grid):
|
||
row_str = ' '.join([action_arrows[a] for a in row])
|
||
print(f"Reihe {i}: {row_str}")
|
||
|
||
print("\nZustand-für-Zustand Policy:")
|
||
for state in range(16):
|
||
row, col = state // 4, state % 4
|
||
print(f"Zustand {state:2d} (Reihe {row}, Spalte {col}): {action_arrows[policy[state]]}")
|
||
print("="*40 + "\n")
|
||
|
||
|
||
|
||
|
||
|
||
def main():
|
||
"""Hauptfunktion für alle Aufgaben aus Übung 1."""
|
||
|
||
print("="*60)
|
||
print("Q-LEARNING FÜR FROZENLAKE (DETERMINISTISCH)")
|
||
print("Praktische Übung 1 - Lösung")
|
||
print("="*60)
|
||
|
||
# Environment initialisieren
|
||
env = initialize_environment()
|
||
n_states = env.observation_space.n # type: ignore
|
||
n_actions = env.action_space.n # type: ignore
|
||
print(f"\nEnvironment: FrozenLake-v1 (is_slippery=False)")
|
||
print(f"Zustände: {n_states}")
|
||
print(f"Aktionen: {n_actions}")
|
||
|
||
# ========================================
|
||
# HYPERPARAMETER - Optimal getunt für deterministisches FrozenLake
|
||
# ========================================
|
||
n_episodes = 3000
|
||
alpha = 0.4 # Lernrate
|
||
gamma = 0.99 # Diskontierung
|
||
epsilon_start = 1.0
|
||
epsilon_min = 0.01
|
||
epsilon_decay = 0.999 # Optimal für deterministisches Environment
|
||
|
||
# Q-Learning trainieren
|
||
print("\n--- Q-Learning Training ---")
|
||
print(f"Hyperparameter: α={alpha}, γ={gamma}, ε_decay={epsilon_decay}")
|
||
Q, rewards = train_q_learning(
|
||
env,
|
||
n_episodes=n_episodes,
|
||
alpha=alpha,
|
||
gamma=gamma,
|
||
epsilon_start=epsilon_start,
|
||
epsilon_min=epsilon_min,
|
||
epsilon_decay=epsilon_decay
|
||
)
|
||
|
||
# Lernkurve plotten
|
||
print("\n--- Lernkurve ---")
|
||
plot_learning_curve(rewards)
|
||
|
||
# Policy evaluieren
|
||
print("\n--- Policy Evaluation ---")
|
||
success_rate, avg_reward = evaluate_policy(env, Q, n_episodes=100)
|
||
print(f"Erfolgsrate (100 Episoden): {success_rate:.1f}%")
|
||
print(f"Durchschnittliche Belohnung: {avg_reward:.3f}")
|
||
|
||
# Visualisierungen
|
||
print("\n--- Visualisierungen ---")
|
||
visualize_q_table(Q)
|
||
visualize_policy(Q)
|
||
|
||
print("\n" + "="*60)
|
||
print("TRAINING ABGESCHLOSSEN!")
|
||
print("="*60)
|
||
print("\nÄndere die Hyperparameter oben und führe erneut aus!")
|
||
print(" - Versuche α = 0.01 oder α = 0.5")
|
||
print(" - Versuche γ = 0.5 oder γ = 0.99")
|
||
print(" - Versuche epsilon_decay = 1.0 (kein Decay)")
|
||
|
||
env.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|