import os import joblib import numpy as np import pandas as pd from sklearn.neighbors import BallTree from typing import Dict, Optional from app.services.HealthAssessor import HealthAssessor from app.logger import logger import hashlib import redis from app.services.HealthCacheService import CacheService class WindFarmPretrainModel: """整个风场的预训练模型""" def __init__(self, wind_code: str): self.wind_code = wind_code self.mill_type = None # 风场主要机型 self.subsystem_models = {} # 各子系统模型 self.features = {} # 各子系统使用的特征 self.turbine_codes = [] # 包含的风机列表 self.cache_client = CacheService(host='localhost', port=6379) # 添加缓存客户端 def train(self, data_dict: Dict[str, pd.DataFrame], mill_type: str): """训练风场模型(支持单特征子系统)""" self.mill_type = mill_type self.turbine_codes = list(data_dict.keys()) assessor = HealthAssessor() # 合并所有风机数据用于训练 all_data = pd.concat(data_dict.values()) # 训练各子系统模型 subsystems = { 'generator': assessor.subsystem_config['generator'][mill_type], 'nacelle': assessor.subsystem_config['nacelle'], 'grid': assessor.subsystem_config['grid'], 'drive_train': assessor.subsystem_config['drive_train'] if mill_type == 'dfig' else None } for subsys, config in subsystems.items(): if config is None: continue # 获取子系统特征 features = assessor._get_subsystem_features(config, all_data) logger.info('features',features) if not features: logger.warning(f"子系统 {subsys} 无有效特征") continue # 准备训练数据 - 降低样本量要求但至少需要100个样本 train_data = all_data[features].dropna() if len(train_data) < 100: # 原为1000 logger.warning(f"子系统 {subsys} 数据不足: {len(train_data)}样本") continue try: # 训练MSET模型 mset = assessor._create_mset_core() if mset.genDLMatrix(train_data.values) != 0: continue # 计算权重 - 支持单特征 normalized_data = mset.CRITIC_prepare(train_data) # 单特征直接赋权重1.0 if len(normalized_data.columns) == 1: weights = pd.Series([1.0], index=normalized_data.columns) else: weights = mset.CRITIC(normalized_data) # 保存子系统模型 self.subsystem_models[subsys] = { 'matrixD': mset.matrixD, 'healthyResidual': mset.healthyResidual, 'feature_weights': weights.to_dict() } self.features[subsys] = features except Exception as e: logger.error(f"子系统 {subsys} 训练失败: {str(e)}") continue def assess(self, data: pd.DataFrame, turbine_code: str) -> Dict: # 生成缓存键 cache_key = f"pretrain:{self.wind_code}:{turbine_code}:{data.shape[0]}:{hashlib.sha256(pd.util.hash_pandas_object(data).values.tobytes()).hexdigest()}" """使用预训练模型进行评估(支持单特征子系统)""" try: if not self.subsystem_models: return {} results = { "engine_code": turbine_code, "subsystems": {}, "assessed_subsystems": [] } for subsys in self.subsystem_models.keys(): if subsys not in self.features: continue features = [f for f in self.features[subsys] if f in data.columns] if not features: continue test_data = data[features].dropna() if len(test_data) < 5: # 降低最小样本量要求(原为10) continue try: # 确保权重有效 weights_dict = self.subsystem_models[subsys]['feature_weights'] weights = pd.Series(weights_dict) if weights_dict else pd.Series(np.ones(len(features))/len(features)) # 初始化MSET模型(如果尚未初始化) if not hasattr(self, '_balltree_cache'): self._init_balltree_cache() mset = self._balltree_cache.get(subsys) if not mset: continue flags = mset.calcSPRT(test_data.values, weights.values) valid_flags = [x for x in flags if not np.isnan(x)] health_score = float(np.mean(valid_flags)) if valid_flags else 50.0 results["subsystems"][subsys] = { "health_score": health_score, "weights": weights_dict } # logger.info(f"打印结果: {results['subsystems'][subsys]}") bins = [0, 10, 20, 30, 40, 50, 60, 70, 80] adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7] def adjust_score(score): for i in range(len(bins)): if score < bins[i]: return score + adjust_values[i-1] return score # adjusted_score = adjust_score(health_score) # if adjusted_score >= 100: adjusted_score = 92.8 results["subsystems"][subsys] = { "health_score": adjusted_score, "weights": weights_dict } # logger.info(f"打印结果: {results['subsystems'][subsys]}") results["assessed_subsystems"].append(subsys) except Exception as e: logger.info(f"子系统 {subsys} 评估失败: {str(e)}") continue # 计算整机健康度 if results["assessed_subsystems"]: scores = [results["subsystems"][s]["health_score"] for s in results["assessed_subsystems"]] weights = np.ones(len(scores)) / len(scores) # 子系统间使用等权重 results["total_health_score"] = float(np.dot(scores, weights)) return results except Exception as e: logger.error(f"预训练模型评估过程中出错: {str(e)}") return {} def _init_balltree_cache(self): """初始化BallTree缓存""" self._balltree_cache = {} assessor = HealthAssessor() for subsys, model in self.subsystem_models.items(): try: mset = assessor._create_mset_core() mset.matrixD = model['matrixD'] mset.healthyResidual = model['healthyResidual'] mset.normalDataBallTree = BallTree( mset.matrixD, leaf_size=4, metric=lambda a,b: 1.0 - mset.calcSimilarity(a, b) ) self._balltree_cache[subsys] = mset except Exception as e: logger.info(f"初始化子系统 {subsys} 的BallTree失败: {str(e)}") def save(self, model_dir: str): """保存模型到文件""" save_data = { "wind_code": self.wind_code, "mill_type": self.mill_type, "subsystem_models": self.subsystem_models, "features": self.features, "turbine_codes": self.turbine_codes } os.makedirs(model_dir, exist_ok=True) path = os.path.join(model_dir, f"{self.wind_code}.pkl") joblib.dump(save_data, path) @classmethod def load(cls, model_dir: str, wind_code: str) -> Optional['WindFarmPretrainModel']: """从文件加载模型""" path = os.path.join(model_dir, f"{wind_code}.pkl") if not os.path.exists(path): return None data = joblib.load(path) model = cls(data["wind_code"]) model.mill_type = data["mill_type"] model.subsystem_models = data["subsystem_models"] model.features = data["features"] model.turbine_codes = data.get("turbine_codes", []) return model