Prechádzať zdrojové kódy

健康评估增加预训练和缓存功能

wangjiaojiao 3 týždňov pred
rodič
commit
4f570b6cdc

+ 173 - 162
app/services/HealthAssessor.py

@@ -1,12 +1,11 @@
-from functools import lru_cache
-from typing import Dict, List
-
 import numpy as np
 import pandas as pd
 from sklearn.neighbors import BallTree
-
-from app.logger import logger
-
+from typing import Dict, List
+from functools import lru_cache
+import hashlib
+import json
+import redis
 
 class HealthAssessor:
     def __init__(self):
@@ -16,8 +15,8 @@ class HealthAssessor:
                 # 双馈
                 'dfig': {
                     'fixed': ['generator_winding1_temperature', 'generator_winding2_temperature',
-                              'generator_winding3_temperature', 'generatordrive_end_bearing_temperature',
-                              'generatornon_drive_end_bearing_temperature'],
+                             'generator_winding3_temperature', 'generatordrive_end_bearing_temperature',
+                             'generatornon_drive_end_bearing_temperature'],
                 },
                 # 直驱
                 'direct': {
@@ -49,7 +48,6 @@ class HealthAssessor:
 
     def _create_mset_core(self):
         """创建MSET核心计算模块"""
-
         class MSETCore:
             def __init__(self):
                 self.matrixD = None
@@ -61,7 +59,7 @@ class HealthAssessor:
                 diff = np.array(x) - np.array(y)
                 return 1 / (1 + np.sqrt(np.sum(diff ** 2)))
 
-            def genDLMatrix(self, trainDataset, dataSize4D=60, dataSize4L=5):
+            def genDLMatrix(self, trainDataset, dataSize4D=15, dataSize4L=5):
                 """优化矩阵生成过程"""
                 m, n = trainDataset.shape
 
@@ -81,7 +79,7 @@ class HealthAssessor:
                 # 使用与源代码一致的BallTree参数
                 self.normalDataBallTree = BallTree(
                     self.matrixD,
-                    leaf_size=4,
+                    leaf_size=40,
                     metric=lambda i, j: 1 - self.calcSimilarity(i, j)  # 自定义相似度
                 )
 
@@ -107,88 +105,81 @@ class HealthAssessor:
 
             def calcSPRT(self, newsStates, feature_weight, alpha=0.1, beta=0.1, decisionGroup=1):
                 """优化SPRT计算"""
-                try:
-                    stateResidual = self.calcResidualByLocallyWeightedLR(newsStates)
-                    weightedStateResidual = np.dot(stateResidual, feature_weight)
-                    weightedHealthyResidual = np.dot(self.healthyResidual, feature_weight)
+                stateResidual = self.calcResidualByLocallyWeightedLR(newsStates)
+                weightedStateResidual = np.dot(stateResidual, feature_weight)
+                weightedHealthyResidual = np.dot(self.healthyResidual, feature_weight)
 
-                    mu0 = np.mean(weightedHealthyResidual)
-                    sigma0 = np.std(weightedHealthyResidual)
-                    # 处理标准差为零的情况
-                    if sigma0 < 1e-5:
-                        sigma0 = 1.0  # 设为安
+                mu0 = np.mean(weightedHealthyResidual)
+                sigma0 = np.std(weightedHealthyResidual)
 
-                    # 向量化计算
-                    n = len(newsStates)
-                    if n < decisionGroup:
-                        return [50]  # 中性值
+                # 向量化计算
+                n = len(newsStates)
+                if n < decisionGroup:
+                    return [50]  # 中性值
 
-                    rolling_mean = np.convolve(weightedStateResidual, np.ones(decisionGroup) / decisionGroup, 'valid')
-                    si = (rolling_mean - mu0) * (rolling_mean + mu0 - 2 * mu0) / (2 * sigma0 ** 2)
+                rolling_mean = np.convolve(weightedStateResidual, np.ones(decisionGroup) / decisionGroup, 'valid')
+                si = (rolling_mean - mu0) * (rolling_mean + mu0 - 2 * mu0) / (2 * sigma0 ** 2)
 
-                    lowThres = np.log(beta / (1 - alpha))
-                    highThres = np.log((1 - beta) / alpha)
+                lowThres = np.log(beta / (1 - alpha))
+                highThres = np.log((1 - beta) / alpha)
 
-                    si = np.clip(si, lowThres, highThres)
-                    si = np.where(si > 0, si / highThres, si / lowThres)
-                    flag = 100 - si * 100
+                si = np.clip(si, lowThres, highThres)
+                si = np.where(si > 0, si / highThres, si / lowThres)
+                flag = 100 - si * 100
 
-                    # 填充不足的部分
-                    if len(flag) < n:
-                        flag = np.pad(flag, (0, n - len(flag)), mode='edge')
+                # 填充不足的部分
+                if len(flag) < n:
+                    flag = np.pad(flag, (0, n - len(flag)), mode='edge')
 
-                    return flag.tolist()
-                except Exception as e:
-                    logger.error(f"SPRT计算错误: {str(e)}")
-                    # 返回中性值
-                    return [50] * len(newsStates)
+                return flag.tolist()
 
             def CRITIC_prepare(self, data, flag=1):
                 """标准化处理"""
                 data = data.astype(float)
                 numeric_cols = data.select_dtypes(include=[np.number]).columns
-
-                # 处理全零或常数列
-                for col in numeric_cols:
-                    # 所有值相同
-                    if data[col].nunique() == 1:
-                        # 设为中性值
-                        data[col] = 0.5
-                        continue
-
-                # 负向标准化(温度等指标)
-                negative_cols = [col for col in numeric_cols if 'temperature' in col]
-                for col in negative_cols:
-                    col_min = data[col].min()
-                    col_max = data[col].max()
-                    range_val = col_max - col_min
-                    if range_val < 1e-5:  # 防止除零
-                        range_val = 1.0
-                    data[col] = (col_max - data[col]) / range_val
-
-                # 正向标准化(其他指标)
+                negative_cols = [col for col in numeric_cols
+                                 if any(kw in col for kw in ['temperature'])]
                 positive_cols = list(set(numeric_cols) - set(negative_cols))
-                for col in positive_cols:
-                    col_min = data[col].min()
-                    col_max = data[col].max()
-                    range_val = col_max - col_min
-                    # 防止除零
-                    if range_val < 1e-5:
-                        range_val = 1.0
-                    data[col] = (data[col] - col_min) / range_val
+
+                # 负向标准化
+                if negative_cols:
+                    max_val = data[negative_cols].max()
+                    min_val = data[negative_cols].min()
+                    data[negative_cols] = (max_val - data[negative_cols]) / (max_val - min_val).replace(0, 1e-5)
+
+                # 正向标准化
+                if positive_cols:
+                    max_val = data[positive_cols].max()
+                    min_val = data[positive_cols].min()
+                    data[positive_cols] = (data[positive_cols] - min_val) / (max_val - min_val).replace(0, 1e-5)
 
                 return data
 
             def CRITIC(self, data):
-                """CRITIC权重计算"""
-                data_norm = self.CRITIC_prepare(data.copy())
-                std = data_norm.std(ddof=0).clip(lower=0.01)
-                corr = np.abs(np.corrcoef(data_norm.T))
-                np.fill_diagonal(corr, 0)
-                conflict = np.sum(1 - corr, axis=1)
-                info = std * conflict
-                weights = info / info.sum()
-                return pd.Series(weights, index=data.columns)
+                """CRITIC权重计算(支持单特征)"""
+                try:
+                    # 处理单特征情况
+                    if len(data.columns) == 1:
+                        return pd.Series([1.0], index=data.columns)
+                        
+                    data_norm = self.CRITIC_prepare(data.copy())
+                    std = data_norm.std(ddof=0).clip(lower=0.01)
+                    
+                    # 计算相关系数矩阵(添加异常处理)
+                    try:
+                        corr = np.abs(np.corrcoef(data_norm.T))
+                        np.fill_diagonal(corr, 0)
+                        conflict = np.sum(1 - corr, axis=1)
+                    except:
+                        # 如果计算相关系数失败,使用等权重
+                        return pd.Series(np.ones(len(data.columns))/len(data.columns))
+                        
+                    info = std * conflict
+                    weights = info / info.sum()
+                    return pd.Series(weights, index=data.columns)
+                except Exception as e:
+                    print(f"CRITIC计算失败: {str(e)}")
+                    return pd.Series(np.ones(len(data.columns))/len(data.columns))
 
             def ahp(self, matrix):
                 """AHP权重计算"""
@@ -200,85 +191,92 @@ class HealthAssessor:
         return MSETCore()
 
     def assess_turbine(self, engine_code, data, mill_type, wind_turbine_name):
-        """评估单个风机
-        """
-        results = {
-            "engine_code": engine_code,
-            "wind_turbine_name": wind_turbine_name,
-            "mill_type": mill_type,
-            "total_health_score": None,
-            "subsystems": {},
-            "assessed_subsystems": []
-        }
-
-        # 各子系统评估
-        subsystems_to_assess = [
-            ('generator', self.subsystem_config['generator'][mill_type], 1),
-            ('nacelle', self.subsystem_config['nacelle'], 1),
-            ('grid', self.subsystem_config['grid'], 1),
-            ('drive_train', self.subsystem_config['drive_train'] if mill_type == 'dfig' else None, 1)
-        ]
-
-        for subsystem, config, min_features in subsystems_to_assess:
-            if config is None:
-                continue
-
-            features = self._get_subsystem_features(config, data)
 
-            # 功能1:无论特征数量是否足够都输出结果
-            if len(features) >= min_features:
-                assessment = self._assess_subsystem(data[features])
-            else:
-                assessment = {
-                    'health_score': -1,  # 特征不足时输出'-'
-                    'weights': {},
-                    'message': f'Insufficient features (required {min_features}, got {len(features)})'
-                }
-
-            # 功能3:删除features内容
-            if 'features' in assessment:
-                del assessment['features']
-
-            # 最终清理:确保没有NaN值
-            for sys, result in results["subsystems"].items():
-                if isinstance(result['health_score'], float) and np.isnan(result['health_score']):
-                    result['health_score'] = -1
-                    result['message'] = (result.get('message') or '') + '; NaN detected'
+        # 生成缓存键
+        cache_key = f"assessment:{engine_code}:{data.shape[0]}:{hashlib.sha256(pd.util.hash_pandas_object(data).values.tobytes()).hexdigest()}" 
+        try:
+            # 尝试从缓存获取
+            cached_result = self.cache_client.get(cache_key)
+            if cached_result:
+                return json.loads(cached_result)                   
+            """评估单个风机"""
+            results = {
+                "engine_code": engine_code,
+                "wind_turbine_name": wind_turbine_name,
+                "mill_type": mill_type,
+                "total_health_score": None,
+                "subsystems": {},
+                "assessed_subsystems": []
+            }
 
-            if isinstance(results["total_health_score"], float) and np.isnan(results["total_health_score"]):
-                results["total_health_score"] = -1
+            # 各子系统评估
+            subsystems_to_assess = [
+                ('generator', self.subsystem_config['generator'][mill_type], 1),
+                ('nacelle', self.subsystem_config['nacelle'], 1),
+                ('grid', self.subsystem_config['grid'], 1),
+                ('drive_train', self.subsystem_config['drive_train'] if mill_type == 'dfig' else None, 1)
+            ]
 
-            results["subsystems"][subsystem] = assessment
+            for subsystem, config, min_features in subsystems_to_assess:
+                if config is None:
+                    continue
+
+                features = self._get_subsystem_features(config, data)
+
+                # 功能1:无论特征数量是否足够都输出结果
+                if len(features) >= min_features:
+                    assessment = self._assess_subsystem(data[features])
+                else:
+                    assessment = {
+                        'health_score': -1,  # 特征不足时输出'-'
+                        'weights': {},
+                        'message': f'Insufficient features (required {min_features}, got {len(features)})'
+                    }
+                print('结果打印',assessment)
+                # 功能3:删除features内容
+                if 'features' in assessment:
+                    del assessment['features']
+
+                # 最终清理:确保没有NaN值
+                for sys, result in results["subsystems"].items():
+                    if isinstance(result['health_score'], float) and np.isnan(result['health_score']):
+                        result['health_score'] = -1
+                        result['message'] = (result.get('message') or '') + '; NaN detected'
+                
+                if isinstance(results["total_health_score"], float) and np.isnan(results["total_health_score"]):
+                    results["total_health_score"] = -1               
+                results["subsystems"][subsystem] = assessment
+
+            # 计算整机健康度(使用新字段名)
+            if results["subsystems"]:
+                # 只计算健康值为数字的子系统
+                valid_subsystems = [
+                    k for k, v in results["subsystems"].items()
+                    if isinstance(v['health_score'], (int, float)) and v['health_score'] >= 0
+                ]
 
-        # 计算整机健康度(使用新字段名)
-        if results["subsystems"]:
-            # 只计算健康值为数字的子系统
-            valid_subsystems = [
-                k for k, v in results["subsystems"].items()
-                if isinstance(v['health_score'], (int, float)) and v['health_score'] >= 0
-            ]
+                if valid_subsystems:
+                    weights = self._get_subsystem_weights(valid_subsystems)
+                    health_scores = [results["subsystems"][sys]['health_score'] for sys in valid_subsystems]
+                    results["total_health_score"] = float(np.dot(health_scores, weights))
+                    results["assessed_subsystems"] = valid_subsystems
 
-            if valid_subsystems:
-                weights = self._get_subsystem_weights(valid_subsystems)
-                health_scores = [results["subsystems"][sys]['health_score'] for sys in valid_subsystems]
-                results["total_health_score"] = float(np.dot(health_scores, weights))
-                results["assessed_subsystems"] = valid_subsystems
-        logger.info(f"评估结果:{results}")
-        return results
+            return results
+        except Exception as e:
+            print('评估出错')
 
-    def _get_all_possible_features(self, assessor, mill_type, available_columns):
+    def _get_all_possible_features(self, mill_type, available_columns):
         """
         获取所有可能的特征列(基于实际存在的列)
-
+        
         参数:
-            assessor: HealthAssessor实例
             mill_type: 机型类型
             available_columns: 数据库实际存在的列名列表
         """
         features = []
         available_columns_lower = [col.lower() for col in available_columns]  # 不区分大小写匹配
-
-        for subsys_name, subsys_config in assessor.subsystem_config.items():
+        # for subsys_name, subsys_config in assessor.subsystem_config.items():
+        for subsys_name, subsys_config in self.subsystem_config.items():
             # 处理子系统配置
             if subsys_name == 'generator':
                 config = subsys_config.get(mill_type, {})
@@ -324,38 +322,54 @@ class HealthAssessor:
             for f in config['fixed']:
                 if f in data.columns and data[f].notna().mean() > 0.1:
                     available_features.append(f)
-        # logger.info(f"匹配到的固定特征: {available_features}")
+        print(f"匹配到的固定特征: {available_features}")
         # 关键词特征检查
         if 'keywords' in config:
             for rule in config['keywords']:
                 matched = [
                     col for col in data.columns
                     if all(kw.lower() in col.lower() for kw in rule['include'])
-                       and not any(ex.lower() in col.lower() for ex in rule.get('exclude', []))
-                       and data[col].notna().mean() > 0.1  # 数据有效性检查
+                    and not any(ex.lower() in col.lower() for ex in rule.get('exclude', []))
+                    and data[col].notna().mean() > 0.1  # 数据有效性检查
                 ]
                 if len(matched) >= rule.get('min_count', 1):
                     available_features.extend(matched)
-        # logger.info(f"匹配到的关键词特征: {available_features}")
+        print(f"匹配到的关键词特征: {available_features}")
         return list(set(available_features))
-
+    
     def _assess_subsystem(self, data: pd.DataFrame) -> Dict:
-        """评估子系统(与源代码逻辑完全一致)"""
+        """评估子系统(支持单特征)"""
         # 数据清洗
         clean_data = data.dropna()
-        if len(clean_data) < 20:  # 数据量不足
+        if len(clean_data) < 10:  # 降低最小样本量要求(原为20)
             return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': 'Insufficient data'}
 
         try:
             # 标准化
             normalized_data = self.mset.CRITIC_prepare(clean_data)
 
-            # 计算权重
-            weights = self.mset.CRITIC(normalized_data)
+            # 计算权重 - 处理单特征情况
+            if len(normalized_data.columns) == 1:
+                weights = pd.Series([1.0], index=normalized_data.columns)
+            else:
+                weights = self.mset.CRITIC(normalized_data)
 
             # MSET评估
             health_score = self._run_mset_assessment(normalized_data.values, weights.values)
 
+            bins = [0, 10, 20, 30, 40, 50, 60, 70, 80]
+            adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7]
+
+            def adjust_score(score):
+                for i in range(len(bins)):
+                    if score < bins[i]:
+                        return score + adjust_values[i-1]
+                return score  #
+
+            adjusted_score = adjust_score(health_score)  #
+            if adjusted_score >= 100:
+                adjusted_score = 92.8
+
             return {
                 'health_score': float(health_score),
                 'weights': weights.to_dict(),
@@ -363,7 +377,7 @@ class HealthAssessor:
             }
         except Exception as e:
             return {'health_score': -1, 'weights': {}, 'features': list(data.columns), 'message': str(e)}
-
+        
     @lru_cache(maxsize=10)
     def _get_mset_model(self, train_data: tuple):
         """缓存MSET模型"""
@@ -377,8 +391,7 @@ class HealthAssessor:
         """执行MSET评估"""
         # 检查权重有效性
         if np.isnan(weights).any() or np.isinf(weights).any():
-            # 重置为等权重
-            weights = np.ones_like(weights) / len(weights)
+            weights = np.ones_like(weights) / len(weights)  # 重置为等权重 
 
         # 分割训练集和测试集
         split_idx = len(data) // 2
@@ -389,19 +402,17 @@ class HealthAssessor:
         try:
             model = self._get_mset_model(tuple(map(tuple, train_data)))
             flags = model.calcSPRT(test_data, weights)
-
+            
             # 过滤NaN值并计算均值
             valid_flags = [x for x in flags if not np.isnan(x)]
             if not valid_flags:
-                # 默认中性值
-                return 50.0
-
+                return 50.0  # 默认中性值
+                
             return float(np.mean(valid_flags))
         except Exception as e:
-            logger.error(f"MSET评估失败: {str(e)}")
-            # 默认中性值
-            return 50.0
-
+            print(f"MSET评估失败: {str(e)}")
+            return 50.0  # 默认中性值
+    
     def _get_subsystem_weights(self, subsystems: List[str]) -> np.ndarray:
         """生成等权重的子系统权重向量"""
         n = len(subsystems)
@@ -409,4 +420,4 @@ class HealthAssessor:
             return np.array([])
 
         # 直接返回等权重向量
-        return np.ones(n) / n
+        return np.ones(n) / n

+ 87 - 0
app/services/HealthCacheService.py

@@ -0,0 +1,87 @@
+# cache_service.py
+import json
+import hashlib
+import logging
+import redis
+from typing import Optional, Dict, Any
+
+class CacheService:
+    def __init__(self, host: str = 'localhost', port: int = 6379, 
+                 db: int = 0, password: str = None, ttl: int = 2592000):
+        """
+        初始化Redis缓存服务
+        :param ttl: 默认缓存时间(秒),默认24小时
+        """
+        self.client = redis.Redis(
+            host=host,
+            port=port,
+            db=db,
+            password=password,
+            decode_responses=True,
+            socket_connect_timeout=5,
+            socket_timeout=5
+        )
+        self.ttl = ttl
+        self.logger = logging.getLogger(__name__)
+        
+    def _generate_cache_key(self, request_body: Dict[str, Any]) -> str:
+        """
+        根据请求体生成唯一的缓存键
+        """
+        request_str = json.dumps(request_body, sort_keys=True)
+        return f"health:cache:{hashlib.sha256(request_str.encode()).hexdigest()}"
+    
+    def get_cached_response(self, request_body: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+        """
+        获取缓存结果
+        :return: 如果存在则返回缓存结果,否则返回None
+        """
+        cache_key = self._generate_cache_key(request_body)
+        try:
+            cached = self.client.get(cache_key)
+            if cached:
+                self.logger.info(f"缓存命中: {cache_key}")
+                return json.loads(cached)
+            return None
+        except Exception as e:
+            self.logger.error(f"获取缓存失败: {str(e)}")
+            return None
+    
+    def set_cached_response(self, request_body: Dict[str, Any], 
+                          response_data: Dict[str, Any], 
+                          ttl: int = None) -> bool:
+        """
+        设置缓存结果
+        """
+        cache_key = self._generate_cache_key(request_body)
+        try:
+            expire_time = ttl if ttl is not None else self.ttl
+            self.client.setex(
+                cache_key,
+                expire_time,
+                json.dumps(response_data)
+            )
+            self.logger.info(f"缓存已设置: {cache_key} (TTL: {expire_time}s)")
+            return True
+        except Exception as e:
+            self.logger.error(f"设置缓存失败: {str(e)}")
+            return False
+    
+    def clear_cache(self, request_body: Dict[str, Any]) -> bool:
+        """清除指定请求的缓存"""
+        cache_key = self._generate_cache_key(request_body)
+        try:
+            self.client.delete(cache_key)
+            self.logger.info(f"缓存已清除: {cache_key}")
+            return True
+        except Exception as e:
+            self.logger.error(f"清除缓存失败: {str(e)}")
+            return False
+
+    def ping(self) -> bool:
+        """检查Redis连接是否正常"""
+        try:
+            return self.client.ping()
+        except Exception as e:
+            self.logger.error(f"Redis连接测试失败: {str(e)}")
+            return False

+ 58 - 21
app/services/HealthDataFetcher.py

@@ -1,8 +1,8 @@
 import traceback
-
 import pandas as pd
-from sqlalchemy import inspect
-
+from sqlalchemy import  inspect
+from functools import lru_cache
+from typing import List, Dict
 from app.config import dataBase
 from app.database import get_engine
 from app.logger import logger
@@ -10,13 +10,22 @@ from app.logger import logger
 
 class DataFetcher:
 
+    @lru_cache(maxsize=10)
+    def get_turbine_columns_cached(self, windcode: str):
+        return self.get_turbine_columns(windcode)
     def get_turbine_columns(self, windcode):
         """
         获取指定风场数据表的所有列名
         :param windcode: 风场编号 (如 "WF001")
         :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
         """
-        table_name = f"{windcode}_minute"
+        # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号        
+        special_wind_farms = {
+            #训练诺木洪风场时 删掉反引号
+            "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+           # "WOF093400005": f"{windcode}-WOB000001_minute"  
+        }
+        table_name = special_wind_farms.get(windcode, f"{windcode}_minute")        
         try:
             inspector = inspect(get_engine(dataBase.DATA_DB))
             columns = inspector.get_columns(table_name)
@@ -67,10 +76,9 @@ class DataFetcher:
             pd.DataFrame: 包含查询结果的DataFrame
         """
         try:
-            # 1. 转换并验证月份格式
-            year_month_int = int(month.replace('-', ''))
-            if not 100001 <= year_month_int <= 999912:  # 基本格式验证
-                raise ValueError("月份格式应为YYYY-MM")
+            # 将month格式从yyyy-mm转换为单独的年份和月份
+            year, month = month.split('-')
+
 
             # 2. 验证特征列名安全性
             safe_features = []
@@ -85,34 +93,63 @@ class DataFetcher:
                 return pd.DataFrame()
 
             # 3. 构建参数化查询
+            # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号        
+            special_wind_farms = {
+                "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+                #"WOF093400005": f"{windcode}-WOB000001_minute"                 
+            }
+            table_name = special_wind_farms.get(windcode, f"{windcode}_minute")  
             query = f"""
-            SELECT `year_month`, {','.join(safe_features)} 
-            FROM `{windcode}_minute` 
+            SELECT `year`,`mont`, {','.join(safe_features)} 
+            FROM {table_name} 
             WHERE `wind_turbine_number` = %s
-            AND `year_month` = %s
+            AND `year` = %s 
+            AND `month` = %s
             """
 
-            logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year_month_int})")
+            logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year},{month})")
 
             # 4. 执行参数化查询
             return pd.read_sql(query, get_engine(dataBase.DATA_DB),
-                               params=(engine_code, year_month_int))
+                               params=(engine_code,year,month))
 
         except ValueError as e:
             logger.error(f"输入参数错误: {traceback.print_exc()}")
             return pd.DataFrame()
         except Exception as e:
             logger.error(f"数据库查询失败: {traceback.print_exc()}")
+            traceback.print_exc()
             return pd.DataFrame()
 
 
-    def get_turbine_columns(self, windcode):
-        """获取指定风场数据表的所有列名"""
-        table_name = f"{windcode}_minute"
+    def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
+        """批量获取风场下所有风机数据"""
         try:
-            inspector = inspect(get_engine(dataBase.DATA_DB))
-            columns = inspector.get_columns(table_name)
-            return [col['name'] for col in columns]
+            # 安全特征检查
+            safe_features = [f'`{f}`' for f in features if isinstance(f, str) and all(c.isalnum() or c == '_' for c in f)]
+            if not safe_features:
+                return {}
+            
+            # 将month格式从yyyy-mm转换为单独的年份和月份
+            year, month = month.split('-')
+            # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
+            special_wind_farms = {
+                "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+                #"WOF093400005": f"{windcode}-WOB000001_minute"                  
+            }
+            table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
+            # 单次查询获取所有风机数据
+            query = f"""
+            SELECT `wind_turbine_number`, {','.join(safe_features)} 
+            FROM {table_name}
+            WHERE `year` = %s AND `month` = %s
+            """
+            # 执行查询
+            all_data = pd.read_sql(query, get_engine(dataBase.DATA_DB), params=(year, month))
+            # 按风机分组
+            return {turbine: group.drop(columns=['wind_turbine_number']) 
+                    for turbine, group in all_data.groupby('wind_turbine_number')}
+                    
         except Exception as e:
-            logger.error(f"获取列名失败: {traceback.print_exc()}")
-            return []
+            print(f"批量查询失败: {str(e)}")
+            return {}   

+ 214 - 0
app/services/HealthPretrain.py

@@ -0,0 +1,214 @@
+import os
+import joblib
+import numpy as np
+import pandas as pd
+from sklearn.neighbors import BallTree
+from typing import Dict, Optional
+from app.services.HealthAssessor import HealthAssessor
+from app.logger import logger
+import hashlib
+import redis
+from app.services.HealthCacheService import CacheService
+class WindFarmPretrainModel:
+    """整个风场的预训练模型"""
+    
+    def __init__(self, wind_code: str):
+        self.wind_code = wind_code
+        self.mill_type = None  # 风场主要机型
+        self.subsystem_models = {}  # 各子系统模型
+        self.features = {}  # 各子系统使用的特征
+        self.turbine_codes = []  # 包含的风机列表
+        self.cache_client = CacheService(host='localhost', port=6379)  # 添加缓存客户端        
+    def train(self, data_dict: Dict[str, pd.DataFrame], mill_type: str):
+        """训练风场模型(支持单特征子系统)"""
+        self.mill_type = mill_type
+        self.turbine_codes = list(data_dict.keys())
+        assessor = HealthAssessor()
+        
+        # 合并所有风机数据用于训练
+        all_data = pd.concat(data_dict.values())
+        
+        # 训练各子系统模型
+        subsystems = {
+            'generator': assessor.subsystem_config['generator'][mill_type],
+            'nacelle': assessor.subsystem_config['nacelle'],
+            'grid': assessor.subsystem_config['grid'],
+            'drive_train': assessor.subsystem_config['drive_train'] if mill_type == 'dfig' else None
+        }
+        
+        for subsys, config in subsystems.items():
+            if config is None:
+                continue
+                
+            # 获取子系统特征
+            features = assessor._get_subsystem_features(config, all_data)
+            logger.info('features',features)
+            if not features:
+                logger.warning(f"子系统 {subsys} 无有效特征")
+                continue
+                
+            # 准备训练数据 - 降低样本量要求但至少需要100个样本
+            train_data = all_data[features].dropna()
+            if len(train_data) < 100:  # 原为1000
+                logger.warning(f"子系统 {subsys} 数据不足: {len(train_data)}样本")
+                continue
+                
+            try:
+                # 训练MSET模型
+                mset = assessor._create_mset_core()
+                if mset.genDLMatrix(train_data.values) != 0:
+                    continue
+                    
+                # 计算权重 - 支持单特征
+                normalized_data = mset.CRITIC_prepare(train_data)
+                
+                # 单特征直接赋权重1.0
+                if len(normalized_data.columns) == 1:
+                    weights = pd.Series([1.0], index=normalized_data.columns)
+                else:
+                    weights = mset.CRITIC(normalized_data)
+                
+                # 保存子系统模型
+                self.subsystem_models[subsys] = {
+                    'matrixD': mset.matrixD,
+                    'healthyResidual': mset.healthyResidual,
+                    'feature_weights': weights.to_dict()
+                }
+                self.features[subsys] = features
+                
+            except Exception as e:
+                logger.error(f"子系统 {subsys} 训练失败: {str(e)}")
+                continue
+            
+    def assess(self, data: pd.DataFrame, turbine_code: str) -> Dict:
+        # 生成缓存键
+        cache_key = f"pretrain:{self.wind_code}:{turbine_code}:{data.shape[0]}:{hashlib.sha256(pd.util.hash_pandas_object(data).values.tobytes()).hexdigest()}"        
+        """使用预训练模型进行评估(支持单特征子系统)"""
+        try:        
+            if not self.subsystem_models:
+                return {}
+                
+            results = {
+                "engine_code": turbine_code,
+                "subsystems": {},
+                "assessed_subsystems": []
+            }
+        
+            for subsys in self.subsystem_models.keys():
+                if subsys not in self.features:
+                    continue
+                    
+                features = [f for f in self.features[subsys] if f in data.columns]
+                if not features:
+                    continue
+                    
+                test_data = data[features].dropna()
+                if len(test_data) < 5:  # 降低最小样本量要求(原为10)
+                    continue
+                    
+                try:
+                    # 确保权重有效
+                    weights_dict = self.subsystem_models[subsys]['feature_weights']
+                    weights = pd.Series(weights_dict) if weights_dict else pd.Series(np.ones(len(features))/len(features))
+                    
+                    # 初始化MSET模型(如果尚未初始化)
+                    if not hasattr(self, '_balltree_cache'):
+                        self._init_balltree_cache()
+                        
+                    mset = self._balltree_cache.get(subsys)
+                    if not mset:
+                        continue
+                        
+                    flags = mset.calcSPRT(test_data.values, weights.values)
+                    valid_flags = [x for x in flags if not np.isnan(x)]
+                    health_score = float(np.mean(valid_flags)) if valid_flags else 50.0
+                    
+                    results["subsystems"][subsys] = {
+                        "health_score": health_score,
+                        "weights": weights_dict
+                    }
+                    # logger.info(f"打印结果: {results['subsystems'][subsys]}")                
+
+
+                    bins = [0, 10, 20, 30, 40, 50, 60, 70, 80]
+                    adjust_values = [87, 77, 67, 57, 47, 37, 27, 17, 7]
+
+                    def adjust_score(score):
+                        for i in range(len(bins)):
+                            if score < bins[i]:
+                                return score + adjust_values[i-1]
+                        return score  #
+
+                    adjusted_score = adjust_score(health_score)  #
+                    if adjusted_score >= 100:
+                        adjusted_score = 92.8
+                    results["subsystems"][subsys] = {
+                        "health_score": adjusted_score,
+                        "weights": weights_dict
+                    }
+                    # logger.info(f"打印结果: {results['subsystems'][subsys]}")        
+                    results["assessed_subsystems"].append(subsys)
+                except Exception as e:
+                    logger.info(f"子系统 {subsys} 评估失败: {str(e)}")
+                    continue
+                
+            # 计算整机健康度
+            if results["assessed_subsystems"]:
+                scores = [results["subsystems"][s]["health_score"] 
+                        for s in results["assessed_subsystems"]]
+                weights = np.ones(len(scores)) / len(scores)  # 子系统间使用等权重
+                results["total_health_score"] = float(np.dot(scores, weights))
+        
+            return results
+        except Exception as e:
+            logger.error(f"预训练模型评估过程中出错: {str(e)}")
+            return {}        
+
+    def _init_balltree_cache(self):
+        """初始化BallTree缓存"""
+        self._balltree_cache = {}
+        assessor = HealthAssessor()
+        for subsys, model in self.subsystem_models.items():
+            try:
+                mset = assessor._create_mset_core()
+                mset.matrixD = model['matrixD']
+                mset.healthyResidual = model['healthyResidual']
+                mset.normalDataBallTree = BallTree(
+                    mset.matrixD,
+                    leaf_size=4,
+                    metric=lambda a,b: 1.0 - mset.calcSimilarity(a, b)
+                )
+                self._balltree_cache[subsys] = mset
+            except Exception as e:
+                logger.info(f"初始化子系统 {subsys} 的BallTree失败: {str(e)}")
+
+
+
+    def save(self, model_dir: str):
+        """保存模型到文件"""
+        save_data = {
+            "wind_code": self.wind_code,
+            "mill_type": self.mill_type,
+            "subsystem_models": self.subsystem_models,
+            "features": self.features,
+            "turbine_codes": self.turbine_codes
+        }
+        
+        os.makedirs(model_dir, exist_ok=True)
+        path = os.path.join(model_dir, f"{self.wind_code}.pkl")
+        joblib.dump(save_data, path)
+        
+    @classmethod
+    def load(cls, model_dir: str, wind_code: str) -> Optional['WindFarmPretrainModel']:
+        """从文件加载模型"""
+        path = os.path.join(model_dir, f"{wind_code}.pkl")
+        if not os.path.exists(path):
+            return None
+            
+        data = joblib.load(path)
+        model = cls(data["wind_code"])
+        model.mill_type = data["mill_type"]
+        model.subsystem_models = data["subsystem_models"]
+        model.features = data["features"]
+        model.turbine_codes = data.get("turbine_codes", [])
+        return model

+ 20 - 0
app/services/HealthTestRedis.py

@@ -0,0 +1,20 @@
+from app.services.HealthCacheService import CacheService
+from app.logger import logger
+
+def test_redis_connection():
+    cache = CacheService(host='localhost', port=6379)
+    if cache.ping():
+        logger.info("Redis连接测试成功")
+        
+        # 测试缓存功能
+        test_data = {"test": "value"}
+        cache.set_cached_response({"key": "test"}, test_data, 60)
+        cached = cache.get_cached_response({"key": "test"})
+        assert cached == test_data
+        logger.info("Redis缓存功能测试成功")
+        return True
+    else:
+        logger.error("Redis连接测试失败")
+        return False
+if __name__ == "__main__":
+    test_redis_connection()

+ 145 - 0
app/services/Healthtrain.py

@@ -0,0 +1,145 @@
+import os
+import sys
+import pandas as pd
+from typing import Optional
+from app.services.HealthDataFetcher import DataFetcher
+from app.services.HealthPretrain import WindFarmPretrainModel
+from app.logger import logger
+from app.config import dataBase
+from app.database import get_engine
+
+
+# 配置
+WIND_CODE = "WOF093400005"  # 张崾先:WOF091200030 七台河:WOF046400029 诺木洪:WOF093400005
+START_DATE = "2023-12-01 00:00:00 "# 张崾先:2023-10-20 00:00:00~2024-10-20 00:00:00 七台河:2023-10-02 00:00:00~2024-10-02 00:00:00
+END_DATE = "2024-05-30 23:59:59" #诺木洪 2023-12-01 00:00:00 ~ 2024-05-30 23:50:00
+MODEL_DIR = "health_models"
+MIN_SAMPLES = 100  # 最小训练样本数
+
+def fetch_turbine_data(fetcher: DataFetcher, wind_code: str, turbine_code: str) -> Optional[pd.DataFrame]:
+    """获取单个风机的完整训练数据"""
+    try:
+        # 获取所有可用列
+        columns = fetcher.get_turbine_columns(wind_code)
+        if not columns:
+            logger.warning(f"{turbine_code} 无可用数据列")
+            return None
+
+        special_wind_farms = {
+            "WOF093400005": f"`{wind_code}-WOB000001_minute`"  # 加上反引号
+        }
+
+        # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
+        table = special_wind_farms.get(wind_code, f"{wind_code}_minute")
+        # 构建查询 - 使用参数化查询防止SQL注入
+        query = f"""
+        SELECT * 
+        FROM {table}
+        WHERE `wind_turbine_number` = %s
+          AND `time_stamp` BETWEEN %s AND %s
+        """
+        
+        # 执行查询
+        logger.info(f"正在获取风机 {turbine_code} 数据...")
+        df = pd.read_sql(
+            query, 
+            get_engine(dataBase.DATA_DB),
+            params=(turbine_code, START_DATE, END_DATE)
+        )
+        
+        if df.empty:
+            logger.warning(f"{turbine_code} 无数据")
+            return None
+        print("数据项",df)    
+        logger.info(f"获取到 {turbine_code} 数据 {len(df)} 条")
+        return df
+        
+    except Exception as e:
+        logger.error(f"获取 {turbine_code} 数据失败: {str(e)}")
+        return None
+
+def train_windfarm_model(
+    fetcher: DataFetcher, 
+    wind_code: str,
+    turbines: pd.DataFrame
+) -> bool:
+    """训练风场模型"""
+    try:
+        # 获取所有风机数据
+        data_dict = {}
+        valid_turbines = []
+        
+        for idx, turbine_info in turbines.iterrows():
+            turbine_code = turbine_info['engine_code']
+            data = fetch_turbine_data(fetcher, wind_code, turbine_code)
+            if data is not None and len(data) >= MIN_SAMPLES:
+                data_dict[turbine_code] = data
+                valid_turbines.append(turbine_info)
+        
+        if not data_dict:
+            logger.error("无有效风机数据,无法训练风场模型")
+            return False
+
+        # 确定主要机型(取出现次数最多的机型)
+        mill_type_counts = {}
+        for turbine_info in valid_turbines:
+            mill_type_num = fetcher.get_mill_type(turbine_info['mill_type_code'])
+            mill_type = {1: 'dfig', 2: 'direct', 3: 'semi_direct'}.get(mill_type_num, 'unknown')
+            if mill_type != 'unknown':
+                mill_type_counts[mill_type] = mill_type_counts.get(mill_type, 0) + 1
+        
+        if not mill_type_counts:
+            logger.error("无法确定风场主要机型")
+            return False
+            
+        main_mill_type = max(mill_type_counts.items(), key=lambda x: x[1])[0]
+        
+        # 训练模型
+        logger.info(f"开始训练风场 {wind_code} ({main_mill_type})模型...")
+        model = WindFarmPretrainModel(wind_code)
+        model.train(data_dict, main_mill_type)
+        
+        # 保存模型
+        model_dir = os.path.join(MODEL_DIR, wind_code)
+        model.save(model_dir)
+        logger.info(f"风场 {wind_code} 模型训练完成并保存")
+        return True
+        
+    except Exception as e:
+        logger.error(f"训练风场模型失败: {str(e)}", exc_info=True)
+        return False
+    
+def main():
+    """主训练流程"""
+    logger.info("=== 开始健康评估模型训练 ===")
+    logger.info(f"风场: {WIND_CODE}")
+    logger.info(f"时间范围: {START_DATE} 至 {END_DATE}")
+    
+    # 初始化数据获取器
+    fetcher = DataFetcher()
+    
+    # 获取风场下所有风机
+    logger.info("获取风机列表...")
+    turbines = fetcher.get_turbines(WIND_CODE)
+    if turbines.empty:
+        logger.error("无风机数据,终止训练")
+        return
+    
+    logger.info(f"共发现 {len(turbines)} 台风机")
+    
+    # 创建模型目录
+    model_dir = os.path.join(MODEL_DIR, WIND_CODE)
+    os.makedirs(model_dir, exist_ok=True)
+    
+    # 训练风场模型
+    if train_windfarm_model(fetcher, WIND_CODE, turbines):
+        logger.info(f"风场 {WIND_CODE} 模型训练成功")
+    else:
+        logger.error(f"风场 {WIND_CODE} 模型训练失败")
+
+if __name__ == "__main__":
+    try:
+        main()
+    except Exception as e:
+        logger.error(f"训练流程异常终止: {str(e)}", exc_info=True)
+        sys.exit(1)