123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- import numpy as np
- import pandas as pd
- from sklearn.neighbors import BallTree
- from typing import Dict, List, Optional, Union
- from functools import lru_cache
- class HealthAssessor:
- def __init__(self):
- self.subsystem_config = {
- # 发电机
- 'generator': {
- # 双馈
- 'dfig': {
- 'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
- 'generator_winding3_temperature', 'generatordrive_end_bearing_temperature',
- 'generatornon_drive_end_bearing_temperature'],
- },
- # 直驱
- 'direct': {
- 'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
- 'generator_winding3_temperature', 'main_bearing_temperature'],
- }
- },
- # 机舱系统
- 'nacelle': {
- 'fixed': ['front_back_vibration_of_the_cabin', 'side_to_side_vibration_of_the_cabin',
- 'cabin_position', 'cabin_temperature'],
- },
- # 电网环境
- 'grid': {
- 'fixed': ['reactive_power', 'active_power', 'grid_a_phase_current',
- 'grid_b_phase_current', 'grid_c_phase_current'],
- },
- # 传动系统
- 'drive_train': {
- 'fixed': ['main_bearing_temperature'],
- 'keywords': [
- {'include': ['gearbox', 'temperature'], 'exclude': [], 'min_count': 2},
- ]
- }
- }
-
- # 嵌入源代码的MSET实现
- self.mset = self._create_mset_core()
- def _create_mset_core(self):
- """创建MSET核心计算模块"""
- class MSETCore:
- def __init__(self):
- self.matrixD = None
- self.normalDataBallTree = None
- self.healthyResidual = None
- def calcSimilarity(self, x, y):
- """优化后的相似度计算"""
- diff = np.array(x) - np.array(y)
- return 1/(1 + np.sqrt(np.sum(diff**2)))
- def genDLMatrix(self, trainDataset, dataSize4D=60, dataSize4L=5):
- """优化矩阵生成过程"""
- m, n = trainDataset.shape
-
- # 快速选择极值点
- min_indices = np.argmin(trainDataset, axis=0)
- max_indices = np.argmax(trainDataset, axis=0)
- unique_indices = np.unique(np.concatenate([min_indices, max_indices]))
- self.matrixD = trainDataset[unique_indices].copy()
-
- # 快速填充剩余点
- remaining_indices = np.setdiff1d(np.arange(m), unique_indices)
- np.random.shuffle(remaining_indices)
- needed = max(0, dataSize4D - len(unique_indices))
- if needed > 0:
- self.matrixD = np.vstack([self.matrixD, trainDataset[remaining_indices[:needed]]])
-
- # 使用与源代码一致的BallTree参数
- self.normalDataBallTree = BallTree(
- self.matrixD,
- leaf_size=4,
- metric=lambda i,j: 1-self.calcSimilarity(i,j) # 自定义相似度
- )
-
- # 使用所有数据计算残差
- self.healthyResidual = self.calcResidualByLocallyWeightedLR(trainDataset)
- return 0
- def calcResidualByLocallyWeightedLR(self, newStates):
- """优化残差计算"""
- if len(newStates.shape) == 1:
- newStates = newStates.reshape(-1, 1)
-
- dist, iList = self.normalDataBallTree.query(
- newStates,
- k=min(10, len(self.matrixD)),
- return_distance=True
- )
- weights = 1/(dist + 1e-5)
- weights /= weights.sum(axis=1)[:, np.newaxis]
-
- est_X = np.sum(weights[:, :, np.newaxis] * self.matrixD[iList[0]], axis=1)
- return est_X - newStates
- def calcSPRT(self, newsStates, feature_weight, alpha=0.1, beta=0.1, decisionGroup=1):
- """优化SPRT计算"""
- stateResidual = self.calcResidualByLocallyWeightedLR(newsStates)
- weightedStateResidual = np.dot(stateResidual, feature_weight)
- weightedHealthyResidual = np.dot(self.healthyResidual, feature_weight)
- mu0 = np.mean(weightedHealthyResidual)
- sigma0 = np.std(weightedHealthyResidual)
-
- # 向量化计算
- n = len(newsStates)
- if n < decisionGroup:
- return [50] # 中性值
-
- rolling_mean = np.convolve(weightedStateResidual, np.ones(decisionGroup)/decisionGroup, 'valid')
- si = (rolling_mean - mu0) * (rolling_mean + mu0 - 2*mu0) / (2*sigma0**2)
-
- lowThres = np.log(beta/(1-alpha))
- highThres = np.log((1-beta)/alpha)
-
- si = np.clip(si, lowThres, highThres)
- si = np.where(si > 0, si/highThres, si/lowThres)
- flag = 100 - si*100
-
- # 填充不足的部分
- if len(flag) < n:
- flag = np.pad(flag, (0, n-len(flag)), mode='edge')
-
- return flag.tolist()
- def CRITIC_prepare(self, data, flag=1):
- """标准化处理"""
- data = data.astype(float)
- numeric_cols = data.select_dtypes(include=[np.number]).columns
- #需要确认哪些指标是正向标准化 哪些是负向标准化
- negative_cols = [col for col in numeric_cols
- if any(kw in col for kw in ['temperature'])]
- positive_cols = list(set(numeric_cols) - set(negative_cols))
-
- # 负向标准化
- if negative_cols:
- max_val = data[negative_cols].max()
- min_val = data[negative_cols].min()
- data[negative_cols] = (max_val - data[negative_cols]) / (max_val - min_val).replace(0, 1e-5)
-
- # 正向标准化
- if positive_cols:
- max_val = data[positive_cols].max()
- min_val = data[positive_cols].min()
- data[positive_cols] = (data[positive_cols] - min_val) / (max_val - min_val).replace(0, 1e-5)
-
- return data
- def CRITIC(self, data):
- """CRITIC权重计算"""
- data_norm = self.CRITIC_prepare(data.copy())
- std = data_norm.std(ddof=0).clip(lower=0.01)
- corr = np.abs(np.corrcoef(data_norm.T))
- np.fill_diagonal(corr, 0)
- conflict = np.sum(1 - corr, axis=1)
- info = std * conflict
- weights = info / info.sum()
- return pd.Series(weights, index=data.columns)
- def ahp(self, matrix):
- """AHP权重计算"""
- eigenvalue, eigenvector = np.linalg.eig(matrix)
- max_idx = np.argmax(eigenvalue)
- weight = eigenvector[:, max_idx].real
- return weight / weight.sum(), eigenvalue[max_idx].real
- return MSETCore()
- def assess_turbine(self, engine_code, data, mill_type, wind_turbine_name):
- """评估单个风机
- """
- results = {
- "engine_code": engine_code,
- "wind_turbine_name": wind_turbine_name,
- "mill_type": mill_type,
- "total_health_score": None,
- "subsystems": {},
- "assessed_subsystems": []
- }
- # 各子系统评估
- subsystems_to_assess = [
- ('generator', self.subsystem_config['generator'][mill_type], 1),
- ('nacelle', self.subsystem_config['nacelle'],1),
- ('grid', self.subsystem_config['grid'], 1),
- ('drive_train', self.subsystem_config['drive_train'] if mill_type == 'dfig' else None,1)
- ]
- for subsystem, config, min_features in subsystems_to_assess:
- if config is None:
- continue
-
- features = self._get_subsystem_features(config, data)
-
- # 功能1:无论特征数量是否足够都输出结果
- if len(features) >= min_features:
- assessment = self._assess_subsystem(data[features])
- else:
- assessment = {
- 'health_score': -1, # 特征不足时输出'-'
- 'weights': {},
- 'message': f'Insufficient features (required {min_features}, got {len(features)})'
- }
-
- # 功能3:删除features内容
- if 'features' in assessment:
- del assessment['features']
-
- results["subsystems"][subsystem] = assessment
- # 计算整机健康度(使用新字段名)
- if results["subsystems"]:
- # 只计算健康值为数字的子系统
- valid_subsystems = [
- k for k, v in results["subsystems"].items()
- if isinstance(v['health_score'], (int, float)) and v['health_score'] >= 0
- ]
-
- if valid_subsystems:
- weights = self._get_subsystem_weights(valid_subsystems)
- health_scores = [results["subsystems"][sys]['health_score'] for sys in valid_subsystems]
- results["total_health_score"] = float(np.dot(health_scores, weights))
- results["assessed_subsystems"] = valid_subsystems
- return results
-
- def _get_all_possible_features(self,assessor, mill_type, available_columns):
- """
- 获取所有可能的特征列(基于实际存在的列)
-
- 参数:
- assessor: HealthAssessor实例
- mill_type: 机型类型
- available_columns: 数据库实际存在的列名列表
- """
- features = []
- available_columns_lower = [col.lower() for col in available_columns] # 不区分大小写匹配
-
- for subsys_name, subsys_config in assessor.subsystem_config.items():
- # 处理子系统配置
- if subsys_name == 'generator':
- config = subsys_config.get(mill_type, {})
- elif subsys_name == 'drive_train' and mill_type != 'dfig':
- continue
- else:
- config = subsys_config
-
- # 处理固定特征
- if 'fixed' in config:
- for f in config['fixed']:
- if f in available_columns:
- features.append(f)
-
- # 处理关键词特征
- if 'keywords' in config:
- for rule in config['keywords']:
- matched = []
- include_kws = [kw.lower() for kw in rule['include']]
- exclude_kws = [ex.lower() for ex in rule.get('exclude', [])]
-
- for col in available_columns:
- col_lower = col.lower()
- # 检查包含关键词
- include_ok = all(kw in col_lower for kw in include_kws)
- # 检查排除关键词
- exclude_ok = not any(ex in col_lower for ex in exclude_kws)
-
- if include_ok and exclude_ok:
- matched.append(col)
-
- if len(matched) >= rule.get('min_count', 1):
- features.extend(matched)
-
- return list(set(features)) # 去重
- def _get_subsystem_features(self, config: Dict, data: pd.DataFrame) -> List[str]:
- """最终版特征获取方法"""
- available_features = []
-
- # 固定特征检查(要求至少10%非空)
- if 'fixed' in config:
- for f in config['fixed']:
- if f in data.columns and data[f].notna().mean() > 0.1:
- available_features.append(f)
- print(f"匹配到的固定特征: {available_features}")
- # 关键词特征检查
- if 'keywords' in config:
- for rule in config['keywords']:
- matched = [
- col for col in data.columns
- if all(kw.lower() in col.lower() for kw in rule['include'])
- and not any(ex.lower() in col.lower() for ex in rule.get('exclude', []))
- and data[col].notna().mean() > 0.1 # 数据有效性检查
- ]
- if len(matched) >= rule.get('min_count', 1):
- available_features.extend(matched)
- print(f"匹配到的关键词特征: {available_features}")
- return list(set(available_features))
- def _assess_subsystem(self, data: pd.DataFrame) -> Dict:
- """评估子系统(与源代码逻辑完全一致)"""
- # 数据清洗
- clean_data = data.dropna()
- if len(clean_data) < 20: # 数据量不足
- return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': 'Insufficient data'}
-
- try:
- # 标准化
- normalized_data = self.mset.CRITIC_prepare(clean_data)
-
- # 计算权重
- weights = self.mset.CRITIC(normalized_data)
-
- # MSET评估
- health_score = self._run_mset_assessment(normalized_data.values, weights.values)
-
- return {
- 'health_score': float(health_score),
- 'weights': weights.to_dict(),
- 'features': list(data.columns)
- }
- except Exception as e:
- return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': str(e)}
- @lru_cache(maxsize=10)
- def _get_mset_model(self, train_data: tuple):
- """缓存MSET模型"""
- # 注意:由于lru_cache需要可哈希参数,这里使用元组
- arr = np.array(train_data)
- model = self._create_mset_core()
- model.genDLMatrix(arr)
- return model
- def _run_mset_assessment(self, data: np.ndarray, weights: np.ndarray) -> float:
- """执行MSET评估"""
- # 分割训练集和测试集
- split_idx = len(data) // 2
- train_data = data[:split_idx]
- test_data = data[split_idx:]
-
- # 使用缓存模型
- model = self._get_mset_model(tuple(map(tuple, train_data))) # 转换为可哈希的元组
-
- # 计算SPRT标志
- flags = model.calcSPRT(test_data, weights)
- return np.mean(flags)
-
- def _get_subsystem_weights(self, subsystems: List[str]) -> np.ndarray:
- """生成等权重的子系统权重向量"""
- n = len(subsystems)
- if n == 0:
- return np.array([])
-
- # 直接返回等权重向量
- return np.ones(n) / n
- # def _get_subsystem_weights(self, subsystems: List[str]) -> np.ndarray:
- # """动态生成子系统权重矩阵"""
- # n = len(subsystems)
- # if n == 0:
- # return np.array([])
-
- # # 定义子系统优先级
- # priority_order = ['generator', 'drive_train', 'nacelle', 'grid']
- # priority = {sys: idx for idx, sys in enumerate(priority_order) if sys in subsystems}
-
- # # 构建比较矩阵
- # matrix = np.ones((n, n))
- # for i in range(n):
- # for j in range(n):
- # if subsystems[i] in priority and subsystems[j] in priority:
- # if priority[subsystems[i]] < priority[subsystems[j]]:
- # matrix[i,j] = 3
- # elif priority[subsystems[i]] > priority[subsystems[j]]:
- # matrix[i,j] = 1/3
-
- # # AHP计算权重
- # weights, _ = self.mset.ahp(matrix)
- # return weights
|