以下是一个基于PyTorch实现Transformer模型的简单示例代码,并对每个步骤进行了详细的注释。请注意,这个示例主要着重于模型的实现,不包括数据处理和训练部分。
import torch import torch.nn as nn import torch.nn.functional as F import math # 定义位置编码器 class PositionalEncoder(nn.Module): def __init__(self, d_model, max_len=512): super(PositionalEncoder, self).__init__() self.d_model = d_model self.max_len = max_len # 计算位置编码表 pe = torch.zeros(max_len, d_model) # 创建一个形状为(max_len, d_model)的零张量pe,用于存储位置编码 position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # 创建一个形状为(max_len, 1)的张量,表示位置 div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # 计算位置编码中的分母部分 pe[:, 0::2] = torch.sin(position * div_term) # 计算位置编码中偶数位置的值 pe[:, 1::2] = torch.cos(position * div_term) # 计算位置编码中奇数位置的值 pe = pe.unsqueeze(0) # 在第0维上增加一维,用于处理批次数据 self.register_buffer('pe', pe) # 将位置编码表pe注册为模型的缓冲区 def forward(self, x): # 输入x的维度为(batch_size, seq_len, d_model) x = x * math.sqrt(self.d_model) # 对输入乘以一个缩放因子,以便缓解梯度消失问题 seq_len = x.size(1) # 获取输入序列的长度 # 将位置编码添加到输入中 x = x + self.pe[:, :seq_len] # 在对应位置添加位置编码 return x # 定义多头注意力机制 class MultiHeadAttention(nn.Module): def __init__(self, d_model, num_heads): super(MultiHeadAttention, self).__init__() assert d_model % num_heads == 0, "d_model必须被num_heads整除" self.d_model = d_model self.num_heads = num_heads self.head_dim = d_model // num_heads # 每个头的维度 # 定义线性变换层 self.W_q = nn.Linear(d_model, d_model) # 查询向量的线性变换层 self.W_k = nn.Linear(d_model, d_model) # 键向量的线性变换层 self.W_v = nn.Linear(d_model, d_model) # 值向量的线性变换层 self.W_o = nn.Linear(d_model, d_model) # 输出向量的线性变换层 def forward(self, query, key, value, mask=None): batch_size = query.size(0) # 获取批次大小 # 将输入的query、key、value通过线性变换得到Q、K、V Q = self.W_q(query).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) # 计算查询向量Q K = self.W_k(key).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) # 计算键向量K V = self.W_v(value).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2) # 计算值向量V # 计算注意力分数 attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim) # 计算注意力分数 if mask is not None: attention_scores = attention_scores.masked_fill(mask == 0, float('-inf')) # 使用mask处理注意力分数 attention_weights = F.softmax(attention_scores, dim=-1) # 计算注意力权重 # 计算注意力值 attention_output = torch.matmul(attention_weights, V) # 计算注意力值 attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) # 调整注意力值的形状 # 经过线性变换得到最终输出 output = self.W_o(attention_output) # 最终输出 return output # 定义前向传播层 class FeedForward(nn.Module): def __init__(self, d_model, d_ff): super(FeedForward, self).__init__() self.d_model = d_model self.d_ff = d_ff # 定义两个线性变换层 self.linear1 = nn.Linear(d_model, d_ff) # 第一个线性变换层 self.linear2 = nn.Linear(d_ff, d_model) # 第二个线性变换层 def forward(self, x): x = F.relu(self.linear1(x)) # 使用ReLU激活函数进行非线性变换 x = self.linear2(x) # 进行第二个线性变换 return x # 定义一个Transformer模型 class Transformer(nn.Module): def __init__(self, d_model, num_heads, d_ff, num_layers): super(Transformer, self).__init__() self.d_model = d_model self.num_heads = num_heads self.d_ff = d_ff self.num_layers = num_layers # 定义多个编码器层 self.encoder_layers = nn.ModuleList([ nn.ModuleList([ MultiHeadAttention(d_model, num_heads), nn.LayerNorm(d_model), FeedForward(d_model, d_ff), nn.LayerNorm(d_model) ]) for _ in range(num_layers) ]) def forward(self, src, mask=None): x = src # 通过多个编码器层进行前向传播 for i in range(self.num_layers): # 多头注意力层 attention = self.encoder_layers[i][0] norm1 = self.encoder_layers[i][1] x = x + attention(x, x, x, mask=mask) x = norm1(x) # 前向传播层 feed_forward = self.encoder_layers[i][2] norm2 = self.encoder_layers[i][3] x = x + feed_forward(x) x = norm2(x) return x # 测试Transformer模型 if __name__ == "__main__": # 假设输入维度为(16, 20, 512),即(batch_size, seq_len, d_model) input_tensor = torch.randn(16, 20, 512) transformer_model = Transformer(d_model=512, num_heads=8, d_ff=2048, num_layers=6) output = transformer_model(input_tensor) print(output.shape) # 输出:torch.Size([16, 20, 512])
以下是longfromer的pytorch实现版本
import torch import torch.nn as nn import torch.nn.functional as F import math # 定义位置编码器 class PositionalEncoder(nn.Module): def __init__(self, d_model, max_len=512): super(PositionalEncoder, self).__init__() self.d_model = d_model self.max_len = max_len # 计算位置编码表 pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0) self.register_buffer('pe', pe) def forward(self, x): x = x * math.sqrt(self.d_model) seq_len = x.size(1) x = x + self.pe[:, :seq_len] return x # 定义局部注意力机制 class LocalAttention(nn.Module): def __init__(self, d_model, local_window): super(LocalAttention, self).__init__() self.d_model = d_model self.local_window = local_window self.attention = nn.MultiheadAttention(d_model, 1) def forward(self, x, mask=None): batch_size, seq_len, _ = x.size() local_mask = torch.zeros(seq_len, seq_len) for i in range(seq_len): local_mask[i, max(0, i - self.local_window):i + self.local_window + 1] = 1 local_mask = local_mask.unsqueeze(0).to(x.device) local_mask = local_mask * mask.unsqueeze(1) if mask is not None else local_mask return self.attention(x.permute(1, 0, 2), x.permute(1, 0, 2), x.permute(1, 0, 2), key_padding_mask=local_mask) # 定义Longformer模型 class Longformer(nn.Module): def __init__(self, d_model, num_heads, d_ff, num_layers, local_window): super(Longformer, self).__init__() self.d_model = d_model self.num_heads = num_heads self.d_ff = d_ff self.num_layers = num_layers self.local_window = local_window # 定义位置编码器和局部注意力层 self.positional_encoder = PositionalEncoder(d_model) self.local_attention = LocalAttention(d_model, local_window) # 定义多个编码器层 self.encoder_layers = nn.ModuleList([ nn.ModuleList([ nn.LayerNorm(d_model), nn.Linear(d_model, d_ff), nn.ReLU(), nn.Linear(d_ff, d_model), nn.LayerNorm(d_model) ]) for _ in range(num_layers) ]) def forward(self, src, mask=None): x = self.positional_encoder(src) for i in range(self.num_layers): norm1 = self.encoder_layers[i][0] linear1 = self.encoder_layers[i][1] relu = self.encoder_layers[i][2] linear2 = self.encoder_layers[i][3] norm2 = self.encoder_layers[i][4] # 局部注意力层 if mask is not None: mask[:, :, :self.local_window] = 0 x = x + self.local_attention(x.permute(1, 0, 2), mask=mask)[0].permute(1, 0, 2) # 前向传播层 x = norm1(x) x = linear2(relu(linear1(x))) + x x = norm2(x) return x # 测试Longformer模型 if __name__ == "__main__": input_tensor = torch.randn(16, 512, 512) # 假设输入维度为(16, 512, 512) mask = torch.ones(16, 512) # 假设有512个标记 longformer_model = Longformer(d_model=512, num_heads=8, d_ff=2048, num_layers=6, local_window=128) output = longformer_model(input_tensor, mask=mask) print(output.shape) # 输出:torch.Size([16, 512, 512])