Bladeren bron

Merge branch 'dev-wjj'

# Conflicts:
#	app/routers/health.py
#	app/services/HealthCacheService.py
#	app/services/HealthDataFetcher.py
#	app/services/HealthPretrain.py
#	app/services/HealthTestRedis.py
#	app/services/Healthtrain.py
#	app/services/MSET_Temp.py
chenhongyan1989 3 weken geleden
bovenliggende
commit
1a334a8f72

+ 14 - 0
app/routers/health.py

@@ -7,6 +7,7 @@ import time
 import os
 from app.logger import logger
 from app.models.HealthEvaluationReqAndResp import AssessmentResult, AssessmentRequest, SubsystemResult
+
 from app.services.HealthAssessor import HealthAssessor
 from app.services.HealthDataFetcher import DataFetcher
 import concurrent.futures
@@ -346,3 +347,16 @@ def _format_result(assessment: Dict, model_type: str = None) -> AssessmentResult
         model_type=model_type
     )
 
+
+def clean_nans(obj):
+    """递归清理字典和列表中的NaN值"""
+    if isinstance(obj, dict):
+        return {k: clean_nans(v) for k, v in obj.items()}
+    elif isinstance(obj, list):
+        return [clean_nans(item) for item in obj]
+    elif isinstance(obj, float) and math.isnan(obj):
+        return -1.0  # 替换为-1表示无效值
+    elif isinstance(obj, float) and not math.isfinite(obj):
+        return -1.0  # 处理无穷大值
+    else:
+        return obj

+ 16 - 11
app/services/HealthCacheService.py

@@ -5,13 +5,12 @@ import logging
 import redis
 from typing import Optional, Dict, Any
 
-
 class CacheService:
     def __init__(self, host: str = '192.168.50.233', port: int = 6379,
-                 db: int = 10, password: int = 123456, ttl: int = None):
+                 db: int = 10, password: int = 123456, ttl: Optional[int] = None):
         """
         初始化Redis缓存服务
-        :param ttl: 默认缓存时间(秒),默认24小时
+        :param ttl: 默认缓存时间(秒)
         """
         self.client = redis.Redis(
             host=host,
@@ -48,21 +47,27 @@ class CacheService:
             self.logger.error(f"获取缓存失败: {str(e)}")
             return None
 
+
     def set_cached_response(self, request_body: Dict[str, Any],
-                            response_data: Dict[str, Any],
-                            ttl: int = None) -> bool:
+                        response_data: Dict[str, Any],
+                        ttl: Optional[int] = None) -> bool:
         """
         设置缓存结果
+        :param ttl: 如果为 None 或 0,则永久缓存
         """
         cache_key = self._generate_cache_key(request_body)
         try:
             expire_time = ttl if ttl is not None else self.ttl
-            self.client.setex(
-                cache_key,
-                expire_time,
-                json.dumps(response_data)
-            )
-            self.logger.info(f"缓存已设置: {cache_key} (TTL: {expire_time}s)")
+
+            if expire_time is None or expire_time <= 0:
+                # 永久缓存
+                self.client.set(cache_key, json.dumps(response_data))
+                self.logger.info(f"缓存已设置(永久): {cache_key}")
+            else:
+                # 带过期时间的缓存
+                self.client.setex(cache_key, expire_time, json.dumps(response_data))
+                self.logger.info(f"缓存已设置(TTL: {expire_time}s): {cache_key}")
+
             return True
         except Exception as e:
             self.logger.error(f"设置缓存失败: {str(e)}")

+ 19 - 7
app/services/HealthDataFetcher.py

@@ -19,8 +19,13 @@ class DataFetcher:
         :param windcode: 风场编号 (如 "WF001")
         :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
         """
-
-        table_name = f"{windcode}_minute"
+        # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
+        special_wind_farms = {
+            #训练诺木洪风场时 删掉反引号
+            "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+           # "WOF093400005": f"{windcode}-WOB000001_minute"
+        }
+        table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
         try:
             inspector = inspect(get_engine(dataBase.DATA_DB))
             columns = inspector.get_columns(table_name)
@@ -88,9 +93,12 @@ class DataFetcher:
                 return pd.DataFrame()
 
             # 3. 构建参数化查询
-
-
-            table_name = f"{windcode}_minute"
+            # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
+            special_wind_farms = {
+                "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+                #"WOF093400005": f"{windcode}-WOB000001_minute"
+            }
+            table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
             query = f"""
             SELECT `year`,`mont`, {','.join(safe_features)} 
             FROM {table_name} 
@@ -124,8 +132,12 @@ class DataFetcher:
 
             # 将month格式从yyyy-mm转换为单独的年份和月份
             year, month = month.split('-')
-
-            table_name =f"{windcode}_minute"
+            # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
+            special_wind_farms = {
+                "WOF093400005": f"`{windcode}-WOB000001_minute`"  # 加上反引号
+                #"WOF093400005": f"{windcode}-WOB000001_minute"
+            }
+            table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
             # 单次查询获取所有风机数据
             query = f"""
             SELECT `wind_turbine_number`, {','.join(safe_features)} 

+ 34 - 34
app/services/HealthPretrain.py

@@ -11,23 +11,23 @@ import redis
 from app.services.HealthCacheService import CacheService
 class WindFarmPretrainModel:
     """整个风场的预训练模型"""
-    
+
     def __init__(self, wind_code: str):
         self.wind_code = wind_code
         self.mill_type = None  # 风场主要机型
         self.subsystem_models = {}  # 各子系统模型
         self.features = {}  # 各子系统使用的特征
         self.turbine_codes = []  # 包含的风机列表
-        self.cache_client = CacheService(host='localhost', port=6379)  # 添加缓存客户端        
+        self.cache_client = CacheService(host='192.168.50.233', port=6379,db =10,password=123456,ttl=2592000 )  # 添加缓存客户端
     def train(self, data_dict: Dict[str, pd.DataFrame], mill_type: str):
         """训练风场模型(支持单特征子系统)"""
         self.mill_type = mill_type
         self.turbine_codes = list(data_dict.keys())
         assessor = HealthAssessor()
-        
+
         # 合并所有风机数据用于训练
         all_data = pd.concat(data_dict.values())
-        
+
         # 训练各子系统模型
         subsystems = {
             'generator': assessor.subsystem_config['generator'][mill_type],
@@ -35,39 +35,39 @@ class WindFarmPretrainModel:
             'grid': assessor.subsystem_config['grid'],
             'drive_train': assessor.subsystem_config['drive_train'] if mill_type == 'dfig' else None
         }
-        
+
         for subsys, config in subsystems.items():
             if config is None:
                 continue
-                
+
             # 获取子系统特征
             features = assessor._get_subsystem_features(config, all_data)
             logger.info('features',features)
             if not features:
                 logger.warning(f"子系统 {subsys} 无有效特征")
                 continue
-                
+
             # 准备训练数据 - 降低样本量要求但至少需要100个样本
             train_data = all_data[features].dropna()
             if len(train_data) < 100:  # 原为1000
                 logger.warning(f"子系统 {subsys} 数据不足: {len(train_data)}样本")
                 continue
-                
+
             try:
                 # 训练MSET模型
                 mset = assessor._create_mset_core()
                 if mset.genDLMatrix(train_data.values) != 0:
                     continue
-                    
+
                 # 计算权重 - 支持单特征
                 normalized_data = mset.CRITIC_prepare(train_data)
-                
+
                 # 单特征直接赋权重1.0
                 if len(normalized_data.columns) == 1:
                     weights = pd.Series([1.0], index=normalized_data.columns)
                 else:
                     weights = mset.CRITIC(normalized_data)
-                
+
                 # 保存子系统模型
                 self.subsystem_models[subsys] = {
                     'matrixD': mset.matrixD,
@@ -75,59 +75,59 @@ class WindFarmPretrainModel:
                     'feature_weights': weights.to_dict()
                 }
                 self.features[subsys] = features
-                
+
             except Exception as e:
                 logger.error(f"子系统 {subsys} 训练失败: {str(e)}")
                 continue
-            
+
     def assess(self, data: pd.DataFrame, turbine_code: str) -> Dict:
         # 生成缓存键
-        cache_key = f"pretrain:{self.wind_code}:{turbine_code}:{data.shape[0]}:{hashlib.sha256(pd.util.hash_pandas_object(data).values.tobytes()).hexdigest()}"        
+        cache_key = f"pretrain:{self.wind_code}:{turbine_code}:{data.shape[0]}:{hashlib.sha256(pd.util.hash_pandas_object(data).values.tobytes()).hexdigest()}"
         """使用预训练模型进行评估(支持单特征子系统)"""
-        try:        
+        try:
             if not self.subsystem_models:
                 return {}
-                
+
             results = {
                 "engine_code": turbine_code,
                 "subsystems": {},
                 "assessed_subsystems": []
             }
-        
+
             for subsys in self.subsystem_models.keys():
                 if subsys not in self.features:
                     continue
-                    
+
                 features = [f for f in self.features[subsys] if f in data.columns]
                 if not features:
                     continue
-                    
+
                 test_data = data[features].dropna()
                 if len(test_data) < 5:  # 降低最小样本量要求(原为10)
                     continue
-                    
+
                 try:
                     # 确保权重有效
                     weights_dict = self.subsystem_models[subsys]['feature_weights']
                     weights = pd.Series(weights_dict) if weights_dict else pd.Series(np.ones(len(features))/len(features))
-                    
+
                     # 初始化MSET模型(如果尚未初始化)
                     if not hasattr(self, '_balltree_cache'):
                         self._init_balltree_cache()
-                        
+
                     mset = self._balltree_cache.get(subsys)
                     if not mset:
                         continue
-                        
+
                     flags = mset.calcSPRT(test_data.values, weights.values)
                     valid_flags = [x for x in flags if not np.isnan(x)]
                     health_score = float(np.mean(valid_flags)) if valid_flags else 50.0
-                    
+
                     results["subsystems"][subsys] = {
                         "health_score": health_score,
                         "weights": weights_dict
                     }
-                    # logger.info(f"打印结果: {results['subsystems'][subsys]}")                
+                    # logger.info(f"打印结果: {results['subsystems'][subsys]}")
 
 
                     bins = [0, 10, 20, 30, 40, 50, 60, 70, 80]
@@ -146,23 +146,23 @@ class WindFarmPretrainModel:
                         "health_score": adjusted_score,
                         "weights": weights_dict
                     }
-                    # logger.info(f"打印结果: {results['subsystems'][subsys]}")        
+                    # logger.info(f"打印结果: {results['subsystems'][subsys]}")
                     results["assessed_subsystems"].append(subsys)
                 except Exception as e:
                     logger.info(f"子系统 {subsys} 评估失败: {str(e)}")
                     continue
-                
+
             # 计算整机健康度
             if results["assessed_subsystems"]:
-                scores = [results["subsystems"][s]["health_score"] 
+                scores = [results["subsystems"][s]["health_score"]
                         for s in results["assessed_subsystems"]]
                 weights = np.ones(len(scores)) / len(scores)  # 子系统间使用等权重
                 results["total_health_score"] = float(np.dot(scores, weights))
-        
+
             return results
         except Exception as e:
             logger.error(f"预训练模型评估过程中出错: {str(e)}")
-            return {}        
+            return {}
 
     def _init_balltree_cache(self):
         """初始化BallTree缓存"""
@@ -193,22 +193,22 @@ class WindFarmPretrainModel:
             "features": self.features,
             "turbine_codes": self.turbine_codes
         }
-        
+
         os.makedirs(model_dir, exist_ok=True)
         path = os.path.join(model_dir, f"{self.wind_code}.pkl")
         joblib.dump(save_data, path)
-        
+
     @classmethod
     def load(cls, model_dir: str, wind_code: str) -> Optional['WindFarmPretrainModel']:
         """从文件加载模型"""
         path = os.path.join(model_dir, f"{wind_code}.pkl")
         if not os.path.exists(path):
             return None
-            
+
         data = joblib.load(path)
         model = cls(data["wind_code"])
         model.mill_type = data["mill_type"]
         model.subsystem_models = data["subsystem_models"]
         model.features = data["features"]
         model.turbine_codes = data.get("turbine_codes", [])
-        return model
+        return model

+ 3 - 3
app/services/HealthTestRedis.py

@@ -2,10 +2,10 @@ from app.services.HealthCacheService import CacheService
 from app.logger import logger
 
 def test_redis_connection():
-    cache = CacheService(host='localhost', port=6379)
+    cache = CacheService(host='192.168.50.233', port=6379)
     if cache.ping():
         logger.info("Redis连接测试成功")
-        
+
         # 测试缓存功能
         test_data = {"test": "value"}
         cache.set_cached_response({"key": "test"}, test_data, 60)
@@ -17,4 +17,4 @@ def test_redis_connection():
         logger.error("Redis连接测试失败")
         return False
 if __name__ == "__main__":
-    test_redis_connection()
+    test_redis_connection()