import torch from torch.utils.data import Dataset, DataLoader from transformers import AutoTokenizer, AutoModelForCausalLM, AdamW, get_linear_schedule_with_warmup, DataCollatorWithPadding from src.config import Config import logging import os from torch.amp import autocast, GradScaler # 使用新的 torch.amp 模块 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TextDataset(Dataset): """自定义数据集类""" def __init__(self, file_path, tokenizer, seq_length): self.tokenizer = tokenizer self.seq_length = seq_length with open(file_path, "r", encoding="utf-8") as f: self.lines = f.read().splitlines() def __len__(self): return len(self.lines) def __getitem__(self, idx): line = self.lines[idx] tokens = self.tokenizer.encode( line, max_length=self.seq_length, truncation=True, padding="max_length" ) return torch.tensor(tokens) # 返回 CPU 上的张量 class ModelTrainer: def __init__(self): """初始化模型、分词器和优化器""" self.modelId = Config.PRETRAINED_MODEL_DIR self.device = Config.DEVICE logger.info(f"Using device: {self.device}") try: # 加载分词器和模型 self.tokenizer = AutoTokenizer.from_pretrained(self.modelId) self.model = AutoModelForCausalLM.from_pretrained(self.modelId) self.model.to(self.device) # 将模型移动到设备 logger.info("Pretrained model and tokenizer loaded.") except Exception as e: logger.error(f"Failed to load pretrained model: {e}") raise def train(self): """训练模型""" try: # 加载数据集 dataset = TextDataset(Config.PROCESSED_DATA_PATH, self.tokenizer, Config.SEQ_LENGTH) data_collator = DataCollatorWithPadding(self.tokenizer) # 动态填充数据 dataloader = DataLoader(dataset, batch_size=Config.BATCH_SIZE, shuffle=True, num_workers=Config.NUM_WORKERS, collate_fn=data_collator) # 初始化优化器和学习率调度器 optimizer = AdamW(self.model.parameters(), lr=Config.LEARNING_RATE, weight_decay=0.01) # 添加权重衰减 total_steps = len(dataloader) * Config.EPOCHS scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=100, num_training_steps=total_steps) # 混合精度训练 scaler = GradScaler(enabled=Config.USE_FP16) # 使用混合精度 # 训练循环 self.model.train() for epoch in range(Config.EPOCHS): for i, batch in enumerate(dataloader): # 将输入数据移动到设备 inputs = batch["input_ids"].to(self.device) attention_mask = batch["attention_mask"].to(self.device) # 前向传播 with autocast(device_type=Config.DEVICE, enabled=Config.USE_FP16): # 使用混合精度 outputs = self.model(inputs, attention_mask=attention_mask, labels=inputs) loss = outputs.loss / Config.GRADIENT_ACCUMULATION_STEPS # 归一化损失 # 反向传播和优化 scaler.scale(loss).backward() # 缩放损失并反向传播 if (i + 1) % Config.GRADIENT_ACCUMULATION_STEPS == 0: scaler.step(optimizer) # 更新参数 scaler.update() # 更新缩放器 optimizer.zero_grad() scheduler.step() # 更新学习率 logger.info(f"Epoch {epoch + 1}/{Config.EPOCHS}, Loss: {loss.item()}") # 保存训练后的模型 os.makedirs(Config.TRAINED_MODEL_DIR, exist_ok=True) self.model.save_pretrained(Config.TRAINED_MODEL_DIR) self.tokenizer.save_pretrained(Config.TRAINED_MODEL_DIR) logger.info(f"Model saved to {Config.TRAINED_MODEL_DIR}") except Exception as e: logger.error(f"Training failed: {e}") raise