|
před 5 měsíci | |
---|---|---|
data | před 5 měsíci | |
resource | před 5 měsíci | |
src | před 5 měsíci | |
.gitignore | před 5 měsíci | |
README.MD | před 5 měsíci | |
main.py | před 5 měsíci | |
runModel.py | před 5 měsíci | |
trainModel.py | před 5 měsíci |
MoE(Mixture of Experts)架构
一、原理
专家模块
MoE由多个“专家”(expert)组成。这些专家通常是一些较小的神经网络,例如多层感知机(MLP)。每个专家都有自己独特的参数和处理能力。
门控机制
伴随专家模块的是一个门控网络。当输入数据到来时,门控网络会根据输入的特征决定将该输入路由到哪个专家或者哪些专家的组合上进行处理。这种路由决策是基于数据驱动的,旨在把不同类型的输入分配给最适合处理它们的专家。
二、优点
高效性
在处理大规模数据和复杂任务时,可以有效地利用计算资源。因为不必让所有的数据都通过一个庞大而统一的模型,而是将不同部分的数据分发给擅长处理它们的专家,从而避免了在一些不相关任务上浪费计算资源。
模型容量大
通过组合多个专家,可以增加模型整体的容量。这意味着它能够学习到更多的模式和关系,对于复杂任务有更好的表现能力。
三、缺点
训练复杂性
需要仔细地设计门控机制,确保正确的输入被路由到合适的专家。不当的路由可能导致某些专家被过度使用,而其他专家则未得到充分利用,影响模型性能。而且训练过程中需要平衡各个专家之间的权重更新等问题。
通信开销
在分布式训练的情况下,如果多个专家分布在不同的计算设备上,数据在设备之间的路由会带来一定的通信开销,这可能会影响训练速度和效率。
Dense(密集连接)架构
一、原理
全连接特性
在Dense架构的神经网络中,每一层的神经元都与它之前的所有层中的神经元相连。例如,在一个简单的三层神经网络中,第二层的每个神经元都接收来自第一层所有神经元的输入,第三层的每个神经元又接收来自第一和第二层所有神经元的输入。
特征重复利用
由于这种全连接的方式,前面层提取的特征可以被后续层多次利用。这有助于深度神经网络逐步从原始数据中学习到更高级、更抽象的特征表示。
二、优点
信息传递完全
保证了信息在网络中的最大程度流动。前面层的任何微小变化都能传播到后面的所有层,使得模型能够充分利用输入数据中的所有信息,有利于学习复杂的函数映射。
特征学习深度
能够深入挖掘数据中的特征。随着网络层数的增加,它可以不断地在前序特征基础上构建新的特征,适合处理一些具有高度非线性关系的任务,如图像识别、自然语言处理中的语义理解等。
三、缺点
参数量大
由于每层都与之前所有层全连接,当网络层数增加和输入维度增大时,模型的参数数量会急剧增加。这不仅会导致训练时间长,还容易产生过拟合现象,尤其在数据量相对较小时更为明显。
计算复杂度高
在计算每一层的输出时,由于大量的连接,需要进行大量的乘法和加法运算,这对计算资源(如GPU内存和计算速度)提出了很高的要求,限制了模型在一些计算资源有限设备上的应用。
在 DeepSeek-R1 模型中,SFT 和 GRPO 是两种重要的技术或方法,分别用于模型训练和优化。以下是它们的详细介绍:
SFT 是一种通过使用标注数据对预训练模型进行微调的技术。它通常在预训练模型的基础上,针对特定任务(如文本分类、问答等)进行进一步训练。
from transformers import AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
from datasets import load_dataset
# 加载预训练模型和分词器
model = AutoModelForSequenceClassification.from_pretrained("deepseek-r1")
tokenizer = AutoTokenizer.from_pretrained("deepseek-r1")
# 加载数据集
dataset = load_dataset("your_dataset")
# 数据预处理
def preprocess_function(examples):
return tokenizer(examples["text"], truncation=True, padding="max_length")
tokenized_datasets = dataset.map(preprocess_function, batched=True)
# 定义训练参数
training_args = TrainingArguments(
output_dir="./results",
evaluation_strategy="epoch",
learning_rate=2e-5,
per_device_train_batch_size=16,
num_train_epochs=3,
weight_decay=0.01,
)
# 定义 Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"],
)
# 微调模型
trainer.train()
GRPO 是一种用于强化学习(RL)或策略优化的技术。它通过反转梯度方向来优化策略,从而在复杂任务中实现更好的性能。
import torch
import torch.nn as nn
import torch.optim as optim
# 定义策略模型
class PolicyModel(nn.Module):
def __init__(self):
super(PolicyModel, self).__init__()
self.fc = nn.Linear(128, 2) # 假设输入维度为 128,输出为 2 个动作
def forward(self, x):
return torch.softmax(self.fc(x), dim=-1)
# 定义 GRPO 优化器
class GRPOptimizer:
def __init__(self, model, lr=1e-3):
self.model = model
self.optimizer = optim.Adam(model.parameters(), lr=lr)
def step(self, loss):
# 反向传播
loss.backward()
# 梯度反转
for param in self.model.parameters():
param.grad = -param.grad # 反转梯度
# 更新参数
self.optimizer.step()
self.optimizer.zero_grad()
# 训练循环
policy_model = PolicyModel()
grp_optimizer = GRPOptimizer(policy_model)
for epoch in range(10):
state = torch.randn(1, 128) # 假设状态维度为 128
action_probs, _ = policy_model(state)
action = torch.argmax(action_probs).item()
# 假设奖励为 1(实际任务中需根据环境计算)
reward = 1
loss = -torch.log(action_probs[0, action]) * reward # 策略梯度损失
# GRPO 优化
grp_optimizer.step(loss)
特性 | SFT(监督微调) | GRPO(基于梯度反转的策略优化) |
---|---|---|
应用场景 | 监督学习任务(如分类、生成) | 强化学习任务(如策略优化) |
数据需求 | 需要标注数据 | 需要环境和奖励信号 |
优化目标 | 最小化监督损失(如交叉熵) | 最大化累积奖励 |
技术核心 | 微调预训练模型 | 梯度反转和策略优化 |
如果你有更多关于 DeepSeek-R1 或相关技术的问题,欢迎继续提问!
SFT GRPO
python库: PyTorch+CUDA
运行环境:Windows11 环境
import torch
print(torch.__version__) # 查看 PyTorch 版本
# 检查 CUDA 是否可用
if torch.cuda.is_available():
print("CUDA 可用")
print("CUDA 版本:", torch.version.cuda)
print("GPU 名称:", torch.cuda.get_device_name(0))
else:
print("CUDA 不可用")
安装包下载链接: https://developer.nvidia.com/cuda-downloads
命令:nvcc --version 验证是否安装成功
安装成功信息示例:
nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2023 NVIDIA Corporation
Built on ...
Cuda compilation tools, release 12.3, V12.3.107
1. 卸载当前 PyTorch 执行命令:pip uninstall torch
2. 生成安装支持CUDA的PyTorch命令,命令生成链接:https://pytorch.org/get-started/locally/
如:pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126
源自Deepseek-R1对话获取
组件 | 推荐配置 |
---|---|
GPU | 16 × NVIDIA A100 80GB(通过 NVLink 互联) |
CPU | 2 × AMD EPYC 7763(64 核,128 线程) |
内存 | 2TB DDR4 ECC |
存储 | 20TB NVMe SSD(分布式文件系统,如 Lustre) |
网络 | 100GbE InfiniBand |
操作系统 | Ubuntu 20.04 LTS |
深度学习框架 | PyTorch 2.0+,DeepSpeed,Megatron-LM |
组件 | 推荐配置 |
---|---|
GPU | 8 × NVIDIA A100 80GB(通过 NVLink 互联) |
CPU | 2 × AMD EPYC 7763(64 核,128 线程) |
内存 | 2TB DDR4 ECC |
存储 | 20TB NVMe SSD(分布式文件系统,如 Lustre) |
网络 | 100GbE InfiniBand |
操作系统 | Ubuntu 20.04 LTS |
深度学习框架 | PyTorch 2.0+,DeepSpeed,Megatron-LM |
估算: 并发用户数:8 × A100 80GB的配置可以支持10-50个并发用户。 每秒输出tokens数:每秒可以输出10-50个tokens。 建议: 如果需要支持更多并发用户或更高的token输出速度,可以增加GPU数量(如16 × A100 80GB)或使用更高效的推理框架优化(如FasterTransformer)。 对于更高性能需求,可以考虑使用专门优化的推理硬件(如NVIDIA H100)或分布式推理集群
尚待服务器环境具备时验证
以下是支持多节点和多 GPU 分布式训练的代码:
# model_trainer_distribute.py
import torch
import torch.distributed as dist
from torch.utils.data import Dataset, DataLoader
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from transformers import AutoTokenizer, AutoModelForCausalLM, AdamW
from src.config import Config
import logging
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TextDataset(Dataset):
"""自定义数据集类"""
def __init__(self, file_path, tokenizer, seq_length):
self.tokenizer = tokenizer
self.seq_length = seq_length
with open(file_path, "r", encoding="utf-8") as f:
self.lines = f.read().splitlines()
def __len__(self):
return len(self.lines)
def __getitem__(self, idx):
line = self.lines[idx]
tokens = self.tokenizer.encode(
line,
max_length=self.seq_length,
truncation=True,
padding="max_length"
)
return torch.tensor(tokens) # 返回 CPU 上的张量
class ModelTrainer:
def __init__(self, local_rank, world_size):
"""初始化模型、分词器和优化器"""
self.local_rank = local_rank
self.world_size = world_size
self.device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
try:
# 加载分词器和模型
self.tokenizer = AutoTokenizer.from_pretrained(Config.PRETRAINED_MODEL_DIR)
self.model = AutoModelForCausalLM.from_pretrained(Config.PRETRAINED_MODEL_DIR)
self.model.to(self.device) # 将模型移动到设备
# 使用 DistributedDataParallel 包装模型
self.model = DDP(self.model, device_ids=[local_rank], output_device=local_rank)
logger.info("Pretrained model and tokenizer loaded.")
except Exception as e:
logger.error(f"Failed to load pretrained model: {e}")
raise
def train(self):
"""训练模型"""
try:
# 加载数据集
dataset = TextDataset(Config.PROCESSED_DATA_PATH, self.tokenizer, Config.SEQ_LENGTH)
sampler = DistributedSampler(dataset, num_replicas=self.world_size, rank=self.local_rank)
dataloader = DataLoader(
dataset,
batch_size=Config.BATCH_SIZE,
sampler=sampler,
num_workers=4
)
# 初始化优化器
optimizer = AdamW(self.model.parameters(), lr=Config.LEARNING_RATE)
# 训练循环
self.model.train()
for epoch in range(Config.EPOCHS):
sampler.set_epoch(epoch) # 设置 epoch 以打乱数据
for batch in dataloader:
# 将输入数据移动到设备
inputs = batch.to(self.device)
# 前向传播
outputs = self.model(inputs, labels=inputs)
loss = outputs.loss
# 反向传播和优化
loss.backward()
optimizer.step()
optimizer.zero_grad()
if self.local_rank == 0: # 仅主进程记录日志
logger.info(f"Epoch {epoch + 1}/{Config.EPOCHS}, Loss: {loss.item()}")
# 保存训练后的模型(仅主进程保存)
if self.local_rank == 0:
os.makedirs(Config.TRAINED_MODEL_DIR, exist_ok=True)
self.model.module.save_pretrained(Config.TRAINED_MODEL_DIR)
self.tokenizer.save_pretrained(Config.TRAINED_MODEL_DIR)
logger.info(f"Model saved to {Config.TRAINED_MODEL_DIR}")
except Exception as e:
logger.error(f"Training failed: {e}")
raise
def setup_distributed():
"""初始化分布式训练环境"""
dist.init_process_group(backend="nccl")
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
return local_rank, world_size
def cleanup_distributed():
"""清理分布式训练环境"""
dist.destroy_process_group()
def main():
# 初始化分布式训练
local_rank, world_size = setup_distributed()
try:
# 初始化训练器
trainer = ModelTrainer(local_rank, world_size)
trainer.train()
finally:
# 清理分布式训练环境
cleanup_distributed()
if __name__ == "__main__":
main()
部署方案
torchrun --nproc_per_node=4 --nnodes=1 --node_rank=0 --master_addr=localhost --master_port=12345 model_trainer.py
参数说明 --nproc_per_node:每个节点的 GPU 数量。 --nnodes:节点总数(单节点为 1)。 --node_rank:当前节点的 rank(单节点为 0)。 --master_addr 和 --master_port:主节点的地址和端口。
启动命令 假设有 2 台服务器,每台服务器有 4 张 GPU。
主节点(Node 0)
torchrun \
--nproc_per_node=4 \
--nnodes=2 \
--node_rank=0 \
--master_addr=<主节点IP> \
--master_port=12345 \
model_trainer.py
从节点(Node 1)
torchrun \
--nproc_per_node=4 \
--nnodes=2 \
--node_rank=1 \
--master_addr=<主节点IP> \
--master_port=12345 \
model_trainer.py
参数说明 --nproc_per_node:每个节点的 GPU 数量。 --nnodes:节点总数。 --node_rank:当前节点的 rank(主节点为 0,从节点为 1)。 --master_addr:主节点的 IP 地址。 --master_port:主节点的端口。
执行训练方法
单服务器多 GPU 训练 确保所有 GPU 可用:
nvidia-smi
启动训练:
torchrun --nproc_per_node=4 --nnodes=1 --node_rank=0 --master_addr=localhost --master_port=12345 model_trainer.py
多服务器多 GPU 训练 在主节点和从节点上分别启动训练:
主节点:
torchrun --nproc_per_node=4 --nnodes=2 --node_rank=0 --master_addr=<主节点IP> --master_port=12345 model_trainer.py
从节点:
torchrun --nproc_per_node=4 --nnodes=2 --node_rank=1 --master_addr=<主节点IP> --master_port=12345 model_trainer.py
确保所有节点可以互相访问,并且端口 12345 未被占用。
总结 通过上述重构和部署方案,代码可以支持: 单服务器多 GPU 的分布式训练。 多服务器多 GPU 的分布式训练。
关键点: 使用 DistributedDataParallel 实现模型并行。 使用 DistributedSampler 实现数据并行。 使用 torchrun 启动分布式训练。
在昇腾910B NPU上训练DeepSeek-R1 671B模型,需要结合昇腾AI处理器(如Ascend 910B)的硬件特性和华为的AI软件栈(如MindSpore)进行适配和优化。以下是详细的步骤和注意事项:
软件环境 操作系统:支持Ascend 910B的操作系统(如Ubuntu 18.04/20.04或CentOS 7.6)。 驱动和固件:安装昇腾910B的驱动和固件。 MindSpore:华为的深度学习框架,支持昇腾910B。 安装MindSpore(推荐使用与Ascend 910B兼容的版本,如MindSpore 2.0+)。
安装命令示例:
pip install mindspore-ascend
CANN:华为的异构计算架构,用于优化昇腾NPU的性能。 安装CANN工具包(与MindSpore版本匹配)。
数据集 准备训练DeepSeek-R1 671B所需的数据集,并确保数据格式与MindSpore兼容。
分布式训练支持 DeepSeek-R1 671B模型规模巨大,通常需要分布式训练。 使用MindSpore的分布式训练功能(如mindspore.set_auto_parallel_context)配置多机多卡训练。
示例代码:
import mindspore as ms
from mindspore import context
from mindspore.communication import init
# 设置运行模式为图模式
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# 初始化分布式训练
init()
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, device_num=8)
混合精度训练 为了提升训练效率,可以使用混合精度训练(FP16)。 在MindSpore中启用混合精度:
from mindspore import amp
# 设置混合精度
model = amp.build_train_network(model, optimizer, loss_fn, level="O2")
示例代码:
from mindspore.dataset import GeneratorDataset
dataset = GeneratorDataset(source=data_generator, column_names=["data", "label"])
dataset = dataset.batch(batch_size=32)
模型定义 使用MindSpore定义DeepSeek-R1 671B模型。 示例代码:
import mindspore.nn as nn
class DeepSeekR1(nn.Cell):
def __init__(self):
super(DeepSeekR1, self).__init__()
# 定义模型结构
self.layer1 = nn.Dense(1024, 2048)
self.layer2 = nn.Dense(2048, 4096)
# 添加更多层...
def construct(self, x):
x = self.layer1(x)
x = self.layer2(x)
# 添加更多操作...
return x
损失函数和优化器 定义损失函数和优化器。 示例代码:
from mindspore import nn
# 定义损失函数
loss_fn = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
# 定义优化器
optimizer = nn.Adam(params=model.trainable_params(), learning_rate=0.001)
训练循环 使用MindSpore的Model API进行训练。 示例代码:
from mindspore import Model
# 创建模型
model = Model(network=DeepSeekR1(), loss_fn=loss_fn, optimizer=optimizer)
# 开始训练
model.train(epoch=10, train_dataset=dataset)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.AUTO_PARALLEL,
device_num=8,
full_batch=True,
gradient_aggregation_group=4)
内存优化 使用MindSpore的内存优化功能(如grad_accumulation)减少显存占用。 示例代码:
ms.set_auto_parallel_context(grad_accumulation_step=4)
通信优化 使用华为的HCCL(Huawei Collective Communication Library)优化多机多卡通信。 确保网络带宽和延迟满足分布式训练的需求。
import logging
logging.basicConfig(level=logging.INFO)
性能监控 使用华为的Ascend工具(如Ascend Performance Analyzer)监控训练性能。 分析瓶颈(如计算、通信、I/O)并进行优化。
注意事项 硬件资源:671B规模的模型需要大量计算资源,确保硬件环境足够。 分布式训练:多机多卡训练需要良好的网络环境和通信优化。 版本兼容性:确保MindSpore、CANN和驱动版本兼容。 文档支持:参考华为官方文档(如MindSpore和Ascend文档)获取最新信息。
通过以上步骤,你可以在昇腾910B NPU上成功训练DeepSeek-R1 671B模型。如果遇到问题,可以联系华为技术支持或社区获取帮助。