import os import joblib import numpy as np import pandas as pd from sklearn.neighbors import BallTree from typing import Dict, Optional, List from health_evalution_class import HealthAssessor import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class WindFarmPretrainModel: """整个风场的预训练模型""" def __init__(self, wind_code: str): self.wind_code = wind_code self.mill_type = None # 风场主要机型 self.subsystem_models = {} # 各子系统模型 self.features = {} # 各子系统使用的特征 self.turbine_codes = [] # 包含的风机列表 def train(self, data_dict: Dict[str, pd.DataFrame], mill_type: str): """训练风场模型(支持单特征子系统)""" self.mill_type = mill_type self.turbine_codes = list(data_dict.keys()) assessor = HealthAssessor() # 合并所有风机数据用于训练 all_data = pd.concat(data_dict.values()) # 训练各子系统模型 subsystems = { 'generator': assessor.subsystem_config['generator'][mill_type], 'nacelle': assessor.subsystem_config['nacelle'], 'grid': assessor.subsystem_config['grid'], 'drive_train': assessor.subsystem_config['drive_train'] if mill_type == 'dfig' else None } for subsys, config in subsystems.items(): if config is None: continue # 获取子系统特征 features = assessor._get_subsystem_features(config, all_data) logger.info('features',features) if not features: logger.warning(f"子系统 {subsys} 无有效特征") continue # 准备训练数据 - 降低样本量要求但至少需要100个样本 train_data = all_data[features].dropna() if len(train_data) < 100: # 原为1000 logger.warning(f"子系统 {subsys} 数据不足: {len(train_data)}样本") continue try: # 训练MSET模型 mset = assessor._create_mset_core() if mset.genDLMatrix(train_data.values) != 0: continue # 计算权重 - 支持单特征 normalized_data = mset.CRITIC_prepare(train_data) # 单特征直接赋权重1.0 if len(normalized_data.columns) == 1: weights = pd.Series([1.0], index=normalized_data.columns) else: weights = mset.CRITIC(normalized_data) # 保存子系统模型 self.subsystem_models[subsys] = { 'matrixD': mset.matrixD, 'healthyResidual': mset.healthyResidual, 'feature_weights': weights.to_dict() } self.features[subsys] = features except Exception as e: logger.error(f"子系统 {subsys} 训练失败: {str(e)}") continue def assess(self, data: pd.DataFrame, turbine_code: str) -> Dict: """使用预训练模型进行评估(支持单特征子系统)""" if not self.subsystem_models: return {} results = { "engine_code": turbine_code, "subsystems": {}, "assessed_subsystems": [] } for subsys in self.subsystem_models.keys(): if subsys not in self.features: continue features = [f for f in self.features[subsys] if f in data.columns] if not features: continue test_data = data[features].dropna() if len(test_data) < 5: # 降低最小样本量要求(原为10) continue try: # 确保权重有效 weights_dict = self.subsystem_models[subsys]['feature_weights'] weights = pd.Series(weights_dict) if weights_dict else pd.Series(np.ones(len(features))/len(features)) # 初始化MSET模型(如果尚未初始化) if not hasattr(self, '_balltree_cache'): self._init_balltree_cache() mset = self._balltree_cache.get(subsys) if not mset: continue flags = mset.calcSPRT(test_data.values, weights.values) valid_flags = [x for x in flags if not np.isnan(x)] health_score = float(np.mean(valid_flags)) if valid_flags else 50.0 results["subsystems"][subsys] = { "health_score": health_score, "weights": weights_dict } # logger.info(f"打印结果: {results['subsystems'][subsys]}") bins = [0, 10, 20, 30, 40, 50, 60, 70, 80] adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7] def adjust_score(score): for i in range(len(bins)): if score < bins[i]: return score + adjust_values[i-1] return score # adjusted_score = adjust_score(health_score) # if adjusted_score >= 100: adjusted_score = 92.8 results["subsystems"][subsys] = { "health_score": adjusted_score, "weights": weights_dict } # logger.info(f"打印结果: {results['subsystems'][subsys]}") results["assessed_subsystems"].append(subsys) except Exception as e: logger.info(f"子系统 {subsys} 评估失败: {str(e)}") continue # 计算整机健康度 if results["assessed_subsystems"]: scores = [results["subsystems"][s]["health_score"] for s in results["assessed_subsystems"]] weights = np.ones(len(scores)) / len(scores) # 子系统间使用等权重 results["total_health_score"] = float(np.dot(scores, weights)) return results def _init_balltree_cache(self): """初始化BallTree缓存""" self._balltree_cache = {} assessor = HealthAssessor() for subsys, model in self.subsystem_models.items(): try: mset = assessor._create_mset_core() mset.matrixD = model['matrixD'] mset.healthyResidual = model['healthyResidual'] mset.normalDataBallTree = BallTree( mset.matrixD, leaf_size=4, metric=lambda a,b: 1.0 - mset.calcSimilarity(a, b) ) self._balltree_cache[subsys] = mset except Exception as e: logger.info(f"初始化子系统 {subsys} 的BallTree失败: {str(e)}") def save(self, model_dir: str): """保存模型到文件""" save_data = { "wind_code": self.wind_code, "mill_type": self.mill_type, "subsystem_models": self.subsystem_models, "features": self.features, "turbine_codes": self.turbine_codes } os.makedirs(model_dir, exist_ok=True) path = os.path.join(model_dir, f"{self.wind_code}.pkl") joblib.dump(save_data, path) @classmethod def load(cls, model_dir: str, wind_code: str) -> Optional['WindFarmPretrainModel']: """从文件加载模型""" path = os.path.join(model_dir, f"{wind_code}.pkl") if not os.path.exists(path): return None data = joblib.load(path) model = cls(data["wind_code"]) model.mill_type = data["mill_type"] model.subsystem_models = data["subsystem_models"] model.features = data["features"] model.turbine_codes = data.get("turbine_codes", []) return model