import os import torch import torch.distributed as dist from unsloth import FastLanguageModel from unsloth import is_bfloat16_supported from trl import SFTTrainer, GRPOConfig, GRPOTrainer from datasets import load_dataset from transformers import TrainingArguments import re from datasets import load_dataset, Dataset from modelscope.msdatasets import MsDataset # Load and prep dataset SYSTEM_PROMPT = """ Respond in the following format: ... ... """ XML_COT_FORMAT = """\ {reasoning} {answer} """ def extract_xml_answer(text: str) -> str: answer = text.split("")[-1] answer = answer.split("")[0] return answer.strip() def extract_hash_answer(text: str) -> str | None: if "####" not in text: return None return text.split("####")[1].strip() # uncomment middle messages for 1-shot prompting def get_gsm8k_questions(split = "train") -> Dataset: # data = load_dataset('https://huggingface.co/datasets/openai/gsm8k', 'main')[split] # type: ignore data = MsDataset.load('openai-mirror/gsm8k', subset_name='main', split=split) data = data.map(lambda x: { # type: ignore 'prompt': [ {'role': 'system', 'content': SYSTEM_PROMPT}, {'role': 'user', 'content': x['question']} ], 'answer': extract_hash_answer(x['answer']) }) # type: ignore return data # type: ignore dataset = get_gsm8k_questions() # 方法 1:使用 datasets 库的 to_json 方法 dataset.to_json(os.path.join("..","data","backup", "gsm8k_dataset_for_train.jsonl"), orient="records", lines=True, force_ascii=False) # Reward functions def correctness_reward_func(prompts, completions, answer, **kwargs) -> list[float]: responses = [completion[0]['content'] for completion in completions] q = prompts[0][-1]['content'] extracted_responses = [extract_xml_answer(r) for r in responses] print('-'*20, f"Question:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}", f"\nExtracted:\n{extracted_responses[0]}") return [2.0 if r == a else 0.0 for r, a in zip(extracted_responses, answer)] def int_reward_func(completions, **kwargs) -> list[float]: responses = [completion[0]['content'] for completion in completions] extracted_responses = [extract_xml_answer(r) for r in responses] return [0.5 if r.isdigit() else 0.0 for r in extracted_responses] def strict_format_reward_func(completions, **kwargs) -> list[float]: """Reward function that checks if the completion has a specific format.""" pattern = r"^\n.*?\n\n\n.*?\n\n$" responses = [completion[0]["content"] for completion in completions] matches = [re.match(pattern, r) for r in responses] return [0.5 if match else 0.0 for match in matches] def soft_format_reward_func(completions, **kwargs) -> list[float]: """Reward function that checks if the completion has a specific format.""" pattern = r".*?\s*.*?" responses = [completion[0]["content"] for completion in completions] matches = [re.match(pattern, r) for r in responses] return [0.5 if match else 0.0 for match in matches] def count_xml(text) -> float: count = 0.0 if text.count("\n") == 1: count += 0.125 if text.count("\n\n") == 1: count += 0.125 if text.count("\n\n") == 1: count += 0.125 count -= len(text.split("\n\n")[-1])*0.001 if text.count("\n") == 1: count += 0.125 count -= (len(text.split("\n")[-1]) - 1)*0.001 return count def xmlcount_reward_func(completions, **kwargs) -> list[float]: contents = [completion[0]["content"] for completion in completions] return [count_xml(c) for c in contents] class ModelTrainer: def __init__(self, model_name, max_seq_length, dtype, load_in_4bit,lora_rank=32): # 初始化 ModelTrainer 类,设置模型名称、最大序列长度、数据类型和是否以4位加载 self.model_name = model_name self.max_seq_length = max_seq_length self.dtype = dtype # dtype: 数据类型,如 torch.float16 或 torch.bfloat16 self.load_in_4bit = load_in_4bit # load_in_4bit: 是否以4位精度加载模型,用于节省显存 self.lora_rank=lora_rank #Larger rank = smarter, but slower def load_model(self,lora_rank=64): # 加载预训练模型和分词器 model, tokenizer = FastLanguageModel.from_pretrained( model_name=self.model_name, max_seq_length=self.max_seq_length, load_in_4bit=self.load_in_4bit, # 值为True 以 4 bit量化进行微调,为False LoRA 16bit。这将内存使用量减少了 4 倍,使我们能够在免费的 16GB 内存 GPU 中实际进行微调。4 位量化本质上将权重转换为一组有限的数字以减少内存使用量。这样做的缺点是准确度会下降 1-2%。如果您想要这种微小的额外准确度,请在较大的 GPU(如 H100)上将其设置为 False。 dtype=self.dtype, fast_inference = True, # Enable vLLM fast inference max_lora_rank = lora_rank, gpu_memory_utilization=0.005, # 0.6 # Reduce if out of memory ) # 将模型移动到设备上 model = model.to_empty(device='cuda') # 使用 to_empty 而不是 to # 初始化模型的权重 for param in model.parameters(): if param.is_meta: param.data = torch.randn_like(param) # 随机初始化 # 添加 LoRA 适配器 model = FastLanguageModel.get_peft_model( model, max_seq_length=self.max_seq_length, # 最大上下文(序列)长度 r=lora_rank, # Choose any number > 0 ! Suggested 8, 16, 32, 64, 128 target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"], # 应用 LoRA 的目标模块 lora_alpha=16, # LoRA 的 alpha 参数,控制适配器的缩放 lora_dropout=0, # LoRA 的 dropout 率,设置为0以优化性能 bias="none", # 是否在 LoRA 中添加偏置,设置为 "none" 以优化性能 use_gradient_checkpointing="unsloth", # 使用梯度检查点以节省显存,对于非常长的上下文,使用 True 或 "unsloth" random_state=3407, # 随机种子,确保实验可复现 use_rslora=False, # 是否使用 rank stabilized LoRA,当前不支持 loftq_config=None, # LoftQ 配置,当前不支持 ) return model, tokenizer def load_data(self, train_data_path): # 加载训练集和测试集 with open(train_data_path, 'r') as f: train_dataset = load_dataset("json", data_files={"train": train_data_path}, split="train") # train_data_path: 训练数据路径,格式为 JSONL return train_dataset def train(self, model, tokenizer, train_dataset): print("is_bfloat16_supported()=", is_bfloat16_supported()) # 监控显存使用情况 print(f"Reserved memory: {torch.cuda.memory_reserved()}") print(f"Allocated memory: {torch.cuda.memory_allocated()}") # 启用 pin_memory 2025年3月10日未能验证通过 train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=1, shuffle=True, pin_memory=True ) # 释放未使用的显存 torch.cuda.empty_cache() training_args = GRPOConfig( use_vllm = True, # use vLLM for fast inference! learning_rate = 5e-6, adam_beta1 = 0.9, adam_beta2 = 0.99, weight_decay = 0.1, warmup_ratio = 0.1, lr_scheduler_type = "cosine", optim ="adamw_8bit", # "adamw_8bit" if device == "cuda" else "adamw_torch", # CPU 使用 adamw_torch logging_steps = 1, bf16 = is_bfloat16_supported(), fp16 = not is_bfloat16_supported(), per_device_train_batch_size = 1, gradient_accumulation_steps = 1, # Increase to 4 for smoother training num_generations = 4, # 8 # 每次生成 输出 个数 max_prompt_length = 256, # 256 # 输入提示的最大长度 max_completion_length = 200,# 200 # 生成内容的最大长度 num_train_epochs = 1, # Set to 1 for a full training run max_steps = 250, # 250 save_steps = 250, # 250 max_grad_norm = 0.1, report_to = "none", # Can use Weights & Biases output_dir = os.path.join('..', 'models',"outputs"), ) # 初始化 SFTTrainer trainer = GRPOTrainer( model = model, processing_class = tokenizer, reward_funcs = [ xmlcount_reward_func, soft_format_reward_func, strict_format_reward_func, int_reward_func, correctness_reward_func, ], args = training_args, train_dataset = train_dataset, ) # 训练模型 trainer.train() return model def save_model(self, model, tokenizer, save_path): # 保存模型和分词器 model.save_pretrained(save_path) tokenizer.save_pretrained(save_path) print(f"Model saved to {save_path}") if __name__ == "__main__": # 配置参数 model_name = os.path.join('..', 'models', 'pretrained', 'DeepSeek-R1-Distill-Qwen-1.5B') # model_name: 预训练模型的路径 max_seq_length = 512 # 单次会话(single session) 的最大 token 长度,一个token大约3-4 字节(Byte) dtype = torch.float16 # 数据类型 load_in_4bit = True # 是否以4位精度加载模型 lora_rank=16 # 定义训练集和测试集路径 train_data_path = os.path.join('..', 'data', 'processed', 'train.jsonl') # train_data_path: 训练数据路径 try: # 设置环境变量 # 单机多卡 os.environ['RANK'] = '0' # 第一张卡的 rank os.environ['WORLD_SIZE'] = '1' # 总共有 1 张卡 os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '12345' # 多机多卡 # export RANK=0 # 第一台机器的 rank # export WORLD_SIZE=4 # 总共有 4 台机器 # export MASTER_ADDR=<主节点 IP> # export MASTER_PORT=12345 # 初始化进程组 # dist.init_process_group(backend='nccl', init_method='env://') # 初始化 ModelTrainer trainer = ModelTrainer(model_name, max_seq_length, dtype, load_in_4bit,lora_rank) # 加载模型和分词器 model, tokenizer = trainer.load_model(lora_rank) # 加载数据集 train_dataset = trainer.load_data(train_data_path) # 训练模型 model = trainer.train(model, tokenizer, train_dataset) # 保存模型 save_path = os.path.join('..', 'models', 'trained', 'DeepSeek-R1-Distill-Qwen-1.5B-GRPO') trainer.save_model(model, tokenizer, save_path) finally: # 确保进程组被销毁 if dist.is_initialized(): dist.destroy_process_group() print("train finally")