您现在的位置是:首页 >其他 >pytorch实现ppo算法网站首页其他

pytorch实现ppo算法

倔强zc 2026-03-28 00:01:04
简介pytorch实现ppo算法

1. 导入必要的库

python
import torch 
import torch.nn as nn 
import torch.optim as optim 
import torch.nn.functional as F 
import numpy as np

2. 定义策略网络

python

class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return F.softmax(self.fc3(x), dim=-1)

3. 定义值函数网络

python

class ValueNetwork(nn.Module):
    def __init__(self, state_dim, hidden_dim=64):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

4. 定义PPO算法

python

class PPO:
    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, epsilon=0.2, epochs=10):
        self.policy_net = PolicyNetwork(state_dim, action_dim)
        self.value_net = ValueNetwork(state_dim)
        self.optimizer = optim.Adam(list(self.policy_net.parameters()) + list(self.value_net.parameters()), lr=lr)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epochs = epochs
        
    def update(self, states, actions, rewards, next_states, dones):
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)
        
        # 计算优势函数
        values = self.value_net(states)
        next_values = self.value_net(next_states)
        returns = rewards + self.gamma * next_values * (1 - dones)
        advantages = returns - values
        
        # 更新策略网络和值函数网络
        for _ in range(self.epochs):
            # 计算新的动作概率
            new_probs = self.policy_net(states).gather(1, actions.unsqueeze(1))
            old_probs = new_probs.detach()
            
            # 计算比率和裁剪的损失
            ratio = new_probs / old_probs
            clipped_ratio = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon)
            policy_loss = -torch.min(ratio * advantages, clipped_ratio * advantages).mean()
            
            # 计算值函数损失
            value_loss = F.mse_loss(values, returns)
            
            # 总损失
            loss = policy_loss + value_loss
            
            # 反向传播和优化
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

5. 使用PPO进行训练

python

# 假设你已经有了环境
env = ...  # 你的环境

# 初始化PPO
ppo = PPO(state_dim=env.observation_space.shape[0], action_dim=env.action_space.n)

# 训练循环
for episode in range(1000):
    state = env.reset()
    done = False
    states, actions, rewards, next_states, dones = [], [], [], [], []
    
    while not done:
        action_probs = ppo.policy_net(torch.FloatTensor(state))
        action = torch.multinomial(action_probs, 1).item()
        
        next_state, reward, done, _ = env.step(action)
        
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        next_states.append(next_state)
        dones.append(done)
        
        state = next_state
    
    # 更新PPO
    ppo.update(states, actions, rewards, next_states, dones)

6. 总结

  • 策略网络:用于生成动作的概率分布。
  • 值函数网络:用于估计状态值函数,帮助计算优势函数。
  • PPO更新:通过裁剪策略更新,确保策略更新的稳定性。

附:环境env有哪些

1.许多强化学习任务可以直接使用现有的环境库,例如:

OpenAI Gym
  • 简介:OpenAI Gym 是最常用的强化学习环境库,提供了多种预定义的环境,如经典控制、Atari 游戏等。
  • 安装
    pip install gym
  • 使用示例
    import gym
    
    # 创建环境
    env = gym.make('CartPole-v1')
    
    # 重置环境
    state = env.reset()
    
    # 与环境交互
    done = False
    while not done:
        action = env.action_space.sample()  # 随机选择一个动作
        next_state, reward, done, info = env.step(action)
        print(f"State: {next_state}, Reward: {reward}, Done: {done}")
    
    # 关闭环境
    env.close()
PyBullet
  • 简介:PyBullet 是一个物理引擎,支持更复杂的 3D 环境。
  • 安装
    pip install pybullet
  • 使用示例
    import pybullet_envs
    
    env = gym.make('AntBulletEnv-v0')
Unity ML-Agents
  • 简介:Unity ML-Agents 提供了基于 Unity 引擎的强化学习环境,适合更复杂的 3D 场景。
  • 安装:需要安装 Unity 和 ML-Agents 插件

2. 自定义环境

如果你需要解决特定的问题,可以自定义环境。以下是自定义环境的基本步骤:

步骤 1:定义环境类

继承 gym.Env 并实现必要的方法:

  • __init__:初始化环境。
  • reset:重置环境到初始状态。
  • step:执行动作并返回下一个状态、奖励、是否结束等信息。
  • render(可选):可视化环境。
步骤 2:示例代码
import gym
from gym import spaces
import numpy as np

class CustomEnv(gym.Env):
    def __init__(self):
        super(CustomEnv, self).__init__()
        # 定义动作空间和状态空间
        self.action_space = spaces.Discrete(2)  # 2个离散动作
        self.observation_space = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)  # 状态空间

    def reset(self):
        # 重置环境到初始状态
        self.state = np.array([0.5])
        return self.state

    def step(self, action):
        # 执行动作
        if action == 0:
            self.state -= 0.1
        else:
            self.state += 0.1

        # 计算奖励和是否结束
        reward = 1 if self.state > 0.9 else 0
        done = self.state >= 1.0 or self.state <= 0.0

        return self.state, reward, done, {}

    def render(self, mode='human'):
        # 可视化环境(可选)
        print(f"State: {self.state}")

# 使用自定义环境
env = CustomEnv()
state = env.reset()
done = False
while not done:
    action = env.action_space.sample()
    next_state, reward, done, info = env.step(action)
    print(f"State: {next_state}, Reward: {reward}, Done: {done}")

3. 结合你的代码

在你的代码中,env 可能是你自定义的环境,或者是通过 gym 创建的环境。例如:

# 使用 OpenAI Gym 创建环境
env = gym.make('CartPole-v1')

# 在训练循环中使用环境
state = env.reset()
done = False
while not done:
    action = env.action_space.sample()  # 随机选择动作
    next_state, reward, done, info = env.step(action)
    print(f"State: {next_state}, Reward: {reward}, Done: {done}")

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。