Procházet zdrojové kódy

生产环境版本修改

wangjiaojiao před 2 měsíci
rodič
revize
dc708a117b

+ 1 - 1
app/config.py

@@ -11,7 +11,7 @@ class PlatformSettings:
     db_user: str = "admin"
     db_password: str = "admin123456"
     db_host: str = "192.168.50.233:3306"
-    db_name: str = "energy_show"
+    db_name: str = "energy_prod"
 
 
 # 数据库连接池配置信息

+ 1 - 1
app/routers/health.py

@@ -23,7 +23,7 @@ REALTIME_ASSESSOR = HealthAssessor()
 CACHE_SERVICE = CacheService(
 host='192.168.50.233',
 port=6379,
-db =10,
+db =11,
 password=123456,
 ttl=None #单位秒
 )

+ 48 - 0
app/routers/temperature.py

@@ -10,6 +10,7 @@ from app.logger import logger
 from app.models.AutoDiagModel import AutoDiagInput
 from app.models.TemperatureInput import TemperatureInput
 from app.models.TemperatureThresholdInput import TemperatureThresholdInput
+from app.models.TemperatureDataQueryInput import TemperatureDataQueryInput
 from app.services.Auto_diag import Auto_diag
 from app.services.MSET_Temp import MSET_Temp
 
@@ -194,7 +195,54 @@ if __name__ == "__main__":
 
     uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
 
+@router.post("/temperature/dataquery")
+async def query_data(inp: TemperatureDataQueryInput):
+    """
+    查询指定风机在特定时间点前后各50个时间点的数据
+    输入:
+    {
+      "windCode": "WOF091200030",
+      "windTurbineNumber": "WOG01355",
+      "timestamp": "2024-06-01 00:00:00"
+    }
+    输出:
+    {
+      "data": {
+        "wind_turbine_number": "WOG01355",
+        "record_count": 101,
+        "records": [
+          {"时间戳": "2024-05-31 23:10:00", "主轴承温度": 65.2, ...},
+          {"时间戳": "2024-05-31 23:15:00", "主轴承温度": 65.5, ...},
+          ...
+          {"时间戳": "2024-06-01 00:50:00", "主轴承温度": 66.1, ...}
+        ]
+      },
+      "code": 200,
+      "message": "success"
+    }
+    """
+    try:
+        analyzer = MSET_Temp(inp.windCode, [inp.windTurbineNumber], "", "")
+        result = analyzer.query_surrounding_data(inp.timestamp,minutes_around = 250)
+        if result['record_count'] == 0:
+            return JSONResponse(
+                content={"code": 405, "message": "未找到数据"},
+                status_code=200
+            )
 
+        return {
+            "data": {
+                "wind_turbine_number": inp.windTurbineNumber,
+                "records": result['records']
+            },
+            "code": 200,
+            "message": "success"
+        }
+        
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=str(e))
+    
+    
 @router.post("/autodiag/{autodiagType}")
 async def perform_diagnosis(autodiagType: str, input_data: AutoDiagInput):
     """

+ 81 - 74
app/services/CMSAnalyst.py

@@ -5,7 +5,7 @@ import math
 import numpy as np
 import pandas as pd
 from scipy.signal import hilbert
-
+from typing import List, Dict, Any
 from app.config import dataBase
 from app.database import get_engine
 from app.logger import logger
@@ -14,13 +14,18 @@ from app.logger import logger
 class CMSAnalyst:
     def __init__(self, fmin, fmax, table_name, ids):
         # 从数据库获取原始数据
+        self.table_name =table_name
+        self.ids = ids        
         self.datas = self._get_by_id(table_name, ids)
-        self.datas = [df[['mesure_data', 'time_stamp', 'sampling_frequency', 'wind_turbine_number', 'rotational_speed',
-                          'mesure_point_name']] for df in self.datas]
+        self.datas = [
+            df[['id', 'mesure_data', 'time_stamp', 'sampling_frequency', 
+                'wind_turbine_number', 'rotational_speed', 'mesure_point_name']]
+            for df in self.datas
+        ] 
         # 只输入一个id,返回一个[df],所以拿到self.data[0]
         self.data_filter = self.datas[0]
         # 取数据列
-        self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'][0]))
+        self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'].iloc[0]))  
         self.envelope_spectrum_m = self.data.shape[0]
         self.envelope_spectrum_n = 1
         # 设置分析参数
@@ -56,16 +61,16 @@ class CMSAnalyst:
         # time_domain_analysis
         self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs
 
+    
     def _get_by_id(self, windcode, ids):
-        df_res = []
         engine = get_engine(dataBase.DATA_DB)
-        for id in ids:
-            table_name = windcode + '_wave'
-            lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} "
-            df = pd.read_sql(lastday_df_sql, engine)
-            df_res.append(df)
-        return df_res
-
+        table_name = windcode + '_wave'
+        ids_str = ','.join(map(str, ids))
+        sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str}) ORDER BY time_stamp"
+        df = pd.read_sql(sql, engine)
+        grouped = [group.reset_index(drop=True) for _, group in df.groupby('id')] 
+        return grouped
+      
     # envelope_spectrum_analysis 包络谱分析
     def _bandpass_filter(self, data):
         """带通滤波"""
@@ -148,6 +153,8 @@ class CMSAnalyst:
             "B3P": _3P_1X,
         }
         # result = json.dumps(result, ensure_ascii=False)
+        result = self.replace_nan(result)
+
         return result
 
         # frequency_domain_analysis 频谱分析
@@ -216,6 +223,7 @@ class CMSAnalyst:
                        {"Xaxis": fn_Gen * 5, "val": "5X"}, {"Xaxis": fn_Gen * 6, "val": "6X"}],
             "B3P": _3P_1X,
         }
+        result = self.replace_nan(result)       
         result = json.dumps(result, ensure_ascii=False)
         return result
 
@@ -283,83 +291,73 @@ class CMSAnalyst:
             "rpm_Gen": round(rpm_Gen, 2),  # 转速r/min
 
         }
-
+        result = self.replace_nan(result)
         result = json.dumps(result, ensure_ascii=False)
 
         return result
 
-    # trend_analysis 趋势图
-
-    def trend_analysis(self):
-        all_stats = []
-
-        # 定义积分函数
-        def _integrate(data, dt):
-            return np.cumsum(data) * dt
-
-        # 定义计算统计指标的函数
-        def _calculate_stats(data):
-            mean_value = np.mean(data)
-            max_value = np.max(data)
-            min_value = np.min(data)
-            Xrms = np.sqrt(np.mean(data ** 2))  # 加速度均方根值(有效值)
-            Xp = (max_value - min_value) / 2  # 峰值(单峰最大值) # 峰值
-            Cf = Xp / Xrms  # 峰值指标
-            Sf = Xrms / mean_value  # 波形指标
-            If = Xp / np.mean(np.abs(data))  # 脉冲指标
-            Xr = np.mean(np.sqrt(np.abs(data))) ** 2  # 方根幅值
-            Ce = Xp / Xr  # 裕度指标
-
-            # 计算每个数据点的绝对值减去均值后的三次方,并求和
-            sum_abs_diff_cubed_3 = np.mean((np.abs(data) - mean_value) ** 3)
-            # 计算偏度指标
-            Cw = sum_abs_diff_cubed_3 / (Xrms ** 3)
-            # 计算每个数据点的绝对值减去均值后的四次方,并求和
-            sum_abs_diff_cubed_4 = np.mean((np.abs(data) - mean_value) ** 4)
-            # 计算峭度指标
-            Cq = sum_abs_diff_cubed_4 / (Xrms ** 4)
-            #
+    def trend_analysis(self) -> str:
+        """
+        优化后的趋势分析方法(向量化计算统计指标)
+        返回 JSON 字符串,包含所有时间点的统计结果。
+        """
+        for df in self.datas:
+            df['parsed_data'] = df['mesure_data'].apply(json.loads)            
+        # 1. 合并所有数据并解析 mesure_data
+        combined_df = pd.concat(self.datas)
+        combined_df['parsed_data'] = combined_df['mesure_data'].apply(json.loads)  # 批量解析 JSON
+
+        # 2. 向量化计算统计指标(避免逐行循环)
+        def calculate_stats(group: pd.DataFrame) -> Dict[str, Any]:
+            data = np.array(group['parsed_data'].iloc[0])  # 提取振动数据数组
+            fs = int(group['sampling_frequency'].iloc[0])  # 采样频率
+            dt = 1 / fs  # 时间间隔
+
+            # 计算时域指标(向量化操作)
+            mean = np.mean(data)
+            max_val = np.max(data)
+            min_val = np.min(data)
+            Xrms = np.sqrt(np.mean(data ** 2))
+            Xp = (max_val - min_val) / 2
+            Cf = Xp / Xrms
+            Sf = Xrms / mean if mean != 0 else 0
+            If = Xp / np.mean(np.abs(data))
+
+            # 计算速度和峭度指标
+            velocity = np.cumsum(data) * dt  # 积分计算速度
+            velocity_rms = np.sqrt(np.mean(velocity ** 2))
+            Cq = np.mean((data - mean) ** 4) / (Xrms ** 4) if Xrms != 0 else 0
 
             return {
-                "fs": self.fs,  # 采样频率
-                "Mean": round(mean_value, 2),  # 平均值
-                "Max": round(max_value, 2),  # 最大值
-                "Min": round(min_value, 2),  # 最小值
-                "Xrms": round(Xrms, 2),  # 有效值
-                "Xp": round(Xp, 2),  # 峰值
-                "If": round(If, 2),  # 脉冲指标
-                "Cf": round(Cf, 2),  # 峰值指标
-                "Sf": round(Sf, 2),  # 波形指标
-                "Ce": round(Ce, 2),  # 裕度指标
-                "Cw": round(Cw, 2),  # 偏度指标
-                "Cq": round(Cq, 2),  # 峭度指标
-                # velocity_rms :速度有效值
-                # time_stamp:时间戳
+                "time_stamp": str(group['time_stamp'].iloc[0]),
+                "fs": fs,
+                "Mean": round(mean, 2),
+                "Max": round(max_val, 2),
+                "Min": round(min_val, 2),
+                "Xrms": round(Xrms, 2),
+                "Xp": round(Xp, 2),
+                "Cf": round(Cf, 2),
+                "Sf": round(Sf, 2),
+                "If": round(If, 2),
+                "velocity_rms": round(velocity_rms, 2),
+                "Cq": round(Cq, 2)
             }
 
-        for data in self.datas:
-            fs = int(self.data_filter['sampling_frequency'].iloc[0])
-            dt = 1 / fs
-            time_stamp = data['time_stamp'][0]
-            data = np.array(ast.literal_eval(data['mesure_data'][0]))
-
-            velocity = _integrate(data, dt)
-            velocity_rms = np.sqrt(np.mean(velocity ** 2))
-            stats = _calculate_stats(data)
-            # 速度有效值
-            stats["velocity_rms"] = round(velocity_rms, 2)
-            # 时间戳
-            stats["time_stamp"] = str(time_stamp)
-            all_stats.append(stats)
-        all_stats = json.dumps(all_stats, ensure_ascii=False)
-        return all_stats
+        # 3. 按 ID 分组并应用统计计算
+        stats = combined_df.groupby('id').apply(calculate_stats).tolist()
 
+        # 4. 返回 JSON 格式结果
+        return json.dumps(stats, ensure_ascii=False) 
+ 
     def Characteristic_Frequency(self):
         """提取轴承、齿轮等参数"""
         # 1、从测点名称中提取部件名称(计算特征频率的部件)
         str1 = self.mesure_point_name
         # 2、连接233的数据库'energy_show',从表'wind_engine_group'查找风机编号'engine_code'对应的机型编号'mill_type_code'
         engine_code = self.wind_code
+        # print("engine_code:", engine_code)
+        logger.info("!!!")
+        logger.info(f"engine_code {engine_code} ")
         engine = get_engine(dataBase.PLATFORM_DB)
         df_sql2 = f"SELECT * FROM wind_engine_group WHERE engine_code = '{engine_code}'"
         df2 = pd.read_sql(df_sql2, engine)
@@ -578,3 +576,12 @@ class CMSAnalyst:
             "FTF": round(FTF, 2),
 
         }
+    #检查返回结果是否有nan 若有,则替换成none
+    def replace_nan(self, obj):
+        if isinstance(obj, dict):
+            return {k: self.replace_nan(v) for k, v in obj.items()}
+        elif isinstance(obj, list):
+            return [self.replace_nan(item) for item in obj]
+        elif isinstance(obj, float) and math.isnan(obj):
+            return None
+        return obj

+ 1 - 1
app/services/HealthCacheService.py

@@ -7,7 +7,7 @@ from typing import Optional, Dict, Any
 
 class CacheService:
     def __init__(self, host: str = '192.168.50.233', port: int = 6379,
-                 db: int = 10, password: int = 123456, ttl: Optional[int] = None):
+                 db: int = 11, password: int = 123456, ttl: Optional[int] = None):
         """
         初始化Redis缓存服务
         :param ttl: 默认缓存时间(秒)

+ 1 - 1
app/services/HealthPretrain.py

@@ -18,7 +18,7 @@ class WindFarmPretrainModel:
         self.subsystem_models = {}  # 各子系统模型
         self.features = {}  # 各子系统使用的特征
         self.turbine_codes = []  # 包含的风机列表
-        self.cache_client = CacheService(host='192.168.50.233', port=6379,db =10,password=123456,ttl=2592000 )  # 添加缓存客户端
+        self.cache_client = CacheService(host='192.168.50.233', port=6379,db =11,password=123456,ttl=2592000 )  # 添加缓存客户端
     def train(self, data_dict: Dict[str, pd.DataFrame], mill_type: str):
         """训练风场模型(支持单特征子系统)"""
         self.mill_type = mill_type

+ 96 - 1
app/services/MSET_Temp.py

@@ -9,7 +9,7 @@ from sklearn.neighbors import BallTree
 
 from app.config import dataBase
 from app.database import get_engine
-
+from typing import Dict
 
 class MSET_Temp:
     """
@@ -184,3 +184,98 @@ class MSET_Temp:
             metric=lambda a, b: 1.0 - inst.calcSimilarity(a, b)
         )
         return inst
+
+    def query_surrounding_data(self, timestamp: str, minutes_around: int = 250) -> Dict:
+        """
+        查询指定时间点前后50个点的数据
+        参数:
+            timestamp: 中心时间点,格式为 'yyyy-mm-dd HH:MM:SS'
+            minutes_around: 查询前后多少分钟的数据
+        返回:
+            {
+                'record_count': int,
+                'records': List[Dict],
+                'columns_mapping': Dict[str, str]  # 字段中英文映射
+            }
+        """
+        # 中英文映射字典
+        cn_map = {
+            'wind_turbine_name':'风机名称',
+            'time_stamp': '时间',
+            'active_power': '有功功率(kW)',
+            'rotor_speed': '风轮转速(rpm)',
+            'generator_speed':'发电机转速(rpm)',
+            'wind_velocity': '风速(m/s)',
+            'pitch_angle_blade_1':'桨距角1(°)',
+            'pitch_angle_blade_2':'桨距角2(°)',  
+            'pitch_angle_blade_3':'桨距角3(°)',
+            'cabin_position':'机舱位置(°)',   
+            'true_wind_direction':'绝对风向(°)',
+            'yaw_error1':'对风角度(°)',     
+            'set_value_of_active_power':'有功功率设定值(kW)',
+            'gearbox_oil_temperature':'齿轮箱油温(℃)',     
+            'generatordrive_end_bearing_temperature':'发电机驱动端轴承温度(℃)',
+            'generatornon_drive_end_bearing_temperature':'发电机非驱动端轴承温度(℃)',     
+            'cabin_temperature':'机舱内温度(℃)',
+            'twisted_cable_angle':'扭缆角度(°)',     
+            'outside_cabin_temperature':'环境温度(℃)',
+            'main_bearing_temperature':'主轴承轴承温度(℃)',     
+            'main_bearing_temperature_2': '主轴承轴承温度2(℃)',            
+            'gearbox_high_speed_shaft_bearing_temperature':'齿轮箱高速轴轴承温度(℃)',
+            'gearboxmedium_speed_shaftbearing_temperature':'齿轮箱中速轴轴承温度(℃)',     
+            'gearbox_low_speed_shaft_bearing_temperature':'齿轮箱低速轴轴承温度(℃)',
+            'generator_winding1_temperature':'发电机绕组1温度(℃)',     
+            'generator_winding2_temperature':'发电机绕组2温度(℃)',
+            'generator_winding3_temperature':'发电机绕组3温度(℃)',     
+            'grid_a_phase_current':'电网A相电流(A)',     
+            'grid_b_phase_current': '电网B相电流(A)',
+            'grid_c_phase_current': '电网C相电流(A)'
+        }
+
+        table = f"{self.windCode}_minute"
+        engine = get_engine(dataBase.DATA_DB)
+
+        # 查询数据
+        sql = text(f"""
+            SELECT *
+            FROM {table}
+            WHERE wind_turbine_number IN ({','.join([f"'{t}'" for t in self.windTurbineNumberList])})
+            AND time_stamp BETWEEN 
+                DATE_SUB(:timestamp, INTERVAL :minutes MINUTE) 
+                AND DATE_ADD(:timestamp, INTERVAL :minutes MINUTE)
+            ORDER BY time_stamp ASC
+        """)
+        
+        df = pd.read_sql(sql, engine, params={
+            "timestamp": timestamp,
+            "minutes": minutes_around
+        })
+
+        # 打印查询到的数据条数
+        record_count = len(df)
+        print(f"查询到 {record_count} 条数据")
+
+        if df.empty:
+            return {
+                'record_count': 0,
+                'records': [],
+                'columns_mapping': {}
+            }
+
+        # 删除空列和不需要的列
+        cols_to_drop = ['wind_turbine_number', 'reactive_power','lab', 'year', 'month','day','year_month','front_back_vibration_of_the_cabin','side_to_side_vibration_of_the_cabin',
+                        'actual_torque','given_torque','clockwise_yaw_count','counterclockwise_yaw_count','unusable','power_curve_available','required_gearbox_speed','inverter_speed_master_control',
+                        'wind_turbine_status','wind_turbine_status2','turbulence_intensity'
+                        ]
+        cols_to_drop = [col for col in cols_to_drop if col in df.columns]
+        df = df.drop(columns=cols_to_drop)
+        df = df.dropna(axis=1, how='all')
+
+        # 转换字段名和格式
+        df['time_stamp'] = df['time_stamp'].astype(str)
+        records = df.rename(columns=cn_map).to_dict('records')
+
+        return {
+            'record_count': record_count,
+            'records': records
+        }