理解 huggingface transformers 中 GPT2 模型

2023-08-06 日 17:20 2023-12-16 六 20:44

修改历史

  • [2023-11-06 一 23:56] 丰富 beam search 部分,加入具体例子、BeamSearchScorer.process 和 finalize 等细节
  • [2023-09-26 二 12:53] 重写 past_key_values 部分,按照调用顺序梳理

本文是对源代码的注解,文中提到的源码基于 transformers 4.30.2 版本(不同版本之间的某些辅助接口会有所差异,比如 4.32 版本里 EncoderDecoderModel 就没有 adjust_logits_during_generation 函数了,但主体部分不会变)。 阅读本文时,可以考虑先用 IDE (如 vscode) 打开一个 python 文件,在其中临时添加以下(部分)代码,然后借助 IDE 的跳转功能查看对应的函数源代码,以下也是本文所覆盖的各函数和类的概览。

from transformers import (
    EncoderDecoderModel,
    GPT2LMHeadModel,
    GPT2Model,
)
GPT2Model.forward
GPT2LMHeadModel.forward
EncoderDecoderModel.forward
EncoderDecoderModel.generate
GPT2LMHeadModel.prepare_inputs_for_generation

from transformers.models.gpt2.modeling_gpt2 import (
    GPT2Block, GPT2Attention,
)
GPT2Block.forward
GPT2Attention.forward
GPT2Attention._attn

from transformers.generation import GenerationMixin
GenerationMixin.beam_search
GenerationMixin.greedy_search
GenerationMixin._prepare_model_inputs
GenerationMixin._prepare_attention_mask_for_generation
GenerationMixin._prepare_encoder_decoder_kwargs_for_generation
GenerationMixin._prepare_decoder_input_ids_for_generation
GenerationMixin._extract_past_from_model_output

GPT2LMHedModel 类解析

总体架构

llm_transformer_decoder.svg

transformer 原始论文 [1706.03762] Attention Is All You Need 中的 decoder 结构如上图 a) 所示,其中每个 transformer block 的计算可以用以下伪代码表示:

def block(x: pos_word_embedding):
    x = layer_norm(x + self_attention(x))  # Multi-head att
    x = layer_norm(x + cross_attention(x)) 
    x = layer_norm(x + feedfoward(x)) 

block 里各模块的顺序是:

transformer_block = ["SelfAttention", "Add", "LayerNorm", "CrossAttention", "Add", "LayerNorm", "FeedForward", "Add", "LayerNorm"]

block 叠加 N 次(比如 12)后最终输入到 Linear 和 Softmax 层。

huggingface 的 GPT2LMHeadModel 实现如图 b) 所示,该类对 GPT2Model 类和线性层(词预测头)进行了封装, GPT2Model 里有多个 GPT2Block, 它与原始的 transformer block 的区别是:

  • 把每一次 attention 后的 LayerNorm 都移动到了 attention 层之前,伪代码如下:

    def gpt_block(x: pos_word_embedding):
        x = x + self_attention(layer_norm(x)) 
        x = x + cross_attention(layer_norm(x)) 
        x = x + feedfoward(layer_norm(x)) 
    
  • 或者换个角度,把 block 内最后一个 LayerNorm 层移动到了 block 的第一层,因此其模块的顺序是:

    gpt_block = [transformer_block[-1]] + transformer_block[:-1]
    print(gpt_block)
    
    ['LayerNorm', 'SelfAttention', 'Add', 'LayerNorm', 'CrossAttention', 'Add', 'LayerNorm', 'FeedForward', 'Add']
    

由于 GPT2Model 在多个 GPT2Block 堆叠后又加了 LayerNorm 层, 因此如果考虑整个 decoder 结构的话, 二者的差别仅仅是 GPT2Model 在第一层 attention layer 前新增了一层 LayerNorm . 这些差别可能不会对模型最终训练结果有多少影响,但在编程实践中还是需要理清楚。

此外,由于 GPT2 没有 encoder, 因此其中 cross_attention 层默认是不计算的。

FeedForward 层: GPT2MLP 类

GPT2Model 里的 FeedForward 层是用 GPT2MLP 类实现的,它的 forward 函数内容如下:

hidden_states = self.c_fc(hidden_states) # e.g [32, 128, 768] --> [32, 128, 2048] 
hidden_states = self.act(hidden_states) # GeLU or ReLU
hidden_states = self.c_proj(hidden_states) # e.g [32, 128, 2048]  --> [32, 128, 768]
hidden_states = self.dropout(hidden_states)

其中 c_fc 和 c_proj 都是 Conv1D 实现, 和全连接层功能几乎一样,细节上差别在于:

  • Conv1D(output_dim, input_dim) 是带 bias 的,无法通过参数控制;对参数矩阵用 nn.init.normal_(self.weight, std=0.02) 初始化, bias 初始为 0; 类的 init 参数是先 output 维度后 input 维度。
  • Linear(input_dim, output_dim, bias=False) 的 bias 是可选的,比如 GPT2LMHeadModel 模型的 lm_head 就没有使用偏置;用 kaiming_uniform_ 方法初始化参数;类的 init 参数是先 input 维度后 output 维度;

全连接层没有太特别的意义,基本只能解释成在做非线性变换或者增加模型参数以"扩容", 用两层的原因是第二层需要把维度处理成和输入一样,便于构建残差层。另外从计算效率上看,本层涉及大矩阵乘法,计算量较大, Lora 算法就针对大矩阵运算的效率问题在微调过程进行了优化。

对于激活层则有一些选择的权衡和理论解释性(故事性),GPT2 使用的是 GELU, 它与 RELU 类似,但从计算上来看,它在 0 点左侧的梯度不为 0 且更加平滑,这可以缓解 dead neuron 的问题,即在初始化或者某次大梯度更新后导致部分 bias 权重过小,导致任何输入都无法再激活这些权重所作用的神经元的问题。

不过 GLUE 全名是 Gaussian Error Linear Unit, 其设计灵感来自按照高斯分布进行自适应 droupout 的启发(考虑到各层神经元一般都会做 normalization),它的理论公式是以标准正太分布为先验概率的伯努利随机变量的期望: \(\text{GELU}(x) = x \Phi (x) \), \( \Phi \) 是高斯分布的累积概率函数,一般会用近似公式来实现,比如:

\[ \text{GELU}(x) = 0.5x \left(1 + \tanh\left(\sqrt{\frac{2}{\pi}} \left(x + 0.044715x^3\right)\right)\right) \]

LayerNorm 类

实现公式如下:

\[ y = \frac{x - \mathrm{E}[x]}{ \sqrt{\mathrm{Var}[x] + \epsilon}} * \gamma + \beta \]

它的 forward 函数中直接调用的是 F.layer_norm, 而该函数调用应该是 C++ 或 cuda 实现,跳转不到最终的 python 代码,但在 nn.LayerNorm 的 init 函数里有以下代码:

if self.elementwise_affine: # default True
    self.weight = Parameter(torch.empty(self.normalized_shape, **factory_kwargs))
    self.bias = Parameter(torch.empty(self.normalized_shape, **factory_kwargs))

形参 normalized_shape 对应的实参是 hidden_size , GPT2 small 默认为 768:

self.ln_1 = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)

这说明在序列生成中, LayerNorm 它是对所有词向量维进行归一化,每层的参数个数为 2x768.

统计角度看,该层的 \( \gamma, \beta \) 代表了所有训练样本的 token 在不同特征层的均值和标准差(分布性质)。LayerNorm 是把 batch 里的所有 token 看作同一个随机变量的采样值, 而 BatchNorm 则把 batch 里每个样本看作同一个随机变量的采样值。

从计算角度看,归一化可能有利于控制梯度的大小,增加训练稳定性。

GPT2Attention 类

GPT2Attention 类的 init 函数中初始化了几个核心的类属性:

self.register_buffer(
    "bias",
    torch.tril(torch.ones((max_positions, max_positions), dtype=torch.bool)).view(
        1, 1, max_positions, max_positions
    ),
    persistent=False,
)
self.register_buffer("masked_bias", torch.tensor(-1e4), persistent=False)
self.c_proj = Conv1D(self.embed_dim, self.embed_dim)
self.attn_dropout = nn.Dropout(config.attn_pdrop)
self.resid_dropout = nn.Dropout(config.resid_pdrop)
  • self.biasself.masked_bias 都是 register_buffer,它们是不带梯度的模型参数, self.masked_bias 没有被使用; 但 self.bias 很重要,它是一个下三角为 True 其余为 False 的矩阵,之后用做自注意力的 attention mask, 一个简单案例展示如下:

    import torch
    max_positions = 3
    bias = torch.tril(torch.ones((max_positions, max_positions), dtype=torch.bool)).view(
        1, 1, max_positions, max_positions
    )
    print(bias)
    
    tensor([[[[ True, False, False],
              [ True,  True, False],
              [ True,  True,  True]]]])
    
  • c_proj 是计算完 attention 加权后的线性层

如果不需要 crossAttention, 那么所有的 q,k,v 都来自 decoder 的隐藏层,以下是自注意力的 attention 头,一个输入分出 3 部分。

self.c_attn = Conv1D(3 * self.embed_dim, self.embed_dim)

如果要做交叉注意力,那么 q 和 k,v 需要分离,因为 k,v 是来自 encoder 的隐藏层:

self.c_attn = Conv1D(2 * self.embed_dim, self.embed_dim)
self.q_attn = Conv1D(self.embed_dim, self.embed_dim)

自注意力的计算过程如下:

  • 切分出多个注意力头:
query, key, value = self.c_attn(hidden_states).split(self.split_size, dim=2)
query = self._split_heads(query, self.num_heads, self.head_dim)
key = self._split_heads(key, self.num_heads, self.head_dim)
value = self._split_heads(value, self.num_heads, self.head_dim)

用一个具体的例子来说明, q,k,v 在 split 后形状都是 torch.Size([10, 12, 6, 64]), 表示 batch 为 10, 每个 token 对应 12 个 heads, 序列长度为 6, 每个 head 是 64 维度

  • 进入 self._attn ,这是真正的计算 \( \text{softmax}(\frac{QK^{T}}{\sqrt{d_{k}}})V \) 的函数,后文梳理 attention mask 时会详细介绍
attn_output, attn_weights = self._attn(query, key, value, attention_mask, head_mask)
  • 最后把各个注意力头计算的结果合并再输入线性层 c_proj 和 dropout:
attn_output = self._merge_heads(attn_output, self.num_heads, self.head_dim)
attn_output = self.c_proj(attn_output)
attn_output = self.resid_dropout(attn_output)
  • 对比 _split_heads_merge_heads, 这是两个互逆的过程:

    def _split_heads(self, tensor, num_heads, attn_head_size):
        new_shape = tensor.size()[:-1] + (num_heads, attn_head_size)
        tensor = tensor.view(new_shape)
        return tensor.permute(0, 2, 1, 3) # (batch, head, seq_length, head_features)
    
    def _merge_heads(self, tensor, num_heads, attn_head_size):
        tensor = tensor.permute(0, 2, 1, 3).contiguous() # (batch, seq_length, head, head_features)
        new_shape = tensor.size()[:-2] + (num_heads * attn_head_size,)
        return tensor.view(new_shape) # (batch, seq_length, hidden_size)
    

注意力的计算流程

计算注意力核心要素之一就是 attention mask,本节梳理自注意力层和交叉注意力层的 attention mask 的计算流程,说明 attention mask 在各个函数调用之间是如何传递和转换形状,最终影响注意力得分的。

自注意力层 attention mask 的生成

作为解码器,GPT2 在预测 token 时只同比自身位置更前的 token 计算相似度,且每一层 GPT2Block 都是遵循同样的原则。

一般来所,用 tokenizer 生成的 attention_mask 默认是形状为 (batch,seq) 的全 1 矩阵, 后文假设用户传入的 attention_mask 要么是全 1, 要么是 None;

回到最外层的 GPT2LMHeadModel 类,它的 forward 函数获得 attention_mask 参数后直接传递给 GPT2Model.foward , 中间没有对 attenton_mask 做额外操作。

GPT2Model.forward 中有以下代码:

# GPT2Attention mask.
if attention_mask is not None:
    if batch_size <= 0:
        raise ValueError("batch_size has to be defined and > 0")
    attention_mask = attention_mask.view(batch_size, -1)
    # We create a 3D attention mask from a 2D tensor mask.
    # Sizes are [batch_size, 1, 1, to_seq_length]
    # So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length]
    # this attention mask is more simple than the triangular masking of causal attention
    # used in OpenAI GPT, we just need to prepare the broadcast dimension here.
    attention_mask = attention_mask[:, None, None, :]

    # Since attention_mask is 1.0 for positions we want to attend and 0.0 for
    # masked positions, this operation will create a tensor which is 0.0 for
    # positions we want to attend and the dtype's smallest value for masked positions.
    # Since we are adding it to the raw scores before the softmax, this is
    # effectively the same as removing these entirely.
    attention_mask = attention_mask.to(dtype=self.dtype)  # fp16 compatibility
    attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min

首先,根据注释可以进一步确认用户输入的 attention_mask 应该是一个 (batch, length) 的二维 tensor, 其中只包括 0 和 1 ; 此外 attention_mask 会被展开成形状为 (batch, 1, 1, length) 的四维矩阵;

最后一句中,原始矩阵中的 1 会变成 0, 0 则变成接近 -inf, 因此它的意图是用加法(广播)来对注意力得分做 mask 后再计算 softmax, 使得那些 -inf 的值会得到接近 0 的注意力权重。

注意本文用 -inf 表示 torch 能表示的最接近负无穷的数,它的真实值如下:

print(torch.finfo(torch.float32).min)
print(torch.finfo(torch.float16).min)
print(torch.finfo().min)
-3.4028234663852886e+38
-65504.0
-3.4028234663852886e+38

如果用 -inf 符号去计算,可能会出现数值问题,应该避免。

torch.tensor(float("-inf")) + 12.2
tensor(-inf)

经过以上计算,当前的 attention_mask 要么是就是形状为 (batch,1,1,length) 的全 0 矩阵,要么还是 None.

GPT2Model.forward 会调用循环调用 GPT2Block.forward 继而调用 GPT2Attention.forward ,这三个函数都没有对 attention_mask 做额外处理,因此直接进入到最核心的 GPT2Attention._attn 函数

GPT2Attention._attn 函数

该函数首先计算完整的 scaled Dot-Product attention weight 值,也就 \( \text{softmax}(\frac{QK^{T}}{\sqrt{d_{k}}})V \) 中的 \( \frac{QK^{T}}{\sqrt{d_{k}}} \):

attn_weights = torch.matmul(query, key.transpose(-1, -2))

 if self.scale_attn_weights:
     attn_weights = attn_weights / torch.full(
         [], value.size(-1) ** 0.5, dtype=attn_weights.dtype, device=attn_weights.device
     )

假设输入序列长度是 6, batch 是 2, 采用默认 12 个注意力头,那么:

  • attn_weights 的形状是 torch.Size([2, 12, 6, 6]): 表示有 2 个样本,每个样本 12 个头,当前有 6 个 token, 因此得到的权重是 6x6
  • q,k,v 形状为 torch.Size([2, 12, 6, 64]): 表示每个 token 对应 12 个 heads, 每个 head 是 64 维度
  • attention_mask 的形状为 torch.Size([2, 1, 1, 6]), 内容都是 0 。(或为 None)

接着下来涉及对 attention mask 操作的代码是:

if not self.is_cross_attention:
    # if only "normal" attention layer implements causal mask
    query_length, key_length = query.size(-2), key.size(-2)
    causal_mask = self.bias[:, :, key_length - query_length : key_length, :key_length]
    mask_value = torch.finfo(attn_weights.dtype).min
    # Need to be a tensor, otherwise we get error: `RuntimeError: expected scalar type float but found double`.
    # Need to be on the same device, otherwise `RuntimeError: ..., x and y to be on the same device`
    mask_value = torch.full([], mask_value, dtype=attn_weights.dtype).to(attn_weights.device)
    attn_weights = torch.where(causal_mask, attn_weights.to(attn_weights.dtype), mask_value)

if attention_mask is not None:
    # Apply the attention mask
    attn_weights = attn_weights + attention_mask

attn_weights = nn.functional.softmax(attn_weights, dim=-1)

对于自注意力层,以上第一个分支内代码会被执行,前文中就已经知道 self.bias 是一个三角矩阵, 形状是 [1,1,max_position,max_position], max_positions 为 6 时如下:

tensor([[[[ True, False, False, False, False, False],
        [ True,  True, False, False, False, False],
        [ True,  True,  True, False, False, False],
        [ True,  True,  True,  True, False, False],
        [ True,  True,  True,  True,  True, False],
        [ True,  True,  True,  True,  True,  True]]]])

但如果当前输入的序列长度只有 3 的话,只需要选择左上角 3x3 的矩阵,也就是 bias[:,:,:3,:3]. 因此第三第四维度最多只需要取前 key_length 个元素,这才有如下代码:

causal_mask = self.bias[:, :, key_length - query_length : key_length, :key_length] 

不过在推理时,每次模型只是自回归式地解码出一个词汇作为下个时刻的输入,query_length 始终是 1, 因此每一步只需要对单个 token 计算注意力,也就是取出 key_length - 1 : key_length 这一行注意力 mask,形状是 [1,1,1,key_length], 它会在执行 torch.where 时从 (batch,heads, 1,key_length)attn_weights 上 mask 掉无效值。

注意 torch.where 也会广播,但这个场景中不需要广播

torch.where(torch.tensor([[1,0]]).bool(), torch.tensor([[1,2],[4,5]]), 100)
tensor([[  1, 100],
        [  4, 100]])

执行完判断分支里的代码后会得到了一个与 attention_mask 无关的 attn_weights ,这说明:

  • 如果 attention_mask 是 None 的话,以下语句不会触发, attention_mask 对注意力计算没有任何贡献。

    if attention_mask is not None:
        attn_weights = attn_weights + attention_mask
    
  • 如果 attention_mask 不是 None, 那么它就是 (batch,1,1,length) 形状的,需要和形状为 (batch, heads, query_length , key_length) 的 attn_weights 进行广播求和,结果矩阵的形状和后者保持一致。

结论是: attention_mask 只是用来 mask 某个 token 的,如 pad_token;

假设训练时,输入两个长度为 6 的序列, 那么 causal_mask 的形状就是 (1, 1, 6, 6)。如果不想要关注第 2 个 token, 那么输入的 attention_mask 形状为 (2,6) 的矩阵,其中每一行可以是:

[1,0,1,1,1,1]

经过 GPT2Model.forward 的处理后,变成了形状为 [2,1,1,6] 的矩阵,其中最后一维变成了

[0,-inf,0,0,0,0]

一旦广播,最后两维度就变成:

[
    [0,-inf,0,0,0,0],
    [0,-inf,0,0,0,0],
    [0,-inf,0,0,0,0],
    [0,-inf,0,0,0,0],
    [0,-inf,0,0,0,0],
    [0,-inf,0,0,0,0],
 ]

再与 attn_weights 相加的话,每个 token (行)和第 2 个(列) token 的注意力权重都变成 0 了。

接着对 attention 做 softmax 变成概率权重,然后对 value 做加权平均:

attn_weights = nn.functional.softmax(attn_weights, dim=-1)
attn_weights = self.attn_dropout(attn_weights)
attn_output = torch.matmul(attn_weights, value)

本节给工程实践上带来的启示是:如果想要修改自注意力的下三角 mask 矩阵,只能修改 GPT2Attention._attn (要么继承覆写,要么 monkey patch)函数,模型没有留出对外的注意力定制接口。

交叉注意力 attention mask 的生成

首先要清楚的是,GPT2 默认是纯 decoder 结构,没有 encoder 提供外部上下文, 因此理论上是不需要实现交叉注意力的,然而工程上没必要把该接口删除,以保持随时可以把 GPT2 和其他 encoder 模型组合起来变成 encoder-decoder 结构的灵活性。

GPT2Model.forward 中,如果有设置 add_cross_attentionTrue 以及提供了 encoder 的隐藏层,那么会激活对交叉注意力的准备流程:

# If a 2D or 3D attention mask is provided for the cross-attention
# we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length]
if self.config.add_cross_attention and encoder_hidden_states is not None:
    encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size()
    encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length)
    if encoder_attention_mask is None:
        encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device)
    encoder_attention_mask = self.invert_attention_mask(encoder_attention_mask)
else:
    encoder_attention_mask = None

从这里可以看到,首先 encoder_attention_mask 是 forward 参数提供的,假设 encoder 是一个 bert 类的模型,那么即使提供该参数,它也是形状为 (batch, seq_len) 的全 1 矩阵(不考虑 padding token),因此它和以下语句生成的 tensor 是一样的(所以如果不想定制交叉注意力 mask, 可以不用传入该参数)

if encoder_attention_mask is None:
    encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device)

接着调用了 self.invert_attention_mask 函数,它来自 ModuleUtilsMixin 类(GPT2Model 继承到了它)其实现如下:

def invert_attention_mask(...):
    if encoder_attention_mask.dim() == 3:
        encoder_extended_attention_mask = encoder_attention_mask[:, None, :, :]
    if encoder_attention_mask.dim() == 2:
        encoder_extended_attention_mask = encoder_attention_mask[:, None, None, :]
    encoder_extended_attention_mask = encoder_extended_attention_mask.to(dtype=self.dtype)  # fp16 compatibility
    encoder_extended_attention_mask = (1.0 - encoder_extended_attention_mask) * torch.finfo(self.dtype).min
    return encoder_extended_attention_mask

这些操作和自注意力的 attention_mask 的处理手段是一样的,都是先扩展维度,然后把 1 转为 0, 把 0 转为 -inf.

有一点不同的是,在第一个判断分支中表明 encoder_attention_mask 可以是三维结构,也就是说,用户可以通过函数参数来手动设置更复杂的交叉注意力 mask 。

接着调用 GPT2Block.forward ,它并没有对 mask 做额外处理。

再进入 GPT2Attention.forward, 在真正计算 attention 前,把 attenton_mask 赋值为 encoder_attention_mask, 也就是统一交叉注意力和自注意力的计算接口,区别只在于 key, value 的不同。

if encoder_hidden_states is not None:
    query = self.q_attn(hidden_states)
    key, value = self.c_attn(encoder_hidden_states).split(self.split_size, dim=2)
    attention_mask = encoder_attention_mask

之后就是调用 GPT2Attention._attn ,上文已经分析过该函数。交叉注意力场景下的区别在于,以下第一个条件分支是不会执行的, 不会构造 causal_mask, 对注意力的控制都是通过 attention_mask (也就是 encoder_attention_mask )完成的。

if not self.is_cross_attention:
    # all skip

if attention_mask is not None:
    attn_weights = attn_weights + attention_mask

attn_weights = nn.functional.softmax(attn_weights, dim=-1)

本节给工程实践带来的启示是:如果要修改交叉注意力 mask 矩阵,可以直接在 GPT2Model.forward 参数里传入二维或三维的 encoder_attention_mask 矩阵:

  • 二维矩阵可以使得编码器输入的某个 token 被 decoder 完全忽视
  • 三维矩阵则可以定制 decoder 里各 token 对 encoder 中各 token 的关注程度。

在后文介绍 EncoderDecoderModel 时还会再次从编码器解码交互的角度梳理一遍交叉注意力的计算过程。

其他层的说明

  • Dropout 层, GPT2LMHeadModel 出现 dropout 的地方:
    • embedding 计算之后
    • 注意力 softamx 之后
    • Feedfoward 层
    • GPT2MLP 最后一层

他们的特点都是经历了比较大的矩阵乘法之后做类似加噪/正则化的操作

  • 位置编码:

    GPT2 使用的是 pos embedding 的形式,位置编码是学习出来的。原始 transformers 用的是固定的编码。

模型解码过程解析

transformers 库的设计原则

在介绍 generate 函数前,有必要先说明该函数是如何与模型建立连接的,因此梳理 transfomers 库在架构上的设计原则:

  • 在模块的文件组织上,由于深度学习模型迭代非常快, huggingface 不遵循一般的软件产品开发中提倡的尽量复用的原则,而是选择了 Don't Repeat Yourself 准则,一个模型所有 layer 都写在一个文件里,尽管有些层(比如 attention)在其他文件里有实现,但最好的做法并不是去调用该函数,而是复制一份到当前文件里。

    这种方法的好处是,一篇论文的核心细节都在一个文件中,即便 GPT2 的维护人员(可能是任意开源贡献者)修改了某些模块也不会影响其他文件中模型的功能。

    本文讨论的 GPT2LMHeadModelGPT2ModelGPT2BlockGPT2Attention GPT2MLP 都放在 transformers 库的 models/gpt2/modeling_gpt2.py 中。如果想编写一个基于 GPT2 魔改的模型,一种最”暴力“的做法就是在当前目录下直接复制一个新的名如 modeling_gpt2_myversion.py 的文件,然后只修改需要的部分。

  • 在模块的抽象层级上,处于抽象层中间的是 PreTrainedModel 类, GPT2LMHeadModelGPT2Model 都是它的后代,它们先继承 GPT2PreTrainedModel, 该类做的事情比较少,主要是模型参数的初始化策略。而 GPT2PreTrainedModel 则继承了 PreTrainedModel

    class GPT2LMHeadModel(GPT2PreTrainedModel):
        # 带输出头的结构实现
    
    class GPT2PreTrainedModel(PreTrainedModel):
        # 模型参数初始化
    

    PreTrainedModel 继承了以下几个类,分别给 Model 提供 torch 模型、常用辅助函数、输入输出处理函数和 huggingface 模型上传接口。

    class PreTrainedModel(nn.Module, ModuleUtilsMixin, GenerationMixin, PushToHubMixin):
    

    GenerationMixin 定义了对输入输出格式进行转换处理的接口, generation 函数就来自于此,它包含各类解码算法的实现。

    连同上一章提到的类,绘制出的类继承关系如下图,每个类中包括了最典型几个接口的名称:

llm_transformer_inheritance.svg

其中没提到的 EncoderDecoderModel 会在后文中介绍。

参考:How to add a model to 🤗 Transformers?

generate 函数

根据上节可知,在 GPT2ModelGPT2LMHeadModel 或者任何继承自 PreTrainedModel 类的模型里都可以调用 generate 函数,它一般是在模型训练好之后做推理时调用,最简单的执行例子:

model_inputs = tokenizer('I enjoy walking with my cute dog', return_tensors='pt').to(torch_device)
greedy_output = model.generate(**model_inputs, max_new_tokens=40)
print(tokenizer.decode(greedy_output[0], skip_special_tokens=True))

默认情况下 generate 使用贪心策略解码。 但 generation 函数是一个很通用的入口,它可以解析传入的各类参数以满足不同的解码策略,比如 contrastive_search, beam_search 等等,它的功能可以用以下伪代码表示:

def generate(self, **kwargs):
    if meaning(kwargs) == "greedy_search":
        self.greedy_search(kwargs)
    elif meaning(kwargs) == "beam_search":
        self.beam_search(kwargs)
    elif #...

以下是执行 beam_search 的例子,由于参数 num_beams 不为 None ,所以触发了调用 self.beam_search

output_ids = self.model.generate(
    input_ids=torch.Tensor(context_ids)
    .long()
    .unsqueeze(0)
    .to(self.model.device),
    num_beams=10,
    num_return_sequences=9,
    logits_processor=self.processors,
)
  • 输入的准备工作

尽管 generate 函数就是一个解码策略分发器,但它还是会做一些通用的准备,都是以 _prepare* 命名的,以下分别是对 inputs_idattention_mask 的预处理:

inputs_tensor, model_input_name, model_kwargs = self._prepare_model_inputs(
            inputs, generation_config.bos_token_id, model_kwargs
        )

#...

accepts_attention_mask = "attention_mask" in set(inspect.signature(self.forward).parameters.keys())
requires_attention_mask = "encoder_outputs" not in model_kwargs

if model_kwargs.get("attention_mask", None) is None and requires_attention_mask and accepts_attention_mask:
    model_kwargs["attention_mask"] = self._prepare_attention_mask_for_generation(
        inputs_tensor, generation_config.pad_token_id, generation_config.eos_token_id
    )

后文对此会有更详细说明

解码缓存机制: past_key_values

前文介绍到 causal_mask 时简单提到过,训练阶段 GPT2 对每个时刻 token 的计算是并行的,而预测阶段采用的是自回归方式,当前时刻的 token 是上一时刻生成的。因此,预测时每次只要对最后一个 token 进行前向传播即可,但该 token 的每一层特征都需要与同一层的之前的所有特征计算 attention 值(类似一个 bigram 模型,当前 token 依赖上一个 token, 只不过 gpt 里上一个 token 是所有历史 token 的加权平均), 因此要把之前的用于计算 attention 的 key 和 value 缓存起来,保存在 past_key_values 变量中,其文档描述如下:

List of torch.FloatTensor of length config.n_layers, with each tensor of shape
(2, batch_size, num_attn_heads, decoder_sequence_length, embed_size_per_head).

这是一个长度等于 GPT2Model 层数也就是 GPT2Block 个数的 tuple, 其中每个元素都是一个 rank 为 5 的形状为 (2, batch_size, num_attn_heads, decoder_sequence_length, embed_size_per_head) 的 tensor 。

注意 2 这个维度可以不放到 tensor 里,也就是说,也可以表示成 2 个形状为 (batch_size, num_attn_heads, decoder_sequence_length, embed_size_per_head) 的 tensor, 因为该维度只是表示 key,value 是一对的, 用于存取,不会去做矩阵运算,因此用任何容器(list,tuple,tensor)来保存都可以。

例如,用 GPT-2 base 模型做 beam search ,当 beam_num=10 且准备预测第 6 个 token 的时候, past_key_values 是一个长度为 12 的列表,形状为 (2, 10, 12, 5, 64). 12 个注意力头,每个头 64 维,5 表示前五个 token,后文将继续用这个例子辅助说明。

64 * 12
768

input_ids 的准备

由于 past_key_values 的存在,在解码时,每次只需要传入最后一个 token_id, GPT2LMHeadModelprepare_inputs_for_generation 函数对此进行了处理:

def prepare_inputs_for_generation(self, input_ids, past_key_values=None, inputs_embeds=None, **kwargs):
    token_type_ids = kwargs.get("token_type_ids", None)
    # only last token for inputs_ids if past is defined in kwargs
    if past_key_values:
        input_ids = input_ids[:, -1].unsqueeze(-1)
        if token_type_ids is not None:
            token_type_ids = token_type_ids[:, -1].unsqueeze(-1)

    attention_mask = kwargs.get("attention_mask", None)
    position_ids = kwargs.get("position_ids", None)

    if attention_mask is not None and position_ids is None:
        # create position_ids on the fly for batch generation
        position_ids = attention_mask.long().cumsum(-1) - 1
        position_ids.masked_fill_(attention_mask == 0, 1)
        if past_key_values:
            position_ids = position_ids[:, -1].unsqueeze(-1)

    # skip
    model_inputs.update(
        {
            "past_key_values": past_key_values,
            "use_cache": kwargs.get("use_cache"),
            "position_ids": position_ids,
            "attention_mask": attention_mask,
            "token_type_ids": token_type_ids,
        }
    )
    return model_inputs

past_key_values 存在的时候,下一次推理的输出只会取最后一个 token 的最后一项: input_ids[:, -1].unsqueeze(-1). 同样如果 position_ids 存在,也只取最后一个元素: position_ids[:, -1].unsqueeze(-1)

与之对应的, GPT2Attention._attn 函数 一节提到,在做自注意力的时候, causal_mask 矩阵不再是一个方阵,它从形状从 (1, 1, key_length, key_length) 变成了 (1, 1, 1, key_length), 后者在参与计算时核心是以下两段代码:

attn_weights = torch.where(causal_mask, attn_weights.to(attn_weights.dtype), mask_value)
# skip...

if attention_mask is not None:
    attn_weights = attn_weights + attention_mask

causal_mask 的形状是 (1,1,1,key_length), attention_mask 是 None 则不会参与计算,如果不是 None 那么形状是 (batch,1,1,key_length), atten_weights 形状为 (batch, 12, 1, key_length) 广播出来还是 (batch, 12, 1, key_length), 这些形状和传入的 input_ids 和 posint_ids 都是匹配的。

prepare_inputs_for_generation 函数是在具体的解码策略函数里调用,比如以下是从 GenerationMixin.greedy_search 函数中提取出的相关部分,在循环解码的每一步,先调用输入准备函数,然后执行 self(*kwargs) 从而触发前向传播

while True:
    # ...
    # prepare model inputs
    model_inputs = self.prepare_inputs_for_generation(input_ids, **model_kwargs)

    # forward pass to get next token
    outputs = self(
        **model_inputs,
        return_dict=True,
        output_attentions=output_attentions,
        output_hidden_states=output_hidden_states,
    )
    next_token_logits = outputs.logits[:, -1, :]
    next_tokens_scores = logits_processor(input_ids, next_token_logits)
    next_tokens = torch.argmax(next_tokens_scores, dim=-1)
    input_ids = torch.cat([input_ids, next_tokens[:, None]], dim=-1)
    model_kwargs = self._update_model_kwargs_for_generation(
                outputs, model_kwargs, is_encoder_decoder=self.config.is_encoder_decoder
            )

首先要清楚,解码时第一次的时候传入 prepare_inputs_for_generation 的 **model_kwargs 中是没有 past_key_values 的,也就是为默认值 None.

有了解码的结果 outputs 后通过以下两个函数对其进行后处理:

def _update_model_kwargs_for_generation(self, outputs: ModelOutput, model_kwargs: Dict[str, Any]) -> Dict[str, Any]:
    # update past_key_values
    model_kwargs["past_key_values"] = self._extract_past_from_model_output(
        outputs, standardize_cache_format=standardize_cache_format
    )
    #skip
def _extract_past_from_model_output(self, outputs: ModelOutput, standardize_cache_format: bool = False):
    past_key_values = None
    if "past_key_values" in outputs:
        past_key_values = outputs.past_key_values
    #skip
    return past_key_values

这个时候,outputs 中已经有了非 None 的 past_key_values, 它从 outputs 中提取出,通过 _update_model_kwargs_for_generation 函数又放回到 model_kwargs 中。

past_key_values 传入 GPT2LMHeadModel

past_key_values 是在执行以下命令后返回到 outputs 里的

outputs = self(
    **model_inputs,
    return_dict=True,
    output_attentions=output_attentions,
    output_hidden_states=output_hidden_states,
)

这里 self 是 GPT2LMHeadModel, 但 GPT2LMHeadModel 的 forward 里没有对 past_key_values 做处理,直接就传入到 self.transformer, 也就是 GPT2Model.forward 中

transformer_outputs = self.transformer(
    input_ids,
    past_key_values=past_key_values,
    #skip
    )

past_key_values 传入 GPT2Model

GPT2Model.forward 中相关代码如下:

if past_key_values is None:
    past_length = 0
    past_key_values = tuple([None] * len(self.h))
else:
    past_length = past_key_values[0][0].size(-2)
if position_ids is None:
    position_ids = torch.arange(past_length, input_shape[-1] + past_length, dtype=torch.long, device=device)
    position_ids = position_ids.unsqueeze(0).view(-1, input_shape[-1])

这里如果 past_key_value 为 None, 那么会创建一个长度等于 len(self.h) 的全为 None 的 tuple,

接着检查如果没有传入 position_ids (大部分时候都不会手动传入)的话,会根据 past_key_values 维度创建 position_ids , past_key_values[0][0].size(-2) 是已经生成的序列的长度。考虑两种情况:

  • 训练的时候,那么每次前向传播只是手动输入 input_ids 和 labels, 其他参数默认, 因此每次 past_key_values 都是 None, past_length 始终 0, 于是 position_ids 计算如下

    print(torch.arange(0, 3))
    print(torch.arange(0, 3).unsqueeze(0).view(-1, 3))
    
    tensor([0, 1, 2])
    tensor([[0, 1, 2]])
    

    也就是覆盖了整个输入序列。

  • 解码的时候,假设 past_key_values 中 key 和 value 已经存了 2 个 token 的缓存,那么 past_length 等于 2. 当前为了预测第三个 token, 传入的 input_id 实际是一个 (batch,1) 形状的矩阵(后文会提到是哪里对输入进行切分的),因此 input_shape[-1] 等于 1, position_ids 计算为:

    print(torch.arange(2, 1 + 2))
    print(torch.arange(2, 1 + 2).unsqueeze(0).view(-1, 1))
    
    tensor([2])
    tensor([[2]])
    

    计算出来的 position_ids 是最后这个 token 的,形状为 (batch, 1) 。

回到 past_key_values, 在 GPT2Model.forward 中,它的以 layer_past 之名分别输入到了各层 GPT2Block 的 forward 中:

for i, (block, layer_past) in enumerate(zip(self.h, past_key_values)):
    #skip
    outputs = block(layer_past, ...)

past_key_values[i] 传入 GPT2Block

在 GPT2Block 中,layer_past 传递给了自注意力层 GPT2Attention 的 forward 里:

attn_outputs = self.attn(
            hidden_states,
            layer_past=layer_past,
            attention_mask=attention_mask,
            head_mask=head_mask,
            use_cache=use_cache,
            output_attentions=output_attentions,
        )

而在计算交叉注意力时,没有传入 layer_past

outputs = attn_outputs[1:] # 获得 present (attentions)

if encoder_hidden_states is not None:
    # skip
    residual = hidden_states
    hidden_states = self.ln_cross_attn(hidden_states)
    cross_attn_outputs = self.crossattention(
        hidden_states,
        attention_mask=attention_mask,
        head_mask=head_mask,
        encoder_hidden_states=encoder_hidden_states,
        encoder_attention_mask=encoder_attention_mask,
        output_attentions=output_attentions,
    )
    attn_output = cross_attn_outputs[0]

由于 GPT2Attention.forward 中 layer_past 的默认值为 None, 这大致说明交叉注意力是不需要 key value 缓存的,具体细节见下节。

past_key_values[i] 传入 GPT2Attention

GPT2Attention.forward 形参如下:

# GPT2Attention
def forward(
     self,
     hidden_states: Optional[Tuple[torch.FloatTensor]],
     layer_past: Optional[Tuple[torch.Tensor]] = None,
     # skip

接着是 layer_past 相关代码:

query = self._split_heads(query, self.num_heads, self.head_dim)
key = self._split_heads(key, self.num_heads, self.head_dim)
value = self._split_heads(value, self.num_heads, self.head_dim)

if layer_past is not None:
    past_key, past_value = layer_past
    key = torch.cat((past_key, key), dim=-2)
    value = torch.cat((past_value, value), dim=-2)

if use_cache is True:
    present = (key, value)
else:
    present = None

# skip
attn_output, attn_weights = self._attn(query, key, value, attention_mask, head_mask)

在切分出 query,key,value 后,如果 layer_past 不是 None, 就取出 past_keypast_value, 继续把当前计算出的 key value 在 dim=-2 维度拼接,该维度的意义是 decoder_sequence_length, 拼接完维度就是 (10,12,6,64) 。

解码时 use_cache 默认为 True, 因此将 key,value 放到 present 变量继而放到 outputs[1] 中保存下来, present 的形状可以看作是 (2,10,12,6,64), 尽管它是个 tuple.

layer_past 会为 None 的情况再次回顾:

  • 训练时,我们直接调用 GPT2LMHeadModel 的 forward 函数,且根本不会手动传入 past_key_values 参数,又上文可知 past_key_values 就是一个全 None 的 tuple, layer_past 也就是 None
  • 交叉注意力层,不会传入 layer_past, 因此用默认值 None
  • 解码第一个 token 的时候

从 GPT2Attention 中返回

GPT2Attention 的 foward 中计算完自注意力后, (key, value) 会放到 outputs 的第二个元素里返回到 GPT2Block 中

#skip merge_heads, c_proj and  dropout
outputs = (attn_output, present)
if output_attentions:
    outputs += (attn_weights,)
return outputs  # a, present, (attentions)

从 GPT2Block 中返回

GPT2Attention.forward 返回的 outputs 会在 GPT2Block 中以 attn_outputs 变量会接收,然后其第二个元素之后的内容放到 outputs 中,注意 attn_outputs[1] 就是 present, 也就是本层的 (key, value) 缓存对。

attn_outputs = self.attn(layer_past=layer_past,)
outputs = attn_outputs[1:]

然后执行以下 use_cache 为 True 的分支

if use_cache:
    outputs = (hidden_states,) + outputs
else:
    outputs = (hidden_states,) + outputs[1:]

return outputs  # hidden_states, present, (attentions, cross_attentions)

也就是说 GPT2Block 返回的结果的第二个是本曾的 (key, value) 缓存对

从 GPT2Model 中返回

GPT2Model.forward 中所有与 past_key_values 相关代码如下:

if past_key_values is None:
    past_length = 0
    past_key_values = tuple([None] * len(self.h))
# skip
presents = () if use_cache else None

for i, (block, layer_past) in enumerate(zip(self.h, past_key_values)):
    # skip
    outputs = block(layer_past, ...)
    if use_cache is True:
        presents = presents + (outputs[1],)
# skip
# 当前得到 n_layers 个 (key,value) 对, 放在结果第二个位置被返回
return BaseModelOutputWithPastAndCrossAttentions(
            last_hidden_state=hidden_states,
            past_key_values=presents,
            hidden_states=all_hidden_states,
            attentions=all_self_attentions,
            cross_attentions=all_cross_attentions,
        )

GPT2Model.forward 拿到每一层 GPT2Block 的返回值后取出了 outputs[1] 加入到 presents 变量,构成了一个长度为 len(self.h) 的 tuple, 然后包装在 BaseModelOutputWithPastAndCrossAttentions 的第二个变量 past_key_values 中。

从 GPT2LMHeadModel 中返回

GPT2LMHeadModel.forward 中的 GPT2Model 是以 self.transformer 对象执行的,其返回的 transformer_outputs 变量的各个参数又被包装到 CausalLMOutputWithCrossAttentions 里然后再次返回

transformer_outputs = self.transformer(
    input_ids,
    past_key_values=past_key_values,
    #skip
)

#skip
return CausalLMOutputWithCrossAttentions(
    loss=loss,
    logits=lm_logits,
    past_key_values=transformer_outputs.past_key_values,
    hidden_states=transformer_outputs.hidden_states,
    attentions=transformer_outputs.attentions,
    cross_attentions=transformer_outputs.cross_attentions,
)

从 generate 循环中返回

我们再回到第一节调用 Model.generate 的场景,代码如下:

while True:
    # ...
    # prepare model inputs
    model_inputs = self.prepare_inputs_for_generation(input_ids, **model_kwargs)

    # forward pass to get next token
    outputs = self(
        **model_inputs,
        return_dict=True,
        output_attentions=output_attentions,
        output_hidden_states=output_hidden_states,
    )
    next_token_logits = outputs.logits[:, -1, :]
    next_tokens_scores = logits_processor(input_ids, next_token_logits)
    next_tokens = torch.argmax(next_tokens_scores, dim=-1)
    input_ids = torch.cat([input_ids, next_tokens[:, None]], dim=-1)
    model_kwargs = self._update_model_kwargs_for_generation(
                outputs, model_kwargs, is_encoder_decoder=self.config.is_encoder_decoder
            )

outputs 就是一个 CausalLMOutputWithCrossAttentions 类型的对象。其中包括 past_key_values 变量,它在 _update_model_kwargs_for_generation 函数里被取出又放回到 model_kwargs 之中,进入下一次循环采样(如不熟悉,可以回顾本章第一节 input_ids 的准备 的内容来重新看清整个流程。)

以 greedy_search 为例,以上循环结束最后返回结果是:

if return_dict_in_generate:
    if self.config.is_encoder_decoder:
        return GreedySearchEncoderDecoderOutput(
            sequences=input_ids,
            scores=scores,
            encoder_attentions=encoder_attentions,
            encoder_hidden_states=encoder_hidden_states,
            decoder_attentions=decoder_attentions,
            cross_attentions=cross_attentions,
            decoder_hidden_states=decoder_hidden_states,
        )
    else:
        return GreedySearchDecoderOnlyOutput(
            sequences=input_ids,
            scores=scores,
            attentions=decoder_attentions,
            hidden_states=decoder_hidden_states,
        )
else:
    return input_ids

两个 if 分支里不会再包含 past_key_values, 因此最后缓存是不会被 generate 函数返回的,在循环采样结束后会自动释放掉。

另外,以上的 input_ids 实际是解码出来的结果 token id 序列,之所以叫做 input_ids, 是因为循环中它不断作为下一次解码的输入,因此直到停止也叫做 input_ids。而我们最后通过 outputs.sequences 就可以取出最终的结果

beam_search 过程

本节梳理序列生成任务中比较常用也是各类采样策略中较复杂的 beam search 部分的代码实现

beam search 的算法逻辑写在 GenerationMixin.beam_search 函数中,不过其涉及的数据结构是在 transformers 库的 generation/beam_search.py 文件里,核心是以下几个类:

from transformers.generation.beam_search import (
    BeamHypotheses
    BeamScorer,
    BeamSearchScorer,
    ConstrainedBeamSearchScorer,
)

BeamSearchScorer.process
BeamSearchScorer.finalize

本文只关注 BeamSearchScorer 类中 self.num_beam_groups=1 情况下的代码执行逻辑,这是最通用的 beam search 实现, ConstrainedBeamSearchScorer 等其他变体也不在梳理范围。

按算法分类的话, beam search 是一种剪枝的宽度优先树路径搜索算法,beam_num = 2 意味着要找出 2 条路径(beam 在后文中等于搜索路径)。 下图中,编号表示遍历的顺序编号,绿色加粗的是搜索出的路径。左图是普通的宽度优先搜索,它需要访问每一个节点,对每条路径打分,然后选出最优的两条路径,因此如果每个节点都有 k 个邻居,采样了 n 步,那么时间复杂度是指数级别 \( O(k^{n}) \);右图是 beam search, 在每一层搜索后,只保留分数最高的 2 条路径继续往下搜索,其搜索空间大大减小,基本是 \( O(2nk) \) 。放在序列解码的任务场景中,k 就是字典大小,n 是生成序列的长度。

llm_beam_search.svg

beam_search 前的准备

generate 函数中,如果有 num_beams 参数(以及没有其他采样策略选项),那么会调用 beam_search 函数,具体的触发条件如下:

is_beam_gen_mode = (
    (generation_config.num_beams > 1)
    and (generation_config.num_beam_groups == 1)
    and generation_config.do_sample is False
    and not is_constraint_gen_mode
    and not is_contrastive_search_gen_mode
)

为了能够维持路径得分,先实例化一个 BeamSearchScorer 类的对象,它继承自 BeamScorer 抽象类,用于跟踪搜索过程中的状态:

beam_scorer = BeamSearchScorer(
    batch_size=batch_size,
    num_beams=generation_config.num_beams,
    device=inputs_tensor.device,
    length_penalty=generation_config.length_penalty,
    do_early_stopping=generation_config.early_stopping,
    num_beam_hyps_to_keep=generation_config.num_return_sequences,
    max_length=generation_config.max_length,
)

该类对 batch 里每个样本构建一个 BeamHypotheses 对象来追踪 beam 的状态,这些对象在属性 _beam_hyps 中:

self._beam_hyps = [
            BeamHypotheses(
                num_beams=self.num_beams,
                length_penalty=self.length_penalty,
                early_stopping=self.do_early_stopping,
                max_length=max_length,
            )
            for _ in range(batch_size)
        ]

注意每个样本搜索时要在 num_beams 个"平行世界" 里同时进行探索, 因此接下来调用 _expand_inputs_for_generation 对 input_ids 、attention_mask 等 tensor 类型的模型输入做扩充:

def _expand_inputs_for_generation(
    expand_size: int = 1,
    is_encoder_decoder: bool = False,
    input_ids: Optional[torch.LongTensor] = None,
    **model_kwargs,
) -> Tuple[torch.LongTensor, Dict[str, Any]]:
    """Expands tensors from [batch_size, ...] to [batch_size * expand_size, ...]"""

    def _expand_dict_for_generation(dict_to_expand):
        for key in dict_to_expand:
            if dict_to_expand[key] is not None and isinstance(dict_to_expand[key], torch.Tensor):
                dict_to_expand[key] = dict_to_expand[key].repeat_interleave(expand_size, dim=0)
        return dict_to_expand

    if input_ids is not None:
        input_ids = input_ids.repeat_interleave(expand_size, dim=0)

    model_kwargs = _expand_dict_for_generation(model_kwargs)

    if is_encoder_decoder:
        # Skip some code
        model_kwargs["encoder_outputs"] = _expand_dict_for_generation(model_kwargs["encoder_outputs"])

    return input_ids, model_kwargs

例如 beam_num = 2 的时候,需要对输入中所有 tensor 都复制出额外的 1 份从而扩大 batch 为 2,如果原本是 [1,2,3] 当前会变成 [1,1,2,2,3,3]. 这样就有 beam_num 个单独的向前传播路径。 当然,输入的一般是形状为 (batch, seq_len) ,并且第一个 token_id 一般是 101, 对应 CLS token (注:CLS 一般是在 BERT 类编码器模型中用来做句子分类的 token 的代号, 在 GPT 类生成模型中,一般用 bos, begin of sentence 来表示,但本文还是沿用 CLS 这个标记了, 另外,基于 gpt-2 的模型有很多,不同模型可能使用不同的分词方式和词表,因此 bos 对应的 id 也不同,gpt2 官方实现里该 token 编码是 50256,但本文用的是另一个以 101 作为 bos id 的模型进行示例的) 。

num_beams = 2
print(torch.tensor([1,2,3]).repeat_interleave(num_beams, dim=0))
print(torch.tensor([[101, 23]]).repeat_interleave(num_beams, dim=0))
tensor([1, 1, 2, 2, 3, 3])
tensor([[101,  23],
        [101,  23]])

被扩充后的 input_ids 的第一个维度被称为 batch_beam_size, 它等于 batch_size*num_beams

问题:为什么预测第一个 token 时候也要复制 beam_num 份样本扩充?这不会导致采样结果完全一样吗?想象一下,在 beam_num = 2 的情况下,刚开始就复制了两份输入,因此两份输入的预测结果是完全一样的,采样得到的最高得分的两个 token 也就完全一样了。

答:确实是这样的,如果不对得分做处理,这种方式会预测出完全相同的序列,transformers 的解决方法是在计算得分的时候给那些复制出的 beam_num - 1 个样本的初始得分设置得很低(下节有具体代码的展示),这样第一次采样时复制样本的预测就不会被考虑到了。 这样实现是为了统一输入的格式,使得每次循环的输入都有 batch*beam_num 个样本,第一次也不例外。

扩充 input_ids 之后,则真正调用 beam_search 函数:

# 13. run beam search
return self.beam_search(
    input_ids,
    beam_scorer,
    logits_processor=logits_processor,
    stopping_criteria=stopping_criteria,
    pad_token_id=generation_config.pad_token_id,
    eos_token_id=generation_config.eos_token_id,
    output_scores=generation_config.output_scores,
    return_dict_in_generate=generation_config.return_dict_in_generate,
    synced_gpus=synced_gpus,
    **model_kwargs,
)

循环解码核心

beam_search 函数的核心代码为可以分成三部分来看:

  1. 初始化得分矩阵记录每个路径得分,进入循环,将输入传到模型 forward 中计算出下一时刻各个 token 的得分
  2. 用下一时刻各 token 得分更新当前的得分矩阵
  3. 修剪当前的得分矩阵, 只保留得分最高的 id 对应的那些路径

以下是第一个阶段:

  • 首先注释里解释了把被复制的样本的得分设置为 -1e9 使得第一次计算的时候它们的得分不会被考虑进去,这是解决上一节中提到的问题的代码实现。
  • prepare_inputs_for_generation 已经在上文提到过,主要逻辑是,如果 past_key_values 存在,则只取 input_ids 、position_ids 里最后一个 token 进行推理。
# initialise score of first beam with 0 and the rest with -1e9. This makes sure that only
# tokens of the first beam are considered to avoid sampling the exact same tokens across all beams.
beam_scores = torch.zeros((batch_size, num_beams), dtype=torch.float, device=input_ids.device)
beam_scores[:, 1:] = -1e9
beam_scores = beam_scores.view((batch_size * num_beams,))
while True:
    model_inputs = self.prepare_inputs_for_generation(input_ids, **model_kwargs)

    outputs = self(
        **model_inputs,
        return_dict=True,
        output_attentions=output_attentions,
        output_hidden_states=output_hidden_states,
    )

    # skip part2: score calculate
    # skip part3: reshape

用一个简单例子来说明(batch=3, beam_size = 2, 词库大小为 5,初始输入为 CLS )

batch_size, num_beams = 3, 2
beam_scores = torch.zeros((batch_size, num_beams), dtype=torch.float)
beam_scores[:, 1:] = -1e9
beam_scores = beam_scores.view((batch_size * num_beams,))
print(beam_scores)
tensor([ 0.0000e+00, -1.0000e+09,  0.0000e+00, -1.0000e+09,  0.0000e+00,
        -1.0000e+09])

这里可以看到,每个样本的第二 beam 得分都非常小。

有了模型的 outputs 输出,关键就是获取下一个时刻各个 token 的得分、累加到历史得分里并找出最高的 num_beam 个得分

while True:
    # skip 
    outputs = self(**model_inputs,...)# model forward

    next_token_logits = outputs.logits[:, -1, :]
    next_token_scores = nn.functional.log_softmax(
        next_token_logits, dim=-1
    )  # (batch_size * num_beams, vocab_size)

    next_token_scores_processed = logits_processor(input_ids, next_token_scores)
    next_token_scores = next_token_scores_processed + beam_scores[:, None].expand_as(next_token_scores)

    vocab_size = next_token_scores.shape[-1]
    next_token_scores = next_token_scores.view(batch_size, num_beams * vocab_size)

    # Sample 2 next tokens for each beam (so we have some spare tokens and match output of beam search)
    next_token_scores, next_tokens = torch.topk(
        next_token_scores, 2 * num_beams, dim=1, largest=True, sorted=True
    )

    next_indices = torch.div(next_tokens, vocab_size, rounding_mode="floor")
    next_tokens = next_tokens % vocab_size

    # skip part3: reshape

用上文中的例子来解释每一行代码:

  • 第一次循环时,outputs.logits 是一个形状为 [6, 1, 5] 的 tensor, next_token_logits 的形状为 [6, 5] , 计算 softmax 并求 log 之后得到的 next_token_scores 继续保持 [6, 5] 的形状。 假设是如下得分(注意由于第一次循环时 input_ids 都是 CLS, 因此组内各 beam 得分是一样的):

    torch.manual_seed(1)
    input_ids = torch.tensor([[101]]*6)
    next_token_scores = torch.randn((3, 5)).repeat_interleave(num_beams, dim=0)
    print(input_ids)
    print(next_token_scores)
    
    tensor([[101],
            [101],
            [101],
            [101],
            [101],
            [101]])
    tensor([[ 0.6614,  0.2669,  0.0617,  0.6213, -0.4519],
            [ 0.6614,  0.2669,  0.0617,  0.6213, -0.4519],
            [-0.1661, -1.5228,  0.3817, -1.0276, -0.5631],
            [-0.1661, -1.5228,  0.3817, -1.0276, -0.5631],
            [-0.8923, -0.0583, -0.1955, -0.9656,  0.4224],
            [-0.8923, -0.0583, -0.1955, -0.9656,  0.4224]])
    

    肉眼上看,各样本最高得分的两个 token 下标分别是: (0, 3), (2, 0), (4, 1);

  • logits_processor 用于对 logit 做修改,可以用来限制采样,比如假设有一些敏感词汇要过滤,那么可以通过 processor 来把这些词的得分变得很低。这里假设过滤掉最后一个词,那么 logits_processor 可以看作以下函数(尽管它是一个类,但用 __call__ 方法转成了带参数的函数)

    def logits_processor(input_ids, next_token_scores):
        next_token_scores[:,-1] = -1e9
        return next_token_scores
    
    next_token_scores_processed = logits_processor(input_ids, next_token_scores)
    print(next_token_scores_processed)
    
    tensor([[ 6.6135e-01,  2.6692e-01,  6.1677e-02,  6.2132e-01, -1.0000e+09],
            [ 6.6135e-01,  2.6692e-01,  6.1677e-02,  6.2132e-01, -1.0000e+09],
            [-1.6613e-01, -1.5228e+00,  3.8168e-01, -1.0276e+00, -1.0000e+09],
            [-1.6613e-01, -1.5228e+00,  3.8168e-01, -1.0276e+00, -1.0000e+09],
            [-8.9229e-01, -5.8250e-02, -1.9551e-01, -9.6564e-01, -1.0000e+09],
            [-8.9229e-01, -5.8250e-02, -1.9551e-01, -9.6564e-01, -1.0000e+09]])
    

    next_token_scores_processed 的形状仍然是 [6, 5], 不过由于最后一个元素被过滤掉了,此时各样本最高得分的两个 token 下标分别是: (0, 3), (2, 0), (1, 2);

  • 接着把以上得分广播并累加到路径得分 beam_scores 上:

    next_token_scores_processed + beam_scores[:, None].expand_as(next_token_scores)
    
    tensor([[ 6.6135e-01,  2.6692e-01,  6.1677e-02,  6.2132e-01, -1.0000e+09],
            [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09],
            [-1.6613e-01, -1.5228e+00,  3.8168e-01, -1.0276e+00, -1.0000e+09],
            [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09],
            [-8.9229e-01, -5.8250e-02, -1.9551e-01, -9.6564e-01, -1.0000e+09],
            [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09]])
    

    这实际可以简写成:

    next_token_scores_processed + beam_scores[:, None]
    
    tensor([[ 6.6135e-01,  2.6692e-01,  6.1677e-02,  6.2132e-01, -1.0000e+09],
            [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09],
            [-1.6613e-01, -1.5228e+00,  3.8168e-01, -1.0276e+00, -1.0000e+09],
            [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09],
            [-8.9229e-01, -5.8250e-02, -1.9551e-01, -9.6564e-01, -1.0000e+09],
            [-1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09]])
    

    还是保留原始代码:

    next_token_scores = next_token_scores_processed + beam_scores[:, None].expand_as(next_token_scores)
    

    此时 next_token_scores 还是形状为 [6,5] 的 tensor, 第一次循环中,每个样本中额外复制出来的 beam 的得分都是无穷小,只有第一个 beam 保存了各个 token 的得分

  • 为了对所有路径中的节点统计得分,要把 [6,5] 维度变成 [3, 10], 也就是每个样本的所有 beam 里的 token 得分都放在一个维度:

    vocab_size = next_token_scores.shape[-1] # 5
    next_token_scores = next_token_scores.view(batch_size, num_beams * vocab_size)
    print(next_token_scores)
    
    tensor([[ 6.6135e-01,  2.6692e-01,  6.1677e-02,  6.2132e-01, -1.0000e+09,
             -1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09],
            [-1.6613e-01, -1.5228e+00,  3.8168e-01, -1.0276e+00, -1.0000e+09,
             -1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09],
            [-8.9229e-01, -5.8250e-02, -1.9551e-01, -9.6564e-01, -1.0000e+09,
             -1.0000e+09, -1.0000e+09, -1.0000e+09, -1.0000e+09, -2.0000e+09]])
    
  • 接着对每个样本取出得分最高的两个元素的得分以及对应下标,正常思路就是在第二个维度上进行排序并取前 2 个,。 不过这里取了 2*num_beams 个最高得分,有一倍的冗余。

    next_token_scores, next_tokens = torch.topk(
          next_token_scores, 2 * num_beams, dim=1, largest=True, sorted=True
      )
    print(next_token_scores)
    print(next_tokens)
    
    tensor([[ 0.6614,  0.6213,  0.2669,  0.0617],
            [ 0.3817, -0.1661, -1.0276, -1.5228],
            [-0.0583, -0.1955, -0.8923, -0.9656]])
    tensor([[0, 3, 1, 2],
            [2, 0, 3, 1],
            [1, 2, 0, 3]])
    

    next_token_scores 维度变成了 [3, 4] 也就是 [batch_size, 2*num_beams], 另外next_tokens 每行的前 2 个和上文中肉眼识别的结果 (0, 3), (2, 0), (1, 2) 是一样的。

  • 为了确定最高得分来自于哪个 beam 束,用 torch.div 和求模的方式来获得 beam_idx 和 token_id. (不过在以上例子中,由于是第一次循环,其他 beam 的值都接近负无穷,因此 topk 都来自于 beam_idx=0 的路径)

    next_indices = torch.div(next_tokens, vocab_size, rounding_mode="floor")
    next_tokens = next_tokens % vocab_size
    print(next_indices)
    print(next_tokens)
    
    tensor([[0, 0, 0, 0],
            [0, 0, 0, 0],
            [0, 0, 0, 0]])
    tensor([[0, 3, 1, 2],
            [2, 0, 3, 1],
            [1, 2, 0, 3]])
    

    这两个 tensor 形状都是 [batch_size, 2*num_beams]

更新 beam_scores: BeamScorer.process

上一节内容已经展示了 beam search 算法核心的 80% 了,但还有很多工程上的工作,例如要根据以上得到的各种得分以及排序结果来更新 beam_scores 、input_ids 等进入下一次循环所需要的变量,还要判断各个 beam 中是否已经采样出了结束符号(EOS)从而要停止某些 beam 的采样。

以下代码中先调用 beam_scorer.process 来更新 beam_scores ,源代码里有 "#stateless" 注释表明该函数的执行是无状态的,意味着函数的返回值 beam_outputs 完全是由函数输入所决定的,不需要 beam_scorer 对象里额外的其他信息。 因为各个 beam 的得分已经都广播到 next_token_scores 中了,这里要做的是从该变量里”提取“出新的 beam_scores 。(最初看代码的时候我以为是 beam_scorer 维持了各个路径的历史得分,然后通过 process 函数去去更新路径得分,但这种做法就不是 stateless, 而是 stateful 了。 beam_scorer 实际的主要作用是跟踪 beam 是否停止,后文将具体介绍)

while True:
    # skip 

    # stateless
    beam_outputs = beam_scorer.process(
        input_ids, #(batch_size, seq_len)
        next_token_scores, #(batch_size, 2*num_beams)
        next_tokens, #(batch_size, 2*num_beams)
        next_indices, #(batch_size, 2*num_beams)
        pad_token_id=pad_token_id,
        eos_token_id=eos_token_id,
        beam_indices=beam_indices, # None
    )
    beam_scores = beam_outputs["next_beam_scores"]
    beam_next_tokens = beam_outputs["next_beam_tokens"]
    beam_idx = beam_outputs["next_beam_indices"]

beam_scorer.process 中先初始化 beam 的 scores, tokens 和 indices, 如下,这里 self.group_size 默认等于 num_beams

cur_len = input_ids.shape[-1] + 1  # add up to the length which the next_scores is calculated on
batch_size = len(self._beam_hyps)
next_beam_scores = torch.zeros((batch_size, self.group_size)) # (batch_size, num_beam)
next_beam_tokens = torch.zeros((batch_size, self.group_size))
next_beam_indices = torch.zeros((batch_size, self.group_size))

接着对 batch 里每个样本进行处理:

for batch_idx, beam_hyp in enumerate(self._beam_hyps): # #batch_size
    if self._done[batch_idx]:
        next_beam_scores[batch_idx, :] = 0
        next_beam_tokens[batch_idx, :] = pad_token_id
        next_beam_indices[batch_idx, :] = 0
        continue

以上先检查当前样本是否可以停止采样,判断标准是 self._done[batch_idx], 该变量在 init 里初始化为:

self._done = torch.tensor([False for _ in range(batch_size)], dtype=torch.bool)

接着对每个样本的每个搜索路径 beam 进行检查:

for batch_idx, beam_hyp in enumerate(self._beam_hyps):
    #skip
    # next tokens for this sentence
    beam_idx = 0
    for beam_token_rank, (next_token, next_score, next_index) in enumerate(
        zip(next_tokens[batch_idx], next_scores[batch_idx], next_indices[batch_idx])
    ):
        batch_beam_idx = batch_idx * self.group_size + next_index
        # add to generated hypotheses if end of sentence
        if (eos_token_id is not None) and (next_token.item() in eos_token_id):
            #skip
        else:
            # add next predicted token since it is not eos_token
            next_beam_scores[batch_idx, beam_idx] = next_score
            next_beam_tokens[batch_idx, beam_idx] = next_token
            next_beam_indices[batch_idx, beam_idx] = batch_beam_idx
            beam_idx += 1

        # once the beam for next step is full, don't add more tokens to it.
        if beam_idx == self.group_size:
            break

以上核心是 if (eos_token_id 判断的两个分支,本节先分析 if 判断失败的情况,该分支表明当前采样出的并不是 eos, 因此采样继续循环,所以这里做的就是把 socre, token 和 beam_idx 重新添加回到对应的 next_beam_* tensor 中,而 if beam_idx == self.group_size: 判断意味着,只有前 num_beam 个元素会被加入到这些 tensor 中(这与 next_beam_scores 等变量形状是一样的),而 process 函数返回类型如下:

return UserDict(
     {
         "next_beam_scores": next_beam_scores.view(-1),
         "next_beam_tokens": next_beam_tokens.view(-1),
         "next_beam_indices": next_beam_indices.view(-1),
     }
 )
beam_scores = beam_outputs["next_beam_scores"]
beam_next_tokens = beam_outputs["next_beam_tokens"]
beam_idx = beam_outputs["next_beam_indices"]

这意味着经过 process 函数后,如果每个搜索路径都没有遇到终点, 那么以上三个变量经历的操作类似于:

beam_scores = next_token_scores[:, :num_beams].reshape(-1)
beam_next_tokens = next_tokens[:, :num_beams].reshape(-1)
beam_idx = next_indices[:, :num_beams].reshape(-1)
print(beam_scores)
print(beam_next_tokens)
print(beam_idx)
tensor([ 0.6614,  0.6213,  0.3817, -0.1661, -0.0583, -0.1955])
tensor([0, 3, 2, 0, 1, 2])
tensor([0, 0, 0, 0, 0, 0])

以上结果是合理的,比如所有 tensor 都回到了 (batch_size * num_beams,) 形状,它们表示的都是当前 step 下的各类状态的切片,因此第二个时间维度形状为 1.

更新 input_ids 和 past_key_values

while True:
    # skip part1 model forward
    # skip part2 score calculate

    input_ids = torch.cat([input_ids[beam_idx, :], beam_next_tokens.unsqueeze(-1)], dim=-1)

    model_kwargs = self._update_model_kwargs_for_generation(
        outputs, model_kwargs, is_encoder_decoder=self.config.is_encoder_decoder
    )
    if model_kwargs["past_key_values"] is not None:
        model_kwargs["past_key_values"] = self._reorder_cache(model_kwargs["past_key_values"], beam_idx)

本节的主角是 beam_idx, 当前的例子中,由于是第一次循环,各个样本上最高分的 beam 都来自原始的第一个 beam, 因此是一个全 0 的 tensor, 每个位置的意义表示的是当前的"胜利者" beam 的下标,而只有胜利者路径会参与下一次采样。 在后续的采样中,各个 beam 都有自己的得分,那么其他 beam 可能会有更高的得分,这个时候 beam_idx 就不是全 0, 而可以是 0 到 num_beam-1 中任何一个整数值。

拼接完的 input_ids

input_ids = torch.cat([input_ids[beam_idx, :], beam_next_tokens.unsqueeze(-1)], dim=-1)
print(input_ids)
tensor([[101,   0],
        [101,   3],
        [101,   2],
        [101,   0],
        [101,   1],
        [101,   2]])

接着调用 _update_model_kwargs_for_generation ,该函数上文提到过,后文也继续会提到,这里要知道的它的关键作用就是给 attention_mask 、 token_type_ids 拼接一个 tensor([1]) ,使得长度变长。

最后,和 input_ids 类似,要对 past_key_values 也根据 beam_idx 进行重新选择,只留下"胜利者"所属的历史:

@staticmethod
def _reorder_cache(
    past_key_values: Tuple[Tuple[torch.Tensor]], beam_idx: torch.Tensor
) -> Tuple[Tuple[torch.Tensor]]:
    return tuple(
        tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past)
        for layer_past in past_key_values
    )

该函数是在模型类里具体实现的,比如在 GPT2LMHeadModelGPT2DoubleHeadsModel 中都有。

为什么要对 past_key_values 进行 reorder?

还记得 past_key_values 的形状或者维度吗?它的类型是 (tuple(torch.FloatTensor))

外层 tuple 长度为 config.n_layers ,内部是 key 和 value 对, 维度是 (2, batch_size, num_heads, sequence_length - 1, embed_size_per_head)) 在 beam search 的每一步采样中,会从 beam_num*vocab_size 个 token 中选择概率最高的 beam_num 个 beam, 而这 beam_num*vocab_size 个 token 所在 batch 的编号都是在 0 到 beam_num-1 中,每次选择会打乱 beam 的编号(也就是 batch 的编号),所以需要改变 key 和 value 里 batch 维度的顺序

单路径结束、单样本结束和整个 batch 的结束

前文都是理解 beam search 是如何不断循环采样的,本节则理解算法如何停止,这比如何继续采样更复杂一点,所有逻辑都在 BeamSearchScorer 中,要考虑:

  • 有些 beam 如果先遇到了结束符号,如何和其他 beam 一起继续采样还是自己先停止?

    回到上文提到的 BeamSearchScorer.proces 里遇到 eos 的 if 分支,代码如下:

    # if beam_token does not belong to top num_beams tokens, it should not be added
    is_beam_token_worse_than_top_num_beams = beam_token_rank >= self.group_size
    if is_beam_token_worse_than_top_num_beams:
        continue
    if beam_indices is not None:
        beam_index = beam_indices[batch_beam_idx]
        beam_index = beam_index + (batch_beam_idx,)
    else:
        beam_index = None
    
    beam_hyp.add(
        input_ids[batch_beam_idx].clone(),
        next_score.item(),
        beam_indices=beam_index,
    )
    

    第一行表示如果这个 token 不是在前 num_beam 里,那么不考虑(因为前文把 2*num_beams 个最高分 token 都保留了)。而 beam_indices 默认就是 None, 因此真正执行的是 beam_hyp.add 函数:

    def add(self, hyp: torch.LongTensor, sum_logprobs: float, beam_indices: Optional[torch.LongTensor] = None):
        score = sum_logprobs / (hyp.shape[-1] ** self.length_penalty)
        if len(self) < self.num_beams or score > self.worst_score:
            self.beams.append((score, hyp, beam_indices))
            if len(self) > self.num_beams:
                sorted_next_scores = sorted([(s, idx) for idx, (s, _, _) in enumerate(self.beams)])
                del self.beams[sorted_next_scores[0][1]]
                self.worst_score = sorted_next_scores[1][0]
            else:
                self.worst_score = min(score, self.worst_score)
    

    这里首先计算加了长度惩罚后的得分,length_penalty 越大,对长句得分惩罚越大,越小的话得分鼓励长句(可以小于 0),默认是 1. 接着之后的代码实际是一个最大堆的更新算法(只不过每次更新是重新调用一次排序,而不是进行 heapfy,不过 beam size 一般不大,效率瓶颈不在这里而是在神经网络,因此可能没必要过度优化),self.beams 中最多只能有 num_beams 个元素,并且只保留得分最高的元素,维持 worst_score 是为了快速过滤低分的 beam.

    这里看到 self.beams 保存的是某个样本中已经结束的最高得分的 beam. 在这种情况下,next_beam_* 不会更新,也就是得分已经定格了,不过上一轮的 token 还是会拼接到 input_ids 上 比如,有样本的某个 beam 的 input_ids 是 [1,2,3], 接着它预测出了 102, 这是一个结束符号(eos),那么这条路径会被加入到 beam_hyp.beams 中封存起来,但由于其他 beam 还没有结束,为了保持 tensor 形状,input_ids 变成 [1,2,3,3], 仍然继续采样下去。但最后我们从 beam_hyp 里取最终返回的序列。

  • 某个样本里所有 beam 都结束了,但 batch 里其他样本没结束,如何继续采样?

    相关函数:BeamSearchScorer.is_done

  • 所有样本都结束了,如何返回结果

    相关函数:BeamSearchScorer.finalize

编码解码模型: EncoderDecoderModel 类

EncoderDecoderModel 类似一个协议层,把编码和解码的接口协调起来。

一种灵活初始该类的方式如下,先分别实例化 encoder 和 decoder ,然后将他们合并,这种方式用户可以对各种现有的 encoder (如 Bert) 和 decoder (如 GPT2)进行组合。

model = EncoderDecoderModel(config=None, encoder=encoder, decoder=decoder)

对于 transformer 类模型,编码和解码器的核心连接是 cross attention, 因此后文前四节都是围绕一个问题展开的: "如何定制的交叉注意力 mask", 或者更具体的,如何把 cross attention mask 改成随机 0/1 矩阵,而不影响 encoder 和 decoder 的自注意力机制。

forward 中注意力 mask

EncoderDecoderModel.forward 的参数中,与注意力控制有关的参数是 attention_maskdecoder_attention_mask

其中 attention_mask 分饰两角:

  • 传给 encoder.forwardattention_mask 形参作为编码器自注意力的 mask
  • 传给 decoder.fowardencoder_attention_mask 形参作为解码器的交叉注意力的 mask

也就是说,原生的接口无法区分编码器注意力 mask 和交叉注意力 mask; 如果想要交叉注意力 mask 区别于编码器的自注意力 mask, 有两种处理方法:

  1. 先手动执行 encoder.forward (注意力 mask 自己单独设置)获得编码器的最后一层输出, 再把它作为参数传给 EncoderDecoderModel.forwardencoder_outputs 形参, EncoderDecoderModel.forward 中由于有了 encoder_outputs 就不会再对 encoder 做前向传播,因此 attention_mask 成了专门为交叉注意力定制的参数。
  2. 覆盖 EncoderDecoderModel.foward 函数,在其中添加一个新的 cross_attention_mask 参数,默认为 None,直接把这个参数传给 decoder.foward 里的 encoder_attention_mask, 如下所示:

    self.decoder(input_ids=decoder_input_ids,
                 attention_mask=decoder_attention_mask,
                 encoder_attention_mask=cross_attention_mask)
    

    个人选择第二种方案,因为接口更加统一。

    根据 [BROKEN LINK: 20230812T203211.351626] 中对 invert_attention_mask 函数(调用场景如下)的分析中提到, encoder_attention_mask 可以传入 2 维或者 3 维的 mask, 但前提是 GPT2Model.forward 要求 self.config.add_cross_attention 要为 True, 且给 GPT2 提供了 encoder 的输出层:

    if self.config.add_cross_attention and encoder_hidden_states is not None:
        encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size()
        encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length)
        if encoder_attention_mask is None:
            encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device)
        encoder_attention_mask = self.invert_attention_mask(encoder_attention_mask)
    else:
        encoder_attention_mask = None
    

    以上的 encoder_hidden_states 是在 EncoderDecoderModel.forward 中从 encoder_outputs 变量里获取的:

    encoder_hidden_states = encoder_outputs[0]
    

    于是还需要手动处理的就是把 config.add_cross_attention 设置为 True 。因此要理清把该变量设置为 True 的后果和时机:

    • 由于 GPT2Block 的初始化函数中有以下代码,因此如果在初始化 GPT2LMHeadModel 时就把 add_cross_attention 设置为 True, 那么每个 GPT2Block 都会新增交叉注意力层,此时的 GPT2 和原始 Transformers 里的 decoder 就非常接近了。

      class GPT2Block(nn.Module):
          def __init__(self, config, layer_idx=None):
              # skip
              if config.add_cross_attention:
                  self.crossattention = GPT2Attention(config, is_cross_attention=True, layer_idx=layer_idx)
                  self.ln_cross_attn = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)
      
    • 如果不想把对每一层 GPT2Attention 都加上交叉注意力层,比如只想添加最上层 block 的交叉注意力或者在原 GPT2 结构上新增一个纯交叉注意力模块,那么就不应该在初始化前把该变量打开,这类场景在后文会单独介绍。

接着进入到真正计算交叉注意力的 GPT2Attention 层,其 forward 中有以下判断:

if encoder_hidden_states is not None:
    # add one self-attention block for cross-attention
    if not hasattr(self, "crossattention"):
        raise ValueError(
            f"If `encoder_hidden_states` are passed, {self} has to be instantiated with "
            "cross-attention layers by setting `config.add_cross_attention=True`"
        )
    residual = hidden_states
    hidden_states = self.ln_cross_attn(hidden_states)
    cross_attn_outputs = self.crossattention(
        hidden_states,
        attention_mask=attention_mask,
        head_mask=head_mask,
        encoder_hidden_states=encoder_hidden_states,
        encoder_attention_mask=encoder_attention_mask,
        output_attentions=output_attentions,
    )
    attn_output = cross_at

可以看到,只要提供了编码器的输出就会激活交叉注意力的计算,因此如果只提供了编码器输出,但初始化的时候没有将 config.add_cross_attention 设置为 True, 执行会报错。考虑到刚才提及的只想开启部分 GPT2Block 的需求,我们就需要修改这部分代码,将这类检查操作删除掉,同样这类问题会在后文具体的修改案例中讨论。

generate 中的注意力 mask

上一节介绍了如何修改 EncoderDecoderModel.forward 的函数接口使得能够接受新的 cross_attention_mask 参数,但问题还没有结束,因为我们在推理的时候不会直接调用该 forward 而是调用 generate 我们需要保证 generate 函数会如实地把 cross_attention_mask 参数传递给 EncoderDecoderModel.foward, 而这两者中间实际还隔着一个解码策略函数,如 beam_search.

在默认的 generate 中,并没有显式的 attenton_mask 相关的参数,额外的参数都打包在可变参数 kwargs 字典里,之后该字典会被更新到 model_kwargs 变量中。

首先,进行存在性检查,如果没有检测到 attention_mask 那么会调用 _prepare_attention_mask_for_generation 自动生成:

if model_kwargs.get("attention_mask", None) is None and requires_attention_mask and accepts_attention_mask:
      model_kwargs["attention_mask"] = self._prepare_attention_mask_for_generation(
          inputs_tensor, generation_config.pad_token_id, generation_config.eos_token_id
      )

具体函数实现如下,大体是生成和 inputs 形状一样(或去掉 padding)的全 1 矩阵。

def _prepare_attention_mask_for_generation(
    self,
    inputs: torch.Tensor,
    pad_token_id: Optional[int],
    eos_token_id: Optional[Union[int, List[int]]],
) -> torch.LongTensor:
    is_input_ids = len(inputs.shape) == 2 and inputs.dtype in [torch.int, torch.long]
    is_pad_token_in_inputs = (pad_token_id is not None) and (pad_token_id in inputs)
    if isinstance(eos_token_id, int):
        eos_token_id = [eos_token_id]
    is_pad_token_not_equal_to_eos_token_id = (eos_token_id is None) or (pad_token_id not in eos_token_id)

    # Check if input is input_ids and padded -> only then is attention_mask defined
    if is_input_ids and is_pad_token_in_inputs and is_pad_token_not_equal_to_eos_token_id:
        return inputs.ne(pad_token_id).long()
    else:
        return torch.ones(inputs.shape[:2], dtype=torch.long, device=inputs.device)

如果检测到是 encoder-decoder 模型且参数中没有传入 "encoder_outputs",那么会调用 _prepare_encoder_decoder_kwargs_for_generation 函数对 encoder 做前向传播,因为对于 encoder 来说,并不依赖解码策略,该函数执行后会给 model_kwargs 加入编码器输出层 encoder_outputs

接着对于解码器,调用 _prepare_decoder_input_ids_for_generation 准备解码器的输入。和 attention_mask 有关的代码如下:

def _prepare_decoder_input_ids_for_generation(...,model_kwargs,..):
     # no user input -> use decoder_start_token_id as decoder_input_ids
     if decoder_input_ids is None:
         decoder_input_ids = decoder_input_ids_start
         # skip...
     elif (decoder_input_ids[:, 0] != decoder_start_token_id).all().item():
         decoder_input_ids = torch.cat([decoder_input_ids_start, decoder_input_ids], dim=-1)
         if "decoder_attention_mask" in model_kwargs:
             decoder_attention_mask = model_kwargs["decoder_attention_mask"]
             decoder_attention_mask = torch.cat(
                 (torch.ones_like(decoder_attention_mask)[:, :1], decoder_attention_mask),
                 dim=-1,
             )
             model_kwargs["decoder_attention_mask"] = decoder_attention_mask

     return decoder_input_ids, model_kwargs

这里检查 decoder_input_ids 的第一个字符是不是 start_token, 如果不是,会给解码器输入添加上开始字符,同时添加一个 decoder_attention_mask

该函数并不会删掉放在 model_kwargs 里的 cross_attention_mask, 因此不需要修改。

generate 会调用具体的解码策略函数如 greedy_searchbeam_search 时,这些函数的循环解码中,每一步在真正调用 forwrd 前都会调用 self.prepare_inputs_for_generation, 此时的 self 是 EncoderDecoderModel 而不是前文讲解过的 GPT2LMHeadModel, 该函数实现如下:

def prepare_inputs_for_generation(
    self, input_ids, past_key_values=None, attention_mask=None, use_cache=None, encoder_outputs=None, **kwargs
):
    decoder_inputs = self.decoder.prepare_inputs_for_generation(input_ids, past_key_values=past_key_values)
    decoder_attention_mask = decoder_inputs["attention_mask"] if "attention_mask" in decoder_inputs else None
    input_dict = {
        "attention_mask": attention_mask,
        "decoder_attention_mask": decoder_attention_mask,
        "decoder_input_ids": decoder_inputs["input_ids"],
        "encoder_outputs": encoder_outputs,
        "past_key_values": decoder_inputs["past_key_values"],
        "use_cache": use_cache,
    }
    return input_dict

首先,这里对 input_dict 是白名单模式,因此会删除额外添加的参数,这意味着我们需要覆盖该函数。

其次,以上调用了 self.decoder 的 prepare_inputs_for_generation 这实际触发的是 GPT2LMHeadModel ,它的实现在前文中介绍过,但那时没有提到其返回值,如下:

# GPT2LMHeadModel
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, inputs_embeds=None, **kwargs):
    # skip
    model_inputs.update({
        "past_key_values": past_key_values,
        "use_cache": kwargs.get("use_cache"),
        "position_ids": position_ids,
        "attention_mask": attention_mask,
        "token_type_ids": token_type_ids,
    })
    return model_inputs

可以看到它是无害的,不会过滤掉 generate 里自定义的参数,也不会根据是否有 past_key_values 对注意力进行修剪,但会对 input_ids 等进行修剪。

最后由于在解码函数中,都是用以下方式做前向传播:

outputs = self(
    **model_inputs,
    return_dict=True,
    output_attentions=output_attentions,
    output_hidden_states=output_hidden_states)

self() 调用的是 EncoderDecoderModel.forward, 因此可以覆盖 prepare_inputs_for_generation 函数,在最后的返回 input_dict 中加入 cross_attention_mask

# EncoderDecoderModel
def prepare_inputs_for_generation():
    #skip
    input_dict = {
        "attention_mask": attention_mask,
        "decoder_attention_mask": decoder_attention_mask,
        "cross_attention_mask": kwargs.get("cross_attention_mask", None),
        "decoder_input_ids": decoder_inputs["input_ids"],
        "encoder_outputs": encoder_outputs,
        "past_key_values": decoder_inputs["past_key_values"],
        "use_cache": use_cache,
    }
    return input_dict

还需注意的是,此时的 self() 参数中已经有了编码器的结果。

调用完 forward 后还会执行 _update_model_kwargs_for_generation, 在下一节梳理

交叉注意力 mask 的形状

上节只说到手动传入的交叉注意力可以是 2 维或者 3 维矩阵,但我们还不清楚其各个维度的长度值,对于自注意力,它的 mask 是一个方阵,但交叉注意力并不是,如果编码器输出的长度和解码器长度不同,相应的维度也要不同,本节梳理这部分代码。

GPT2Attention._attn 函数 中分析过注意力计算,对于交叉注意力,只会执行以下语句,对于输入的 attention_mask 可能是掩盖 decoder_inputs 中的 padding 的也可能是掩盖 encoder_inputs 中的 padding 的

if attention_mask is not None:
    # Apply the attention mask
    attn_weights = attn_weights + attention_mask

如果这个 Attention 是交叉层的,那么 attention_mask 就是最外层传入的 cross_attention_mask, 它的维度要和 attn_weights 一致,是 (batch, heads, key_length, query_length),在交叉注意力场景下, key_lengthquery_length 长度是不同的。

最初输入的 cross_attention_mask 如果是一个二维矩阵,那么形状要被展开成 (batch, 1,1 key_length), 如果传入 3 维矩阵则要展开成 encoder_attention_mask[:, None, :, :] 形状, 这些都是在 GPT2Model.forward 调用 ModuleUtilsMixininvert_attention_mask 函数内进行的。

如果要自己额外添加一个用于交叉注意力 GPT2BlockGPT2Model 之上(从 12 层变 13 层)的话,那么需要在调用 block 的 forward 之前把 encoder_attention_mask 进行升维,如下

if encoder_attention_mask is not None:
    encoder_attention_mask = self.invert_attention_mask(encoder_attention_mask)
cross_outputs = self.cross_block(*) # new GPT2Block

否则 attention_mask 是 2 维或 3 维,无法和 attn_weights 相加。

在训练阶段,只需要根据编码器和解码器的长度来构造 mask 即可,它们在一个样本的训练周期里是固定的。 但在推理阶段,在对解码器的 context 进行前向传播的时候(第一次推理), query_length 等于 上下文的长度,这仅仅是在做上下文特征填充,然而在每次采样出新的 token 后,只会计算最后一个 token 的特征,并且将该特征的 key value 与 past_key_value 拼接,query 的长度增加了 1,于是 encoder_attention_mask 的尺寸需要是 (1,1,query_length + 1,key_length), 那么这个过程中如何动态调整 encoder_attention_mask

负责处理的是 _update_model_kwargs_for_generation 函数,它来自 GenrerationMixin 类,而不是 GPT2Model 或者 EncoderDecoderModel, 其中处理 attention 代码如下:

def _update_model_kwargs_for_generation(
    self,
    outputs: ModelOutput,
    model_kwargs: Dict[str, Any],
    is_encoder_decoder: bool = False,
    standardize_cache_format: bool = False,
) -> Dict[str, Any]:
    # skip
    if not is_encoder_decoder:
        # update attention mask
        if "attention_mask" in model_kwargs:
            attention_mask = model_kwargs["attention_mask"]
            model_kwargs["attention_mask"] = torch.cat(
                [attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))], dim=-1
            )
    else:
        # update decoder attention mask
        if "decoder_attention_mask" in model_kwargs:
            decoder_attention_mask = model_kwargs["decoder_attention_mask"]
            model_kwargs["decoder_attention_mask"] = torch.cat(
                [decoder_attention_mask, decoder_attention_mask.new_ones((decoder_attention_mask.shape[0], 1))],
                dim=-1,
            )

    return model_kwargs

可以看到,它每次对上一次的 decoder_attention_mask 最后一维拼接一个全 1 的向量,因此如果要控制生成部分的 mask, 需要覆盖这个函数,比如把 new_ones 改成 random 之类的函数。然而这里没有处理 cross_attention_mask 的机制,

要注意的是,这里适合的 attenion_mask 应该是 2 维的:

decoder_attention_mask = torch.tensor([[1,2,3]])
print(decoder_attention_mask.new_ones((decoder_attention_mask.shape[0], 1)))
torch.cat([decoder_attention_mask,
           decoder_attention_mask.new_ones((decoder_attention_mask.shape[0], 1))],
           dim=-1)
tensor([[1]])
tensor([[1, 2, 3, 1]])

对于 3 维无法拼接:

decoder_attention_mask = torch.tensor([[[1,2],[3,4]]])
torch.cat([decoder_attention_mask,
           decoder_attention_mask.new_ones((decoder_attention_mask.shape[0], 1))],
          dim=-1)

RuntimeErrorTraceback (most recent call last)
<ipython-input-54-01886092c5ce> in <module>
      1 decoder_attention_mask = torch.tensor([[[1,2],[3,4]]])
----> 2 torch.cat([decoder_attention_mask,
      3            decoder_attention_mask.new_ones((decoder_attention_mask.shape[0], 1))],
      4           dim=-1)

RuntimeError: Tensors must have same number of dimensions: got 3 and 2

因此实际在这个函数中不能用拼接的方式定制精细的叉注意力 mask。只能直接给出一个形状为 (batch,length) 或 (1,length) 的注意力矩阵(比如随机矩阵)。

另外一种做法是修改 prepare_inputs_for_generation, 因为每次调用 EncoderDecoderModel 或 DecoderModel 的 forward 前都会执行该函数,可以通过额外传入一个 idx 的方式来指示这是第几次前向传播,如果是第 0 次,那么就只如上一节对该函数的扩充一样,在返回结果中直接获得:

"cross_attention_mask": kwargs.get(
            "cross_attention_mask", None
        ),  # add custom cross_attention

否则根据 idx 更新,类似以下形式:

idx = kwargs.get("idx", -1)
cross_attention_mask = kwargs.get("cross_attention_mask", None)
if idx > 0 and attention_mask is not None:
    cross_attention_mask = torch.ones((1, attention_mask.shape[1])).to(
        attention_mask.device
    )
    cross_attention_mask[:, : idx - 1] = 0

另外,如果传入的 cross_attention_mask 为 None, 且所有交叉注意力通路都开启了,那么就不会对交叉注意力权重进行 mask, 也就是所有时间步的编码都会被考虑。

注意力 mask 的完整路径

本节对从 EncoderDecoder 的训练和解码过程 attention_mask 做一次完整的梳理,

首先调用 generate

  1. 如果没有声明 attention_mask 那么会调用 _prepare_attention_mask_for_generation 生成一个大小和 input_ids 一样的除 padding token 位置为 0 ,其余都为 1 的矩阵
  2. 调用 _prepare_decoder_input_ids_for_generation, 检查输入第一个元素是否是 start_token, 如果不是,那么补上一个字符以及该字符对应的 decoder_attention_mask
  3. 在 greedy 或 beam 解码循环中,调用 EncoderDecoderModel 的 prepare_inputs_for_generation 函数,实质是调用具体 decoder 的 prepare_inputs_for_generation 函数,然后用白名单过滤额外参数,只留下 "attention_mask" 和 "decoder_attention_mask", 然而顾名思义,不管是编码解码器还是单独解码器的该函数都不会对 attention_mask 做修改操作,他们是处理 inputs 的,尤其是作为优化,每次解码时只要对新生成的最后一个 token 进行向前传播。

    如果要在 forward 中传入额外的参数,需要修改 EncoderDecoderModel 的 prepare_inputs_for_generation, 使得该参数不会被白名单过滤掉。

  4. 接着就进入到了 EncoderDecoder 的 forward 函数(如果死训练则直接从该函数开始)

    它把 decoder_attention_mask 分配给 decoder 的 attenion_mask 参数, attention_mask 则分配给 decoder 的 encoder_attention_mask 参数和 encoder 的 attenion_mask.

  5. decoder 以 GPT2LMHeadModel 为例,以上两个 mask 参数原封不动继续传给 GPT2Model.forward,
    • 先对 attention_mask 做升维度(只能从 2 维到 4 维)
    • 对 encoder_attention_mask 调用 ModuleUtilsMixin 的 invert_attention_mask 做升维度(从 2 或 3 维到 4 维),前提是 self.config.add_cross_attention 为 True
  6. 接着进入 GPT2Block, 原封不动传给 GPT2Attention.forward, 注意该函数的签名里同时可以接收以上两种 mask, 其中:
    • 如果 encoder_hidden_states 不为 None, 那么 attention_mask 参数会被 encoder_attention_mask 覆盖掉(同时 k v 也来自 encoder_hidden_states),
    • 如果是自注意力层,则直接使用 attention_mask
    • 然后都是把 q,k,v 连同 attention_mask 传给 GPT2Attention._attn
  7. 在 GPT2Attention._attn 中,如果是不是交叉注意力层,会 hardwire 计算一个下三角(或一行) causal_mask 做自注意力计算。并且只要 attention_mask 不空,最终都执行:

    attn_weights = attn_weights + attention_mask
    
  8. 如果在 generate 中继续生成下一个 token, 那么还会调用 GenerationMixin 的 _update_model_kwargs_for_generation 函数,如果不是 encoder_decoder 模型,那么对 attention_mask 的长度增加一,否则对 decoder_attention_mask 长度增加 1. 然后回到步骤 3

input_ids, label_ids 的对齐问题

解码阶段

由于训练场景遵循和解码一样的输入要求,因此先考虑解码场景。

GenerationMixin.generate 函数有一个 inputs 参数,它会根据是纯解码器模型调用还是编码解码器调用来决定它是给编码器还是解码器的,纯解码器结构,它对应的就是解码器的 input_ids, 对于编码解码结构则默认是给编码器的输入序列。

不过为了防止混乱,可以不使用该参数,而是手动指定 input_ids 和 decoder_input_ids 明确表示是分别给编码器和解码器的,如下:

output_ids = model.generate(
            input_ids=encoder_input_ids
            pad_token_id=decoder_tokenizer.pad_token_id,
            eos_token_id=decoder_tokenizer.eos_token_id,
            decoder_input_ids=decode_input_ids
            logits_processor=processors,)

相关函数执行顺序:

  1. _prepare_model_inputs: 它是处理编码器的输入的函数, 其核心代码如下:

    inputs_tensor, model_input_name, model_kwargs = self._prepare_model_inputs(
                inputs, generation_config.bos_token_id, model_kwargs)
    

    注意 _prepare_model_inputs 输入的是 generate 的 inputs 参数,但是在 _prepare_model_inputs 中有一段代码如下:

    model_kwargs = {k: v for k, v in model_kwargs.items() if v is not None or k != input_name}
    inputs_kwarg = model_kwargs.pop(input_name, None)
    if inputs_kwarg is not None and inputs is not None:
        raise ValueError(
            f"`inputs`: {inputs}` were passed alongside {input_name} which is not allowed."
            f"Make sure to either pass {inputs} or {input_name}=..."
        )
    elif inputs_kwarg is not None:
        inputs = inputs_kwarg
    

    这里由于 input_ids 是在 model_kwargs 中的,而编码器的 input_name 是 input_ids, 因此 inputs 就被赋值成了 input_ids

    接着,_prepare_model_inputs 会调用 _maybe_initialize_input_ids_for_generation, 其核心函数如下:

    if inputs is not None:
        return inputs
    
    encoder_outputs = model_kwargs.get("encoder_outputs")
    if self.config.is_encoder_decoder and encoder_outputs is not None:
        # make dummy input_ids with value -100, as a sanity check ensuring that they won't be used for encoding
        shape = encoder_outputs.last_hidden_state.size()[:-1]
        return torch.ones(shape, dtype=torch.long, device=self.device) * -100
    
    if bos_token_id is None:
        raise ValueError("`bos_token_id` has to be defined when no `input_ids` are provided.")
    
    #skip some codes
    return torch.ones((batch_size, 1), dtype=torch.long, device=self.device) * bos_token_id
    

    可以看到,如果用户输入了 inputs 或 (inputs_ids) 那么直接返回,但是如果用户手动传入了编码器输出(而不是自动执行 encoder 得到的),那么会生成一个全是 -100 的矩阵,形状是 encoder 的 (batch, seq_len), 这里完全是作为一个 placeholder, 因为既然有了 encoder_outputs, input_ids 是不需要执行的。

    更重要的是 encoder_outputs 没有给定的情况下,会返回一个 (batch, 1) 形状的 bos_token_id 矩阵,这里说明,编码器第一个元素默认需要一个初始 token, generation_config.bos_token_id 默认是 101 也就是 [CLS], 而只要用户给定了 input_ids,那么不会对其进行修改。

    另外,如果当前不是 EncoderDecoderModel 而是 GPT2LMHeadModel 调用 generate, 那么在没有输入的情况下,会自动使用 CLS 作为输入, 这是合理的,因为要预测第一个元素必须要有一个启动指示 token. 但如果 inputs 不是空,那么不会做处理。

  2. 接着是 _prepare_decoder_input_ids_for_generation 来对 decoder 的输入进行处理,调用接口如下:

    if self.config.is_encoder_decoder:
       input_ids, model_kwargs = self._prepare_decoder_input_ids_for_generation(
                       batch_size=batch_size,
                       model_input_name=model_input_name,
                       model_kwargs=model_kwargs,
                       decoder_start_token_id=generation_config.decoder_start_token_id,
                       bos_token_id=generation_config.bos_token_id,
                       device=inputs_tensor.device,
                   )
    

    首先,要注意,只有 encoder decoder 结构才会触发以上函数,如果直接是 GPT2LMHeadModel 调用 generate 并不会做这个处理。

    另外 generation_config.decoder_start_token_id 和 generation_config.bos_token_id 一般都是 CLS, 在构建模型的时候,还可以用以下语句来设置 token

    decoder.config.bos_token_id = decoder_tokenizer.cls_token_id
    

    进入该函数后,首先有一个参数优先级判断:

    if model_kwargs is not None and "decoder_input_ids" in model_kwargs:
        decoder_input_ids = model_kwargs.pop("decoder_input_ids")
    elif "input_ids" in model_kwargs and model_input_name != "input_ids":
        decoder_input_ids = model_kwargs.pop("input_ids")
    else:
        decoder_input_ids = None
    

    这里, decoder_input_ids 是比 input_ids 高的,选用 decoder_input_ids 作为解码输入。 然后以下代码给现有的输入都加上了 CLS token

     #skip
     decoder_input_ids_start = torch.ones((batch_size, 1), dtype=torch.long, device=device) * decoder_start_token_id
    if decoder_input_ids is None:
       decoder_input_ids = decoder_input_ids_start
    elif (decoder_input_ids[:, 0] != decoder_start_token_id).all().item():
       decoder_input_ids = torch.cat([decoder_input_ids_start, decoder_input_ids], dim=-1)
       #skip
    

    总的来说, generate 中同时输入 input_ids 和 decoder_input_ids 的话, input_ids 是给编码器的,如果不是 None, 那么不会对这个输入有任何修改,decoder_input_ids 则是给解码器的,如果没有手动添加 CLS, 那么 generate 会自动添加 CLS, 等进入到 beam_search 或 greey_search 之后,decoder 输入会自动添加 101.

    但是对于纯 decoder 模型,generate 中输入的 input_ids 就是给解码器的,如果它是 None ,那么会被 _maybe_initialize_input_ids_for_generation 修正为 CLS. 当然一般通过 tokenizer 的 encode 或 encoder_plus 对序列进行编码时会自动在开头加上 CLS 结尾加上 SEP.

    在 generate 函数中,不会对 position_ids 进行处理,它是在具体的模型里准备的,因此放到训练阶段来说明

训练阶段

考虑训练的场景,首先对于编码器,EncoderDecoderModel 不会对 encoder 的输入进行任何进行额外处理,就是把 input_ids 传给 encoder 的 forward.

对于解码器,由于 GPT2 采用自监督的学习方式,因此训练的时候,理论上只给定 labels_ids, 例如假设在训练中只给定了 lable_ids 为 [1,2,3], 那么模型应该可以自动计算出 input_is 为 [101, 1, 2]

在 EncoderDecoderModel 的 forward 中计算完 encoder 后有以下代码 :

if (labels is not None) and (decoder_input_ids is None and decoder_inputs_embeds is None):
    decoder_input_ids = shift_tokens_right(labels, self.config.pad_token_id, self.config.decoder_start_token_id)

    # Decode
decoder_outputs = self.decoder(
    input_ids=decoder_input_ids,
    attention_mask=decoder_attention_mask,
    encoder_hidden_states=encoder_hidden_states,
    encoder_attention_mask=attention_mask,
    inputs_embeds=decoder_inputs_embeds,
    output_attentions=output_attentions,
    output_hidden_states=output_hidden_states,
    use_cache=use_cache,
    past_key_values=past_key_values,
    return_dict=return_dict,
    **kwargs_decoder,
)

# Compute loss independent from decoder (as some shift the logits inside them)
loss = None
if labels is not None:
    warnings.warn(DEPRECATION_WARNING, FutureWarning)
    logits = decoder_outputs.logits if return_dict else decoder_outputs[0]
    loss_fct = CrossEntropyLoss()
    loss = loss_fct(logits.reshape(-1, self.decoder.config.vocab_size), labels.view(-1))

总的来看有三步:构造出 decoder_input_ids (如果没有传入的话), decoder.forward, 计算损失。

此处的注意点是:

  • 不使用 decoder_outputs 里的损失,而是重新计算一遍。

接着来看第一步:在调用 decoder.forward 前,如果只给定 labels, 那么先调用 shift_tokens_right 函数构造出解码器输入,其实现如下

def shift_tokens_right(input_ids: torch.Tensor, pad_token_id: int, decoder_start_token_id: int):
    """
    Shift input ids one token to the right.
    """
    shifted_input_ids = input_ids.new_zeros(input_ids.shape)
    shifted_input_ids[:, 1:] = input_ids[:, :-1].clone()
    if decoder_start_token_id is None:
        raise ValueError("Make sure to set the decoder_start_token_id attribute of the model's configuration.")
    shifted_input_ids[:, 0] = decoder_start_token_id

    if pad_token_id is None:
        raise ValueError("Make sure to set the pad_token_id attribute of the model's configuration.")
    # replace possible -100 values in labels by `pad_token_id`
    shifted_input_ids.masked_fill_(shifted_input_ids == -100, pad_token_id)

    return shifted_input_ids

该函数先构造一个形状同 labels 的全 0 矩阵 shifted_input_ids ,假设 labels 长度都是 n,那么 shifted_input_ids 后 n-1 个元素会被赋予 labels 前 n-1 个元素的值,而第一个值赋值为 CLS 编码(101),还是以之前的例子来看的话,得到的是 [101,1,2], 最后一句还对 shifted_input_ids 中值为 -100 的 token_id 重新赋值为 pad_token_id, label 中的 -100 是告诉损失函数不计算该 token 处的损失的。因此如果 labels 是 [1,2,3,-100,4], 同时没有给定 decoder_input_ids, 那么会得到 [101, 1,2,3,0]

最后进入到 decoder 前向传播部分,首先 decoder_input_ids 被赋值给了 input_ids, 而 GPT2LMHeadModel 的 forward 中没有对 input_ids 的处理,但在执行完 self.transformerself.lm_head(hidden_states) 后会对 得到的 logits 做以下处理并计算损失:

if labels is not None:
    # move labels to correct device to enable model parallelism
    labels = labels.to(lm_logits.device)
    # Shift so that tokens < n predict n
    shift_logits = lm_logits[..., :-1, :].contiguous()
    shift_labels = labels[..., 1:].contiguous()
    # Flatten the tokens
    loss_fct = CrossEntropyLoss()
    loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))

从这里可以看到 labels 会被截断,所以如果它是 [1,2,3,-100,4] 形式,shift_labels 就是 [2,3,-100, 4] shift_logits 对应的 id 就是 [1, 2, 3 0]. 可以看到这里无法计算到预测第一个 token 的损失。好在上文知道 EncoderDecoderModel 不会用此处计算的损失。

因此这里的注意点是:

  • 如果直接基于 GPT2LMHeadModel 训练,那么输入的 labels 应该手动加上 CLS, 并且 input_ids 应该是和 labels 一样的,这样损失才是合理的。
  • 如果 GPT2LMHeadModel 只是作为 EncoderDecoderModel 中的解码器,那么 labels 应该是不带 CLS 的, 如果不希望 labels 为 -100 位置对应的 decoder_input_ids 变成 0, 可以手动设置 decoder_input_ids, 其第一个元素得是 CLS, 剩余为 labels[:-1]

最后,对于 position_ids, 会在 GPT2Model.forward 中计算(也就是输入 embedding 层之前),如下:

if position_ids is None:
    position_ids = torch.arange(past_length, input_shape[-1] + past_length, dtype=torch.long, device=device)
    position_ids = position_ids.unsqueeze(0).view(-1, input_shape[-1])

如何开启 GPT2Model 的最后 k 层交叉注意力

在理解了上述概念之后,本节将介绍如何将这些知识应用于实际模型修改的场景中:考虑要把图片的特征加入到 GPT2, 然后微调模型进行看图说话(image caption)的功能。一种比较直接的想法是把 GPT2Model 中的最上面几层 GPT2Block 中的 交叉注意力层打开,使得它可以去对图片编码器做 attention, 融合图片特征。

默认的 GPT2Model 中初始化 GPT2Block 的方式如下:

class GPT2Model(GPT2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        #skip
        self.h = nn.ModuleList([GPT2Block(config, layer_idx=i) for i in range(config.num_hidden_layers)])

这里,所有的 GPT2Block 都用的是同一个 config, 而 GPT2Block 里决定是否添加 crossattention 层的初始化代码如下:

class GPT2Block(nn.Module):
    def __init__(self, config, layer_idx=None):
        # skip
        if config.add_cross_attention:
            self.crossattention = GPT2Attention(config, is_cross_attention=True, layer_idx=layer_idx)
            self.ln_cross_attn = nn.LayerNorm(hidden_size, eps=config.layer_norm_epsilon)

如果 config 中 add_cross_attention 为 False 的话,所有 GPT2Block 都是不带交叉注意力的。

那么为了能够灵活设置,可以初始化模型后,直接修改最上面几层的 GPT2Block, 给他们添加额外的层,如下:

decoder = GPT2LMHeadModel.from_pretrained(model_name_or_path)
for param in decoder.parameters():
    param.requires_grad = False
config = decoder.config

for layer_idx in range(
    config.num_hidden_layers - 1,
    config.num_hidden_layers - open_layers - 1,
    -1,
):
    decoder.transformer.h[layer_idx].crossattention = GPT2Attention(
        config, is_cross_attention=True, layer_idx=layer_idx
    )
    decoder.transformer.h[layer_idx].ln_cross_attn = nn.LayerNorm(
        config.hidden_size, eps=config.layer_norm_epsilon
    )

这样做的好处是完全不会影响读取预训练参数。

EncoderDecoderModel.forward 中注意力 mask 中提到,在 GPT2Block.forward 中,只要 encoder_hidden_states 不是 None 就会触发交叉注意执行,并且如果发现本 block 中没有交叉注意力层,直接报错。

这个对于本节的场景就过于严格了,假设只对最后一层打开交叉注意力,因为 encoder_hidden_states 是要给每一层都传递的,因此前 11 层不应该报错,只是不执行,因此要把 GPT2Block.forward 覆盖,把以下代码

if encoder_hidden_states is not None:
    # add one self-attention block for cross-attention
    if not hasattr(self, "crossattention"):
        raise ValueError(
            f"If `encoder_hidden_states` are passed, {self} has to be instantiated with "
            "cross-attention layers by setting `config.add_cross_attention=True`"
        )

改成:

if encoder_hidden_states is not None and hasattr(self, "crossattention"):: 

一种做法如下,gpt2block_loose_forward 就是修改后的 forward 函数

decoder = GPT2LMHeadModel.from_pretrained(model_name_or_path)
for layer_idx in range(config.num_hidden_layers):
    decoder.transformer.h[layer_idx].forward = gpt2block_loose_forward

注意,尽管 GPT2Model.forward 中有以下代码,但由于我们不对 encoder_attention_mask 进行定制,保持为 None 就可以保证交叉注意力会作用到所有 encoder 输出上,因此不需要覆盖 GPT2Model.forward

if self.config.add_cross_attention and encoder_hidden_states is not None:
    #skip
    encoder_attention_mask = self.invert_attention_mask(encoder_attention_mask)
else:
    encoder_attention_mask = None

交叉熵损失

前文中我们都是在分析前向传播的过程以及具体的模型实现细节,本节讨论 GPT2LMHeadModel 的训练中需要考虑的损失计算问题,这实际是语言模型的核心,也就是说 GPT 这样的模型到底在对什么建模的问题。

这里再次引用本文开篇的结构图,图 b) 中最上方的 CrossEntropy 就本节的关注对象

llm_transformer_decoder.svg

交叉熵损失计算

交叉熵损失可以认为是处于 softmax 之上的最后一层网络结构,但由于标签是 one-hot 编码(分布),在计算交叉熵 \( H(p,q) = -\sum_{i} q(i)\log (p(i)) \) 的时候,标签 q 只有一个是 1 其他都是 0, 假设当前时刻第 i 个 token 为真实值,那么交叉熵就等于: \[ -\log (p_{i}) = -\log (\frac{\exp (o_{i})}{\sum_{j=0}^{n}\exp (o_{j})}) = -o_{i} + \log \sum \exp (o_{j})\]

其中 o 是上图最后 Linear 层的输出,代码中一般用 logits 变量表示。该公式意味着,计算完线性层后不需要对所有输出都进行归一化,只需要对所有 logits 计算指数、求和再取对数(称为 logsumexp),这样做的好处是,我们不需要单独维护 softmax 层的输出,而是从 logits 直接计算出损失,节省了 softmax 中间层的存储。

接着考虑反向传播,计算出以上式子的梯度得到: \( p - q \) ,假设第 i 个 token 是标签,那么该位置的梯度是 \( p_{i} - 1 \), 对于不等于 i 的其他位置 j, 梯度就是 \( p_{j} \). 因此在反向传播时还是要计算出 softmax, 但计算完成后可以马上释放掉,不需要持久存储。

前文提到过,在 EncoderDecoderModel 的 forward 中计算了损失,如下:

if labels is not None:
    # move labels to correct device to enable model parallelism
    labels = labels.to(lm_logits.device)
    # Shift so that tokens < n predict n
    shift_logits = lm_logits[..., :-1, :].contiguous()
    shift_labels = labels[..., 1:].contiguous()
    # Flatten the tokens
    loss_fct = CrossEntropyLoss()
    loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))

这里 CrossEntropyLoss() 用的都是默认参数,它返回的是平均的交叉熵损失。 labels 是一个 batch 并且其中可能有许多是 -100. 它告诉 CrossEntropyLoss 应该忽略该损失。注意 -100 是 pytorch 的 CrossEntropyLoss 默认约定的,而不是 huggingface 模型约定的,当然你可以通过 CrossEntropyLoss(ignore_index=-101) 来修改这个值(几乎没有必要)

比如以下的例子,输入的 labels 的 batch 是 2, 对齐长度后是 5, 第一个句子实际长度是 2, 最后三个 token 都是 -100,

1 2 -100 -100 -100
5 3 11 29 4

那么这个 batch 计算出来的 loss 实际是 10 个 token 里 7 个有效的 token 的损失的平均值。可以用简单的例子来验证: 以下例子最后三个被忽视:

import torch
import torch.nn as nn

labels = torch.tensor([1, 2, -100, -100, -100])
logits = torch.tensor([[0.1, 0.2, 0.3],
                       [0.4, 0.5, 0.6],
                       [0.7, 0.8, 0.9],
                       [1.0, 1.1, 1.2],
                       [1.3, 1.4, 1.5]])

loss_fn = nn.CrossEntropyLoss()

print("平均损失:", loss_fn(logits, labels).item())
print("平均损失::", loss_fn(logits[:2, :], labels[:2]).item())
损失: 1.0519428253173828
损失: 1.0519428253173828

可以看到,计算完整五个 token 的平均损失和只计算前两个有效 token 损失是一样的。

如果把 reduction 改成 "sum" 则返回有效 token 的损失的和:

loss_fn = nn.CrossEntropyLoss( reduction="sum")

print("总损失:", loss_fn(logits, labels).item())
print("总损失::", loss_fn(logits[:2, :], labels[:2]).item())
总损失: 2.1038856506347656
总损失:: 2.1038856506347656

如果把 reduction 改成 "none" 则返回每个 token 的损失,被 mask 的 token 损失为 0:

loss_fn = nn.CrossEntropyLoss(reduction="none")

print("每个 token 的损失:", loss_fn(logits, labels))
print("每个 token 的损失::", loss_fn(logits[:2, :], labels[:2]))
每个 token 的损失: tensor([1.1019, 1.0019, 0.0000, 0.0000, 0.0000])
每个 token 的损失:: tensor([1.1019, 1.0019])

注意平均损失不是每个 token 损失求和后除以损失长度,而是除以有效的 token 个数

loss_fn = nn.CrossEntropyLoss(reduction="none")
print("总损失:", sum(loss_fn(logits, labels)))
print("平均损失:", sum(loss_fn(logits, labels))/sum(loss_fn(logits, labels)>0))
总损失: tensor(2.1039)
平均损失: tensor(1.0519)

Perplexity 的量纲

Perplexity 可以认为是交叉熵的指数,以 2 或者 e 为底,但因为求 softmax 是以 e 为底,因此这里也需要统一:

\( \text{PPL} = e^{H(p,q)} \)

由于单个 token 的交叉熵是 \( -log(p_i) \), 因此对于某个 token 预测的 PPL 就等于 \( \frac{1}{p_{i}} \), 如果是均匀分布,那么 1/p 就是字典里词的个数,这可以看作模型试图预测下一个词时,它的不确定性大致相当于从所有词中随机选择一个词。 而如果 PPL 很低,意味着模型更加确定其预测,就像贾岛只需要推敲是用 “鸟宿池边树,僧敲月下门。” 还是 “鸟宿池边树,僧推月下门。“,可以认为贾岛在预测这个词此时的 PPL 等于 2.

上一节我们计算出的是 token 级别的平均损失,那么此时的 Perplexity 就可以理解成在每个时刻模型考虑的"合理的" token 的平均个数。因此该场景下 PPL 的量纲就是 token 数。

本节只是从交叉熵损失延伸出对 ppl 的一种理解,很不严格,一般 PPL 是句子级别的估计值而不是以上提到的词级别的。

另外 PPL 是一种内部指标,可以看作一种开发者用的调试工具 (dev-tool), 训练语言模型的时候可以关注该值是否稳定下降,但最终决定语言模型好坏的还是终端任务上的外部指标。

如要深入理解 PPL 可以阅读:

Exposure bias 问题

在训练阶段,采用的是 teacher forcing 的方式,也就是说,每次都是提供正确的上下文给 GPT2 看,它只需要预测接下来的一个 token, 因此即便是再长的输入,训练时 GPT2 本质都是在做一个个多分类任务。 但在预测的时候,我们实际想要解决的是序列的最优解码问题,或者让模型生成我们所期望的某些特点的句子(如悲伤或是快乐)

因此,严格来说,预训练和预测实际是在做两个不同的任务。针对这类问题的总结可以参考: Controllable Neural Text Generation | Lil'Log

编程实践技巧拾遗

本节列出在看代码中遇到的一些工程上技巧性代码:

  • init 中用自省模块来检查函数的参数,从而限制可选参数的范围:
decoder_signature = set(inspect.signature(self.decoder.forward).parameters.keys())
if "encoder_hidden_states" not in decoder_signature:
    raise ValueError(
        "The selected decoder is not prepared for the encoder hidden states to be passed. Please see the "
        "following discussion on GitHub: https://github.com/huggingface/transformers/issues/23350"
    )
  • 对数据处理预留接口: 比如以下是 beam_search 函数中的处理当前 token 得分的一行,作者预留了一个 adjust_logits_during_generation 函数,如果用户想要对 logits 再做一些修改,则可以手动实现该函数。(不过后续版本又删除了该接口)

    next_token_logits = self.adjust_logits_during_generation(next_token_logits, cur_len=cur_len)
    

如对本文有任何疑问,欢迎通过 github issue 邮件 metaescape at foxmail dot com 进行反馈