# train_model_grpo_v1.py import os import torch import torch.distributed as dist from unsloth import FastLanguageModel from unsloth import is_bfloat16_supported from trl import GRPOConfig, GRPOTrainer from datasets import load_dataset from conf_train import Config, load_config # 导入配置文件 import re from transformers import LongformerTokenizer, LongformerModel # 分词模型最大支持 4096 个token import numpy as np class ModelTrainer: def __init__(self, config: Config): """ 初始化 ModelTrainer 类,加载配置参数。 :param config: 配置对象,包含模型训练所需的参数 """ self.config: Config = config self.model_name = config.model_name self.max_seq_length = config.max_seq_length self.dtype = torch.float16 if config.dtype == "float16" else torch.bfloat16 self.load_in_4bit = config.load_in_4bit self.fast_inference = config.fast_inference self.lora_rank = config.lora_rank self.gpu_memory_utilization = config.gpu_memory_utilization # 初始化 Longformer 模型和分词器 self.tokenizer = LongformerTokenizer.from_pretrained(f'../models/allenai/longformer-base-4096') self.longformer_model = LongformerModel.from_pretrained(f'../models/allenai/longformer-base-4096') def load_model(self): """ 加载预训练模型和分词器。 :return: 返回加载的模型和分词器 """ model, tokenizer = FastLanguageModel.from_pretrained( model_name=self.model_name, max_seq_length=self.max_seq_length, load_in_4bit=self.load_in_4bit, # False for LoRA 16bit dtype=self.dtype, fast_inference=self.fast_inference, max_lora_rank=self.lora_rank, gpu_memory_utilization=self.gpu_memory_utilization, ) model = model.to_empty(device='cuda') # 初始化模型的权重 for param in model.parameters(): if param.is_meta: param.data = torch.randn_like(param) # 添加 LoRA 适配器 model = FastLanguageModel.get_peft_model( model, max_seq_length=self.max_seq_length, r=self.lora_rank, # Choose any number > 0! Suggested 8, 16, 32, 64, 128 target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"], # Remove QKVO if out of memory lora_alpha=16, lora_dropout=0, # Supports any, but = 0 is optimized bias="none", # Supports any, but = "none" is optimized use_gradient_checkpointing="unsloth", # True or "unsloth" for very long context random_state=3407, use_rslora=False, # We support rank stabilized LoRA loftq_config=None, # And LoftQ ) return model, tokenizer def load_data(self, train_data_path): """ 加载训练数据集。 :param train_data_path: 训练数据路径 :return: 返回加载的训练数据集 """ with open(train_data_path, 'r') as f: train_dataset = load_dataset("json", data_files={"train": train_data_path}, split="train") print("train_dataset -->\n", train_dataset) # 打印第一条数据,检查格式是否正确 print("First example in train_dataset:", train_dataset[0]) return train_dataset def train(self, model, tokenizer, train_dataset): """ 训练模型。 :param model: 预训练模型 :param tokenizer: 分词器 :param train_dataset: 训练数据集 :return: 返回训练后的模型 """ print("is_bfloat16_supported()=", is_bfloat16_supported()) print(f"Reserved memory: {torch.cuda.memory_reserved()}") print(f"Allocated memory: {torch.cuda.memory_allocated()}") train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=1, shuffle=True, pin_memory=True ) torch.cuda.empty_cache() print("self.config.learning_rate=", float(self.config.learning_rate)) training_args = GRPOConfig( use_vllm=self.config.use_vllm, learning_rate=float(self.config.learning_rate), adam_beta1=self.config.adam_beta1, adam_beta2=self.config.adam_beta2, weight_decay=self.config.weight_decay, warmup_ratio=self.config.warmup_ratio, lr_scheduler_type=self.config.lr_scheduler_type, optim=self.config.optim, logging_steps=self.config.logging_steps, bf16=is_bfloat16_supported(), fp16=not is_bfloat16_supported(), per_device_train_batch_size=self.config.per_device_train_batch_size, gradient_accumulation_steps=self.config.gradient_accumulation_steps, num_generations=self.config.num_generations, max_prompt_length=self.config.max_prompt_length, max_completion_length=self.config.max_completion_length, num_train_epochs=self.config.num_train_epochs, max_steps=self.config.max_steps, save_steps=self.config.save_steps, max_grad_norm=self.config.max_grad_norm, report_to=self.config.report_to, output_dir=self.config.output_dir, ) trainer = GRPOTrainer( model=model, processing_class=tokenizer, # 用于处理输入文本的分词器(tokenizer) reward_funcs=[ self.xmlcount_reward_func, # XML 标签完整性奖励函数 self.soft_format_reward_func, # 软格式奖励函数 self.strict_format_reward_func, # 严格格式奖励函数 self.int_reward_func, # 整数奖励函数 self.correctness_reward_func, # 正确性奖励函数 self.semantic_correctness_reward_func, # 语义正确性奖励函数 self.reasoning_quality_reward_func, # 推理质量奖励函数 self.combined_reward_func, # 综合奖励函数 ], args=training_args, # 定义的训练超参数 train_dataset=train_dataset, # 训练数据集 ) trainer.train() return model def save_model(self, model, tokenizer, save_path): """ 保存训练后的模型和分词器。 :param model: 训练后的模型 :param tokenizer: 分词器 :param save_path: 保存路径 """ model.save_pretrained(save_path) tokenizer.save_pretrained(save_path) print(f"Model saved to {save_path}") @staticmethod def cosine_similarity(vec1, vec2): """ 计算两个向量的余弦相似度。 :param vec1: 第一个向量,形状为 (1, 768) :param vec2: 第二个向量,形状为 (1, 768) :return: 余弦相似度 """ vec1 = vec1.squeeze() # 形状从 (1, 768) 变为 (768,) vec2 = vec2.squeeze() # 形状从 (1, 768) 变为 (768,) return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) def semantic_correctness_reward_func(self, prompts, completions, answer, **kwargs): """ 使用 Longformer 计算生成答案与标准答案的语义相似度。 """ responses = [completion[0]['content'] for completion in completions] extracted_responses = [self.extract_xml_answer(r) for r in responses] scores = [] for resp, ans in zip(extracted_responses, answer): # 截断文本,确保长度不超过 4096 resp = self.tokenizer.decode(self.tokenizer.encode(resp, truncation=True, max_length=4096)) ans = self.tokenizer.decode(self.tokenizer.encode(ans, truncation=True, max_length=4096)) # 编码生成答案和标准答案 inputs_resp = self.tokenizer(resp, return_tensors='pt', padding=True, truncation=True, max_length=4096) inputs_ans = self.tokenizer(ans, return_tensors='pt', padding=True, truncation=True, max_length=4096) with torch.no_grad(): outputs_resp = self.longformer_model(**inputs_resp).last_hidden_state.mean(dim=1) # 形状为 (1, 768) outputs_ans = self.longformer_model(**inputs_ans).last_hidden_state.mean(dim=1) # 形状为 (1, 768) # 计算余弦相似度 similarity = self.cosine_similarity(outputs_resp.numpy(), outputs_ans.numpy()) scores.append(similarity) return scores def combined_reward_func(self, prompts, completions, answer, **kwargs): """ 综合多个奖励函数,动态调整权重。 :param prompts: 输入提示 :param completions: 模型生成的补全内容 :param answer: 标准答案 :return: 综合得分列表 """ # 计算各奖励函数的得分 format_score = self.strict_format_reward_func(completions) semantic_score = self.semantic_correctness_reward_func(prompts, completions, answer) correctness_score = self.correctness_reward_func(prompts, completions, answer) # 动态调整权重 combined_scores = [] for fs, ss, cs in zip(format_score, semantic_score, correctness_score): if cs == 2.0: # 答案完全正确 combined_scores.append(fs * 0.2 + ss * 0.3 + cs * 0.5) else: # 答案不完全正确 combined_scores.append(fs * 0.4 + ss * 0.4 + cs * 0.2) return combined_scores @staticmethod def reasoning_quality_reward_func(completions, **kwargs): """ 检查推理过程的质量。 :param completions: 模型生成的补全内容 :return: 推理过程质量得分列表 """ responses = [completion[0]["content"] for completion in completions] scores = [] for response in responses: reasoning_match = re.search(r"\n(.+?)\n", response, re.DOTALL) if reasoning_match: reasoning_content = reasoning_match.group(1).strip() # 简单检查推理内容是否包含关键词 if "诊断" in reasoning_content and "原因" in reasoning_content: scores.append(1.0) else: scores.append(0.5) else: scores.append(0.0) return scores @staticmethod def extract_xml_answer(text: str) -> str: """ 从文本中提取 XML 格式的答案。 :param text: 包含 XML 格式的文本 :return: 提取的答案 """ try: print("text -> \n", text) if "" in text and "" in text: answer = text.split("")[-1] answer = answer.split("")[0] return answer.strip() else: print("Warning: tag not found in response.") # 尝试提取其他有意义的部分 if "诊断" in text: return text.split("诊断")[-1].strip() elif "排查建议" in text: return text.split("排查建议")[-1].strip() else: return text.strip() # 返回原始文本作为备用 except Exception as e: print(f"Error extracting XML answer: {e}") return "" # 返回空字符串或其他默认值 @staticmethod def count_xml(text) -> float: """ 计算 XML 标签的数量和完整性。 :param text: 包含 XML 格式的文本 :return: XML 标签的完整性得分 """ count = 0.0 if text.count("\n") == 1: count += 0.125 if text.count("\n\n") == 1: count += 0.125 if text.count("\n\n") == 1: count += 0.125 count -= len(text.split("\n\n")[-1]) * 0.001 if text.count("\n") == 1: count += 0.125 count -= (len(text.split("\n")[-1]) - 1) * 0.001 return count @staticmethod def xmlcount_reward_func(completions, **kwargs): """ 计算 XML 标签的完整性得分。 :param completions: 模型生成的补全内容 :return: XML 标签的完整性得分列表 """ contents = [completion[0]["content"] for completion in completions] return [ModelTrainer.count_xml(c) for c in contents] @staticmethod def soft_format_reward_func(completions, **kwargs): """ 检查补全内容是否符合软格式要求。 :param completions: 模型生成的补全内容 :return: 符合软格式要求的得分列表 """ pattern = r".*?\s*.*?" responses = [completion[0]["content"] for completion in completions] matches = [re.match(pattern, r) for r in responses] return [0.5 if match else 0.0 for match in matches] @staticmethod def strict_format_reward_func(completions, **kwargs): """ 检查响应是否符合严格的 XML 格式要求,并确保标签内容非空。 :param completions: 模型生成的补全内容 :return: 符合严格格式要求的得分列表 """ pattern = r"^\n(.+?)\n\n\n(.+?)\n\n$" responses = [completion[0]["content"] for completion in completions] scores = [] for response in responses: match = re.match(pattern, response, re.DOTALL) if match: reasoning_content = match.group(1).strip() answer_content = match.group(2).strip() # 检查内容是否非空 if reasoning_content and answer_content: scores.append(1.0) # 格式和内容均符合要求 else: scores.append(0.5) # 格式符合但内容为空 else: scores.append(0.0) # 格式不符合 return scores @staticmethod def int_reward_func(completions, **kwargs): """ 检查补全内容是否包含整数。 :param completions: 模型生成的补全内容 :return: 包含整数的得分列表 """ responses = [completion[0]['content'] for completion in completions] extracted_responses = [ModelTrainer.extract_xml_answer(r) for r in responses] return [0.5 if r.isdigit() else 0.0 for r in extracted_responses] @staticmethod def correctness_reward_func(prompts, completions, answer, **kwargs): """ 检查补全内容是否正确。 :param prompts: 输入提示 :param completions: 模型生成的补全内容 :param answer: 正确答案 :return: 补全内容正确的得分列表 """ print("completions : \n ", completions) responses = [completion[0]['content'] for completion in completions] q = prompts[0][-1]['content'] extracted_responses = [ModelTrainer.extract_xml_answer(r) for r in responses] print('-' * 20, f"Question:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}", f"\nExtracted:\n{extracted_responses[0]}") return [2.0 if r == a else 0.0 for r, a in zip(extracted_responses, answer)] if __name__ == "__main__": try: # 加载配置文件 config = load_config(f"../conf/conf_train.yaml") print("train config:\n", config) # 初始化 ModelTrainer trainer = ModelTrainer(config) # 加载模型和分词器 model, tokenizer = trainer.load_model() # 加载数据集 train_dataset = trainer.load_data(config.train_data_path) # 训练模型 model = trainer.train(model, tokenizer, train_dataset) # 保存模型 trainer.save_model(model, tokenizer, config.save_path) print("Training completed.") except Exception as e: print("exception \n ", e) finally: print("end")