Type something to search...
分步指南:在 Python 中构建一个拥有 2900 万参数的 LLM,如 ChatGPT,并附实用技巧

分步指南:在 Python 中构建一个拥有 2900 万参数的 LLM,如 ChatGPT,并附实用技巧

逐步指南

免费阅读这个故事:link

快速提示 — 我们将首先训练一个分词器,然后从头开始构建一个 2900 万参数的 LLM。这将给我们一个生成正确句子的模型。接下来,我们将使用(SFT)对其进行微调,以提高其知识和响应风格,使其更像 ChatGPT

我已经将我训练好的微型模型部署在 huggingface space 上。你可以在那里与它聊天。Web app ink

看看我和我们训练好的 LLM 之间的一些聊天对话。

问候

要求写代码

知识截止 😆

问一个愚蠢的问题

我们将一起编写代码,而不是一次性学习所有理论,以便正确理解所有内容。从数据集到模型权重,所有内容都是可替换的。

GitHub 代码

所有代码以及正确的设置都可以在我的 GitHub 存储库中找到。

[## GitHub - FareedKhan-dev/train-tiny-llm: 从头开始训练一个 29M 参数的 GPT

从头开始训练一个 29M 参数的 GPT。通过创建一个帐户来为 FareedKhan-dev/train-tiny-llm 的开发做出贡献…

github.com](https://github.com/FareedKhan-dev/train-tiny-llm)

代码库的组织结构如下:

.
├── pretrain.py              # 预训练脚本
├── requirement.txt          # 项目依赖项
├── train_sft.py             # 监督微调脚本
├── train_tokenizer.py       # 分词器训练脚本
├── web_app.py               # Streamlit Web 应用程序
└── transformer/             # 包含核心组件的目录
    ├── dataset.py           # 数据集类 (PretrainDataset, SFTDataset)
    ├── LMConfig.py          # 语言模型配置类 (LMConfig)
    └── model.py             # Transformer 模型定义 (TransformerLM)

GitHub 代码库更灵活,你可以执行并行训练等等!

目录

训练分词器

分词器大型语言模型 (LLM) 的第一个也是最重要的组件,它将文本分解成称为 token(单词、子词或字符)的较小单元。

之所以需要它,是因为 LLM 无法理解原始文本,但它们可以处理数字。

分词器将文本转换为数值表示,以便模型可以分析和学习语言模式。每个 LLM 在训练前都会使用分词,以确保高效的文本处理和理解。

让我们使用 GPT-2 分词器,看看它是怎么想的。

### 加载 GPT-2 分词器
tokenizer = AutoTokenizer.from_pretrained("gpt2")

### 示例文本
text = "Hello, world!"

### 分词过程
tokens = tokenizer.tokenize(text)  # 将文本转换为 token
token_ids = tokenizer.convert_tokens_to_ids(tokens)  # 将 token 转换为数值 ID

### 输出结果
print("Tokens:", tokens)
print("Token IDs:", token_ids)

#### OUTPUT ###
Tokens: ['Hello', ',', 'Ġworld', '!']
Token IDs: [15496, 11, 995, 0]

它将文本转换为数字形式。GPT-2 分词器是在大量数据上训练的。如果你想使用自己的分词器,比如一个已经训练好的分词器,你可以使用它。

但是我们也可以构建自己的分词器,所以让我们这样做。

要训练分词器,我们需要数据。由于我们正在构建一个英语 LLM,Hugging Face 充满了可用于训练分词器的 大规模数据集

我将使用 allenai/c4 Hugging Face 数据集,该数据集是从网络抓取创建的。它非常大,但我将使用 200 万个英文行。你可以自己决定数量。所以,让我们开始吧。

### 以流模式加载数据集
ds = load_dataset("allenai/c4", "en", streaming=True)

### 要检索的行数
num_rows_to_get = 2000000

### 输出文件
output_file = "training_data.jsonl"

### 将数据写入文件
with open(output_file, "w", encoding="utf-8") as f:
    for i, row in enumerate(iter(ds["train"])):
        if i >= num_rows_to_get:
            break
        json.dump({"text": row["text"]}, f, ensure_ascii=False)
        f.write("\n")  # 确保每个 JSON 对象都在新的一行上

因此,我们将 200 万个 Hugging Face 数据转换为 JSONL 文件,因为这种格式易于处理。

让我们打印我们训练数据集的样本,看看它是什么样的。

#此代码仅用于显示目的,不需要运行训练
import json

def print_sample_from_jsonl(file_path, num_samples=3):
    """从 JSONL 文件中打印几个样本条目。"""
    with open(file_path, "r", encoding="utf-8") as f:
        # 迭代最多 num_samples 行
        for _, line in zip(range(num_samples), f):
            print(json.dumps(json.loads(line), indent=4, ensure_ascii=False))

假设你的训练数据在 ‘training_data.jsonl’ 中

print_sample_from_jsonl("training_data.jsonl")

OUTPUT

{ "text": "Beginners BBQ Class ..." }
{ "text": "Discussion in 'Mac  ..." }
{ "text": "Foil plaid lycra an ..." }

这将从我们的 training_data.jsonl 文件中输出几行。每一行都是一个单独的 JSON 对象,包含一个 "text" 字段。

现在,让我们开始构建我们的分词器!我们将使用 tokenizers 库。我们将构建一个字节对编码 (BPE) 分词器。

BPE 从单个字符开始,并迭代地合并最频繁的对,构建一个子词汇表。BPE (字节对编码) 是最好的,因为它平衡了词汇量和效率,使其非常适合处理 NLP 任务中罕见和常见的单词。

首先,我们需要一种有效的方法从 JSONL 文件中加载我们的文本数据。我们将为此使用一个生成器。Generators 非常适合大型数据集,因为它们不会一次将所有内容加载到内存中。

def load_texts_from_jsonl(file_path):
    """
    Generator function to read and yield text data from a JSONL file.
    """
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            yield json.loads(line)["text"]

这个 load_texts_from_jsonl 函数获取文件路径,逐行读取它,提取 'text' 字段,并产生它。这使得它具有内存效率。

接下来,让我们初始化我们的 tokenizer 并设置 pre-tokenizer

### Initialize a new BPE tokenizer
tokenizer = Tokenizer(models.BPE())

### Set up the pre-tokenizer to handle splitting text into bytes
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False)

我们使用 BPE 模型创建一个 Tokenizer 对象。

pre_tokenizer 将输入文本分割成字节。add_prefix_space=False 很重要,它告诉分词器不要在序列的第一个单词之前添加空格。

现在,让我们定义我们的特殊标记并配置 trainer

### Define special tokens
special_tokens = ["<unk>", "<s>", "</s>"]

### Configure the BPE trainer
trainer = trainers.BpeTrainer(
    vocab_size=6400,  # We'll use a vocabulary size of 6400
    special_tokens=special_tokens,
    show_progress=True,
    initial_alphabet=pre_tokenizers.ByteLevel.alphabet()
)

我们有三个特殊标记(我将在创建预训练数据集时告诉您它们为什么重要)。

  • <unk>(用于未知单词)
  • <s>(序列开始)
  • </s>(序列结束)。

BpeTrainer 设置了我们想要的 vocab_size=6400special_tokens 和一个进度条 (show_progress=True)。 initial_alphabet 确保所有单个字节都在起始词汇表中。

是时候训练 tokenizer 了。

### Load the text data using our generator
texts = load_texts_from_jsonl("training_data.jsonl") #your file path

### Train the tokenizer
tokenizer.train_from_iterator(texts, trainer=trainer)

tokenizer.train_from_iterator 是学习发生的地方。 tokenizer 分析我们的文本数据并找出要合并的最频繁的字节对。

在 Colab 免费层(CPU)上训练我的分词器需要 1.5 小时。

我们还需要设置 byte_level 解码器。

tokenizer.decoder = decoders.ByteLevel()

我们设置一个 ByteLevel 解码器,以确保当我们将标记 ID 解码回文本时,保留原始格式。

让我们确保我们的特殊标记具有正确的 IDs

### Ensure special tokens are assigned correctly
assert tokenizer.token_to_id("<unk>") == 0
assert tokenizer.token_to_id("<s>") == 1
assert tokenizer.token_to_id("</s>") == 2

这些断言检查 <unk> 是否为 0<s> 是否为 1</s> 是否为 2。这对一致性至关重要。

现在,让我们保存我们训练好的 tokenizer

### Create the directory to save the tokenizer (if it doesn't exist)
tokenizer_save_dir = "my_custom_tokenizer"
os.makedirs(tokenizer_save_dir, exist_ok=True)

保存分词器模型和配置

tokenizer.save(os.path.join(tokenizer_save_dir, "tokenizer.json"))
tokenizer.model.save(tokenizer_save_dir)

我们创建一个目录 (my_custom_tokenizer),并将 tokenizer 分两部分保存:tokenizer.json(词汇表和合并规则)和模型文件。

最后,让我们创建一个 tokenizer_config.json 文件。此文件提供额外的配置详细信息,以便我们稍后可以使用 Hugging Face 的 AutoTokenizer 轻松加载 tokenizer

它就像我们 tokenizer 的设置文件。我们将逐步构建此配置。

首先,让我们设置一些基本选项:

config = {
    "add_bos_token": False,  # 不要自动添加序列开始标记
    "add_eos_token": False,  # 不要自动添加序列结束标记
    "add_prefix_space": False, # 第一个单词前没有空格(我们已经设置了)
    "bos_token": "<s>",      # 定义序列开始标记
    "eos_token": "</s>",     # 定义序列结束标记
    "unk_token": "<unk>",     # 定义未知标记
    "pad_token": "<unk>",      # 使用 <unk> 进行填充(我们稍后将处理填充)
}

这些选项控制 tokenizer 如何处理特殊标记和空格。

我们明确地设置了序列开始 (bos_token)、序列结束 (eos_token)、未知 (unk_token) 和填充 (pad_token) 标记。

接下来,我们将添加一些更通用的设置:

config.update({
    "model_max_length": 32768,          # 模型可以处理的最大序列长度
    "tokenizer_class": "PreTrainedTokenizerFast", # 指定分词器类
    "clean_up_tokenization_spaces": False, # 不要清理多余的空格
    "additional_special_tokens": [],     # 目前没有额外的特殊标记
    "spaces_between_special_tokens": False, # 不要添加特殊标记之间的空格
    "sp_model_kwargs": {},                # 没有特殊的模型参数
})

我们设置了 最大序列长度 (model_max_length),指定了 tokenizer 类 (PreTrainedTokenizerFast 以提高效率),并控制了空格的处理方式。

现在,让我们定义 special tokensdecoder 中的表示方式:

config["added_tokens_decoder"] = {
    "0": {"content": "<unk>", "lstrip": False, "normalized": False, "rstrip": False, "single_word": False, "special": True},
    "1": {"content": "<s>", "lstrip": False, "normalized": False, "rstrip": False, "single_word": False, "special": True},
    "2": {"content": "</s>", "lstrip": False, "normalized": False, "rstrip": False, "single_word": False, "special": True}
}

这个 added_tokens_decoder 部分将特殊标记的标记 ID (0, 1, 2) 映射到它们的属性。我们告诉 decoder 这些标记是“特殊的”,不应该像普通单词一样对待(没有前导/尾随空格,没有规范化)。

最后,对于对话模型来说,这一点非常重要,我们将添加一个 chat template

config["chat_template"] = """
{% if messages[0]['role'] == 'system' %}
    {% set system_message = messages[0]['content'] %}
    {{ '<s>system\\n' + system_message + '</s>\\n' }}
{% else %}
    {{ '<s>system\\nYou are a helpful AI assistant.</s>\\n' }}
{% endif %}
{% for message in messages %}
    {% set content = message['content'] %}
    {% if message['role'] == 'user' %}
        {{ '<s>user\\n' + content + '</s>\\n<s>assistant\\n' }}
    {% elif message['role'] == 'assistant' %}
        {{ content + '</s>' + '\\n' }}
    {% endif %}
{% endfor %}
"""

Jinja2 模板使用 <s></s> 标记来分隔 systemuserassistant 消息,从而为 LLM 格式化聊天。它确保模型理解对话流程。

现在我们已经构建了我们的配置,让我们将其保存到一个文件中:

with open(os.path.join(tokenizer_save_dir, "tokenizer_config.json"), "w", encoding="utf-8") as config_file:
    json.dump(config, config_file, ensure_ascii=False, indent=4)

print("Tokenizer training completed and saved successfully.")

我们将 config 字典保存为名为 tokenizer_config.jsonJSON 文件,该文件位于我们的 tokenizer 目录中。indent=4 使文件具有可读性。

就是这样!我们已经训练了自定义的 BPE 分词器

在下一部分中,我们将从头开始构建我们的 Transformer 架构。

Transformer 概述

Transformer 是现代大型语言模型的核心。与顺序处理文本的循环网络(如 RNN 或 LSTM)不同,Transformer 使用称为 注意力 的机制同时处理输入的所有部分。

通过这种方式,我们可以实现更大的并行性和可扩展性,从而显着缩短训练时间,并能够处理文本中更长范围的依赖关系。

我们的 Transformer 模型将包含几个关键组件:

  • 嵌入层:将标记转换为向量
  • 位置编码:添加位置信息
  • Transformer 块:层堆叠
  • 多头注意力:侧重于不同的输入部分
  • 前馈网络:处理每个标记
  • RMSNorm:规范化层
  • 输出层:生成标记概率

原始的 Transformer 架构图很难理解,所以我创建了一个简化的图:

简化的 Transformer(由 Fareed Khan 创建)

此图提供了一个高级概述。Transformer 将文本转换为标记,嵌入它们,并添加位置编码。然后,它通过堆叠的自注意力层来捕获上下文。

输出被归一化,通过线性层进行转换,并通过 softmax 处理以生成下一个标记或类的概率。

让我们逐个构建它的每个组件。

RMSNorm

RMSNORM 工作流程(由 Fareed Khan 创建)

RMSNorm(均方根归一化)是一种层归一化。归一化技术在深度学习中至关重要,可以稳定训练并提高性能。

RMSNorm 通过将每一层的激活值除以激活值的均方根来归一化。添加一个小的常数 epsilon (ε),确保数值稳定性(避免除以零)。

这有助于防止激活值变得过大或过小,使训练过程更平滑。它比传统的 LayerNorm 更简单、更快,因为它省略了均值居中步骤。

让我们创建我们的 RMSNorm 类:

class RMSNorm(torch.nn.Module):
    def __init__(self, dim: int, eps: float):
        super().__init__()
        self.eps = eps  # Small constant for numerical stability
        self.weight = nn.Parameter(torch.ones(dim)) # Learnable scaling parameter

def forward(self, x):
        # Calculate the root mean square (RMS) and normalize
        return self.weight * (x.float() * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)).type_as(x)

__init__ 方法初始化 epsilon 值和一个 可学习的权重 参数(初始化为全 1)。

forward 方法执行归一化:

  • 它计算输入 x均方根
  • 添加 epsilon 以确保数值稳定性。
  • 平方根的倒数
  • 乘以 可学习的权重
  • 结果被转换回 x 的原始数据类型。

位置编码

由于 Transformer 一次处理整个输入序列,因此它本身并不知道每个 token 的位置。

我们需要显式地添加此位置信息。这就是位置编码的作用。我们正在使用旋转位置嵌入。

位置编码工作流程(由 Fareed Khan 创建)

旋转位置嵌入 (RoPE) 使用高维空间中的旋转来表示 token 位置。

核心思想是将每个 token 的位置编码为一系列频率。然后,这些频率用于创建复数值嵌入,该嵌入以依赖于其位置的方式旋转 token 的表示。

precompute_pos_cis 函数计算这些嵌入;它是一个辅助函数。

def precompute_pos_cis(dim: int, end: int = int(32 * 1024), theta: float = 1e6):
    """Pre-computes the complex exponentials (cis) for rotary embeddings."""
    freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))
    t = torch.arange(end, device=freqs.device)  # Sequence positions
    freqs = torch.outer(t, freqs).float()  # Outer product of positions and frequencies
    pos_cis = torch.polar(torch.ones_like(freqs), freqs)  # Convert to complex exponentials
    return pos_cis

首先,它根据 维度 (dim) 和缩放因子 (theta) 计算一组 频率

然后,它创建一个 位置 (t) 序列,并计算 t频率外积

最后,它使用 torch.polar 将这些值转换为 复指数,从而得到 pos_cis 张量。

def apply_rotary_emb(xq, xk, pos_cis):
    """Apply rotary embeddings to query (xq) and key (xk) tensors."""
    def unite_shape(pos_cis, x):
        # Ensure pos_cis has the correct shape and reshape it to match x
        assert pos_cis.shape == (x.shape[1], x.shape[-1])
        shape = [d if i == 1 or i == x.ndim - 1 else 1 for i, d in enumerate(x.shape)]
        return pos_cis.view(*shape)

## Convert to complex for rotation
    xq_ = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
    xk_ = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))

## Align pos_cis shape
    pos_cis = unite_shape(pos_cis, xq_)

## Apply rotation and convert back
    xq_out = torch.view_as_real(xq_ * pos_cis).flatten(3)
    xk_out = torch.view_as_real(xk_ * pos_cis).flatten(3)

return xq_out.type_as(xq), xk_out.type_as(xk)

apply_rotary_emb 函数将计算出的 旋转嵌入 应用于注意力机制中的查询 (xq) 和键 (xk),从而编码位置信息。unite_shape 重新调整嵌入的形状以实现兼容性。

xqxk 被转换为 复数,乘以嵌入,然后转换回,保留原始数据类型。

注意力

注意力块(由 Fareed Khan 创建)

注意力Transformer 的关键机制。它允许模型在处理每个 token 时权衡输入序列不同部分的重要性。

我们的实现使用 多头注意力,这意味着我们在并行(使用不同的学习权重)中多次执行 注意力机制,允许模型捕获 token 之间关系的不同方面。

我们还结合了 旋转位置嵌入 (RoPE) 来编码位置信息。

键值缓存 用于更快的 自回归解码(例如在文本生成中)。

class Attention(nn.Module):
    def __init__(self, args):
        super().__init__()
        # Define number of key-value heads (defaults to n_heads if not specified)
        self.n_kv_heads = args.n_heads if args.n_kv_heads is None else args.n_kv_heads
        assert args.n_heads % self.n_kv_heads == 0  # Ensure heads divide evenly

self.n_local_heads = args.n_heads
        self.n_local_kv_heads = self.n_kv_heads
        self.n_rep = self.n_local_heads // self.n_local_kv_heads  # Replication factor
        self.head_dim = args.dim // args.n_heads  # Dimension per head

## Linear projections for query, key, and value
        self.wq = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)
        self.wk = nn.Linear(args.dim, self.n_kv_heads * self.head_dim, bias=False)
        self.wv = nn.Linear(args.dim, self.n_kv_heads * self.head_dim, bias=False)
        self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False)

## Dropout layers
        self.attn_dropout = nn.Dropout(args.dropout)
        self.resid_dropout = nn.Dropout(args.dropout)
        self.dropout = args.dropout
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Optional, Tuple

def apply_rotary_emb(xq, xk, cos, sin):
    """将旋转嵌入应用于查询和键张量。

    Args:
        xq: 查询张量。
        xk: 键张量。
        cos: 旋转嵌入的余弦值。
        sin: 旋转嵌入的正弦值。

    Returns:
        旋转后的查询和键张量。
    """
    # 提取维度
    bsz, seqlen, n_head, head_dim = xq.shape

    # 应用旋转嵌入
    xq = xq.reshape(bsz, seqlen, n_head, -1, 2).transpose(-2, -1)
    xk = xk.reshape(bsz, seqlen, n_head, -1, 2).transpose(-2, -1)
    xq = apply_rotary_emb_torch(xq, cos, sin)
    xk = apply_rotary_emb_torch(xk, cos, sin)
    xq = xq.transpose(-2, -1).reshape(bsz, seqlen, n_head, -1)
    xk = xk.transpose(-2, -1).reshape(bsz, seqlen, n_head, -1)
    return xq, xk

def apply_rotary_emb_torch(x, cos, sin):
    """使用 PyTorch 应用旋转嵌入。

    Args:
        x: 输入张量。
        cos: 余弦值。
        sin: 正弦值。

    Returns:
        旋转后的张量。
    """
    # 提取维度
    x1, x2 = x[..., 0, :], x[..., 1, :]
    o1 = x1 * cos - x2 * sin
    o2 = x2 * cos + x1 * sin
    return torch.stack([o1, o2], dim=-2)

class Attention(nn.Module):
    def __init__(self, args):
        super().__init__()

        self.n_local_heads = args.n_local_heads
        self.n_local_kv_heads = args.n_local_kv_heads
        self.n_rep = self.n_local_heads // self.n_local_kv_heads
        self.head_dim = args.head_dim

        self.wq = nn.Linear(args.dim, args.n_local_heads * args.head_dim, bias=False)
        self.wk = nn.Linear(args.dim, args.n_local_kv_heads * args.head_dim, bias=False)
        self.wv = nn.Linear(args.dim, args.n_local_kv_heads * args.head_dim, bias=False)
        self.wo = nn.Linear(args.n_local_heads * args.head_dim, args.dim, bias=False)

        self.attn_dropout = nn.Dropout(args.dropout)
        self.resid_dropout = nn.Dropout(args.dropout)

        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention') and args.flash_attn

        # 创建因果掩码以防止关注未来标记
        mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float("-inf"))
        mask = torch.triu(mask, diagonal=1)  # 上三角掩码
        self.register_buffer("mask", mask, persistent=False)

    def forward(self,
                x: torch.Tensor,
                pos_cis: torch.Tensor,
                past_key_value: Optional[Tuple[torch.Tensor, torch.Tensor]] = None,
                use_cache=False):
        # 获取批量大小、序列长度和隐藏维度
        bsz, seq_len, _ = x.shape

        # 计算查询、键和值投影
        xq, xk, xv = self.wq(x), self.wk(x), self.wv(x)

        # 重塑为 (batch, seq_len, num_heads, head_dim)
        xq = xq.view(bsz, seq_len, self.n_local_heads, self.head_dim)
        xk = xk.view(bsz, seq_len, self.n_local_kv_heads, self.head_dim)
        xv = xv.view(bsz, seq_len, self.n_local_kv_heads, self.head_dim)

        # 应用旋转位置嵌入
        xq, xk = apply_rotary_emb(xq, xk, pos_cis)

        ## Key-Value 缓存实现
        if past_key_value is not None:
            # 沿序列维度将过去的键/值状态与当前状态连接起来
            xk = torch.cat([past_key_value[0], xk], dim=1)
            xv = torch.cat([past_key_value[1], xv], dim=1)

        ## 如果启用了缓存,则存储更新后的键值对
        past_kv = (xk, xv) if use_cache else None

        ## 转置为 (batch, heads, seq_len, head_dim)
        xq, xk, xv = (
            xq.transpose(1, 2),
            repeat_kv(xk, self.n_rep).transpose(1, 2),  # 重复 KV 头部并转置
            repeat_kv(xv, self.n_rep).transpose(1, 2)  # 重复 KV 头部并转置
        )

        if self.flash and seq_len != 1:
            # 如果可用且未处理单个标记,则使用 Flash Attention 以提高效率
            dropout_p = self.dropout if self.training else 0.0
            output = F.scaled_dot_product_attention(
                xq, xk, xv,
                attn_mask=None,  # 因果掩码由 `is_causal=True` 处理
                dropout_p=dropout_p,
                is_causal=True
            )
        else:
            # 手动计算缩放点积注意力
            scores = (xq @ xk.transpose(-2, -1)) / math.sqrt(self.head_dim)  # 按 head_dim 缩放
            scores += self.mask[:, :, :seq_len, :seq_len]  # 应用因果掩码
            scores = F.softmax(scores.float(), dim=-1).type_as(xq)  # 归一化分数
            scores = self.attn_dropout(scores)  # 应用 dropout
            output = scores @ xv  # 计算值的加权和

        ## 将输出重塑回 (batch, seq_len, hidden_dim)
        output = output.transpose(1, 2).reshape(bsz, seq_len, -1)

        ## 应用输出投影和残差 dropout
        output = self.resid_dropout(self.wo(output))

        ## 返回最终输出和过去的键值缓存(如果启用了缓存)
        return output, past_kv

def repeat_kv(x: torch.Tensor, n_rep: int) -> torch.Tensor:
    """沿头部维度重复键值头 `n_rep` 次。"""
    bs, slen, n_kv_heads, head_dim = x.shape  # 提取张量维度
    if n_rep == 1:
        return x  # 如果 n_rep 为 1,则不需要重复
    # 沿头部维度扩展和重复,然后重塑
    return (
        x[:, :, :, None, :]  # 添加一个新的轴用于重复
        .expand(bs, slen, n_kv_heads, n_rep, head_dim)  # 重复 n_rep 次
        .reshape(bs, slen, n_kv_heads * n_rep, head_dim)  # 展平重复的头
    )

FeedForward

FeedForward 网络是一个简单的全连接网络,在 attention mechanism 之后独立地应用于每个 token 的表示。它提供了额外的 non-linearity,并允许模型学习更复杂的表示。

我们的实现使用 SiLU (Sigmoid Linear Unit) 激活函数。

FFN

用于正则化的 Dropout 层

class FeedForward(nn.Module):
    def __init__(self, config: LMConfig):
        super().__init__()
        # Initialize the linear layers
        self.w1 = nn.Linear(config.dim, config.hidden_dim, bias=False)
        self.w2 = nn.Linear(config.hidden_dim, config.dim, bias=False)
        self.w3 = nn.Linear(config.dim, config.hidden_dim, bias=False)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        # Apply the feedforward layers with SILU activation and dropout
        return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x)))

__init__ 方法初始化了三个 linear layersw1w2w3

  • w1w3 将输入投影到隐藏维度,w2 将其投影回原始维度。
  • hidden dimension 的计算如上所述。
  • 同时初始化了一个 dropout 层。

forward 方法应用 w1SiLU activationw3,将结果相乘,应用 w2,最后应用 dropout

Transformer Block

Transformer Block 工作流程(由 Fareed Khan 创建)

Transformer Block 是 Transformer 模型的基本构建块。它结合了多头注意力 (Multi-Head Attention) 和前馈 (FeedForward) 组件,以及残差连接和层归一化。

残差连接(将输入添加到子层的输出)通过缓解梯度消失问题来帮助训练深度网络。

class TransformerBlock(nn.Module):
    def __init__(self, layer_id: int, config: LMConfig):
        super().__init__()

        # Set model configuration parameters
        self.n_heads = config.n_heads
        self.dim = config.dim
        self.head_dim = config.dim // config.n_heads

        # Initialize attention and feed-forward components
        self.attention = Attention(config)

        # Layer normalization for attention and feed-forward outputs
        self.layer_id = layer_id
        self.attention_norm = RMSNorm(config.dim, eps=config.norm_eps)
        self.ffn_norm = RMSNorm(config.dim, eps=config.norm_eps)

        # Feed-forward neural network
        self.feed_forward = FeedForward(config)

    def forward(self, x, pos_cis, past_key_value=None, use_cache=False):
        # Apply attention with normalization and residual connection
        h_attn, past_kv = self.attention(
            self.attention_norm(x),  # Normalize input before attention
            pos_cis,
            past_key_value=past_key_value,
            use_cache=use_cache
        )

        # Add attention output to the input (residual connection)
        h = x + h_attn

        # Apply feed-forward network with normalization and residual connection
        out = h + self.feed_forward(self.ffn_norm(h))

        return out, past_kv

__init__ 方法初始化了 AttentionFeedForward 模块,以及用于注意力和前馈组件的 RMSNorm 层。

forward 方法:

  1. 首先应用 attention normmulti-head self-attention
  2. 之后,应用 residual connection
  3. 然后,应用 FFN normfeed-forward network,然后是另一个 residual connection

forward 方法实现了以下序列: Attention Norm -> Attention -> Residual Connection -> FFN Norm -> FeedForward -> Residual Connection

该方法返回 resultkey-value cache

组合 Transformer 组件

让我们将我们讨论过的所有组件组装成一个完整的 Transformer 模型。

我们将首先定义模型的配置,将所有超参数存储在一个字典中。

### Configuration for our Transformer model
model_config = {
    "vocab_size": 6400,       # Size of the vocabulary
    "dim": 512,               # Dimensionality of the embeddings and hidden states
    "n_heads": 8,             # Number of attention heads
    "n_kv_heads": 2,          # Number of key-value heads (as specified in the LMConfig)
    "norm_eps": 1e-5,         # Epsilon for RMSNorm
    "dropout": 0.0,           # Dropout probability
    "max_seq_len": 1024,      # Maximum sequence length
    "rope_theta": 10000.0,    # Theta parameter for RoPE
    "multiple_of": 64,        # Used for hidden dimension calculation in FFN
    "hidden_dim": None,       # Hidden dimension of the FFN (calculated if None)
    "n_layers": 8,            # Number of Transformer blocks
    "flash_attn": True,       # Use flash attention if available
}

这个字典 model_config 包含了所有定义我们的 Transformer 结构和行为的超参数。使用这样的字典很方便,因为它使得在一个地方更改设置变得容易。

现在,让我们定义我们的主 Transformer 模型类。

class MyTransformer(PreTrainedModel):
    config_class = dict  # We'll use a dictionary for configuration

    def __init__(self, config: dict):
        super().__init__(config)  # Initialize PreTrainedModel
        self.config = config # store config
        self.vocab_size, self.n_layers = config["vocab_size"], config["n_layers"]

        # Token embeddings
        self.tok_embeddings = nn.Embedding(config["vocab_size"], config["dim"])

        # Dropout layer
        self.dropout = nn.Dropout(config["dropout"])

        # Transformer blocks
        self.layers = nn.ModuleList([TransformerBlock(l, config) for l in range(self.n_layers)])

        # Final normalization layer
        self.norm = RMSNorm(config["dim"], eps=config["norm_eps"])

        # Output layer (linear projection to vocabulary size)
        self.output = nn.Linear(config["dim"], config["vocab_size"], bias=False)

        # Tie the weights of the embedding and output layers
        self.tok_embeddings.weight = self.output.weight

        # Precompute rotary positional embeddings
        self.register_buffer(
            "pos_cis",
            precompute_pos_cis(dim=config["dim"] // config["n_heads"], theta=config["rope_theta"]),
            persistent=False,
        )
        self.OUT = CausalLMOutputWithPast()

__init__ 方法使用以下组件初始化模型:

  • 嵌入层self.tok_embeddings 将 token ID 转换为嵌入。
  • Dropoutself.dropout 有助于防止过拟合。
  • Transformer Blocksself.layers 包含一堆 Transformer block。
  • 归一化self.norm 在输出之前应用 RMSNorm。
  • 输出层self.output 将最终隐藏状态投影到词汇空间。
  • 权重绑定:嵌入层和输出层的权重被绑定以获得更好的性能。
  • 位置嵌入self.pos_cis 存储旋转位置嵌入。
  • 输出:返回 CausalLMOutputWithPast 用于 logits、损失和缓存。

现在,让我们定义 forward 方法,它执行通过模型的完整前向传递:

    def forward(self,
                input_ids: Optional[torch.Tensor] = None,
                past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None,
                use_cache: bool = False,
                **args):

        past_key_values = past_key_values or [None] * len(self.layers)
        start_pos = args.get('start_pos', 0)
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
import json
from tqdm import tqdm
class RMSNorm(nn.Module):
    def __init__(self, dim: int, eps: float = 1e-6):
        super().__init__()
        self.scale = dim ** 0.5
        self.eps = eps
        self.g = nn.Parameter(torch.ones(dim))

    def _norm(self, x):
        return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

    def forward(self, x):
        return self.g * self._norm(x)
class Attention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.n_heads = config.n_heads
        self.head_dim = config.n_embd // config.n_heads
        self.wq = nn.Linear(config.n_embd, config.n_embd, bias=False)
        self.wk = nn.Linear(config.n_embd, config.n_embd, bias=False)
        self.wv = nn.Linear(config.n_embd, config.n_embd, bias=False)
        self.wo = nn.Linear(config.n_embd, config.n_embd, bias=False)
        self.attn_dropout = nn.Dropout(config.attn_pdrop)
        self.resid_dropout = nn.Dropout(config.resid_pdrop)

    def _split_heads(self, x: torch.Tensor) -> torch.Tensor:
        """
        Split the tensor to heads.
        x: (B, T, C)
        return: (B, nh, T, hs)
        """
        B, T, C = x.shape
        return x.view(B, T, self.n_heads, self.head_dim).transpose(1, 2)

    def _merge_heads(self, x: torch.Tensor) -> torch.Tensor:
        """
        Merge the heads.
        x: (B, nh, T, hs)
        return: (B, T, C)
        """
        B, nh, T, hs = x.shape
        return x.transpose(1, 2).contiguous().view(B, T, C)

    def forward(self, x, past_key_value=None, attention_mask=None, layer_past=None):
        B, T, C = x.shape

        q = self.wq(x)
        k = self.wk(x)
        v = self.wv(x)

        q = self._split_heads(q)  # (B, nh, T, hs)
        k = self._split_heads(k)  # (B, nh, T, hs)
        v = self._split_heads(v)  # (B, nh, T, hs)

        if layer_past is not None:
            k = torch.cat((layer_past[0], k), dim=-2)
            v = torch.cat((layer_past[1], v), dim=-2)

        layer_past = (k, v)

        att = q @ k.transpose(-2, -1)  # (B, nh, T, T)
        att = att / (self.head_dim ** 0.5)
        if attention_mask is not None:
            att = att + attention_mask  # (B, nh, T, T)
        att = F.softmax(att, dim=-1)  # (B, nh, T, T)
        att = self.attn_dropout(att)
        y = att @ v  # (B, nh, T, hs)
        y = self._merge_heads(y)  # (B, T, C)
        y = self.wo(y)  # (B, T, C)
        y = self.resid_dropout(y)
        return y, layer_past
class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.c_fc = nn.Linear(config.n_embd, config.n_inner, bias=False)
        self.c_proj = nn.Linear(config.n_inner, config.n_embd, bias=False)
        self.dropout = nn.Dropout(config.resid_pdrop)

    def forward(self, x):
        x = self.c_fc(x)
        x = F.gelu(x)
        x = self.c_proj(x)
        x = self.dropout(x)
        return x
class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.ln_1 = RMSNorm(config.n_embd, eps=config.layer_norm_epsilon)
        self.attn = Attention(config)
        self.ln_2 = RMSNorm(config.n_embd, eps=config.layer_norm_epsilon)
        self.mlp = MLP(config)

    def forward(self, x, pos_cis, past_key_value=None, use_cache=False, attention_mask=None):
        residual = x
        x = self.ln_1(x)
        attn_outputs, layer_past = self.attn(x, past_key_value=past_key_value, attention_mask=attention_mask)
        x = residual + attn_outputs
        residual = x
        x = self.ln_2(x)
        x = residual + self.mlp(x)
        return x, layer_past
class MOEFeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.gate = nn.Linear(config.n_embd, config.n_expert)
        self.experts = nn.ModuleList([MLP(config) for _ in range(config.n_expert)])
        self.dropout = nn.Dropout(config.resid_pdrop)
        self.aux_loss = 0

    def forward(self, x):
        B, T, C = x.shape
        gates = self.gate(x)  # (B, T, n_expert)
        gate_logits = gates.float()
        gate_probs = F.softmax(gate_logits, dim=-1)
        self.aux_loss = gate_probs.mean() * self.config.n_expert
        gate_indices = torch.argmax(gate_probs, dim=-1)  # (B, T)
        y = torch.zeros_like(x)
        for i in range(self.config.n_expert):
            expert_mask = (gate_indices == i).unsqueeze(-1).expand_as(x)
            expert_output = self.experts[i](x)
            y = torch.where(expert_mask, expert_output, y)
        y = self.dropout(y)
        return y
class CausalLMOutputWithPast(dict):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        self[name] = value

    def __delattr__(self, name):
        try:
            del self[name]
        except KeyError:
            raise AttributeError(name)
class GPT2LMHeadModel(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.wte = nn.Embedding(config.vocab_size, config.n_embd)
        self.wpe = nn.Embedding(config.max_position_embeddings, config.n_embd)
        self.drop = nn.Dropout(config.embd_pdrop)
        self.h = nn.ModuleList([Block(config) for _ in range(config.n_layer)])
        self.ln_f = RMSNorm(config.n_embd, eps=config.layer_

```python
## 准备输入 (X) 和目标 (Y) 序列
        X = torch.tensor(input_ids[:-1], dtype=torch.long)  # 排除最后一个token作为X
        Y = torch.tensor(input_ids[1:], dtype=torch.long)   # 排除第一个token作为Y

loss_mask = torch.tensor(loss_mask[1:], dtype=torch.long)  # 调整损失掩码

return X, Y, loss_mask  # 返回 X, Y 和损失掩码

在我们的 pretraining 类中,我们将 maximum length 设置为 512,因为训练期间的每个输入只包含 512 个字符。

正如我之前提到的,我们还用 <s>training_input</s> 包装了我们的文本。这很重要,因为我们训练好的 tokenizer 知道开始和结束的 token 是什么。用这些标签包装每个 training data input 将帮助模型学习从哪里开始和结束,使用这些标签作为指示符。

现在,是时候编写训练循环的代码了。

我们已经创建了我们的预训练数据集类,现在我们拥有了一切,让我们继续。

预训练循环 (29M LLM)

这是我们的模型实际从数据中学习的部分。基本思想是:

  1. 从我们的 PretrainDataset 加载批量数据。
  2. 将输入序列 (X) 馈送到模型。
  3. 计算模型的预测和目标序列 (Y) 之间的损失。
  4. 更新模型的权重以最小化损失。

首先,让我们定义一个辅助函数来计算 learning_rate。我们将使用 cosine learning rate schedule,它在训练期间逐渐降低 learning_rate。这通常比使用固定的 learning_rate 产生更好的结果。

def get_lr(current_step, total_steps, lr):
    """使用余弦曲线计算学习率。"""
    return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))

此函数接受当前的 training_step、总步数 steps 和初始 learning_rate (lr)。它使用 cosine 函数和一个小的常数 learning_rate 来计算当前步的 learning_rate

现在,让我们设置我们的 modeloptimizerdata_loader

### 实例化模型
model = MyTransformer(model_config)
model = model.to("cuda") # 如果有 GPU,将模型移动到 GPU

### 加载分词器
tokenizer = AutoTokenizer.from_pretrained("my_custom_tokenizer")

### 创建训练数据集
train_ds = PretrainDataset("petrain_data.jsonl", tokenizer, max_length=model_config["max_seq_len"])

### 创建数据加载器
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, drop_last=False)

### 定义优化器(AdamW 是一个不错的选择)
optimizer = optim.AdamW(model.parameters(), lr=5e-4)

我们实例化了我们的 MyTransformer 模型,加载了我们训练好的 tokenizer,创建了一个 PretrainDataset 实例,并设置了一个 DataLoader

DataLoader 将为我们处理数据的洗牌和批处理。我们还定义了我们的 optimizerAdamW,它是 Adam 的一种变体,通常更适合训练 Transformers。我们将我们的 model 移到 GPU 以加快我们的训练速度。

接下来,我们定义我们的 loss_function。由于我们正在训练一个 language_model 来预测序列中的下一个 token,我们将使用 cross-entropy loss

### 定义损失函数(交叉熵)
loss_fct = nn.CrossEntropyLoss(reduction='none')

CrossEntropyLoss 结合了 softmax 激活负对数似然损失。我们将 reduction='none' 设置为,因为我们想分别计算每个 token 的 loss(我们将自己处理平均)。

现在,主要的 training loop

epochs = 1 # 您可以调整 epoch 的数量
iter_per_epoch = len(train_loader) #有多少个批次
accumulation_steps = 8 #用于梯度累积
grad_clip = 1.0 #用于梯度裁剪

for epoch in range(epochs):
    start_time = time.time()
    for step, (X, Y, loss_mask) in enumerate(train_loader):
        X = X.to("cuda")
        Y = Y.to("cuda")
        loss_mask = loss_mask.to("cuda")

## 计算当前步的学习率
        lr = get_lr(epoch * iter_per_epoch + step, epochs * iter_per_epoch, 5e-4)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

## 前向传播
        res = model(X)

## 计算损失
        loss = loss_fct(
            res.logits.view(-1, res.logits.size(-1)),
            Y.view(-1)
        ).view(Y.size())
        loss = (loss * loss_mask).sum() / loss_mask.sum()
        loss = loss/accumulation_steps # 除以累积步数

## 反向传播
        loss.backward()

        if (step + 1) % accumulation_steps == 0:
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            # 更新模型的参数
            optimizer.step()
            # 重置梯度
            optimizer.zero_grad()

        if step % 100 == 0:
            spend_time = time.time() - start_time
            print(
                'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
                    epoch + 1,
                    epochs,
                    step,
                    iter_per_epoch,
                    loss.item() * accumulation_steps,
                    optimizer.param_groups[-1]['lr'],
                    spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))

让我们分解一下:

  • Epochs & Batches: 循环遍历数据集的批次。
  • Data to GPU: 将数据移动到 GPU 以加快处理速度。
  • Forward Pass & Loss: 获取模型预测并计算损失。
  • Backward Pass & Gradients: 计算梯度并累积它们。
  • Update & Clipping: 裁剪梯度并更新模型权重。
  • Zero Gradients & Logging: 重置梯度并打印指标。

运行代码后,它将开始打印训练损失等。

...

Epoch:[2/10](30/100) loss:0.256 lr:0.000012345679 epoch_Time:13min

...

最后,让我们添加代码来保存我们训练好的模型:

model.eval() # 设置为评估模式
torch.save(model.state_dict(), "pretrain_model.pth") # 保存状态字典
model.train()# 设置回训练模式。

一旦训练完成。

我们将拥有一个具有 26-3000 万个参数的预训练模型。

您绝对可以通过修改配置设置来增加参数数量。

PyTorch 模型到 SafeTensors

现在我们已经训练了我们的 model,让我们将其转换为 safetensors 格式。

safetensors 格式是存储 tensorsPyTorch 默认 基于 pickle 格式的一种更安全、更快的替代方案。

首先,让我们定义一个执行转换的函数。

def convert_and_save_model(torch_path, transformers_path):
    """将 PyTorch 模型转换为 Transformers 格式并保存它。"""

## 加载 PyTorch 模型的 state_dict
    state_dict = torch.load(torch_path, map_location="cpu")

## 创建你的模型的实例 (MyTransformer)
    model = MyTransformer(model_config)
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
from tqdm import tqdm

device = "cuda" if torch.cuda.is_available() else "cpu"

def export_tokenizer(transformers_path):
    """
    Exports the tokenizer to the specified path.
    """
    tokenizer = AutoTokenizer.from_pretrained(transformers_path, trust_remote_code=True)
    tokenizer.save_pretrained(transformers_path)
    print(f"Tokenizer exported to: {transformers_path}")

def convert_and_save_model(model_path, transformers_path):
    """
    Converts and saves a custom model to the Hugging Face Transformers format.

    Args:
        model_path (str): Path to the pre-trained model file.
        transformers_path (str): Path to save the model in Transformers format.
    """
    # Load the state dictionary
    state_dict = torch.load(model_path, map_location=device)

    # Instantiate the model
    from model import MyTransformer  # Assuming MyTransformer is defined in model.py
    model = MyTransformer()
    model.to(device)

    # Load the state_dict into the model
    model.load_state_dict(state_dict, strict=False)
    # Print the number of parameters
    model_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f'Model parameters: {model_params / 1e6:.3f} Million')

    # Save the model in Transformers format
    model.save_pretrained(transformers_path, safe_serialization=False) # set safe_serialization=True to save in safetensors format

    # Export the tokenizer
    export_tokenizer(transformers_path)
    print(f"Model saved in Transformers format to: {transformers_path}")

它使用 torch.load 加载状态字典,实例化 MyTransformer 模型,加载权重(strict=False 以获得灵活性),并打印总参数。 该模型使用 safe_serialization=False 保存。

现在,让我们调用此函数来执行转换:

### Convert and save the model
convert_and_save_model("pretrain_model.pth", "my_transformer_hf")

这将创建一个名为 my_transformer_hf 的目录,其中包含 model 文件,准备与 Hugging Face transformers 库一起使用。

推理预训练 LLM

让我们对我们的模型进行推理,看看它根据我们的输入如何响应。 但是,不要期望太多,它只是一个预训练模型。

### Load model and tokenizer
model, tokenizer = (AutoModelForCausalLM.from_pretrained("MiniMind2", trust_remote_code=True).to(device).eval(),
                    AutoTokenizer.from_pretrained("MiniMind2", use_fast=False, trust_remote_code=True))

### Generate response from model
def generate_response(prompt, max_new_tokens=512, temperature=0.85, top_p=0.9):
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
    output_ids = model.generate(input_ids, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p,
                                eos_token_id=tokenizer.eos_token_id)
    return tokenizer.decode(output_ids[0], skip_special_tokens=True)

让我们尝试不同的输入,看看模型如何响应。

user input: Hi
LLM: I'm sorry, but you haven't provided the information about
the specific data or issues you're using.

user input: how are you?
LLM: Yes, I'm a DAY GPA. However, I can help you with that.

user input: what is 2+3=?
LLM: When you're looking for a 2+3=)

您可以看到 LLM 响应包含适当的单词和更有意义的较短上下文。 但是,我们希望 LLM 的响应更像 ChatGPT

例如,当我们问 What is 2 + 3? 时,它应该正确地回答 5

SFT 数据集预处理

为了让我们的 LLM 具有知识感知能力,我们需要一个高质量的对话格式数据集。 您可以选择任何数据集,但它应该遵循这种对话风格:

{
    "conversations": [
        {"role": "user", "content": "Hello"},
        {"role": "assistant", "content": "Hello!"},
        {"role": "user", "content": "Goodbye"},
        {"role": "assistant", "content": "Goodbye!"}
    ]
}

我正在使用 Google 提供的 GooAQ 数据集,其中包含 500 万个问题-答案对。

让我们下载数据并检查它的一个样本。

!wget -O gooaq.jsonl "https://github.com/allenai/gooaq/raw/refs/heads/main/data/gooaq.jsonl"
file_path = "gooaq.jsonl"  # Replace with your file path
data = []

### Open file and read line by line with a progress bar
with open(file_path, 'r', encoding='utf-8') as f:
    for line in tqdm(f, desc="Loading JSONL"):
        try:
            data.append(json.loads(line))  # Parse JSON and add to list
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")  # Handle decoding errors

### Print sample
print(data[0])

#### OUTPUT ###
{'id': 1,
 'question': 'Who is the president of U.S?',
 'short_answer': "Trump",
 'answer': None,
 'answer_type': 'unknown',
 'answer_url': None
}

该格式非常容易理解,但我们需要首先将其转换为对话格式。 让我们这样做。

### Process the data
converted_data = []
for item in tqdm(data_list, desc="Processing Data"):
    question = item.get("question")
    answer = item.get("answer") or item.get("short_answer")  # Use short_answer if answer is missing

    if question and answer:  # Skip if both are empty
        conversation = {
            "conversations": [
                {"role": "user", "content": question},
                {"role": "assistant", "content": answer}
            ]
        }
        converted_data.append(conversation)

保存到 JSONL 文件

output_file = "sft_data.jsonl"
with open(output_file, "w", encoding="utf-8") as f:
    for entry in tqdm(converted_data, desc="Writing to File"):
        f.write(json.dumps(entry, ensure_ascii=False) + "\n")  # Write each entry as a JSON line

print(f"Converted data saved to {output_file}")

我们正在检查长答案是否不存在,如果不存在,则使用短答案。然后将训练数据保存为 JSONL 格式,这与我们之前使用的格式相同。

我们需要创建一个 SFTDataset 类来处理此数据集,类似于我们创建预训练数据集类的方式。让我们先这样做,并检查它的工作原理。

这个类的目标是获取我们的会话数据(JSONL 格式),使用聊天模板(还记得我们在分词器配置中定义的那一个吗?)正确地格式化它,并将其转换为我们的模型可以理解的数值输入。

首先,让我们定义 class 的基本结构和 __init__ 方法:

class SFTDataset(Dataset):
    def __init__(self, jsonl_path, tokenizer, max_length=1024):
        super().__init__()
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.samples = self.load_data(jsonl_path)
        self.bos_id = tokenizer('<s>assistant\n', add_special_tokens=False).input_ids
        self.eos_id = tokenizer('</s>\n', add_special_tokens=False).input_ids

__init__ 中:

  • 存储 tokenizermax_length
  • 将数据从 jsonl_path 加载到 self.samples 中。
  • 存储序列的开始和结束 ID。

现在,让我们定义 load_data 方法:

此方法只是逐行读取 JSONL 文件,将每一行解析为 JSON 对象,并将其添加到 samples 列表中。

def __len__(self):
    return len(self.samples)

__len__ 方法返回 samples 的长度。

现在,最重要的是:_create_chat_prompt 方法。在这里,我们使用来自 tokenizerchat_template 来正确地格式化对话。

def _create_chat_prompt(self, conversations):
        """Builds a dialogue in ChatML format."""
        messages = []
        for i, turn in enumerate(conversations):
            role = 'user' if i % 2 == 0 else 'assistant'
            messages.append({"role": role, "content": turn['content']})
        return self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=False
        )

此方法处理一系列对话回合(例如,[{"role": "user", "content": "Hi"}, {"role": "assistant", "content": "Hello"}]),并:

  • 遍历回合。
  • 将它们格式化为结构化列表,确保角色在“user”和“assistant”之间交替。
  • 使用 self.tokenizer.apply_chat_template 应用预定义的聊天模板,插入特殊标记(例如,<s></s>)以分隔回合和角色。
  • 设置 tokenize=False 以返回格式化的字符串而不是标记 ID。

现在,让我们创建 _generate_loss_mask 方法。

def _generate_loss_mask(self, input_ids):
    loss_mask = [0] * len(input_ids)  # Initialize loss mask with zeros
    i = 0
    while i < len(input_ids):
        if input_ids[i:i + len(self.bos_id)] == self.bos_id:  # Check for BOS token
            start = i + len(self.bos_id)
            end = start
            while end < len(input_ids):
                if input_ids[end:end + len(self.eos_id)] == self.eos_id:  # Find EOS token
                    break
                end += 1
            # Mark tokens after BOS until EOS for loss calculation
            for j in range(start + 1, min(end + len(self.eos_id) + 1, self.max_length)):
                loss_mask[j] = 1
            i = end + len(self.eos_id) if end < len(input_ids) else len(input_ids)  # Move index past EOS
        else:
            i += 1
    return loss_mask  # Return the generated loss mask

在此函数中,我们创建一个 mask 来指示在 损失计算 期间应考虑哪些 标记。目标是仅对 助手 的响应计算 损失,而不是对 用户的提示特殊标记 计算损失。

它迭代以找到 assistanteos 标记,然后将 1s 分配给 assistanteos 标记,将 0s 分配给其他标记。

最后,__getitem__ 方法,当从 dataset 访问一个项目时(如 dataset[i]),将调用该方法:

def __getitem__(self, index):
    sample = self.samples[index]  # Get the sample at the given index
    prompt = self._create_chat_prompt(sample['conversations'])  # Generate chat prompt
    encoding = self.tokenizer(prompt, max_length=self.max_length, truncation=True, return_tensors="pt")
    input_ids = encoding.input_ids.squeeze()  # Extract tokenized input IDs

    loss_mask = self._generate_loss_mask(input_ids.tolist())  # Generate loss mask

    # Append EOS token and update loss mask
    input_ids = input_ids.tolist() + [self.tokenizer.eos_token_id]
    loss_mask = loss_mask + [1]

    # Truncate or pad input to match max_length
    if len(input_ids) > self.max_length:
        input_ids = input_ids[:self.max_length]
        loss_mask = loss_mask[:self.max_length]
    else:
        pad_len = self.max_length - len(input_ids)
        input_ids += [self.tokenizer.pad_token_id] * pad_len
        loss_mask += [0] * pad_len

    # Create input (X) and target (Y) tensors for training
    X = torch.tensor(input_ids[:-1], dtype=torch.long)  # Input sequence
    Y = torch.tensor(input_ids[1:], dtype=torch.long)  # Shifted target sequence
    loss_mask = torch.tensor(loss_mask[1:], dtype=torch.long)  # Loss mask for target tokens

    return X, Y, loss_mask  # Return input, target, and loss mask tensors

以下是 __getitem__ 中发生的事情

  1. 获取样本sample = self.samples[index] 检索给定索引处的对话。
  2. 创建提示self._create_chat_prompt(...) 格式化对话。
  3. 分词self.tokenizer(...) 对提示进行分词,截断/填充到 max_length,返回 PyTorch 张量 (return_tensors="pt")。
  4. 损失掩码和 EOS → 生成损失掩码并为 EOS 标记附加 1
  5. 填充/截断 → 通过截断(如果太长)或填充(如果太短)来确保 input_ids 匹配 max_length
  6. 创建 X 和 Y → X 是输入序列;Y 是 X 右移一个位置,用于下一个标记预测。
  7. 返回 → 输出 XYloss_mask

现在我们已经正确地预处理了 SFT 训练数据集,是时候编写训练循环了。

SFT 训练循环

现在我们已经准备好了 SFTDataset,我们可以使用监督微调 (SFT) 来训练我们的模型。

SFT 训练循环与预训练循环非常相似,但我们使用对话数据,并将损失集中在助手的回复上。

首先,让我们设置模型、优化器和数据加载器。这与之前的大部分内容相同,但我们将加载我们预训练的模型权重:

### 加载分词器(与之前相同)
tokenizer = AutoTokenizer.from_pretrained("my_custom_tokenizer")

### 加载预训练模型
model = MyTransformer(model_config)
model.load_state_dict(torch.load("pretrain_model.pth", map_location="cpu")) # 加载权重!
model = model.to("cuda")

我们加载自定义的 tokenizer。至关重要的是,我们实例化我们的 MyTransformer 模型,然后使用 load_state_dictpretrain_model.pth 加载权重。我们首先加载到 CPU,然后将 model 移动到 GPU

现在,让我们创建 SFTDatasetDataLoader

### 创建 SFT 数据集
train_ds = SFTDataset("sft_data.jsonl", tokenizer, max_length=model_config["max_seq_len"])

### 创建数据加载器
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, drop_last=False)

我们使用指向 SFT 数据 (sft_data.jsonl) 的路径、我们的 tokenizer最大序列长度 创建我们的 SFTDataset 实例。DataLoader 的设置与 预训练 类似,启用了 shuffling

接下来是 optimizerloss function

### 定义优化器 (AdamW 是一个不错的选择)
optimizer = optim.AdamW(model.parameters(), lr=5e-5) # 潜在的不同学习率

### 定义损失函数 (交叉熵)
loss_fct = nn.CrossEntropyLoss(reduction='none')

我们使用 AdamW,但 SFT学习率 通常低于 预训练。这里我使用了 5e-5,但你应该进行实验。我们仍然使用 CrossEntropyLossreduction='none'

现在,让我们定义 学习率调度器。我们将像之前一样使用相同的 cosine schedule

def get_lr(current_step, total_steps, lr):
    """使用余弦调度计算学习率。"""
    return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))

此函数与预训练保持不变。

现在,是训练循环的核心:

for epoch in range(epochs):
    start_time = time.time()
    for step, (X, Y, loss_mask) in enumerate(train_loader):
        X = X.to("cuda")
        Y = Y.to("cuda")
        loss_mask = loss_mask.to("cuda")

## 计算学习率
        lr = get_lr(epoch * iter_per_epoch + step, epochs * iter_per_epoch, 5e-5)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

## 前向传播
        res = model(X)

我们启动 epochs外循环batches内循环。我们将我们的 inputtargetloss mask 移动到 GPU

我们计算当前步骤的 学习率,并将其设置为 optimizer。然后,model 进行预测 (res = model(X))。

现在,关键部分:损失计算。这是我们使用 loss_mask 的地方:

### 计算损失(使用 loss_mask!)
        loss = loss_fct(
            res.logits.view(-1, res.logits.size(-1)),
            Y.view(-1)
        ).view(Y.size())
        loss = (loss * loss_mask).sum() / loss_mask.sum() # 关键区别:使用 loss_mask
        loss = loss/accumulation_steps

我们计算交叉熵损失,对其进行掩码以关注助手的 tokens,对掩码损失求和,并按掩码总和进行归一化。在反向传播和参数更新之前应用梯度累积。

接下来是反向传播和参数更新:

### 反向传播
        loss.backward()
        if (step + 1) % accumulation_steps == 0:
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            # 参数更新
            optimizer.step()
            # 梯度清零
            optimizer.zero_grad()

loss.backward() 计算 梯度。如果达到 accumulation steps,我们使用 clip_grad_norm_ 裁剪梯度,使用 optimizer.step() 更新模型参数,并使用 optimizer.zero_grad() 清除梯度。

最后是 日志记录保存

if step % 100 == 0:
            spend_time = time.time() - start_time
            print(
                'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
                    epoch + 1,
                    epochs,
                    step,
                    iter_per_epoch,
                    loss.item() * accumulation_steps,
                    optimizer.param_groups[-1]['lr'],
                    spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))

model.eval() # 设置为评估模式
torch.save(model.state_dict(), "sft_model.pth") # 保存 SFT 模型
model.train() # 设置为训练模式

我们正在记录损失、学习率和训练时间。当我们开始训练时,损失将以与预训练阶段相同的方式打印出来。

...

Epoch:[5/10](30/100) loss:0.514 lr:0.00002342352 epoch_Time:21min

...

训练后,我们的模型权重被保存。我们可以使用之前的函数轻松地将它们转换为 safetensors。现在,让我们加载模型,看看它改进了多少。

评估😖

尽管根据基准来判断我们的 LLM 并不理想,因为它只有 3000 万个参数,但我们还是这样做。

我们首先创建了一个预训练模型,然后使用 SFT 对其进行了微调。可以使用 RLHF 和推理进行进一步的改进,我将在未来的博客中介绍。

让我们测试不同的输入,看看它的反应。

加法问题。

user input: 2+3=?

LLM: 当你运行相同数字和相同数字时,
数字是 2 + 3 = 5。

所以,2 + 3 = 5。

因此,2 + 3 = 5。

知识检查。

user input: 世界上最高的山是什么?

LLM: 珠穆朗玛峰,世界最高的山峰 (8,848.86m),
位于尼泊尔-中国边境的喜马拉雅山脉。
user input: 谁发现了万有引力?

LLM: 牛顿在 1687 年提出了万有引力定律,
解释了物体如何通过引力相互吸引。

一般问题。

user input: hi
LLM: Hello

user: 你好吗?
LLM: 很抱歉,我无法帮助您。
但是,我无法提供有关
特定要求或上下文的信息,但我可以提供
您对您可能拥有的内容的一般想法:

一些问题。

user input: 1 KG 是多少克?
LLM: 答案是 `3`
user input: 1 KG 是多少克?
LLM: 答案是 `3`

虽然这些答案是精心挑选的,但该模型仍然需要更多的预训练和 SFT 训练才能进一步改进。

结论

如果你想构建一个严肃的 LLM,你应该使用一个非常高质量的合成数据集。同样的方法也适用于特定领域的 LLM。

确保创建一个大约有 1 亿到 2 亿个参数的 LLM,因为它们在问答任务中表现良好。

SpongeBob (用于标题图片)

Related Posts

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

AI 研究报告和论文写作 合并两个系统指令以获得两个模型的最佳效果 Perplexity AI 的 Deep Research 工具提供专家级的研究报告,而 OpenAI 的 ChatGPT-o3-mini-high 擅长推理。我发现你可以将它们结合起来生成令人难以置信的论文,这些论文比任何一个模型单独撰写的都要好。你只需要将这个一次性提示复制到 **

阅读更多
让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

Non members click here作为一名软件开发人员,多年来的一个发现总是让我感到惊讶,那就是人们还在 Excel

阅读更多
使用 ChatGPT 搜索网络功能的 10 种创意方法

使用 ChatGPT 搜索网络功能的 10 种创意方法

例如,提示和输出 你知道可以使用 ChatGPT 的“搜索网络”功能来完成许多任务,而不仅仅是基本的网络搜索吗? 对于那些不知道的人,ChatGPT 新的“搜索网络”功能提供实时信息。 截至撰写此帖时,该功能仅对使用 ChatGPT 4o 和 4o-mini 的付费会员开放。 ![](https://images.weserv.nl/?url=https://cdn-im

阅读更多
掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

10 个常见问题解答 本文是我推出的一个名为“10 个常见问题解答”的新系列的一部分。在本系列中,我旨在通过回答关于该主题的十个最常见问题来分解复杂的概念。我的目标是使用简单的语言和相关的类比,使这些想法易于理解。 图片来自 [Solen Feyissa](https://unsplash.com/@solenfeyissa?utm_source=medium&utm_medi

阅读更多
在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和科技这样一个动态的行业中,保持领先意味着不断提升你的技能。无论你是希望深入了解人工智能模型性能、掌握数据分析,还是希望通过人工智能转变传统领域如法律,这些课程都是你成功的捷径。以下是一个精心策划的高价值课程列表,可以助力你的职业发展,并让你始终处于创新的前沿。 1. 生成性人工智能简介课程: [生成性人工智能简介](https://genai.works

阅读更多
揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

在AI军备竞赛中分辨事实与虚构 DeepSeek AI真的是它所宣传的游戏规则改变者,还是仅仅聪明的营销和战略炒作?👀 虽然一些人将其视为AI效率的革命性飞跃,但另一些人则认为它的成功建立在借用(甚至窃取的)创新和可疑的做法之上。传言称,DeepSeek的首席执行官在疫情期间像囤积卫生纸一样囤积Nvidia芯片——这只是冰山一角。 从其声称的550万美元培训预算到使用Open

阅读更多
Type something to search...