123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- import os
- import torch
- import torch.distributed as dist
- from unsloth import FastLanguageModel
- from unsloth import is_bfloat16_supported
- from trl import GRPOConfig, GRPOTrainer
- from datasets import load_dataset
- from conf_train import Config ,load_config # 导入配置文件
- import re
- class ModelTrainer:
- def __init__(self, config:Config):
- """
- 初始化 ModelTrainer 类,加载配置参数。
- :param config: 配置对象,包含模型训练所需的参数
- """
- self.config = config
- self.model_name = config.model_name
- self.max_seq_length = config.max_seq_length
- self.dtype = torch.float16 if config.dtype == "float16" else torch.bfloat16
- self.load_in_4bit = config.load_in_4bit
- self.lora_rank = config.lora_rank
- self.gpu_memory_utilization=config.gpu_memory_utilization
- def load_model(self):
- """
- 加载预训练模型和分词器。
- :return: 返回加载的模型和分词器
- """
- model, tokenizer = FastLanguageModel.from_pretrained(
- model_name=self.model_name,
- max_seq_length=self.max_seq_length,
- load_in_4bit=self.load_in_4bit,
- dtype=self.dtype,
- fast_inference=False,
- max_lora_rank=self.lora_rank,
- gpu_memory_utilization=0.6,
- )
- model = model.to_empty(device='cuda')
- # 初始化模型的权重
- for param in model.parameters():
- if param.is_meta:
- param.data = torch.randn_like(param)
- # 添加 LoRA 适配器
- model = FastLanguageModel.get_peft_model(
- model,
- max_seq_length=self.max_seq_length,
- r=self.lora_rank,
- target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
- "gate_proj", "up_proj", "down_proj"],
- lora_alpha=16,
- lora_dropout=0,
- bias="none",
- use_gradient_checkpointing="unsloth",
- random_state=3407,
- use_rslora=False,
- loftq_config=None,
- )
- return model, tokenizer
- def load_data(self, train_data_path):
- """
- 加载训练数据集。
- :param train_data_path: 训练数据路径
- :return: 返回加载的训练数据集
- """
- with open(train_data_path, 'r') as f:
- train_dataset = load_dataset("json", data_files={"train": train_data_path}, split="train")
- return train_dataset
- def train(self, model, tokenizer, train_dataset):
- """
- 训练模型。
- :param model: 预训练模型
- :param tokenizer: 分词器
- :param train_dataset: 训练数据集
- :return: 返回训练后的模型
- """
- print("is_bfloat16_supported()=", is_bfloat16_supported())
- print(f"Reserved memory: {torch.cuda.memory_reserved()}")
- print(f"Allocated memory: {torch.cuda.memory_allocated()}")
- train_loader = torch.utils.data.DataLoader(
- train_dataset, batch_size=1, shuffle=True, pin_memory=True
- )
- torch.cuda.empty_cache()
- training_args = GRPOConfig(
- use_vllm=False,
- learning_rate=self.config.learning_rate,
- adam_beta1=self.config.adam_beta1,
- adam_beta2=self.config.adam_beta2,
- weight_decay=self.config.weight_decay,
- warmup_ratio=self.config.warmup_ratio,
- lr_scheduler_type=self.config.lr_scheduler_type,
- optim=self.config.optim,
- logging_steps=self.config.logging_steps,
- bf16=is_bfloat16_supported(),
- fp16=not is_bfloat16_supported(),
- per_device_train_batch_size=self.config.per_device_train_batch_size,
- gradient_accumulation_steps=self.config.gradient_accumulation_steps,
- num_generations=self.config.num_generations,
- max_prompt_length=self.config.max_prompt_length,
- max_completion_length=self.config.max_completion_length,
- num_train_epochs=self.config.num_train_epochs,
- max_steps=self.config.max_steps,
- save_steps=self.config.save_steps,
- max_grad_norm=self.config.max_grad_norm,
- report_to=self.config.report_to,
- output_dir=self.config.output_dir,
- )
- trainer = GRPOTrainer(
- model=model,
- processing_class=tokenizer,
- reward_funcs=[
- self.xmlcount_reward_func,
- self.soft_format_reward_func,
- self.strict_format_reward_func,
- self.int_reward_func,
- self.correctness_reward_func,
- ],
- args=training_args,
- train_dataset=train_dataset,
- )
- trainer.train()
- return model
- def save_model(self, model, tokenizer, save_path):
- """
- 保存训练后的模型和分词器。
- :param model: 训练后的模型
- :param tokenizer: 分词器
- :param save_path: 保存路径
- """
- model.save_pretrained(save_path)
- tokenizer.save_pretrained(save_path)
- print(f"Model saved to {save_path}")
- @staticmethod
- def extract_xml_answer(text: str) -> str:
- answer = text.split("<answer>")[-1]
- answer = answer.split("</answer>")[0]
- return answer.strip()
-
- @staticmethod
- def count_xml(text) -> float:
- count = 0.0
- if text.count("<reasoning>\n") == 1:
- count += 0.125
- if text.count("\n</reasoning>\n") == 1:
- count += 0.125
- if text.count("\n<answer>\n") == 1:
- count += 0.125
- count -= len(text.split("\n</answer>\n")[-1])*0.001
- if text.count("\n</answer>") == 1:
- count += 0.125
- count -= (len(text.split("\n</answer>")[-1]) - 1)*0.001
- return count
- @staticmethod
- def xmlcount_reward_func(completions, **kwargs):
- """
- Reward function that counts XML tags in the completion.
- """
- contents = [completion[0]["content"] for completion in completions]
- return [ModelTrainer.count_xml(c) for c in contents]
- @staticmethod
- def soft_format_reward_func(completions, **kwargs):
- """
- Reward function that checks if the completion has a specific format.
- """
- pattern = r"<reasoning>.*?</reasoning>\s*<answer>.*?</answer>"
- responses = [completion[0]["content"] for completion in completions]
- matches = [re.match(pattern, r) for r in responses]
- return [0.5 if match else 0.0 for match in matches]
- @staticmethod
- def strict_format_reward_func(completions, **kwargs):
- """
- Reward function that checks if the completion has a specific format.
- """
- pattern = r"^<reasoning>\n.*?\n</reasoning>\n<answer>\n.*?\n</answer>\n$"
- responses = [completion[0]["content"] for completion in completions]
- matches = [re.match(pattern, r) for r in responses]
- return [0.5 if match else 0.0 for match in matches]
- @staticmethod
- def int_reward_func(completions, **kwargs):
- """
- Reward function that checks if the completion contains an integer.
- """
- responses = [completion[0]['content'] for completion in completions]
- extracted_responses = [ModelTrainer.extract_xml_answer(r) for r in responses]
- return [0.5 if r.isdigit() else 0.0 for r in extracted_responses]
- @staticmethod
- def correctness_reward_func(prompts, completions, answer, **kwargs):
- """
- Reward function that checks if the completion matches the correct answer.
- """
- responses = [completion[0]['content'] for completion in completions]
- q = prompts[0][-1]['content']
- extracted_responses = [ModelTrainer.extract_xml_answer(r) for r in responses]
- print('-'*20, f"Question:\n{q}", f"\nAnswer:\n{answer[0]}", f"\nResponse:\n{responses[0]}", f"\nExtracted:\n{extracted_responses[0]}")
- return [2.0 if r == a else 0.0 for r, a in zip(extracted_responses, answer)]
- if __name__ == "__main__":
- # 加载配置文件
- config = load_config()
- # 设置环境变量
- """
- # 多机多卡
- # export RANK=0 # 第一台机器的 rank
- # export WORLD_SIZE=4 # 总共有 4 台机器
- # export MASTER_ADDR=<主节点 IP>
- # export MASTER_PORT=12345
- """
- # 单机多卡
- os.environ['RANK'] = '0'
- os.environ['WORLD_SIZE'] = '1'
- os.environ['MASTER_ADDR'] = 'localhost'
- os.environ['MASTER_PORT'] = '12345'
-
- # 初始化进程组
- dist.init_process_group(backend='nccl', init_method='env://')
- # 初始化 ModelTrainer
- trainer = ModelTrainer(config)
- # 加载模型和分词器
- model, tokenizer = trainer.load_model()
- # 加载数据集
- train_dataset = trainer.load_data(config.train_data_path)
- # 训练模型
- model = trainer.train(model, tokenizer, train_dataset)
- # 保存模型
- trainer.save_model(model, tokenizer, config.save_path)
- # 确保进程组被销毁
- if dist.is_initialized():
- dist.destroy_process_group()
- print("Training completed.")
|