pytorch-21强化学习(dqn,deepqlearning)教程(代码片段)

ScorpioDoctor ScorpioDoctor     2022-12-05     634

关键词:

要查看图文并茂的教程,请移步: http://studyai.com/pytorch-1.4/intermediate/reinforcement_q_learning.html

本教程演示如何使用PyTorch在 OpenAI Gym 的手推车连杆(CartPole-v0)任务 上训练深度Q-学习的智能体(Deep Q Learning(DQN)agent)。 任务(Task)

智能体(agent)必须在两个动作(action)之间做出决定——向左或向右移动手推车(cart)——这样连在手推车上的杆子(pole)就可以保持直立。 你可以在 Gym 网站 上找到一个包含各种算法和可视化的官方排行榜。 cartpole

上图显示了手推车连杆的运行画面(cartpole)

当智能体观察环境的当前状态(state)并选择一个动作时,环境将迁移(transitions)到一个新状态, 并返回一个表明该动作所造成的结果的奖励(reward)。在这项任务中,每增加一个时间步,奖励为+1, 如果杆子掉得太远或推车偏离中心超过2.4个单位距离,则环境终止。 这意味着表现更好的情景将持续更长的时间,积累更大的回报。

手推车连杆(CartPole)任务的设计使得对智能体的输入是4个表示环境状态(位置、速度等)的实数值(real values)。 然而,神经网络完全可以通过观察场景(looking at the scene)来解决任务,因此我们将使用从屏幕上扣下来的 以购物车为中心的图像块(image patch)作为输入。正因为如此,我们的结果无法直接与官方排行榜的结果相比 ——我们的任务要困难得多。不幸的是,这会减慢训练速度,因为我们必须渲染(render)所有帧。

严格地说,我们将把状态(state)表示为当前屏幕上扣取的图像块和上一个屏幕上扣取的图像块之间的差分(difference)。 这将允许智能体从一个图像中把连杆的速度也考虑进去。

依赖包(Packages)

首先,我们导入依赖包. 第一个依赖包是 gym ,用于产生手推车连杆环境(environment), 安装方式为(pip install gym)。其他的依赖包来自于PyTorch:

神经网络 (torch.nn)
优化 (torch.optim)
自动微分 (torch.autograd)
视觉任务工具集 (torchvision - a separate package).
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T


env = gym.make('CartPole-v0').unwrapped

# 设置 matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# 查看 GPU 是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

回放记忆/内存(Replay Memory)

我们将使用经验回放记忆(experience replay memory)来训练我们的DQN。 它存储智能体观察到的状态迁移(transitions),允许我们稍后重用这些数据。 通过对过往经验随机抽样,可以构造一个不相关的训练批次。结果表明,该方法大大稳定和改进了DQN训练过程。

为了实现上述功能, 我们需要定义两个类:

Transition - 一个命名元组(named tuple)用于表示环境中的单次状态迁移(single transition)。
    该类的作用本质上是将状态-动作对[(state, action) pairs]映射到他们的下一个结果,即[(next_state, action) pairs], 其中的 状态(state)是指从屏幕上获得的差分图像块(screen diifference image)。

ReplayMemory - 一个大小有限的循环缓冲区,用于保存最近观察到的迁移(transition)。 该类还实现了一个采样方法 .sample() 用来在训练过程中随机的选择一个迁移批次(batch of transitions)。
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

现在,让我们定义我们的模型。但首先,让我们快速回顾一下 DQN 是什么。

DQN 算法

我们的环境是确定性的,所以为了简单起见,这里给出的所有方程也都是确定性的。 在强化学习文献中,它们还包含对环境中随机迁移(stochastic transitions)的期望。

我们的目标是训练一个策略,该策略能使打折后的累计奖励最大化。 Rt0=∑∞t=t0γt−t0rt , 其中 Rt0 也被称之为 回报(return). 折扣因子, γ, 应该是一个 0 到 1

之间的常量,以保证累计求和是可收敛的。 对我们的智能体来说,折扣因子使得来自遥远未来的奖励(far future rewards)不如即将到来的奖励(near future rewards)重要。 因为遥远未来的奖励的不确定性要大于即将到来的奖励的不确定性。

Q-learning 背后的思想是: 如果我们有一个函数 Q∗:State×Action→R

能够 告诉我们可以获得的回报是多少, 那么如果要在某个给定的状态上采取一个最优动作,只需要简单的构建一个能够使可获得的回报最大化的策略即可: π∗(s)=argmaxa Q∗(s,a)

然而, 我们并不知道外部世界环境的所有完整信息,所以我们没有机会得到 Q∗ 。 但是,由于神经网络是通用函数逼近器,我们可以简单地创建一个神经网络并训练它,使它与 Q∗

趋同。

对于我们的训练更新规则,我们将使用一个事实,即某个策略的每一个 Q

函数都遵循 贝尔曼方程: Qπ(s,a)=r+γQπ(s′,π(s′))

等式两边的差称为时间差误差(temporal difference error),δ

: δ=Q(s,a)−(r+γmaxaQ(s′,a))

为了最小化这个误差, 我们将使用的损失函数为: Huber loss. 当误差很小时,Huber损失的作用类似于均方误差;但当误差较大时,它的作用类似于平均绝对误差—— 这使得当 Q 的估计值带有非常大的噪声时,损失对异常值更加稳健鲁棒。 我们通过从回放记忆/缓存(replay memory)中采样的一个批次的迁移样本(transition samples) B

, 来计算Huber损失: L=1|B|∑(s,a,s′,r) ∈ BL(δ) whereL(δ)=12δ2|δ|−12for |δ|≤1,otherwise. Q-网络(Q-network)

我们的模型将是一个卷积神经网络,它以当前屏幕图像块和以前屏幕图像块之间的差分作为输入。 它有两个输出,表示 Q(s,left) 和 Q(s,right) (其中 s

是网络的输入)。 实际上,该网络正试图预测在给定当前输入的情况下,采取每项行动的预期回报(expected return)。

class DQN(nn.Module):

    def __init__(self, h, w, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)

        # 线性层的输入连接数取决于conv2d层的输出以及输入图像的尺寸,
        # 因此需要计算出来:linear_input_size
        def conv2d_size_out(size, kernel_size = 5, stride = 2):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
        linear_input_size = convw * convh * 32
        self.head = nn.Linear(linear_input_size, outputs)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))

输入抽取(Input extraction)

下面的代码是从环境中提取和处理渲染图像的. 它使用了 torchvision 包, 该包的使用使得 组合不同的图像变换变得很容易。 运行该cell后,它将显示提取的图像块(patch)。

resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=Image.CUBIC),
                    T.ToTensor()])


def get_cart_location(screen_width):
    world_width = env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART

def get_screen():
    # Returned screen requested by gym is 400x600x3, but is sometimes larger
    # such as 800x1200x3. Transpose it into torch order (CHW).
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))
    # Cart is in the lower half, so strip off the top and bottom of the screen
    _, screen_height, screen_width = screen.shape
    screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
    view_width = int(screen_width * 0.6)
    cart_location = get_cart_location(screen_width)
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    # Strip off the edges, so that we have a square image centered on a cart
    screen = screen[:, :, slice_range]
    # Convert to float, rescale, convert to torch tensor
    # (this doesn't require a copy)
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW)
    return resize(screen).unsqueeze(0).to(device)


env.reset()
plt.figure()
plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(),
           interpolation='none')
plt.title('Example extracted screen')
plt.show()

训练(Training)

超参数与辅助函数

下面的代码实现了我们的模型及其优化器,并且定义了一些实用工具函数:

select_action - 将根据epsilon贪婪策略选择动作。简单地说,我们有时会使用我们的模型来选择动作, 有时我们只是在所有可能的动作集合中均匀采样一个。 通过均匀采样随机选择一个动作的概率将从 EPS_START 开始,并呈指数衰减,朝 EPS_END 结束。 EPS_DECAY 控制着衰减速率。
plot_durations - 该辅助函数用来绘制每集剧情的持续时间,以及过去的最近100集(last 100 episodes)的平均持续时间(官方评估中使用的度量标准)。 绘图将在包含主训练循环的单元下面,并且将在每一集之后更新(update after every episode)。
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10

# 获取屏幕大小,以便我们可以根据Gym返回的形状(shape)正确初始化网络层。
# 此时的典型尺寸接近 3x40x90,这是 get_screen() 中压缩和缩小渲染缓冲区的结果
init_screen = get_screen()
_, _, screen_height, screen_width = init_screen.shape

# 从 Gym 的动作空间获得动作的数量
n_actions = env.action_space.n

policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \\
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        display.clear_output(wait=True)
        display.display(plt.gcf())

训练循环(Training loop)

最后,给出训练模型的代码.

下面, 函数 optimize_model 将执行一个单步优化。 它首先采样一个批次的样本(batch), 将所有张量连接成一个张量 并计算 Q(st,at) 和 V(st+1)=maxaQ(st+1,a), 然后将它们组合进我们的损失(loss)中. 根据定义,如果 s 是一个终止状态,则设定 V(s)=0 。 为了给算法增加稳定性,我们还使用一个目标网络(target network)来计算 V(st+1)

。 目标网络的权重大部分时间保持冻结状态,但每隔一段时间就会用策略网络(policy network)的权重更新一次。 更新间隔通常是若干优化步(optimizition steps),但为了简单起见,我们将使用剧集(episodes)为更新间隔单位。

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (看 https://stackoverflow.com/a/19343/3343043 详细解释
    # ). 把 Transitions 的 batch-array 转换为 batch-arrays 的 Transition 。
    batch = Transition(*zip(*transitions))

    # 计算非最终状态的mask 并把批次样本串接(concantecate)起来
    # (一个最终状态(final state)是指在该状态上(仿真)游戏就结束了)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # 计算 Q(s_t, a) - 模型计算 Q(s_t), 然后我们在动作列中选择动作
    # 这些是根据策略网络(policy_net)对batch中每个状态所采取的操作
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # 计算所有下一个状态的 V(s_t+1)
    # 对非最终下一个状态的动作的期望值是基于“旧的”target_net进行计算的;
    # selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    # 计算期望 Q 值
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # 计算 Huber loss
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    # 优化模型
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

下面,你可以找到主要的训练循环。在开始时,我们重置环境并初始化状态张量。 然后,我们采样一个动作,执行它,观察下一个屏幕和奖励(总是1),并优化我们的模型一次。 当一次episode结束时(我们的模型失败,game over),我们重新启动循环。

下面的代码中, num_episodes 设置的较小. 你应该下载notebook并运行更多的epsiodes, 比如300+以获得有意义的持续时间改进。

num_episodes = 50
for i_episode in range(num_episodes):
    # 初始化环境与状态
    env.reset()
    last_screen = get_screen()
    current_screen = get_screen()
    state = current_screen - last_screen
    for t in count():
        # 选择并执行一个动作
        action = select_action(state)
        _, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        # 观察一个新的状态
        last_screen = current_screen
        current_screen = get_screen()
        if not done:
            next_state = current_screen - last_screen
        else:
            next_state = None

        # 将状态转移保存到记忆内存(memory)中
        memory.push(state, action, next_state, reward)

        # 移动到下一个状态
        state = next_state

        # 执行一步优化过程(on the target network)
        optimize_model()
        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break
    # 更新目标网络, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()

这是一个图表,说明了整个数据流是如何产生的。 ../_images/reinforcement_learning_diagram.jpg

动作(Actions)可以随机选择,也可以基于策略选择, u接着从gym环境中获得下一步到达的状态. 我们将结果记录在回放记忆/内存(replay memory)中,并在每次迭代中运行优化步骤。 优化器从replay memory中随机选取一个批次的样本来进行新策略的训练。 “旧的” target_net 也会被用于优化中以计算期望的Q值;它被偶尔更新以保持其最新。

dqn强化学习

pytorch比tenserflow简单。所以我们模仿用tensorflow写的强化学习。学习资料:本节的全部代码Tensorflow的100行DQN代码我制作的DQN动画简介我的DQNTensorflow教程我的强化学习教程PyTorch官网论文 PlayingAtariwithDeepReinforcementLearning要点Torch是... 查看详情

强化学习学习资料汇总强化学习:q-learning与dqn(deepqnetwork)

python机器学习四(强化学习)DQN算法流程​​https://www.jianshu.com/p/42507aa63b05/https://www.jianshu.com/p/42507aa63b05/​​基于深度强化学习的智能体系结构参数调优​​基于深度强化学习的智能体系结构参数调优-知乎CAPES(ComputerAutomatedPerformanc... 查看详情

004-dqn

什么是DQN今天我们会来说说强化学习中的一种强大武器,DeepQNetwork简称为DQN.GoogleDeepmind团队就是靠着这DQN使计算机玩电动玩得比我们还厉害.  强化学习与神经网络 之前我们所谈论到的强化学习方法都是比较传统的方式,... 查看详情

chatgpt使用拓展资料:强化学习dqn单模型双模型doubledqnduelingdqn

ChatGPT使用拓展资料:强化学习DQN单模型、双模型目录强化学习基础Q-LearningDQN单模型DQN双模型DoubleDQNDuelingDQNTransformers拓展资料强化学习基础在开始学习DQN之前,我们需要了解一些强化学习的基础知识。强化学习是一种机器学习方... 查看详情

强化学习库tianshou——dqn使用(代码片段)

强化学习库tianshou——DQN使用tianshou是清华大学学生开源编写的强化学习库。本人因为一些比赛的原因,有使用到强化学习,但是因为过于紧张与没有尝试快速复现强化学习的代码,并没有获得很好的成绩,故尝试... 查看详情

强化学习库tianshou——dqn使用(代码片段)

强化学习库tianshou——DQN使用tianshou是清华大学学生开源编写的强化学习库。本人因为一些比赛的原因,有使用到强化学习,但是因为过于紧张与没有尝试快速复现强化学习的代码,并没有获得很好的成绩,故尝试... 查看详情

dqn(deepq-learning)入门教程(零)之教程介绍

...,可以一试,然后发现,要用到DQN,再然后,又发现其是强化学习的知识,然后,就入坑了。别人的学习流程是强化学习——>Q-learning——>DQN——>??Fly,我学习流程TM是??Fly——>CNN——>DQN——>Q-learning——>强化学... 查看详情

强化学习dqn经验回放是什么

经验重放:强化学习由于state之间的相关性存在稳定性的问题,因为智能体去探索环境时采集到的样本是一个时间序列,样本之间具有连续性,所以需要打破时间相关性,解决的办法是在训练的时候存储当前训... 查看详情

强化学习dqn经验回放是什么

经验重放:强化学习由于state之间的相关性存在稳定性的问题,因为智能体去探索环境时采集到的样本是一个时间序列,样本之间具有连续性,所以需要打破时间相关性,解决的办法是在训练的时候存储当前训... 查看详情

强化学习-q-learningsarsa和dqn的理解

本文用于基本入门理解。 强化学习的基本理论:R,S,A这些就不说了。 先设想两个场景: 一。1个5x5的格子图,里面有一个目标点, 2个死亡点二。一个迷宫,  一个出发点, 3处分叉点,5个死角,1条活... 查看详情

dqn处理cartpole问题——使用强化学习,本质上是训练mlp,预测每一个动作的得分

代码:#-*-coding:utf-8-*-importrandomimportgymimportnumpyasnpfromcollectionsimportdequefromkeras.modelsimportSequentialfromkeras.layersimportDensefromkeras.optimizersimportAdamfromkeras.utils.vis_utilsim 查看详情

基于pytorch的强化学习(dqn)之baseline基本概念

目录1.引言2.数学推导2.1引理2.2改进的策略梯度2.3蒙特卡罗模拟3.baseline的选择1.引言我们前面讲过策略梯度下降算法 ,现在来介绍一种加快收敛速度的方法:设置Baseline。2.数学推导我们之前推导过状态价值函数梯度的公... 查看详情

强化学习(十三)策略梯度(policygradient)(代码片段)

    在前面讲到的DQN系列强化学习算法中,我们主要对价值函数进行了近似表示,基于价值来学习。这种ValueBased强化学习方法在很多领域都得到比较好的应用,但是ValueBased强化学习方法也有很多局限性,因此在另一些场景... 查看详情

深度强化学习(deepreinforcementlearning)入门:rlbase&dqn-ddpg-a3cintroduction

转自https://zhuanlan.zhihu.com/p/25239682过去的一段时间在深度强化学习领域投入了不少精力,工作中也在应用DRL解决业务问题。子曰:温故而知新,在进一步深入研究和应用DRL前,阶段性的整理下相关知识点。本文集中在DRL的model-free... 查看详情

强化学习笔记experiencereplay经验回放

 1回顾:DQNDQN笔记State-actionValueFunction(Q-function)_UQI-LIUWJ的博客-CSDN博客DQN是希望通过神经网络来学习Q(s,a)的结果,我们输入一个人状态s,通过DQN可以得到各个action对应的Q(s,a)通常用TD来求解DQN其中rt是实际进行交互得到... 查看详情

深度强化学习-dqn算法原理与代码(代码片段)

DQN算法是DeepMind团队提出的一种深度强化学习算法,在许多电动游戏中达到人类玩家甚至超越人类玩家的水准,本文就带领大家了解一下这个算法,论文的链接见下方。论文:Human-levelcontrolthroughdeepreinforcementlearning... 查看详情

tensorflow2.0|基于深度强化学习(dqn)实现动态柔性作业车间调度问题(dfjsp)(代码片段)

...bydeepreinforcementlearning》论文详情可见:论文阅读|《用强化学习求解带插单的动态FJSP》其他相关阅读可见个人CSDN专栏之《论文阅读与实现》,如:论文阅读|《基于加权Q 查看详情

(pytorch复现)基于深度强化学习(cnn+duelingnetwork/dqn/ddqn/d3qn/per)的自适应车间调度(jsp)

为了深入学习各种深度学习网络和强化学习的结合,实现了一下下列文章:ResearchonAdaptiveJobShopSchedulingProblemsBasedonDuelingDoubleDQN|IEEEJournals&Magazine|IEEEXplore状态、动作、奖励函数及实验的简单介绍可参考: 查看详情