
分步指南:在 Python 中构建一个拥有 2900 万参数的 LLM,如 ChatGPT,并附实用技巧
逐步指南
免费阅读这个故事:link
快速提示 — 我们将首先训练一个分词器,然后从头开始构建一个 2900 万参数的 LLM。这将给我们一个生成正确句子的模型。接下来,我们将使用(SFT)对其进行微调,以提高其知识和响应风格,使其更像 ChatGPT。
我已经将我训练好的微型模型部署在 huggingface space 上。你可以在那里与它聊天。Web app ink
看看我和我们训练好的 LLM 之间的一些聊天对话。
问候
要求写代码
知识截止 😆
问一个愚蠢的问题
我们将一起编写代码,而不是一次性学习所有理论,以便正确理解所有内容。从数据集到模型权重,所有内容都是可替换的。
GitHub 代码
所有代码以及正确的设置都可以在我的 GitHub 存储库中找到。
[## GitHub - FareedKhan-dev/train-tiny-llm: 从头开始训练一个 29M 参数的 GPT
从头开始训练一个 29M 参数的 GPT。通过创建一个帐户来为 FareedKhan-dev/train-tiny-llm 的开发做出贡献…
github.com](https://github.com/FareedKhan-dev/train-tiny-llm)
代码库的组织结构如下:
.
├── pretrain.py # 预训练脚本
├── requirement.txt # 项目依赖项
├── train_sft.py # 监督微调脚本
├── train_tokenizer.py # 分词器训练脚本
├── web_app.py # Streamlit Web 应用程序
└── transformer/ # 包含核心组件的目录
├── dataset.py # 数据集类 (PretrainDataset, SFTDataset)
├── LMConfig.py # 语言模型配置类 (LMConfig)
└── model.py # Transformer 模型定义 (TransformerLM)
GitHub 代码库更灵活,你可以执行并行训练等等!
目录
- 训练分词器
- Transformer 概述
- RMSNorm
- 位置编码
- 注意力机制
- 前馈神经网络
- Transformer Block
- 组合 Transformer 组件
- 处理预训练数据集
- 预训练循环 (29M LLM)
- PyTorch 模型到 SafeTensors
- 推断预训练 LLM
- SFT 数据集预处理
- SFT 训练循环
- 评估😖
- 结论
训练分词器
分词器是 大型语言模型 (LLM) 的第一个也是最重要的组件,它将文本分解成称为 token(单词、子词或字符)的较小单元。
之所以需要它,是因为 LLM 无法理解原始文本,但它们可以处理数字。
分词器将文本转换为数值表示,以便模型可以分析和学习语言模式。每个 LLM 在训练前都会使用分词,以确保高效的文本处理和理解。
让我们使用 GPT-2 分词器,看看它是怎么想的。
### 加载 GPT-2 分词器
tokenizer = AutoTokenizer.from_pretrained("gpt2")
### 示例文本
text = "Hello, world!"
### 分词过程
tokens = tokenizer.tokenize(text) # 将文本转换为 token
token_ids = tokenizer.convert_tokens_to_ids(tokens) # 将 token 转换为数值 ID
### 输出结果
print("Tokens:", tokens)
print("Token IDs:", token_ids)
#### OUTPUT ###
Tokens: ['Hello', ',', 'Ġworld', '!']
Token IDs: [15496, 11, 995, 0]
它将文本转换为数字形式。GPT-2 分词器是在大量数据上训练的。如果你想使用自己的分词器,比如一个已经训练好的分词器,你可以使用它。
但是我们也可以构建自己的分词器,所以让我们这样做。
要训练分词器,我们需要数据。由于我们正在构建一个英语 LLM,Hugging Face 充满了可用于训练分词器的 大规模数据集。
我将使用 allenai/c4 Hugging Face 数据集,该数据集是从网络抓取创建的。它非常大,但我将使用 200 万个英文行。你可以自己决定数量。所以,让我们开始吧。
### 以流模式加载数据集
ds = load_dataset("allenai/c4", "en", streaming=True)
### 要检索的行数
num_rows_to_get = 2000000
### 输出文件
output_file = "training_data.jsonl"
### 将数据写入文件
with open(output_file, "w", encoding="utf-8") as f:
for i, row in enumerate(iter(ds["train"])):
if i >= num_rows_to_get:
break
json.dump({"text": row["text"]}, f, ensure_ascii=False)
f.write("\n") # 确保每个 JSON 对象都在新的一行上
因此,我们将 200 万个 Hugging Face 数据转换为 JSONL 文件,因为这种格式易于处理。
让我们打印我们训练数据集的样本,看看它是什么样的。
#此代码仅用于显示目的,不需要运行训练
import json
def print_sample_from_jsonl(file_path, num_samples=3):
"""从 JSONL 文件中打印几个样本条目。"""
with open(file_path, "r", encoding="utf-8") as f:
# 迭代最多 num_samples 行
for _, line in zip(range(num_samples), f):
print(json.dumps(json.loads(line), indent=4, ensure_ascii=False))
假设你的训练数据在 ‘training_data.jsonl’ 中
print_sample_from_jsonl("training_data.jsonl")
OUTPUT
{ "text": "Beginners BBQ Class ..." }
{ "text": "Discussion in 'Mac ..." }
{ "text": "Foil plaid lycra an ..." }
这将从我们的 training_data.jsonl
文件中输出几行。每一行都是一个单独的 JSON 对象,包含一个 "text"
字段。
现在,让我们开始构建我们的分词器!我们将使用 tokenizers
库。我们将构建一个字节对编码 (BPE
) 分词器。
BPE
从单个字符开始,并迭代地合并最频繁的对,构建一个子词汇表。BPE
(字节对编码) 是最好的,因为它平衡了词汇量和效率,使其非常适合处理 NLP 任务中罕见和常见的单词。
首先,我们需要一种有效的方法从 JSONL
文件中加载我们的文本数据。我们将为此使用一个生成器。Generators
非常适合大型数据集,因为它们不会一次将所有内容加载到内存中。
def load_texts_from_jsonl(file_path):
"""
Generator function to read and yield text data from a JSONL file.
"""
with open(file_path, "r", encoding="utf-8") as file:
for line in file:
yield json.loads(line)["text"]
这个 load_texts_from_jsonl
函数获取文件路径,逐行读取它,提取 'text'
字段,并产生它。这使得它具有内存效率。
接下来,让我们初始化我们的 tokenizer
并设置 pre-tokenizer
:
### Initialize a new BPE tokenizer
tokenizer = Tokenizer(models.BPE())
### Set up the pre-tokenizer to handle splitting text into bytes
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False)
我们使用 BPE
模型创建一个 Tokenizer
对象。
pre_tokenizer
将输入文本分割成字节。add_prefix_space=False
很重要,它告诉分词器不要在序列的第一个单词之前添加空格。
现在,让我们定义我们的特殊标记并配置 trainer
:
### Define special tokens
special_tokens = ["<unk>", "<s>", "</s>"]
### Configure the BPE trainer
trainer = trainers.BpeTrainer(
vocab_size=6400, # We'll use a vocabulary size of 6400
special_tokens=special_tokens,
show_progress=True,
initial_alphabet=pre_tokenizers.ByteLevel.alphabet()
)
我们有三个特殊标记(我将在创建预训练数据集时告诉您它们为什么重要)。
<unk>
(用于未知单词)<s>
(序列开始)</s>
(序列结束)。
BpeTrainer
设置了我们想要的 vocab_size=6400
、special_tokens
和一个进度条 (show_progress=True
)。 initial_alphabet
确保所有单个字节都在起始词汇表中。
是时候训练 tokenizer
了。
### Load the text data using our generator
texts = load_texts_from_jsonl("training_data.jsonl") #your file path
### Train the tokenizer
tokenizer.train_from_iterator(texts, trainer=trainer)
tokenizer.train_from_iterator
是学习发生的地方。 tokenizer
分析我们的文本数据并找出要合并的最频繁的字节对。
在 Colab 免费层(CPU)上训练我的分词器需要 1.5 小时。
我们还需要设置 byte_level
解码器。
tokenizer.decoder = decoders.ByteLevel()
我们设置一个 ByteLevel
解码器,以确保当我们将标记 ID 解码回文本时,保留原始格式。
让我们确保我们的特殊标记具有正确的 IDs
:
### Ensure special tokens are assigned correctly
assert tokenizer.token_to_id("<unk>") == 0
assert tokenizer.token_to_id("<s>") == 1
assert tokenizer.token_to_id("</s>") == 2
这些断言检查 <unk>
是否为 0
,<s>
是否为 1
,</s>
是否为 2
。这对一致性至关重要。
现在,让我们保存我们训练好的 tokenizer
:
### Create the directory to save the tokenizer (if it doesn't exist)
tokenizer_save_dir = "my_custom_tokenizer"
os.makedirs(tokenizer_save_dir, exist_ok=True)
保存分词器模型和配置
tokenizer.save(os.path.join(tokenizer_save_dir, "tokenizer.json"))
tokenizer.model.save(tokenizer_save_dir)
我们创建一个目录 (my_custom_tokenizer
),并将 tokenizer
分两部分保存:tokenizer.json
(词汇表和合并规则)和模型文件。
最后,让我们创建一个 tokenizer_config.json
文件。此文件提供额外的配置详细信息,以便我们稍后可以使用 Hugging Face 的 AutoTokenizer
轻松加载 tokenizer
。
它就像我们 tokenizer
的设置文件。我们将逐步构建此配置。
首先,让我们设置一些基本选项:
config = {
"add_bos_token": False, # 不要自动添加序列开始标记
"add_eos_token": False, # 不要自动添加序列结束标记
"add_prefix_space": False, # 第一个单词前没有空格(我们已经设置了)
"bos_token": "<s>", # 定义序列开始标记
"eos_token": "</s>", # 定义序列结束标记
"unk_token": "<unk>", # 定义未知标记
"pad_token": "<unk>", # 使用 <unk> 进行填充(我们稍后将处理填充)
}
这些选项控制 tokenizer
如何处理特殊标记和空格。
我们明确地设置了序列开始 (bos_token
)、序列结束 (eos_token
)、未知 (unk_token
) 和填充 (pad_token
) 标记。
接下来,我们将添加一些更通用的设置:
config.update({
"model_max_length": 32768, # 模型可以处理的最大序列长度
"tokenizer_class": "PreTrainedTokenizerFast", # 指定分词器类
"clean_up_tokenization_spaces": False, # 不要清理多余的空格
"additional_special_tokens": [], # 目前没有额外的特殊标记
"spaces_between_special_tokens": False, # 不要添加特殊标记之间的空格
"sp_model_kwargs": {}, # 没有特殊的模型参数
})
我们设置了 最大序列长度
(model_max_length
),指定了 tokenizer
类 (PreTrainedTokenizerFast
以提高效率),并控制了空格的处理方式。
现在,让我们定义 special tokens
在 decoder
中的表示方式:
config["added_tokens_decoder"] = {
"0": {"content": "<unk>", "lstrip": False, "normalized": False, "rstrip": False, "single_word": False, "special": True},
"1": {"content": "<s>", "lstrip": False, "normalized": False, "rstrip": False, "single_word": False, "special": True},
"2": {"content": "</s>", "lstrip": False, "normalized": False, "rstrip": False, "single_word": False, "special": True}
}
这个 added_tokens_decoder
部分将特殊标记的标记 ID
(0, 1, 2
) 映射到它们的属性。我们告诉 decoder
这些标记是“特殊的”,不应该像普通单词一样对待(没有前导/尾随空格,没有规范化)。
最后,对于对话模型来说,这一点非常重要,我们将添加一个 chat template
:
config["chat_template"] = """
{% if messages[0]['role'] == 'system' %}
{% set system_message = messages[0]['content'] %}
{{ '<s>system\\n' + system_message + '</s>\\n' }}
{% else %}
{{ '<s>system\\nYou are a helpful AI assistant.</s>\\n' }}
{% endif %}
{% for message in messages %}
{% set content = message['content'] %}
{% if message['role'] == 'user' %}
{{ '<s>user\\n' + content + '</s>\\n<s>assistant\\n' }}
{% elif message['role'] == 'assistant' %}
{{ content + '</s>' + '\\n' }}
{% endif %}
{% endfor %}
"""
此 Jinja2
模板使用 <s>
和 </s>
标记来分隔 system
、user
和 assistant
消息,从而为 LLM
格式化聊天。它确保模型理解对话流程。
现在我们已经构建了我们的配置,让我们将其保存到一个文件中:
with open(os.path.join(tokenizer_save_dir, "tokenizer_config.json"), "w", encoding="utf-8") as config_file:
json.dump(config, config_file, ensure_ascii=False, indent=4)
print("Tokenizer training completed and saved successfully.")
我们将 config
字典保存为名为 tokenizer_config.json
的 JSON
文件,该文件位于我们的 tokenizer
目录中。indent=4
使文件具有可读性。
就是这样!我们已经训练了自定义的 BPE 分词器
在下一部分中,我们将从头开始构建我们的 Transformer 架构。
Transformer 概述
Transformer 是现代大型语言模型的核心。与顺序处理文本的循环网络(如 RNN 或 LSTM)不同,Transformer 使用称为 注意力 的机制同时处理输入的所有部分。
通过这种方式,我们可以实现更大的并行性和可扩展性,从而显着缩短训练时间,并能够处理文本中更长范围的依赖关系。
我们的 Transformer 模型将包含几个关键组件:
- 嵌入层:将标记转换为向量
- 位置编码:添加位置信息
- Transformer 块:层堆叠
- 多头注意力:侧重于不同的输入部分
- 前馈网络:处理每个标记
- RMSNorm:规范化层
- 输出层:生成标记概率
原始的 Transformer 架构图很难理解,所以我创建了一个简化的图:
简化的 Transformer(由 Fareed Khan 创建)
此图提供了一个高级概述。Transformer 将文本转换为标记,嵌入它们,并添加位置编码。然后,它通过堆叠的自注意力层来捕获上下文。
输出被归一化,通过线性层进行转换,并通过 softmax 处理以生成下一个标记或类的概率。
让我们逐个构建它的每个组件。
RMSNorm
RMSNORM 工作流程(由 Fareed Khan 创建)
RMSNorm(均方根归一化)是一种层归一化。归一化技术在深度学习中至关重要,可以稳定训练并提高性能。
RMSNorm 通过将每一层的激活值除以激活值的均方根来归一化。添加一个小的常数 epsilon (ε)
,确保数值稳定性(避免除以零)。
这有助于防止激活值变得过大或过小,使训练过程更平滑。它比传统的 LayerNorm 更简单、更快,因为它省略了均值居中步骤。
让我们创建我们的 RMSNorm
类:
class RMSNorm(torch.nn.Module):
def __init__(self, dim: int, eps: float):
super().__init__()
self.eps = eps # Small constant for numerical stability
self.weight = nn.Parameter(torch.ones(dim)) # Learnable scaling parameter
def forward(self, x):
# Calculate the root mean square (RMS) and normalize
return self.weight * (x.float() * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)).type_as(x)
__init__
方法初始化 epsilon
值和一个 可学习的权重
参数(初始化为全 1)。
forward
方法执行归一化:
- 它计算输入
x
的均方根
。 - 添加
epsilon
以确保数值稳定性。 - 取
平方根的倒数
。 - 乘以
可学习的权重
。 - 结果被转换回
x
的原始数据类型。
位置编码
由于 Transformer 一次处理整个输入序列,因此它本身并不知道每个 token 的位置。
我们需要显式地添加此位置信息。这就是位置编码的作用。我们正在使用旋转位置嵌入。
位置编码工作流程(由 Fareed Khan 创建)
旋转位置嵌入 (RoPE) 使用高维空间中的旋转来表示 token 位置。
核心思想是将每个 token 的位置编码为一系列频率。然后,这些频率用于创建复数值嵌入,该嵌入以依赖于其位置的方式旋转 token 的表示。
precompute_pos_cis
函数计算这些嵌入;它是一个辅助函数。
def precompute_pos_cis(dim: int, end: int = int(32 * 1024), theta: float = 1e6):
"""Pre-computes the complex exponentials (cis) for rotary embeddings."""
freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
t = torch.arange(end, device=freqs.device) # Sequence positions
freqs = torch.outer(t, freqs).float() # Outer product of positions and frequencies
pos_cis = torch.polar(torch.ones_like(freqs), freqs) # Convert to complex exponentials
return pos_cis
首先,它根据 维度 (dim)
和缩放因子 (theta
) 计算一组 频率
。
然后,它创建一个 位置 (t)
序列,并计算 t
和 频率
的 外积
。
最后,它使用 torch.polar
将这些值转换为 复指数
,从而得到 pos_cis
张量。
def apply_rotary_emb(xq, xk, pos_cis):
"""Apply rotary embeddings to query (xq) and key (xk) tensors."""
def unite_shape(pos_cis, x):
# Ensure pos_cis has the correct shape and reshape it to match x
assert pos_cis.shape == (x.shape[1], x.shape[-1])
shape = [d if i == 1 or i == x.ndim - 1 else 1 for i, d in enumerate(x.shape)]
return pos_cis.view(*shape)
## Convert to complex for rotation
xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))
## Align pos_cis shape
pos_cis = unite_shape(pos_cis, xq_)
## Apply rotation and convert back
xq_out = torch.view_as_real(xq_ * pos_cis).flatten(3)
xk_out = torch.view_as_real(xk_ * pos_cis).flatten(3)
return xq_out.type_as(xq), xk_out.type_as(xk)
apply_rotary_emb
函数将计算出的 旋转嵌入
应用于注意力机制中的查询 (xq
) 和键 (xk
),从而编码位置信息。unite_shape
重新调整嵌入的形状以实现兼容性。
xq
和 xk
被转换为 复数
,乘以嵌入,然后转换回,保留原始数据类型。
注意力
注意力块(由 Fareed Khan 创建)
注意力
是 Transformer
的关键机制。它允许模型在处理每个 token 时权衡输入序列不同部分的重要性。
我们的实现使用 多头注意力
,这意味着我们在并行(使用不同的学习权重)中多次执行 注意力机制
,允许模型捕获 token 之间关系的不同方面。
我们还结合了 旋转位置嵌入 (RoPE)
来编码位置信息。
键值缓存
用于更快的 自回归解码
(例如在文本生成中)。
class Attention(nn.Module):
def __init__(self, args):
super().__init__()
# Define number of key-value heads (defaults to n_heads if not specified)
self.n_kv_heads = args.n_heads if args.n_kv_heads is None else args.n_kv_heads
assert args.n_heads % self.n_kv_heads == 0 # Ensure heads divide evenly
self.n_local_heads = args.n_heads
self.n_local_kv_heads = self.n_kv_heads
self.n_rep = self.n_local_heads // self.n_local_kv_heads # Replication factor
self.head_dim = args.dim // args.n_heads # Dimension per head
## Linear projections for query, key, and value
self.wq = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)
self.wk = nn.Linear(args.dim, self.n_kv_heads * self.head_dim, bias=False)
self.wv = nn.Linear(args.dim, self.n_kv_heads * self.head_dim, bias=False)
self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False)
## Dropout layers
self.attn_dropout = nn.Dropout(args.dropout)
self.resid_dropout = nn.Dropout(args.dropout)
self.dropout = args.dropout
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Optional, Tuple
def apply_rotary_emb(xq, xk, cos, sin):
"""将旋转嵌入应用于查询和键张量。
Args:
xq: 查询张量。
xk: 键张量。
cos: 旋转嵌入的余弦值。
sin: 旋转嵌入的正弦值。
Returns:
旋转后的查询和键张量。
"""
# 提取维度
bsz, seqlen, n_head, head_dim = xq.shape
# 应用旋转嵌入
xq = xq.reshape(bsz, seqlen, n_head, -1, 2).transpose(-2, -1)
xk = xk.reshape(bsz, seqlen, n_head, -1, 2).transpose(-2, -1)
xq = apply_rotary_emb_torch(xq, cos, sin)
xk = apply_rotary_emb_torch(xk, cos, sin)
xq = xq.transpose(-2, -1).reshape(bsz, seqlen, n_head, -1)
xk = xk.transpose(-2, -1).reshape(bsz, seqlen, n_head, -1)
return xq, xk
def apply_rotary_emb_torch(x, cos, sin):
"""使用 PyTorch 应用旋转嵌入。
Args:
x: 输入张量。
cos: 余弦值。
sin: 正弦值。
Returns:
旋转后的张量。
"""
# 提取维度
x1, x2 = x[..., 0, :], x[..., 1, :]
o1 = x1 * cos - x2 * sin
o2 = x2 * cos + x1 * sin
return torch.stack([o1, o2], dim=-2)
class Attention(nn.Module):
def __init__(self, args):
super().__init__()
self.n_local_heads = args.n_local_heads
self.n_local_kv_heads = args.n_local_kv_heads
self.n_rep = self.n_local_heads // self.n_local_kv_heads
self.head_dim = args.head_dim
self.wq = nn.Linear(args.dim, args.n_local_heads * args.head_dim, bias=False)
self.wk = nn.Linear(args.dim, args.n_local_kv_heads * args.head_dim, bias=False)
self.wv = nn.Linear(args.dim, args.n_local_kv_heads * args.head_dim, bias=False)
self.wo = nn.Linear(args.n_local_heads * args.head_dim, args.dim, bias=False)
self.attn_dropout = nn.Dropout(args.dropout)
self.resid_dropout = nn.Dropout(args.dropout)
self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention') and args.flash_attn
# 创建因果掩码以防止关注未来标记
mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float("-inf"))
mask = torch.triu(mask, diagonal=1) # 上三角掩码
self.register_buffer("mask", mask, persistent=False)
def forward(self,
x: torch.Tensor,
pos_cis: torch.Tensor,
past_key_value: Optional[Tuple[torch.Tensor, torch.Tensor]] = None,
use_cache=False):
# 获取批量大小、序列长度和隐藏维度
bsz, seq_len, _ = x.shape
# 计算查询、键和值投影
xq, xk, xv = self.wq(x), self.wk(x), self.wv(x)
# 重塑为 (batch, seq_len, num_heads, head_dim)
xq = xq.view(bsz, seq_len, self.n_local_heads, self.head_dim)
xk = xk.view(bsz, seq_len, self.n_local_kv_heads, self.head_dim)
xv = xv.view(bsz, seq_len, self.n_local_kv_heads, self.head_dim)
# 应用旋转位置嵌入
xq, xk = apply_rotary_emb(xq, xk, pos_cis)
## Key-Value 缓存实现
if past_key_value is not None:
# 沿序列维度将过去的键/值状态与当前状态连接起来
xk = torch.cat([past_key_value[0], xk], dim=1)
xv = torch.cat([past_key_value[1], xv], dim=1)
## 如果启用了缓存,则存储更新后的键值对
past_kv = (xk, xv) if use_cache else None
## 转置为 (batch, heads, seq_len, head_dim)
xq, xk, xv = (
xq.transpose(1, 2),
repeat_kv(xk, self.n_rep).transpose(1, 2), # 重复 KV 头部并转置
repeat_kv(xv, self.n_rep).transpose(1, 2) # 重复 KV 头部并转置
)
if self.flash and seq_len != 1:
# 如果可用且未处理单个标记,则使用 Flash Attention 以提高效率
dropout_p = self.dropout if self.training else 0.0
output = F.scaled_dot_product_attention(
xq, xk, xv,
attn_mask=None, # 因果掩码由 `is_causal=True` 处理
dropout_p=dropout_p,
is_causal=True
)
else:
# 手动计算缩放点积注意力
scores = (xq @ xk.transpose(-2, -1)) / math.sqrt(self.head_dim) # 按 head_dim 缩放
scores += self.mask[:, :, :seq_len, :seq_len] # 应用因果掩码
scores = F.softmax(scores.float(), dim=-1).type_as(xq) # 归一化分数
scores = self.attn_dropout(scores) # 应用 dropout
output = scores @ xv # 计算值的加权和
## 将输出重塑回 (batch, seq_len, hidden_dim)
output = output.transpose(1, 2).reshape(bsz, seq_len, -1)
## 应用输出投影和残差 dropout
output = self.resid_dropout(self.wo(output))
## 返回最终输出和过去的键值缓存(如果启用了缓存)
return output, past_kv
def repeat_kv(x: torch.Tensor, n_rep: int) -> torch.Tensor:
"""沿头部维度重复键值头 `n_rep` 次。"""
bs, slen, n_kv_heads, head_dim = x.shape # 提取张量维度
if n_rep == 1:
return x # 如果 n_rep 为 1,则不需要重复
# 沿头部维度扩展和重复,然后重塑
return (
x[:, :, :, None, :] # 添加一个新的轴用于重复
.expand(bs, slen, n_kv_heads, n_rep, head_dim) # 重复 n_rep 次
.reshape(bs, slen, n_kv_heads * n_rep, head_dim) # 展平重复的头
)
FeedForward
FeedForward
网络是一个简单的全连接网络,在 attention mechanism
之后独立地应用于每个 token 的表示。它提供了额外的 non-linearity
,并允许模型学习更复杂的表示。
我们的实现使用 SiLU
(Sigmoid Linear Unit) 激活函数。
FFN
用于正则化的 Dropout 层
class FeedForward(nn.Module):
def __init__(self, config: LMConfig):
super().__init__()
# Initialize the linear layers
self.w1 = nn.Linear(config.dim, config.hidden_dim, bias=False)
self.w2 = nn.Linear(config.hidden_dim, config.dim, bias=False)
self.w3 = nn.Linear(config.dim, config.hidden_dim, bias=False)
self.dropout = nn.Dropout(config.dropout)
def forward(self, x):
# Apply the feedforward layers with SILU activation and dropout
return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x)))
__init__
方法初始化了三个 linear layers
:w1
、w2
和 w3
。
w1
和w3
将输入投影到隐藏维度,w2
将其投影回原始维度。hidden dimension
的计算如上所述。- 同时初始化了一个
dropout
层。
forward
方法应用 w1
、SiLU activation
、w3
,将结果相乘,应用 w2
,最后应用 dropout
。
Transformer Block
Transformer Block 工作流程(由 Fareed Khan 创建)
Transformer Block 是 Transformer 模型的基本构建块。它结合了多头注意力 (Multi-Head Attention) 和前馈 (FeedForward) 组件,以及残差连接和层归一化。
残差连接(将输入添加到子层的输出)通过缓解梯度消失问题来帮助训练深度网络。
class TransformerBlock(nn.Module):
def __init__(self, layer_id: int, config: LMConfig):
super().__init__()
# Set model configuration parameters
self.n_heads = config.n_heads
self.dim = config.dim
self.head_dim = config.dim // config.n_heads
# Initialize attention and feed-forward components
self.attention = Attention(config)
# Layer normalization for attention and feed-forward outputs
self.layer_id = layer_id
self.attention_norm = RMSNorm(config.dim, eps=config.norm_eps)
self.ffn_norm = RMSNorm(config.dim, eps=config.norm_eps)
# Feed-forward neural network
self.feed_forward = FeedForward(config)
def forward(self, x, pos_cis, past_key_value=None, use_cache=False):
# Apply attention with normalization and residual connection
h_attn, past_kv = self.attention(
self.attention_norm(x), # Normalize input before attention
pos_cis,
past_key_value=past_key_value,
use_cache=use_cache
)
# Add attention output to the input (residual connection)
h = x + h_attn
# Apply feed-forward network with normalization and residual connection
out = h + self.feed_forward(self.ffn_norm(h))
return out, past_kv
__init__
方法初始化了 Attention
和 FeedForward
模块,以及用于注意力和前馈组件的 RMSNorm
层。
forward
方法:
- 首先应用
attention norm
和multi-head self-attention
。 - 之后,应用
residual connection
。 - 然后,应用
FFN norm
和feed-forward network
,然后是另一个residual connection
。
forward
方法实现了以下序列:
Attention Norm -> Attention -> Residual Connection -> FFN Norm -> FeedForward -> Residual Connection
。
该方法返回 result
和 key-value cache
。
组合 Transformer 组件
让我们将我们讨论过的所有组件组装成一个完整的 Transformer 模型。
我们将首先定义模型的配置,将所有超参数存储在一个字典中。
### Configuration for our Transformer model
model_config = {
"vocab_size": 6400, # Size of the vocabulary
"dim": 512, # Dimensionality of the embeddings and hidden states
"n_heads": 8, # Number of attention heads
"n_kv_heads": 2, # Number of key-value heads (as specified in the LMConfig)
"norm_eps": 1e-5, # Epsilon for RMSNorm
"dropout": 0.0, # Dropout probability
"max_seq_len": 1024, # Maximum sequence length
"rope_theta": 10000.0, # Theta parameter for RoPE
"multiple_of": 64, # Used for hidden dimension calculation in FFN
"hidden_dim": None, # Hidden dimension of the FFN (calculated if None)
"n_layers": 8, # Number of Transformer blocks
"flash_attn": True, # Use flash attention if available
}
这个字典 model_config
包含了所有定义我们的 Transformer
结构和行为的超参数。使用这样的字典很方便,因为它使得在一个地方更改设置变得容易。
现在,让我们定义我们的主 Transformer
模型类。
class MyTransformer(PreTrainedModel):
config_class = dict # We'll use a dictionary for configuration
def __init__(self, config: dict):
super().__init__(config) # Initialize PreTrainedModel
self.config = config # store config
self.vocab_size, self.n_layers = config["vocab_size"], config["n_layers"]
# Token embeddings
self.tok_embeddings = nn.Embedding(config["vocab_size"], config["dim"])
# Dropout layer
self.dropout = nn.Dropout(config["dropout"])
# Transformer blocks
self.layers = nn.ModuleList([TransformerBlock(l, config) for l in range(self.n_layers)])
# Final normalization layer
self.norm = RMSNorm(config["dim"], eps=config["norm_eps"])
# Output layer (linear projection to vocabulary size)
self.output = nn.Linear(config["dim"], config["vocab_size"], bias=False)
# Tie the weights of the embedding and output layers
self.tok_embeddings.weight = self.output.weight
# Precompute rotary positional embeddings
self.register_buffer(
"pos_cis",
precompute_pos_cis(dim=config["dim"] // config["n_heads"], theta=config["rope_theta"]),
persistent=False,
)
self.OUT = CausalLMOutputWithPast()
__init__
方法使用以下组件初始化模型:
- 嵌入层:
self.tok_embeddings
将 token ID 转换为嵌入。 - Dropout:
self.dropout
有助于防止过拟合。 - Transformer Blocks:
self.layers
包含一堆 Transformer block。 - 归一化:
self.norm
在输出之前应用 RMSNorm。 - 输出层:
self.output
将最终隐藏状态投影到词汇空间。 - 权重绑定:嵌入层和输出层的权重被绑定以获得更好的性能。
- 位置嵌入:
self.pos_cis
存储旋转位置嵌入。 - 输出:返回
CausalLMOutputWithPast
用于 logits、损失和缓存。
现在,让我们定义 forward 方法,它执行通过模型的完整前向传递:
def forward(self,
input_ids: Optional[torch.Tensor] = None,
past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None,
use_cache: bool = False,
**args):
past_key_values = past_key_values or [None] * len(self.layers)
start_pos = args.get('start_pos', 0)
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
import json
from tqdm import tqdm
class RMSNorm(nn.Module):
def __init__(self, dim: int, eps: float = 1e-6):
super().__init__()
self.scale = dim ** 0.5
self.eps = eps
self.g = nn.Parameter(torch.ones(dim))
def _norm(self, x):
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
def forward(self, x):
return self.g * self._norm(x)
class Attention(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.n_heads = config.n_heads
self.head_dim = config.n_embd // config.n_heads
self.wq = nn.Linear(config.n_embd, config.n_embd, bias=False)
self.wk = nn.Linear(config.n_embd, config.n_embd, bias=False)
self.wv = nn.Linear(config.n_embd, config.n_embd, bias=False)
self.wo = nn.Linear(config.n_embd, config.n_embd, bias=False)
self.attn_dropout = nn.Dropout(config.attn_pdrop)
self.resid_dropout = nn.Dropout(config.resid_pdrop)
def _split_heads(self, x: torch.Tensor) -> torch.Tensor:
"""
Split the tensor to heads.
x: (B, T, C)
return: (B, nh, T, hs)
"""
B, T, C = x.shape
return x.view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
def _merge_heads(self, x: torch.Tensor) -> torch.Tensor:
"""
Merge the heads.
x: (B, nh, T, hs)
return: (B, T, C)
"""
B, nh, T, hs = x.shape
return x.transpose(1, 2).contiguous().view(B, T, C)
def forward(self, x, past_key_value=None, attention_mask=None, layer_past=None):
B, T, C = x.shape
q = self.wq(x)
k = self.wk(x)
v = self.wv(x)
q = self._split_heads(q) # (B, nh, T, hs)
k = self._split_heads(k) # (B, nh, T, hs)
v = self._split_heads(v) # (B, nh, T, hs)
if layer_past is not None:
k = torch.cat((layer_past[0], k), dim=-2)
v = torch.cat((layer_past[1], v), dim=-2)
layer_past = (k, v)
att = q @ k.transpose(-2, -1) # (B, nh, T, T)
att = att / (self.head_dim ** 0.5)
if attention_mask is not None:
att = att + attention_mask # (B, nh, T, T)
att = F.softmax(att, dim=-1) # (B, nh, T, T)
att = self.attn_dropout(att)
y = att @ v # (B, nh, T, hs)
y = self._merge_heads(y) # (B, T, C)
y = self.wo(y) # (B, T, C)
y = self.resid_dropout(y)
return y, layer_past
class MLP(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.c_fc = nn.Linear(config.n_embd, config.n_inner, bias=False)
self.c_proj = nn.Linear(config.n_inner, config.n_embd, bias=False)
self.dropout = nn.Dropout(config.resid_pdrop)
def forward(self, x):
x = self.c_fc(x)
x = F.gelu(x)
x = self.c_proj(x)
x = self.dropout(x)
return x
class Block(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.ln_1 = RMSNorm(config.n_embd, eps=config.layer_norm_epsilon)
self.attn = Attention(config)
self.ln_2 = RMSNorm(config.n_embd, eps=config.layer_norm_epsilon)
self.mlp = MLP(config)
def forward(self, x, pos_cis, past_key_value=None, use_cache=False, attention_mask=None):
residual = x
x = self.ln_1(x)
attn_outputs, layer_past = self.attn(x, past_key_value=past_key_value, attention_mask=attention_mask)
x = residual + attn_outputs
residual = x
x = self.ln_2(x)
x = residual + self.mlp(x)
return x, layer_past
class MOEFeedForward(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.gate = nn.Linear(config.n_embd, config.n_expert)
self.experts = nn.ModuleList([MLP(config) for _ in range(config.n_expert)])
self.dropout = nn.Dropout(config.resid_pdrop)
self.aux_loss = 0
def forward(self, x):
B, T, C = x.shape
gates = self.gate(x) # (B, T, n_expert)
gate_logits = gates.float()
gate_probs = F.softmax(gate_logits, dim=-1)
self.aux_loss = gate_probs.mean() * self.config.n_expert
gate_indices = torch.argmax(gate_probs, dim=-1) # (B, T)
y = torch.zeros_like(x)
for i in range(self.config.n_expert):
expert_mask = (gate_indices == i).unsqueeze(-1).expand_as(x)
expert_output = self.experts[i](x)
y = torch.where(expert_mask, expert_output, y)
y = self.dropout(y)
return y
class CausalLMOutputWithPast(dict):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
self[name] = value
def __delattr__(self, name):
try:
del self[name]
except KeyError:
raise AttributeError(name)
class GPT2LMHeadModel(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.wte = nn.Embedding(config.vocab_size, config.n_embd)
self.wpe = nn.Embedding(config.max_position_embeddings, config.n_embd)
self.drop = nn.Dropout(config.embd_pdrop)
self.h = nn.ModuleList([Block(config) for _ in range(config.n_layer)])
self.ln_f = RMSNorm(config.n_embd, eps=config.layer_
```python
## 准备输入 (X) 和目标 (Y) 序列
X = torch.tensor(input_ids[:-1], dtype=torch.long) # 排除最后一个token作为X
Y = torch.tensor(input_ids[1:], dtype=torch.long) # 排除第一个token作为Y
loss_mask = torch.tensor(loss_mask[1:], dtype=torch.long) # 调整损失掩码
return X, Y, loss_mask # 返回 X, Y 和损失掩码
在我们的 pretraining
类中,我们将 maximum length
设置为 512,因为训练期间的每个输入只包含 512 个字符。
正如我之前提到的,我们还用 <s>training_input</s>
包装了我们的文本。这很重要,因为我们训练好的 tokenizer
知道开始和结束的 token 是什么。用这些标签包装每个 training data input
将帮助模型学习从哪里开始和结束,使用这些标签作为指示符。
现在,是时候编写训练循环的代码了。
我们已经创建了我们的预训练数据集类,现在我们拥有了一切,让我们继续。
预训练循环 (29M LLM)
这是我们的模型实际从数据中学习的部分。基本思想是:
- 从我们的 PretrainDataset 加载批量数据。
- 将输入序列 (X) 馈送到模型。
- 计算模型的预测和目标序列 (Y) 之间的损失。
- 更新模型的权重以最小化损失。
首先,让我们定义一个辅助函数来计算 learning_rate
。我们将使用 cosine learning rate schedule
,它在训练期间逐渐降低 learning_rate
。这通常比使用固定的 learning_rate
产生更好的结果。
def get_lr(current_step, total_steps, lr):
"""使用余弦曲线计算学习率。"""
return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
此函数接受当前的 training_step
、总步数 steps
和初始 learning_rate (lr)
。它使用 cosine
函数和一个小的常数 learning_rate
来计算当前步的 learning_rate
。
现在,让我们设置我们的 model
、optimizer
和 data_loader
:
### 实例化模型
model = MyTransformer(model_config)
model = model.to("cuda") # 如果有 GPU,将模型移动到 GPU
### 加载分词器
tokenizer = AutoTokenizer.from_pretrained("my_custom_tokenizer")
### 创建训练数据集
train_ds = PretrainDataset("petrain_data.jsonl", tokenizer, max_length=model_config["max_seq_len"])
### 创建数据加载器
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, drop_last=False)
### 定义优化器(AdamW 是一个不错的选择)
optimizer = optim.AdamW(model.parameters(), lr=5e-4)
我们实例化了我们的 MyTransformer
模型,加载了我们训练好的 tokenizer
,创建了一个 PretrainDataset
实例,并设置了一个 DataLoader
。
DataLoader
将为我们处理数据的洗牌和批处理。我们还定义了我们的 optimizer
,AdamW
,它是 Adam 的一种变体,通常更适合训练 Transformers。我们将我们的 model
移到 GPU
以加快我们的训练速度。
接下来,我们定义我们的 loss_function
。由于我们正在训练一个 language_model
来预测序列中的下一个 token
,我们将使用 cross-entropy loss
:
### 定义损失函数(交叉熵)
loss_fct = nn.CrossEntropyLoss(reduction='none')
CrossEntropyLoss
结合了 softmax 激活
和 负对数似然损失
。我们将 reduction='none'
设置为,因为我们想分别计算每个 token 的 loss
(我们将自己处理平均)。
现在,主要的 training loop
。
epochs = 1 # 您可以调整 epoch 的数量
iter_per_epoch = len(train_loader) #有多少个批次
accumulation_steps = 8 #用于梯度累积
grad_clip = 1.0 #用于梯度裁剪
for epoch in range(epochs):
start_time = time.time()
for step, (X, Y, loss_mask) in enumerate(train_loader):
X = X.to("cuda")
Y = Y.to("cuda")
loss_mask = loss_mask.to("cuda")
## 计算当前步的学习率
lr = get_lr(epoch * iter_per_epoch + step, epochs * iter_per_epoch, 5e-4)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
## 前向传播
res = model(X)
## 计算损失
loss = loss_fct(
res.logits.view(-1, res.logits.size(-1)),
Y.view(-1)
).view(Y.size())
loss = (loss * loss_mask).sum() / loss_mask.sum()
loss = loss/accumulation_steps # 除以累积步数
## 反向传播
loss.backward()
if (step + 1) % accumulation_steps == 0:
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
# 更新模型的参数
optimizer.step()
# 重置梯度
optimizer.zero_grad()
if step % 100 == 0:
spend_time = time.time() - start_time
print(
'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
epoch + 1,
epochs,
step,
iter_per_epoch,
loss.item() * accumulation_steps,
optimizer.param_groups[-1]['lr'],
spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
让我们分解一下:
- Epochs & Batches: 循环遍历数据集的批次。
- Data to GPU: 将数据移动到 GPU 以加快处理速度。
- Forward Pass & Loss: 获取模型预测并计算损失。
- Backward Pass & Gradients: 计算梯度并累积它们。
- Update & Clipping: 裁剪梯度并更新模型权重。
- Zero Gradients & Logging: 重置梯度并打印指标。
运行代码后,它将开始打印训练损失等。
...
Epoch:[2/10](30/100) loss:0.256 lr:0.000012345679 epoch_Time:13min
...
最后,让我们添加代码来保存我们训练好的模型:
model.eval() # 设置为评估模式
torch.save(model.state_dict(), "pretrain_model.pth") # 保存状态字典
model.train()# 设置回训练模式。
一旦训练完成。
我们将拥有一个具有 26-3000 万个参数的预训练模型。
您绝对可以通过修改配置设置来增加参数数量。
PyTorch 模型到 SafeTensors
现在我们已经训练了我们的 model
,让我们将其转换为 safetensors
格式。
safetensors
格式是存储 tensors
的 PyTorch
默认 基于 pickle
格式的一种更安全、更快的替代方案。
首先,让我们定义一个执行转换的函数。
def convert_and_save_model(torch_path, transformers_path):
"""将 PyTorch 模型转换为 Transformers 格式并保存它。"""
## 加载 PyTorch 模型的 state_dict
state_dict = torch.load(torch_path, map_location="cpu")
## 创建你的模型的实例 (MyTransformer)
model = MyTransformer(model_config)
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
from tqdm import tqdm
device = "cuda" if torch.cuda.is_available() else "cpu"
def export_tokenizer(transformers_path):
"""
Exports the tokenizer to the specified path.
"""
tokenizer = AutoTokenizer.from_pretrained(transformers_path, trust_remote_code=True)
tokenizer.save_pretrained(transformers_path)
print(f"Tokenizer exported to: {transformers_path}")
def convert_and_save_model(model_path, transformers_path):
"""
Converts and saves a custom model to the Hugging Face Transformers format.
Args:
model_path (str): Path to the pre-trained model file.
transformers_path (str): Path to save the model in Transformers format.
"""
# Load the state dictionary
state_dict = torch.load(model_path, map_location=device)
# Instantiate the model
from model import MyTransformer # Assuming MyTransformer is defined in model.py
model = MyTransformer()
model.to(device)
# Load the state_dict into the model
model.load_state_dict(state_dict, strict=False)
# Print the number of parameters
model_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'Model parameters: {model_params / 1e6:.3f} Million')
# Save the model in Transformers format
model.save_pretrained(transformers_path, safe_serialization=False) # set safe_serialization=True to save in safetensors format
# Export the tokenizer
export_tokenizer(transformers_path)
print(f"Model saved in Transformers format to: {transformers_path}")
它使用 torch.load
加载状态字典,实例化 MyTransformer
模型,加载权重(strict=False
以获得灵活性),并打印总参数。 该模型使用 safe_serialization=False
保存。
现在,让我们调用此函数来执行转换:
### Convert and save the model
convert_and_save_model("pretrain_model.pth", "my_transformer_hf")
这将创建一个名为 my_transformer_hf
的目录,其中包含 model
文件,准备与 Hugging Face transformers
库一起使用。
推理预训练 LLM
让我们对我们的模型进行推理,看看它根据我们的输入如何响应。 但是,不要期望太多,它只是一个预训练模型。
### Load model and tokenizer
model, tokenizer = (AutoModelForCausalLM.from_pretrained("MiniMind2", trust_remote_code=True).to(device).eval(),
AutoTokenizer.from_pretrained("MiniMind2", use_fast=False, trust_remote_code=True))
### Generate response from model
def generate_response(prompt, max_new_tokens=512, temperature=0.85, top_p=0.9):
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
output_ids = model.generate(input_ids, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p,
eos_token_id=tokenizer.eos_token_id)
return tokenizer.decode(output_ids[0], skip_special_tokens=True)
让我们尝试不同的输入,看看模型如何响应。
user input: Hi
LLM: I'm sorry, but you haven't provided the information about
the specific data or issues you're using.
user input: how are you?
LLM: Yes, I'm a DAY GPA. However, I can help you with that.
user input: what is 2+3=?
LLM: When you're looking for a 2+3=)
您可以看到 LLM
响应包含适当的单词和更有意义的较短上下文。 但是,我们希望 LLM
的响应更像 ChatGPT
。
例如,当我们问 What is 2 + 3?
时,它应该正确地回答 5
。
SFT 数据集预处理
为了让我们的 LLM 具有知识感知能力,我们需要一个高质量的对话格式数据集。 您可以选择任何数据集,但它应该遵循这种对话风格:
{
"conversations": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hello!"},
{"role": "user", "content": "Goodbye"},
{"role": "assistant", "content": "Goodbye!"}
]
}
我正在使用 Google 提供的 GooAQ 数据集,其中包含 500 万个问题-答案对。
让我们下载数据并检查它的一个样本。
!wget -O gooaq.jsonl "https://github.com/allenai/gooaq/raw/refs/heads/main/data/gooaq.jsonl"
file_path = "gooaq.jsonl" # Replace with your file path
data = []
### Open file and read line by line with a progress bar
with open(file_path, 'r', encoding='utf-8') as f:
for line in tqdm(f, desc="Loading JSONL"):
try:
data.append(json.loads(line)) # Parse JSON and add to list
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}") # Handle decoding errors
### Print sample
print(data[0])
#### OUTPUT ###
{'id': 1,
'question': 'Who is the president of U.S?',
'short_answer': "Trump",
'answer': None,
'answer_type': 'unknown',
'answer_url': None
}
该格式非常容易理解,但我们需要首先将其转换为对话格式。 让我们这样做。
### Process the data
converted_data = []
for item in tqdm(data_list, desc="Processing Data"):
question = item.get("question")
answer = item.get("answer") or item.get("short_answer") # Use short_answer if answer is missing
if question and answer: # Skip if both are empty
conversation = {
"conversations": [
{"role": "user", "content": question},
{"role": "assistant", "content": answer}
]
}
converted_data.append(conversation)
保存到 JSONL 文件
output_file = "sft_data.jsonl"
with open(output_file, "w", encoding="utf-8") as f:
for entry in tqdm(converted_data, desc="Writing to File"):
f.write(json.dumps(entry, ensure_ascii=False) + "\n") # Write each entry as a JSON line
print(f"Converted data saved to {output_file}")
我们正在检查长答案是否不存在,如果不存在,则使用短答案。然后将训练数据保存为 JSONL
格式,这与我们之前使用的格式相同。
我们需要创建一个 SFTDataset
类来处理此数据集,类似于我们创建预训练数据集类的方式。让我们先这样做,并检查它的工作原理。
这个类的目标是获取我们的会话数据(JSONL 格式),使用聊天模板(还记得我们在分词器配置中定义的那一个吗?)正确地格式化它,并将其转换为我们的模型可以理解的数值输入。
首先,让我们定义 class
的基本结构和 __init__
方法:
class SFTDataset(Dataset):
def __init__(self, jsonl_path, tokenizer, max_length=1024):
super().__init__()
self.tokenizer = tokenizer
self.max_length = max_length
self.samples = self.load_data(jsonl_path)
self.bos_id = tokenizer('<s>assistant\n', add_special_tokens=False).input_ids
self.eos_id = tokenizer('</s>\n', add_special_tokens=False).input_ids
在 __init__
中:
- 存储
tokenizer
和max_length
。 - 将数据从
jsonl_path
加载到self.samples
中。 - 存储序列的开始和结束 ID。
现在,让我们定义 load_data 方法:
此方法只是逐行读取 JSONL 文件
,将每一行解析为 JSON 对象
,并将其添加到 samples
列表中。
def __len__(self):
return len(self.samples)
__len__
方法返回 samples
的长度。
现在,最重要的是:_create_chat_prompt
方法。在这里,我们使用来自 tokenizer
的 chat_template
来正确地格式化对话。
def _create_chat_prompt(self, conversations):
"""Builds a dialogue in ChatML format."""
messages = []
for i, turn in enumerate(conversations):
role = 'user' if i % 2 == 0 else 'assistant'
messages.append({"role": role, "content": turn['content']})
return self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=False
)
此方法处理一系列对话回合(例如,[{"role": "user", "content": "Hi"}, {"role": "assistant", "content": "Hello"}]
),并:
- 遍历回合。
- 将它们格式化为结构化列表,确保角色在“user”和“assistant”之间交替。
- 使用
self.tokenizer.apply_chat_template
应用预定义的聊天模板,插入特殊标记(例如,<s>
和</s>
)以分隔回合和角色。 - 设置
tokenize=False
以返回格式化的字符串而不是标记 ID。
现在,让我们创建 _generate_loss_mask 方法。
def _generate_loss_mask(self, input_ids):
loss_mask = [0] * len(input_ids) # Initialize loss mask with zeros
i = 0
while i < len(input_ids):
if input_ids[i:i + len(self.bos_id)] == self.bos_id: # Check for BOS token
start = i + len(self.bos_id)
end = start
while end < len(input_ids):
if input_ids[end:end + len(self.eos_id)] == self.eos_id: # Find EOS token
break
end += 1
# Mark tokens after BOS until EOS for loss calculation
for j in range(start + 1, min(end + len(self.eos_id) + 1, self.max_length)):
loss_mask[j] = 1
i = end + len(self.eos_id) if end < len(input_ids) else len(input_ids) # Move index past EOS
else:
i += 1
return loss_mask # Return the generated loss mask
在此函数中,我们创建一个 mask
来指示在 损失计算
期间应考虑哪些 标记
。目标是仅对 助手
的响应计算 损失
,而不是对 用户的提示
或 特殊标记
计算损失。
它迭代以找到 assistant
和 eos
标记,然后将 1s
分配给 assistant
和 eos
标记,将 0s
分配给其他标记。
最后,__getitem__
方法,当从 dataset
访问一个项目时(如 dataset[i]
),将调用该方法:
def __getitem__(self, index):
sample = self.samples[index] # Get the sample at the given index
prompt = self._create_chat_prompt(sample['conversations']) # Generate chat prompt
encoding = self.tokenizer(prompt, max_length=self.max_length, truncation=True, return_tensors="pt")
input_ids = encoding.input_ids.squeeze() # Extract tokenized input IDs
loss_mask = self._generate_loss_mask(input_ids.tolist()) # Generate loss mask
# Append EOS token and update loss mask
input_ids = input_ids.tolist() + [self.tokenizer.eos_token_id]
loss_mask = loss_mask + [1]
# Truncate or pad input to match max_length
if len(input_ids) > self.max_length:
input_ids = input_ids[:self.max_length]
loss_mask = loss_mask[:self.max_length]
else:
pad_len = self.max_length - len(input_ids)
input_ids += [self.tokenizer.pad_token_id] * pad_len
loss_mask += [0] * pad_len
# Create input (X) and target (Y) tensors for training
X = torch.tensor(input_ids[:-1], dtype=torch.long) # Input sequence
Y = torch.tensor(input_ids[1:], dtype=torch.long) # Shifted target sequence
loss_mask = torch.tensor(loss_mask[1:], dtype=torch.long) # Loss mask for target tokens
return X, Y, loss_mask # Return input, target, and loss mask tensors
以下是 __getitem__
中发生的事情
- 获取样本 →
sample = self.samples[index]
检索给定索引处的对话。 - 创建提示 →
self._create_chat_prompt(...)
格式化对话。 - 分词 →
self.tokenizer(...)
对提示进行分词,截断/填充到max_length
,返回 PyTorch 张量 (return_tensors="pt"
)。 - 损失掩码和 EOS → 生成损失掩码并为 EOS 标记附加
1
。 - 填充/截断 → 通过截断(如果太长)或填充(如果太短)来确保
input_ids
匹配max_length
。 - 创建 X 和 Y → X 是输入序列;Y 是 X 右移一个位置,用于下一个标记预测。
- 返回 → 输出
X
、Y
和loss_mask
。
现在我们已经正确地预处理了 SFT 训练数据集,是时候编写训练循环了。
SFT 训练循环
现在我们已经准备好了 SFTDataset,我们可以使用监督微调 (SFT) 来训练我们的模型。
SFT 训练循环与预训练循环非常相似,但我们使用对话数据,并将损失集中在助手的回复上。
首先,让我们设置模型、优化器和数据加载器。这与之前的大部分内容相同,但我们将加载我们预训练的模型权重:
### 加载分词器(与之前相同)
tokenizer = AutoTokenizer.from_pretrained("my_custom_tokenizer")
### 加载预训练模型
model = MyTransformer(model_config)
model.load_state_dict(torch.load("pretrain_model.pth", map_location="cpu")) # 加载权重!
model = model.to("cuda")
我们加载自定义的 tokenizer
。至关重要的是,我们实例化我们的 MyTransformer
模型,然后使用 load_state_dict
从 pretrain_model.pth
加载权重。我们首先加载到 CPU
,然后将 model
移动到 GPU
。
现在,让我们创建 SFTDataset
和 DataLoader
:
### 创建 SFT 数据集
train_ds = SFTDataset("sft_data.jsonl", tokenizer, max_length=model_config["max_seq_len"])
### 创建数据加载器
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, drop_last=False)
我们使用指向 SFT 数据 (sft_data.jsonl)
的路径、我们的 tokenizer
和 最大序列长度
创建我们的 SFTDataset
实例。DataLoader
的设置与 预训练
类似,启用了 shuffling
。
接下来是 optimizer
和 loss function
:
### 定义优化器 (AdamW 是一个不错的选择)
optimizer = optim.AdamW(model.parameters(), lr=5e-5) # 潜在的不同学习率
### 定义损失函数 (交叉熵)
loss_fct = nn.CrossEntropyLoss(reduction='none')
我们使用 AdamW
,但 SFT
的 学习率
通常低于 预训练
。这里我使用了 5e-5
,但你应该进行实验。我们仍然使用 CrossEntropyLoss
,reduction='none'
。
现在,让我们定义 学习率调度器
。我们将像之前一样使用相同的 cosine schedule
:
def get_lr(current_step, total_steps, lr):
"""使用余弦调度计算学习率。"""
return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
此函数与预训练保持不变。
现在,是训练循环的核心:
for epoch in range(epochs):
start_time = time.time()
for step, (X, Y, loss_mask) in enumerate(train_loader):
X = X.to("cuda")
Y = Y.to("cuda")
loss_mask = loss_mask.to("cuda")
## 计算学习率
lr = get_lr(epoch * iter_per_epoch + step, epochs * iter_per_epoch, 5e-5)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
## 前向传播
res = model(X)
我们启动 epochs
的 外循环
和 batches
的 内循环
。我们将我们的 input
、target
和 loss mask
移动到 GPU
。
我们计算当前步骤的 学习率
,并将其设置为 optimizer
。然后,model
进行预测 (res = model(X)
)。
现在,关键部分:损失计算
。这是我们使用 loss_mask
的地方:
### 计算损失(使用 loss_mask!)
loss = loss_fct(
res.logits.view(-1, res.logits.size(-1)),
Y.view(-1)
).view(Y.size())
loss = (loss * loss_mask).sum() / loss_mask.sum() # 关键区别:使用 loss_mask
loss = loss/accumulation_steps
我们计算交叉熵损失,对其进行掩码以关注助手的 tokens,对掩码损失求和,并按掩码总和进行归一化。在反向传播和参数更新之前应用梯度累积。
接下来是反向传播和参数更新:
### 反向传播
loss.backward()
if (step + 1) % accumulation_steps == 0:
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
# 参数更新
optimizer.step()
# 梯度清零
optimizer.zero_grad()
loss.backward()
计算 梯度
。如果达到 accumulation steps
,我们使用 clip_grad_norm_
裁剪梯度,使用 optimizer.step()
更新模型参数,并使用 optimizer.zero_grad()
清除梯度。
最后是 日志记录
和 保存
:
if step % 100 == 0:
spend_time = time.time() - start_time
print(
'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
epoch + 1,
epochs,
step,
iter_per_epoch,
loss.item() * accumulation_steps,
optimizer.param_groups[-1]['lr'],
spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
model.eval() # 设置为评估模式
torch.save(model.state_dict(), "sft_model.pth") # 保存 SFT 模型
model.train() # 设置为训练模式
我们正在记录损失、学习率和训练时间。当我们开始训练时,损失将以与预训练阶段相同的方式打印出来。
...
Epoch:[5/10](30/100) loss:0.514 lr:0.00002342352 epoch_Time:21min
...
训练后,我们的模型权重被保存。我们可以使用之前的函数轻松地将它们转换为 safetensors
。现在,让我们加载模型,看看它改进了多少。
评估😖
尽管根据基准来判断我们的 LLM 并不理想,因为它只有 3000 万个参数,但我们还是这样做。
我们首先创建了一个预训练模型,然后使用 SFT 对其进行了微调。可以使用 RLHF 和推理进行进一步的改进,我将在未来的博客中介绍。
让我们测试不同的输入,看看它的反应。
加法问题。
user input: 2+3=?
LLM: 当你运行相同数字和相同数字时,
数字是 2 + 3 = 5。
所以,2 + 3 = 5。
因此,2 + 3 = 5。
知识检查。
user input: 世界上最高的山是什么?
LLM: 珠穆朗玛峰,世界最高的山峰 (8,848.86m),
位于尼泊尔-中国边境的喜马拉雅山脉。
user input: 谁发现了万有引力?
LLM: 牛顿在 1687 年提出了万有引力定律,
解释了物体如何通过引力相互吸引。
一般问题。
user input: hi
LLM: Hello
user: 你好吗?
LLM: 很抱歉,我无法帮助您。
但是,我无法提供有关
特定要求或上下文的信息,但我可以提供
您对您可能拥有的内容的一般想法:
一些问题。
user input: 1 KG 是多少克?
LLM: 答案是 `3`
user input: 1 KG 是多少克?
LLM: 答案是 `3`
虽然这些答案是精心挑选的,但该模型仍然需要更多的预训练和 SFT 训练才能进一步改进。
结论
如果你想构建一个严肃的 LLM,你应该使用一个非常高质量的合成数据集。同样的方法也适用于特定领域的 LLM。
确保创建一个大约有 1 亿到 2 亿个参数的 LLM,因为它们在问答任务中表现良好。
SpongeBob (用于标题图片)