|
- import os
- import joblib
- import numpy as np
- import pandas as pd
- from sklearn.neighbors import BallTree
- from typing import Dict, Optional, List
- from health_evalution_class import HealthAssessor
- import logging
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- class WindFarmPretrainModel:
- """整个风场的预训练模型"""
-
- def __init__(self, wind_code: str):
- self.wind_code = wind_code
- self.mill_type = None # 风场主要机型
- self.subsystem_models = {} # 各子系统模型
- self.features = {} # 各子系统使用的特征
- self.turbine_codes = [] # 包含的风机列表
-
- def train(self, data_dict: Dict[str, pd.DataFrame], mill_type: str):
- """训练风场模型(支持单特征子系统)"""
- self.mill_type = mill_type
- self.turbine_codes = list(data_dict.keys())
- assessor = HealthAssessor()
-
- # 合并所有风机数据用于训练
- all_data = pd.concat(data_dict.values())
-
- # 训练各子系统模型
- subsystems = {
- 'generator': assessor.subsystem_config['generator'][mill_type],
- 'nacelle': assessor.subsystem_config['nacelle'],
- 'grid': assessor.subsystem_config['grid'],
- 'drive_train': assessor.subsystem_config['drive_train'] if mill_type == 'dfig' else None
- }
-
- for subsys, config in subsystems.items():
- if config is None:
- continue
-
- # 获取子系统特征
- features = assessor._get_subsystem_features(config, all_data)
- logger.info('features',features)
- if not features:
- logger.warning(f"子系统 {subsys} 无有效特征")
- continue
-
- # 准备训练数据 - 降低样本量要求但至少需要100个样本
- train_data = all_data[features].dropna()
- if len(train_data) < 100: # 原为1000
- logger.warning(f"子系统 {subsys} 数据不足: {len(train_data)}样本")
- continue
-
- try:
- # 训练MSET模型
- mset = assessor._create_mset_core()
- if mset.genDLMatrix(train_data.values) != 0:
- continue
-
- # 计算权重 - 支持单特征
- normalized_data = mset.CRITIC_prepare(train_data)
-
- # 单特征直接赋权重1.0
- if len(normalized_data.columns) == 1:
- weights = pd.Series([1.0], index=normalized_data.columns)
- else:
- weights = mset.CRITIC(normalized_data)
-
- # 保存子系统模型
- self.subsystem_models[subsys] = {
- 'matrixD': mset.matrixD,
- 'healthyResidual': mset.healthyResidual,
- 'feature_weights': weights.to_dict()
- }
- self.features[subsys] = features
-
- except Exception as e:
- logger.error(f"子系统 {subsys} 训练失败: {str(e)}")
- continue
-
- def assess(self, data: pd.DataFrame, turbine_code: str) -> Dict:
- """使用预训练模型进行评估(支持单特征子系统)"""
- if not self.subsystem_models:
- return {}
-
- results = {
- "engine_code": turbine_code,
- "subsystems": {},
- "assessed_subsystems": []
- }
-
- for subsys in self.subsystem_models.keys():
- if subsys not in self.features:
- continue
-
- features = [f for f in self.features[subsys] if f in data.columns]
- if not features:
- continue
-
- test_data = data[features].dropna()
- if len(test_data) < 5: # 降低最小样本量要求(原为10)
- continue
-
- try:
- # 确保权重有效
- weights_dict = self.subsystem_models[subsys]['feature_weights']
- weights = pd.Series(weights_dict) if weights_dict else pd.Series(np.ones(len(features))/len(features))
-
- # 初始化MSET模型(如果尚未初始化)
- if not hasattr(self, '_balltree_cache'):
- self._init_balltree_cache()
-
- mset = self._balltree_cache.get(subsys)
- if not mset:
- continue
-
- flags = mset.calcSPRT(test_data.values, weights.values)
- valid_flags = [x for x in flags if not np.isnan(x)]
- health_score = float(np.mean(valid_flags)) if valid_flags else 50.0
-
- results["subsystems"][subsys] = {
- "health_score": health_score,
- "weights": weights_dict
- }
- # logger.info(f"打印结果: {results['subsystems'][subsys]}")
- bins = [0, 10, 20, 30, 40, 50, 60, 70, 80]
- adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7]
- def adjust_score(score):
- for i in range(len(bins)):
- if score < bins[i]:
- return score + adjust_values[i-1]
- return score #
- adjusted_score = adjust_score(health_score) #
- if adjusted_score >= 100:
- adjusted_score = 92.8
- results["subsystems"][subsys] = {
- "health_score": adjusted_score,
- "weights": weights_dict
- }
- # logger.info(f"打印结果: {results['subsystems'][subsys]}")
- results["assessed_subsystems"].append(subsys)
- except Exception as e:
- logger.info(f"子系统 {subsys} 评估失败: {str(e)}")
- continue
-
- # 计算整机健康度
- if results["assessed_subsystems"]:
- scores = [results["subsystems"][s]["health_score"]
- for s in results["assessed_subsystems"]]
- weights = np.ones(len(scores)) / len(scores) # 子系统间使用等权重
- results["total_health_score"] = float(np.dot(scores, weights))
-
- return results
- def _init_balltree_cache(self):
- """初始化BallTree缓存"""
- self._balltree_cache = {}
- assessor = HealthAssessor()
- for subsys, model in self.subsystem_models.items():
- try:
- mset = assessor._create_mset_core()
- mset.matrixD = model['matrixD']
- mset.healthyResidual = model['healthyResidual']
- mset.normalDataBallTree = BallTree(
- mset.matrixD,
- leaf_size=4,
- metric=lambda a,b: 1.0 - mset.calcSimilarity(a, b)
- )
- self._balltree_cache[subsys] = mset
- except Exception as e:
- logger.info(f"初始化子系统 {subsys} 的BallTree失败: {str(e)}")
- def save(self, model_dir: str):
- """保存模型到文件"""
- save_data = {
- "wind_code": self.wind_code,
- "mill_type": self.mill_type,
- "subsystem_models": self.subsystem_models,
- "features": self.features,
- "turbine_codes": self.turbine_codes
- }
-
- os.makedirs(model_dir, exist_ok=True)
- path = os.path.join(model_dir, f"{self.wind_code}.pkl")
- joblib.dump(save_data, path)
-
- @classmethod
- def load(cls, model_dir: str, wind_code: str) -> Optional['WindFarmPretrainModel']:
- """从文件加载模型"""
- path = os.path.join(model_dir, f"{wind_code}.pkl")
- if not os.path.exists(path):
- return None
-
- data = joblib.load(path)
- model = cls(data["wind_code"])
- model.mill_type = data["mill_type"]
- model.subsystem_models = data["subsystem_models"]
- model.features = data["features"]
- model.turbine_codes = data.get("turbine_codes", [])
- return model
|