LLAMA

  • LLaMA(LargeLanguage ModelMeta AI)是由MetaAI开发的一系列大语言模型

  • 在模型架构上,LLaMA借鉴了GPT系列的设计理念,同时在技术细节上进行了创新和优化

  • LLaMA与GPT系列的主要区别在于:GPT系列的升级主线聚焦于模型规模与预训练语料的同步提升,而LLaMA则在模型规模上保持相对稳定,更专注于提升预训练数据的规模

image.png


LLaMA1模型

  • LLaMA1是 Meta AI 于 2023 年 2 月推出的首个大语言模型

  • 在Chinchilla扩展法则的指引下,实践“小模型+大数据”的理念,旨在以大规模的优质数据训练相对较小的模型

  • 相对较小的参数规模可以赋能更快的推理速度,使其可以更好的应对计算资源有限的场景

  • 在预训练语料方面,LLaMA1的预训练数据涵盖了大规模网页数据,总数据量高达5TB

  • 在模型架构方面,LLaMA1采用了与GPT系列同样的网络架构。但是其在
    Transformer 原始词嵌入模块、注意力模块和全连接前馈模块上进行了优化:

    1. 在词嵌入模块上,为了提高词嵌入质量,LLaMA1参考了GPTNeo的做法,使用旋转位置编码(Rotary Positional Embeddings, RoPE)替代了原有的绝对位置编码,从而增强位置编码的表达能力,增强了模型对序列顺序的理解
    2. 在注意力模块上,LLaMA1参考了PaLM的做法,将Transformer中的RELU激活函数改为层正则化SwiGLU 激活函数。并且LLaMA1在进行自注意力操作之前对查询(query)以及键(key)添加旋转位置编码
    3. 在全连接前馈模块上,LLaMA1借鉴了GPT-3中的Pre-Norm层正则化策略,将正则化应用于自注意力和前馈网络的输入

RMSNorm

LayerNorm

  • LayerNorm 通过对输入和权重矩进行重新中心化和重新缩放(re-centering 和re-scaling,即减均值和除方差,也称平移不变性和缩放不变性),来帮助稳定训练并加速模型收敛

  • LayerNorm(LN) 主要用于 NLP 领域,它对每个 token 的特征向量进行归一化计算。设某个 token 的特征向量为 x ,LN 运算如下:

image.png

  • 注意我们在方差估计值中添加一个小的常量 epsilon ,以确保我们永远不会尝试除以零

RMSNorm

  • RMSNorm(Root Mean Square Layer Normalization)论文假设 LayerNorm 中的重新中心化不再是必须的(平移不变性不重要),并提出了一种新的归一化方法:均方根层归一化(RMSNorm)

  • RMSNorm 通过均方根(RMS)对每一层神经元的输入进行归一化,使模型具备重新缩放不变性和隐式学习率调整的能力

LayerNorm 和 RMSNorm 都主要用于 NLP 领域,它对每个 token 的特征向量进行归一化计算。设某个 token 的特征向量为 x ,RMSNorm 的计算如下:

image.png

  • 以下是 RMSNorm 在 PyTorch 中的简单实现,使用了 RMS(均方根)来对输入进行归一化处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import torch
import torch.nn as nn

class RMSNorm(nn.Module):
"""nlp 领域"""
def __init__(self, dim):
"""
:param dim: 输入的维度
"""
super(RMSNorm, self).__init__()
self.scale = nn.Parameter(torch.ones(dim)) # 可学习的缩放参数

def forward(self, x):
# x 的形状为 [batch_size, seq_len, dim]

# 计算均方根 (RMS) shape is [2, 4, 1]
rms = torch.sqrt(torch.mean(x ** 2, dim=-1, keepdim=True))

# 归一化,并应用缩放参数
return x / rms * self.scale

# 测试 RMSNorm
batch_size, seq_len, dim = 2, 4, 8
x = torch.randn(batch_size, seq_len, dim)

rmsnorm = RMSNorm(dim)
output = rmsnorm(x)

# nn.RMSNorm 如果传入的是单个整数,则会将其视为一个单元素列表,
# 模块会对最后一个维度进行归一化,并且该维度的大小应该符合这个指定值。
rmsnorm_pytorch = nn.RMSNorm(dim)
output_pytorch = rmsnorm_pytorch(x)

print("输入和输出的形状: ", x.shape, output.shape)
if torch.allclose(output, output_pytorch, atol=1e-6):
print("结果验证通过: 自己实的 RMSNorm 和 pytorch nn.RMSNorm 结果一致!")
else:
print("结果验证失败: 自己实的 RMSNorm 和 pytorch nn.RMSNorm 结果不一致。")

旋转位置编码

  • 旋转位置编码(Rotary Position Embedding,简称 RoPE)是一种用于自注意力机制的位置编码方法,旨在为模型提供序列中每个位置的相对位置信息

  • RoPE 的核心思想是通过旋转矩阵将位置信息编码到查询(Query)和键(Key)向量中,从而在不增加额外参数的情况下,捕捉序列中元素的相对位置关系

  • RoPE 的核心思想是通过旋转矩阵将位置信息编码到查询和键向量中:

    1. 对于序列中的第 m 个位置的查询向量qm和键向量kn,RoPE 通过旋转矩阵Rm和Rn将它们分别旋转到不同的角度
    2. 旋转后的查询和键向量会携带位置信息,从而在计算注意力得分时能够捕捉到相对位置关系
  • 总结结合 RoPE 的 self-attention 操作的流程如下:

    1. 首先,对于 token 序列中的每个词嵌入向量,都计算其对应的 query 和 key 向量;
    2. 然后在得到 query 和 key 向量的基础上,应用公式对每个 token 位置都计算对应的旋转位置编码
    3. 接着对每个 token 位置的 query 和 key 向量的元素按照两两一组应用旋转变换
    4. 最后再计算 query 和 key 之间的内积得到 self-attention 的计算结果

image.png

SwiGLU

  • SwiGLU 激活函数 是一种改进的激活函数,结合了 Swish 激活函数 和 GLU(Gated Linear Unit,门控线性单元)的思想

  • SwiGLU 的主要优势在于它能够更好地捕捉非线性特征,同时保持较高的计算效率

  • SwiGLU 的核心思想是将 Swish 激活函数 和 GLU 门控机制 结合起来。它的公式如下:
    image.png

  • SwiGLU的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import torch
import torch.nn as nn
import torch.nn.functional as F

class SwiGLU(nn.Module):
def __init__(self, dim: int, hidden_dim: int):
super().__init__()
# 定义线性变换层
self.w1 = nn.Linear(dim, hidden_dim) # 主分支
self.w2 = nn.Linear(dim, hidden_dim) # 门控分支

def forward(self, x):
# 主分支:Swish 激活函数
swish = F.silu(self.w1(x)) # silu 是 Swish 的 PyTorch 实现
# 门控分支:线性变换
gate = self.w2(x)
# 逐元素相乘
return swish * gate
  • LLaMA1模型的结构图:

image.png

image.png


LLaMA1 model.py

整体框架说明

1
2
3
4
5
6
7
8
9
10
11
"""
LLaMA1 核心组件包含:
1. 词嵌入层(ParallelEmbedding)
2. 多个Transformer块(TransformerBlock)
3. 输出投影层(ColumnParallelLinear)

每个Transformer块包含:
- 自注意力机制(Multi-Head Attention)
- 前馈网络(FeedForward,使用SwiGLU激活)
- RMSNorm归一化层
"""

关键模块解析

  • 模型参数定义(ModelArgs):
1
2
3
4
5
6
7
8
9
10
@dataclass
class ModelArgs:
dim: int = 512 # 隐藏层维度
n_layers: int = 8 # Transformer层数
n_heads: int = 8 # 注意力头数
vocab_size: int = -1 # 词表大小(由tokenizer决定)
multiple_of: int = 256 # SwiGLU隐藏层维度对齐基数
norm_eps: float = 1e-5 # RMSNorm的epsilon值
max_batch_size: int = 32 # 最大批处理量
max_seq_len: int = 2048 # 最大序列长度
  • RMSNorm(改进的层归一化):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class RMSNorm(nn.Module):
def __init__(self, dim: int, eps: float = 1e-6):
super().__init__()
self.eps = eps # 数值稳定性的小常数
self.weight = nn.Parameter(torch.ones(dim)) # 可学习的缩放参数

def _norm(self, x):
# 计算均方根值进行归一化(相比LayerNorm去除了均值中心化)
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

def forward(self, x):
# 归一化并应用缩放参数
output = self._norm(x.float()).type_as(x) # 转换为float计算后还原类型
return output * self.weight # 应用可学习的缩放参数
  • 旋转位置编码(RoPE):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def precompute_freqs_cis(dim: int, end: int, theta: float = 10000.0):
# 计算频率向量
# theta 是频率的基数,dim 是向量的维度
# torch.arange(0, dim, 2) 生成从 0 到 dim-1 的偶数索引
# [: (dim // 2)] 取前 dim//2 个元素
# 最终 freqs 是一个长度为 dim//2 的向量,表示每个维度的频率
freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))

# 生成位置索引 t,从 0 到 end-1
# freqs.device 确保 t 和 freqs 在同一个设备上(如 CPU 或 GPU)
t = torch.arange(end, device=freqs.device) # type: ignore

# 计算外积,生成位置-频率矩阵
# freqs 是频率向量,t 是位置向量
# torch.outer(t, freqs) 生成一个形状为 (end, dim//2) 的矩阵
freqs = torch.outer(t, freqs).float() # type: ignore

# 将频率矩阵转换为复数形式
# torch.polar 将幅度和相位转换为复数
# torch.ones_like(freqs) 是幅度(全为 1),freqs 是相位
# 最终 freqs_cis 是一个形状为 (end, dim//2) 的复数矩阵
freqs_cis = torch.polar(torch.ones_like(freqs), freqs) # complex64

# 返回预计算的频率矩阵
return freqs_cis

def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor):
# 获取输入张量 x 的维度数
ndim = x.ndim

# 检查 x 的维度是否合法(至少有两个维度)
assert 0 <= 1 < ndim

# 检查 freqs_cis 的形状是否与 x 的第 1 维和最后一维匹配
# freqs_cis 的形状应为 (x.shape[1], x.shape[-1])
assert freqs_cis.shape == (x.shape[1], x.shape[-1])

# 生成新的形状,用于广播
# 对于 x 的每个维度:
# - 如果维度是第 1 维或最后一维,则保持不变
# - 否则设置为 1(便于广播)
shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]

# 将 freqs_cis 调整为新的形状
return freqs_cis.view(*shape)

def apply_rotary_emb(
xq: torch.Tensor, # 查询向量
xk: torch.Tensor, # 键向量
freqs_cis: torch.Tensor, # 预计算的频率矩阵
) -> Tuple[torch.Tensor, torch.Tensor]:
# 将查询向量 xq 转换为复数形式
# xq.float() 将 xq 转换为浮点数
# reshape(*xq.shape[:-1], -1, 2) 将最后一维拆分为两部分(实部和虚部)
# torch.view_as_complex 将实部和虚部组合为复数
xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))

# 将键向量 xk 转换为复数形式(与 xq 相同)
xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))

# 调整 freqs_cis 的形状,使其能够广播到 xq_ 和 xk_ 的形状
freqs_cis = reshape_for_broadcast(freqs_cis, xq_)

# 对查询向量应用旋转位置编码
# xq_ * freqs_cis 是复数乘法,相当于旋转
# torch.view_as_real 将复数转换回实数形式
# flatten(3) 将最后一维展平
xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3)

# 对键向量应用旋转位置编码(与 xq 相同)
xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3)

# 返回旋转后的查询和键向量,并确保数据类型与输入一致
return xq_out.type_as(xq), xk_out.type_as(xk)
  • 自注意力机制(Multi-Head Attention):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Attention(nn.Module):
def __init__(self, args: ModelArgs):
super().__init__()
# 计算本地注意力头数(模型并行化)
self.n_local_heads = args.n_heads // fs_init.get_model_parallel_world_size()
self.head_dim = args.dim // args.n_heads # 每个注意力头的维度

# 定义线性变换层(并行化)
self.wq = ColumnParallelLinear(args.dim, args.n_heads * self.head_dim, bias=False) # 查询投影
self.wk = ColumnParallelLinear(args.dim, args.n_heads * self.head_dim, bias=False) # 键投影
self.wv = ColumnParallelLinear(args.dim, args.n_heads * self.head_dim, bias=False) # 值投影
self.wo = RowParallelLinear(args.n_heads * self.head_dim, args.dim, bias=False) # 输出投影

# 初始化KV缓存
self.cache_k = torch.zeros((args.max_batch_size, args.max_seq_len, self.n_local_heads, self.head_dim)).cuda()
self.cache_v = torch.zeros((args.max_batch_size, args.max_seq_len, self.n_local_heads, self.head_dim)).cuda()

def forward(self, x: torch.Tensor, start_pos: int, freqs_cis: torch.Tensor, mask: Optional[torch.Tensor]):
bsz, seqlen, _ = x.shape # 获取输入形状
xq, xk, xv = self.wq(x), self.wk(x), self.wv(x) # 线性变换得到Q/K/V

# 调整形状为多头注意力格式
xq = xq.view(bsz, seqlen, self.n_local_heads, self.head_dim)
xk = xk.view(bsz, seqlen, self.n_local_heads, self.head_dim)
xv = xv.view(bsz, seqlen, self.n_local_heads, self.head_dim)

# 应用旋转位置编码
xq, xk = apply_rotary_emb(xq, xk, freqs_cis=freqs_cis)

# 更新KV缓存
self.cache_k = self.cache_k.to(xq)
self.cache_v = self.cache_v.to(xq)
self.cache_k[:bsz, start_pos : start_pos + seqlen] = xk
self.cache_v[:bsz, start_pos : start_pos + seqlen] = xv

# 获取缓存的K/V
keys = self.cache_k[:bsz, : start_pos + seqlen]
values = self.cache_v[:bsz, : start_pos + seqlen]

# 计算注意力得分
xq = xq.transpose(1, 2) # 调整形状为 (bs, n_local_heads, seqlen, head_dim)
keys = keys.transpose(1, 2)
values = values.transpose(1, 2)
scores = torch.matmul(xq, keys.transpose(2, 3)) / math.sqrt(self.head_dim) # 缩放点积
if mask is not None:
scores = scores + mask # 应用掩码
scores = F.softmax(scores.float(), dim=-1).type_as(xq) # Softmax归一化
output = torch.matmul(scores, values) # 加权求和
output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1) # 调整形状
return self.wo(output) # 输出投影
  • 前馈网络(SwiGLU激活):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Attention(nn.Module):
def __init__(self, args: ModelArgs):
super().__init__()
# 计算本地注意力头数(模型并行化)
self.n_local_heads = args.n_heads // fs_init.get_model_parallel_world_size()
self.head_dim = args.dim // args.n_heads # 每个注意力头的维度

# 定义线性变换层(并行化)
self.wq = ColumnParallelLinear(args.dim, args.n_heads * self.head_dim, bias=False) # 查询投影
self.wk = ColumnParallelLinear(args.dim, args.n_heads * self.head_dim, bias=False) # 键投影
self.wv = ColumnParallelLinear(args.dim, args.n_heads * self.head_dim, bias=False) # 值投影
self.wo = RowParallelLinear(args.n_heads * self.head_dim, args.dim, bias=False) # 输出投影

# 初始化KV缓存
self.cache_k = torch.zeros((args.max_batch_size, args.max_seq_len, self.n_local_heads, self.head_dim)).cuda()
self.cache_v = torch.zeros((args.max_batch_size, args.max_seq_len, self.n_local_heads, self.head_dim)).cuda()

def forward(self, x: torch.Tensor, start_pos: int, freqs_cis: torch.Tensor, mask: Optional[torch.Tensor]):
bsz, seqlen, _ = x.shape # 获取输入形状
xq, xk, xv = self.wq(x), self.wk(x), self.wv(x) # 线性变换得到Q/K/V

# 调整形状为多头注意力格式
xq = xq.view(bsz, seqlen, self.n_local_heads, self.head_dim)
xk = xk.view(bsz, seqlen, self.n_local_heads, self.head_dim)
xv = xv.view(bsz, seqlen, self.n_local_heads, self.head_dim)

# 应用旋转位置编码
xq, xk = apply_rotary_emb(xq, xk, freqs_cis=freqs_cis)

# 更新KV缓存
self.cache_k = self.cache_k.to(xq)
self.cache_v = self.cache_v.to(xq)
self.cache_k[:bsz, start_pos : start_pos + seqlen] = xk
self.cache_v[:bsz, start_pos : start_pos + seqlen] = xv

# 获取缓存的K/V
keys = self.cache_k[:bsz, : start_pos + seqlen]
values = self.cache_v[:bsz, : start_pos + seqlen]

# 计算注意力得分
xq = xq.transpose(1, 2) # 调整形状为 (bs, n_local_heads, seqlen, head_dim)
keys = keys.transpose(1, 2)
values = values.transpose(1, 2)
scores = torch.matmul(xq, keys.transpose(2, 3)) / math.sqrt(self.head_dim) # 缩放点积
if mask is not None:
scores = scores + mask # 应用掩码
scores = F.softmax(scores.float(), dim=-1).type_as(xq) # Softmax归一化
output = torch.matmul(scores, values) # 加权求和
output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1) # 调整形状
return self.wo(output) # 输出投影
  • Transformer块结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class TransformerBlock(nn.Module):
def __init__(self, layer_id: int, args: ModelArgs):
super().__init__()
self.n_heads = args.n_heads # 注意力头数
self.dim = args.dim # 隐藏层维度
self.head_dim = args.dim // args.n_heads # 每个注意力头的维度
self.attention = Attention(args) # 自注意力模块
self.feed_forward = FeedForward(dim=args.dim, hidden_dim=4 * args.dim, multiple_of=args.multiple_of) # 前馈网络
self.layer_id = layer_id # 当前层ID
self.attention_norm = RMSNorm(args.dim, eps=args.norm_eps) # 注意力前归一化
self.ffn_norm = RMSNorm(args.dim, eps=args.norm_eps) # 前馈前归一化

def forward(self, x: torch.Tensor, start_pos: int, freqs_cis: torch.Tensor, mask: Optional[torch.Tensor]):
# 残差连接结构
h = x + self.attention.forward(self.attention_norm(x), start_pos, freqs_cis, mask) # 自注意力
out = h + self.feed_forward.forward(self.ffn_norm(h)) # 前馈网络
return out
  • 完整Transformer模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Transformer(nn.Module):
def __init__(self, params: ModelArgs):
super().__init__()
self.params = params # 模型参数
self.vocab_size = params.vocab_size # 词表大小
self.n_layers = params.n_layers # Transformer层数

# 词嵌入层(并行化)
self.tok_embeddings = ParallelEmbedding(params.vocab_size, params.dim, init_method=lambda x: x)

# 堆叠Transformer块
self.layers = torch.nn.ModuleList()
for layer_id in range(params.n_layers):
self.layers.append(TransformerBlock(layer_id, params))

# 最终归一化层
self.norm = RMSNorm(params.dim, eps=params.norm_eps)
# 输出投影层(并行化)
self.output = ColumnParallelLinear(params.dim, params.vocab_size, bias=False, init_method=lambda x: x)

# 预计算旋转位置编码
self.freqs_cis = precompute_freqs_cis(self.params.dim // self.params.n_heads, self.params.max_seq_len * 2)

@torch.inference_mode()
def forward(self, tokens: torch.Tensor, start_pos: int):
_bsz, seqlen = tokens.shape # 获取输入形状
h = self.tok_embeddings(tokens) # 词嵌入
self.freqs_cis = self.freqs_cis.to(h.device) # 将频率矩阵移动到相同设备
freqs_cis = self.freqs_cis[start_pos : start_pos + seqlen] # 截取当前序列的频率

# 生成注意力掩码(防止未来信息泄露)
mask = None
if seqlen > 1:
mask = torch.full((1, 1, seqlen, seqlen), float("-inf"), device=tokens.device)
mask = torch.triu(mask, diagonal=start_pos + 1).type_as(h)

# 逐层通过Transformer块
for layer in self.layers:
h = layer(h, start_pos, freqs_cis, mask)
h = self.norm(h) # 最终归一化
output = self.output(h[:, -1, :]) # 输出最后一个token的logits
return output.float()

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# model.py
# Copyright (c) Meta Platforms, Inc. and affiliates.
# This software may be used and distributed according to the terms of the GNU General Public License version 3.

from typing import Optional, Tuple
from dataclasses import dataclass
import math

import torch
from torch import nn
import torch.nn.functional as F

import fairscale.nn.model_parallel.initialize as fs_init
from fairscale.nn.model_parallel.layers import (
ParallelEmbedding,
RowParallelLinear,
ColumnParallelLinear,
)


@dataclass
class ModelArgs:
dim: int = 512
n_layers: int = 8
n_heads: int = 8
vocab_size: int = -1 # defined later by tokenizer
multiple_of: int = 256 # make SwiGLU hidden layer size multiple of large power of 2
norm_eps: float = 1e-5

max_batch_size: int = 32
max_seq_len: int = 2048


class RMSNorm(torch.nn.Module):
def __init__(self, dim: int, eps: float = 1e-6):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))

def _norm(self, x):
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

def forward(self, x):
output = self._norm(x.float()).type_as(x)
return output * self.weight


def precompute_freqs_cis(dim: int, end: int, theta: float = 10000.0):
freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
t = torch.arange(end, device=freqs.device) # type: ignore
freqs = torch.outer(t, freqs).float() # type: ignore
freqs_cis = torch.polar(torch.ones_like(freqs), freqs) # complex64
return freqs_cis


def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor):
ndim = x.ndim
assert 0 <= 1 < ndim
assert freqs_cis.shape == (x.shape[1], x.shape[-1])
shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]
return freqs_cis.view(*shape)


def apply_rotary_emb(
xq: torch.Tensor,
xk: torch.Tensor,
freqs_cis: torch.Tensor,
) -> Tuple[torch.Tensor, torch.Tensor]:
xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))
freqs_cis = reshape_for_broadcast(freqs_cis, xq_)
xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(3)
xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(3)
return xq_out.type_as(xq), xk_out.type_as(xk)


class Attention(nn.Module):
def __init__(self, args: ModelArgs):
super().__init__()

self.n_local_heads = args.n_heads // fs_init.get_model_parallel_world_size()
self.head_dim = args.dim // args.n_heads

self.wq = ColumnParallelLinear(
args.dim,
args.n_heads * self.head_dim,
bias=False,
gather_output=False,
init_method=lambda x: x,
)
self.wk = ColumnParallelLinear(
args.dim,
args.n_heads * self.head_dim,
bias=False,
gather_output=False,
init_method=lambda x: x,
)
self.wv = ColumnParallelLinear(
args.dim,
args.n_heads * self.head_dim,
bias=False,
gather_output=False,
init_method=lambda x: x,
)
self.wo = RowParallelLinear(
args.n_heads * self.head_dim,
args.dim,
bias=False,
input_is_parallel=True,
init_method=lambda x: x,
)

self.cache_k = torch.zeros(
(args.max_batch_size, args.max_seq_len, self.n_local_heads, self.head_dim)
).cuda()
self.cache_v = torch.zeros(
(args.max_batch_size, args.max_seq_len, self.n_local_heads, self.head_dim)
).cuda()

def forward(self, x: torch.Tensor, start_pos: int, freqs_cis: torch.Tensor, mask: Optional[torch.Tensor]):
bsz, seqlen, _ = x.shape
xq, xk, xv = self.wq(x), self.wk(x), self.wv(x)

xq = xq.view(bsz, seqlen, self.n_local_heads, self.head_dim)
xk = xk.view(bsz, seqlen, self.n_local_heads, self.head_dim)
xv = xv.view(bsz, seqlen, self.n_local_heads, self.head_dim)

xq, xk = apply_rotary_emb(xq, xk, freqs_cis=freqs_cis)

self.cache_k = self.cache_k.to(xq)
self.cache_v = self.cache_v.to(xq)

self.cache_k[:bsz, start_pos : start_pos + seqlen] = xk
self.cache_v[:bsz, start_pos : start_pos + seqlen] = xv

keys = self.cache_k[:bsz, : start_pos + seqlen]
values = self.cache_v[:bsz, : start_pos + seqlen]

xq = xq.transpose(1, 2)
keys = keys.transpose(1, 2)
values = values.transpose(1, 2)
scores = torch.matmul(xq, keys.transpose(2, 3)) / math.sqrt(self.head_dim)
if mask is not None:
scores = scores + mask # (bs, n_local_heads, slen, cache_len + slen)
scores = F.softmax(scores.float(), dim=-1).type_as(xq)
output = torch.matmul(scores, values) # (bs, n_local_heads, slen, head_dim)
output = output.transpose(
1, 2
).contiguous().view(bsz, seqlen, -1)

return self.wo(output)


class FeedForward(nn.Module):
def __init__(
self,
dim: int,
hidden_dim: int,
multiple_of: int,
):
super().__init__()
hidden_dim = int(2 * hidden_dim / 3)
hidden_dim = multiple_of * ((hidden_dim + multiple_of - 1) // multiple_of)

self.w1 = ColumnParallelLinear(
dim, hidden_dim, bias=False, gather_output=False, init_method=lambda x: x
)
self.w2 = RowParallelLinear(
hidden_dim, dim, bias=False, input_is_parallel=True, init_method=lambda x: x
)
self.w3 = ColumnParallelLinear(
dim, hidden_dim, bias=False, gather_output=False, init_method=lambda x: x
)

def forward(self, x):
return self.w2(F.silu(self.w1(x)) * self.w3(x))


class TransformerBlock(nn.Module):
def __init__(self, layer_id: int, args: ModelArgs):
super().__init__()
self.n_heads = args.n_heads
self.dim = args.dim
self.head_dim = args.dim // args.n_heads
self.attention = Attention(args)
self.feed_forward = FeedForward(
dim=args.dim, hidden_dim=4 * args.dim, multiple_of=args.multiple_of
)
self.layer_id = layer_id
self.attention_norm = RMSNorm(args.dim, eps=args.norm_eps)
self.ffn_norm = RMSNorm(args.dim, eps=args.norm_eps)

def forward(self, x: torch.Tensor, start_pos: int, freqs_cis: torch.Tensor, mask: Optional[torch.Tensor]):
h = x + self.attention.forward(self.attention_norm(x), start_pos, freqs_cis, mask)
out = h + self.feed_forward.forward(self.ffn_norm(h))
return out


class Transformer(nn.Module):
def __init__(self, params: ModelArgs):
super().__init__()
self.params = params
self.vocab_size = params.vocab_size
self.n_layers = params.n_layers

self.tok_embeddings = ParallelEmbedding(
params.vocab_size, params.dim, init_method=lambda x: x
)

self.layers = torch.nn.ModuleList()
for layer_id in range(params.n_layers):
self.layers.append(TransformerBlock(layer_id, params))

self.norm = RMSNorm(params.dim, eps=params.norm_eps)
self.output = ColumnParallelLinear(
params.dim, params.vocab_size, bias=False, init_method=lambda x: x
)

self.freqs_cis = precompute_freqs_cis(
self.params.dim // self.params.n_heads, self.params.max_seq_len * 2
)

@torch.inference_mode()
def forward(self, tokens: torch.Tensor, start_pos: int):
_bsz, seqlen = tokens.shape
h = self.tok_embeddings(tokens)
self.freqs_cis = self.freqs_cis.to(h.device)
freqs_cis = self.freqs_cis[start_pos : start_pos + seqlen]

mask = None
if seqlen > 1:
mask = torch.full((1, 1, seqlen, seqlen), float("-inf"), device=tokens.device)
mask = torch.triu(mask, diagonal=start_pos + 1).type_as(h)

for layer in self.layers:
h = layer(h, start_pos, freqs_cis, mask)
h = self.norm(h)
output = self.output(h[:, -1, :]) # only compute last logits
return output.float()

LLaMA1 tokenizer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Copyright (c) Meta Platforms, Inc. and affiliates.
# This software may be used and distributed according to the terms of the GNU General Public License version 3.

from sentencepiece import SentencePieceProcessor
from logging import getLogger
from typing import List
import os

# 初始化日志记录器
logger = getLogger() # 获取日志记录器实例

# 定义 Tokenizer 类
class Tokenizer:
def __init__(self, model_path: str):
# 初始化 Tokenizer 类,加载 SentencePiece 模型
# 参数 model_path 是 SentencePiece 模型文件的路径

# 检查模型文件是否存在
assert os.path.isfile(model_path), model_path

# 加载 SentencePiece 模型
self.sp_model = SentencePieceProcessor(model_file=model_path)

# 记录日志:模型加载成功
logger.info(f"Reloaded SentencePiece model from {model_path}")

# 获取词汇表大小
self.n_words: int = self.sp_model.vocab_size()

# 获取 BOS(Begin of Sentence)标记的 ID
self.bos_id: int = self.sp_model.bos_id()

# 获取 EOS(End of Sentence)标记的 ID
self.eos_id: int = self.sp_model.eos_id()

# 获取 PAD(Padding)标记的 ID
self.pad_id: int = self.sp_model.pad_id()

# 记录日志:词汇表大小、BOS ID、EOS ID
logger.info(
f"#words: {self.n_words} - BOS ID: {self.bos_id} - EOS ID: {self.eos_id}"
)

# 检查词汇表大小是否与模型的分词单元数量一致
assert self.sp_model.vocab_size() == self.sp_model.get_piece_size()

# encode 方法
def encode(self, s: str, bos: bool, eos: bool) -> List[int]:
# 将输入字符串编码为 token ID 列表
# 参数:
# s: 输入字符串
# bos: 是否在开头添加 BOS 标记
# eos: 是否在结尾添加 EOS 标记
# 返回值:token ID 列表

# 检查输入是否为字符串
assert type(s) is str

# 使用 SentencePiece 模型将字符串编码为 token ID 列表
t = self.sp_model.encode(s)

# 如果 bos 为 True,在开头添加 BOS 标记
if bos:
t = [self.bos_id] + t

# 如果 eos 为 True,在结尾添加 EOS 标记
if eos:
t = t + [self.eos_id]

# 返回编码后的 token ID 列表
return t

# decode 方法
def decode(self, t: List[int]) -> str:
# 将 token ID 列表解码为字符串
# 参数:
# t: token ID 列表
# 返回值:解码后的字符串

# 使用 SentencePiece 模型将 token ID 列表解码为字符串
return self.sp_model.decode(t)

LLaMA1 generation.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Copyright (c) Meta Platforms, Inc. and affiliates.
# This software may be used and distributed according to the terms of the GNU General Public License version 3.

from typing import List

import torch

from llama.tokenizer import Tokenizer
from llama.model import Transformer

# 定义 LLaMA 类
class LLaMA:
def __init__(self, model: Transformer, tokenizer: Tokenizer):
# 初始化 LLaMA 类
# 参数:
# model: Transformer 模型实例
# tokenizer: Tokenizer 实例
self.model = model # 保存模型
self.tokenizer = tokenizer # 保存分词器

# generate 方法
def generate(
self,
prompts: List[str], # 输入的提示文本列表
max_gen_len: int, # 生成文本的最大长度
temperature: float = 0.8, # 温度参数,控制生成文本的随机性
top_p: float = 0.95, # Top-p 采样参数,控制生成文本的多样性
) -> List[str]: # 返回生成的文本列表
bsz = len(prompts) # 获取批处理大小(即提示文本的数量)
params = self.model.params # 获取模型的参数
# 检查批处理大小是否超过模型支持的最大批处理大小
assert bsz <= params.max_batch_size, (bsz, params.max_batch_size)

# 将提示文本编码为 token ID 列表
# bos=True 表示在开头添加 BOS 标记,eos=False 表示不添加 EOS 标记
prompt_tokens = [self.tokenizer.encode(x, bos=True, eos=False) for x in prompts]

# 计算提示文本的最小长度和最大长度
min_prompt_size = min([len(t) for t in prompt_tokens])
max_prompt_size = max([len(t) for t in prompt_tokens])

# 计算生成文本的总长度(不超过模型的最大序列长度)
total_len = min(params.max_seq_len, max_gen_len + max_prompt_size)

# 初始化 token 张量,填充为 pad_id
tokens = torch.full((bsz, total_len), self.tokenizer.pad_id).cuda().long()
# 将提示文本的 token ID 填充到 tokens 张量中
for k, t in enumerate(prompt_tokens):
tokens[k, : len(t)] = torch.tensor(t).long()

# 创建输入文本的掩码(标记哪些位置是真实 token,哪些是填充 token)
input_text_mask = tokens != self.tokenizer.pad_id

# 初始化生成位置
start_pos = min_prompt_size # 从提示文本的最小长度开始生成
prev_pos = 0 # 上一个生成位置

# 自回归生成文本
for cur_pos in range(start_pos, total_len):
# 获取当前位置的 logits(未归一化的预测分数)
logits = self.model.forward(tokens[:, prev_pos:cur_pos], prev_pos)

# 根据温度参数调整 logits
if temperature > 0:
probs = torch.softmax(logits / temperature, dim=-1) # 使用 softmax 归一化
next_token = sample_top_p(probs, top_p) # 使用 Top-p 采样
else:
next_token = torch.argmax(logits, dim=-1) # 直接取概率最大的 token

# 调整 next_token 的形状
next_token = next_token.reshape(-1)

# 如果当前位置是提示文本部分,则保留原始 token;否则使用生成的 token
next_token = torch.where(
input_text_mask[:, cur_pos], tokens[:, cur_pos], next_token
)

# 将生成的 token 填充到 tokens 张量中
tokens[:, cur_pos] = next_token
prev_pos = cur_pos # 更新上一个生成位置

# 解码生成的 token ID 列表为文本
decoded = []
for i, t in enumerate(tokens.tolist()):
# 截取生成的文本(不超过提示文本长度 + 最大生成长度)
t = t[: len(prompt_tokens[i]) + max_gen_len]
# 如果遇到 EOS 标记,则截断
try:
t = t[: t.index(self.tokenizer.eos_id)]
except ValueError:
pass
# 解码为文本并添加到结果列表中
decoded.append(self.tokenizer.decode(t))

# 返回生成的文本列表
return decoded

# sample_top_p 函数
def sample_top_p(probs, p):
# 对概率分布进行 Top-p 采样
# 参数:
# probs: 概率分布张量
# p: Top-p 采样的概率阈值

# 对概率分布进行降序排序,并获取排序后的索引
probs_sort, probs_idx = torch.sort(probs, dim=-1, descending=True)

# 计算累积概率
probs_sum = torch.cumsum(probs_sort, dim=-1)

# 创建掩码,过滤掉累积概率超过 p 的部分
mask = probs_sum - probs_sort > p
probs_sort[mask] = 0.0 # 将超过 p 的部分置为 0

# 重新归一化概率分布
probs_sort.div_(probs_sort.sum(dim=-1, keepdim=True))

# 从归一化后的概率分布中采样一个 token
next_token = torch.multinomial(probs_sort, num_samples=1)

# 根据采样结果获取原始索引
next_token = torch.gather(probs_idx, -1, next_token)

# 返回采样结果
return next_token