Browse Source

增加对nan值的处理,解决取值下标异常问题,优化趋势分析功能

wangjiaojiao 1 day ago
parent
commit
efd4f54244
1 changed files with 106 additions and 142 deletions
  1. 106 142
      cms_class_20241201.py

+ 106 - 142
cms_class_20241201.py

@@ -5,6 +5,7 @@ import plotly.graph_objs as go
 import pandas as pd
 from sqlalchemy import create_engine, text
 import sqlalchemy
+from typing import Dict,Any
 import json
 import ast
 import math
@@ -61,14 +62,22 @@ df_data = []
 # 主要的类
 class CMSAnalyst:
     def __init__(self, fmin, fmax, table_name, ids):
-
+        self.table_name =table_name
+        self.ids = ids
         # envelope_spectrum_analysis
         # datas是[df1,df2,.....]
         #从数据库获取原始数据
         self.datas = self._get_by_id(table_name,ids)
-        self.datas = [df[['mesure_data','time_stamp','sampling_frequency','wind_turbine_number','rotational_speed','mesure_point_name']] for df in self.datas]
+        self.datas = [
+            df[['id', 'mesure_data', 'time_stamp', 'sampling_frequency', 
+                'wind_turbine_number', 'rotational_speed', 'mesure_point_name']]
+            for df in self.datas
+        ]       
+
         # 只输入一个id,返回一个[df],所以拿到self.data[0]
         self.data_filter = self.datas[0]
+        # print("mesure_data sample:", self.data_filter['mesure_data'].iloc[0])  # 打印第一条数据
+        self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'].iloc[0]))        
         # print(self.data_filter)
         # 取数据列
         self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'][0]))
@@ -117,17 +126,39 @@ class CMSAnalyst:
         # time_domain_analysis
         self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs
         
+    # def _get_by_id(self, windcode, ids):
+    #     df_res = []
+    #     #engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
+    #     engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')
+    #     for id in ids:
+    #         table_name=windcode+'_wave'
+    #         lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} "
+    #         # print(lastday_df_sql)
+    #         df = pd.read_sql(lastday_df_sql, engine)
+    #         df_res.append(df)
+    #     return df_res
+    
+    # def _get_by_id(self, windcode, ids):
+    #     #engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')
+    #     engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
+    #     table_name = windcode + '_wave'
+    #     ids_str = ','.join(map(str, ids))
+    #     sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str}) ORDER BY time_stamp"
+    #     df = pd.read_sql(sql, engine)
+        
+    #     # 按ID分组返回
+    #     grouped = [group for _, group in df.groupby('id')]
+    #     return grouped        
     def _get_by_id(self, windcode, ids):
-        df_res = []
         engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
-     #   engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')
-        for id in ids:
-            table_name=windcode+'_wave'
-            lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} "
-            # print(lastday_df_sql)
-            df = pd.read_sql(lastday_df_sql, engine)
-            df_res.append(df)
-        return df_res
+        table_name = windcode + '_wave'
+        ids_str = ','.join(map(str, ids))
+        sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str}) ORDER BY time_stamp"
+        print("Executing SQL:", sql)  # 打印 SQL
+        df = pd.read_sql(sql, engine)
+        print("Returned DataFrame shape:", df.shape)  # 检查返回的数据量
+        grouped = [group for _, group in df.groupby('id')]
+        return grouped
 
     # envelope_spectrum_analysis 包络谱分析
     def _bandpass_filter(self, data):
@@ -215,7 +246,7 @@ class CMSAnalyst:
             "B3P":_3P_1X,
         }
         # result = json.dumps(result, ensure_ascii=False)
-        reslut = self.replace_nan(reslut)
+        result = self.replace_nan(result)
 
         return result
 
@@ -287,7 +318,7 @@ class CMSAnalyst:
                         {"Xaxis": fn_Gen*5, "val": "5X"}, {"Xaxis": fn_Gen*6, "val": "6X"}],
             "B3P":_3P_1X,
         }
-        reslut = self.replace_nan(reslut)       
+        result = self.replace_nan(result)       
         result = json.dumps(result, ensure_ascii=False)
         return result
 
@@ -343,118 +374,84 @@ class CMSAnalyst:
             "rpm_Gen": round(rpm_Gen, 2),  # 转速r/min
 
         }
-        reslut = self.replace_nan(reslut)
+        result = self.replace_nan(result)
         result = json.dumps(result, ensure_ascii=False)
 
         return result
 
+    def trend_analysis(self) -> str:
+        """
+        优化后的趋势分析方法(向量化计算统计指标)
+        返回 JSON 字符串,包含所有时间点的统计结果。
+        """
+        for df in self.datas:
+            df['parsed_data'] = df['mesure_data'].apply(json.loads)            
+        # 1. 合并所有数据并解析 mesure_data
+        combined_df = pd.concat(self.datas)
+        
 
-    # trend_analysis 趋势图
-
-    def trend_analysis(self):
-
-        all_stats = []
-
-        # 定义积分函数
-        def _integrate(data, dt):
-            return np.cumsum(data) * dt
-
-        # 定义计算统计指标的函数
-        def _calculate_stats(data):
-            mean_value = np.mean(data)
-            max_value = np.max(data)
-            min_value = np.min(data)
-            Xrms = np.sqrt(np.mean(data**2))  # 加速度均方根值(有效值)
-            # Xrms = filtered_acceleration_rms  # 加速度均方根值(有效值)
-            Xp = (max_value - min_value) / 2  # 峰值(单峰最大值) # 峰值
-            Cf = Xp / Xrms  # 峰值指标
-            Sf = Xrms / mean_value  # 波形指标
-            If = Xp / np.mean(np.abs(data))  # 脉冲指标
-            Xr = np.mean(np.sqrt(np.abs(data))) ** 2  # 方根幅值
-            Ce = Xp / Xr  # 裕度指标
-
-            # 计算每个数据点的绝对值减去均值后的三次方,并求和
-            sum_abs_diff_cubed_3 = np.mean((np.abs(data) - mean_value) ** 3)
-            # 计算偏度指标
-            Cw = sum_abs_diff_cubed_3 / (Xrms**3)
-            # 计算每个数据点的绝对值减去均值后的四次方,并求和
-            sum_abs_diff_cubed_4 = np.mean((np.abs(data) - mean_value) ** 4)
-            # 计算峭度指标
-            Cq = sum_abs_diff_cubed_4 / (Xrms**4)
-            #
-
+        combined_df['parsed_data'] = combined_df['mesure_data'].apply(json.loads)  # 批量解析 JSON
+        
+        # 2. 向量化计算统计指标(避免逐行循环)
+        def calculate_stats(group: pd.DataFrame) -> Dict[str, Any]:
+            data = np.array(group['parsed_data'].iloc[0])  # 提取振动数据数组
+            fs = int(group['sampling_frequency'].iloc[0])  # 采样频率
+            dt = 1 / fs  # 时间间隔
+
+            # 计算时域指标(向量化操作)
+            mean = np.mean(data)
+            max_val = np.max(data)
+            min_val = np.min(data)
+            Xrms = np.sqrt(np.mean(data**2))
+            Xp = (max_val - min_val) / 2
+            Cf = Xp / Xrms
+            Sf = Xrms / mean if mean != 0 else 0
+            If = Xp / np.mean(np.abs(data))
+            Xr = np.mean(np.sqrt(np.abs(data))) ** 2
+            Ce = Xp / Xr
+            
+            # 计算偏度和峭度
+
+ 
+            # 计算速度有效值
+            velocity = np.cumsum(data) * dt
+            velocity_rms = np.sqrt(np.mean(velocity**2))
+            Cw = np.mean((data - mean) ** 3) / (Xrms ** 3) if Xrms != 0 else 0
+            Cq = np.mean((data - mean) ** 4) / (Xrms ** 4) if Xrms != 0 else 0                     
             return {
-                "fs":self.fs,#采样频率
-                "Mean": round(mean_value, 2),#平均值
-                "Max": round(max_value, 2),#最大值
-                "Min": round(min_value, 2),#最小值
-                "Xrms": round(Xrms, 2),#有效值
-                "Xp": round(Xp, 2),#峰值
-                "If": round(If, 2), # 脉冲指标
-                "Cf": round(Cf, 2),#峰值指标
-                "Sf": round(Sf, 2),#波形指标
-                "Ce": round(Ce, 2),#裕度指标
-                "Cw": round(Cw, 2) ,#偏度指标
-                "Cq": round(Cq, 2),#峭度指标
-                #velocity_rms :速度有效值
-                #time_stamp:时间戳
+                "fs": fs,
+                "Mean": round(mean, 2),
+                "Max": round(max_val, 2),
+                "Min": round(min_val, 2),
+                "Xrms": round(Xrms, 2),
+                "Xp": round(Xp, 2),
+                "If": round(If, 2),
+                "Cf": round(Cf, 2),
+                "Sf": round(Sf, 2),
+                "Ce": round(Ce, 2),
+                "Cw": round(Cw, 2),
+                "Cq": round(Cq, 2),
+                "velocity_rms": round(velocity_rms, 2),
+                "time_stamp": str(group['time_stamp'].iloc[0])
             }
+        
+ 
+        # 3. 按 ID 分组并应用统计计算
+        stats = combined_df.groupby('id').apply(calculate_stats).tolist()
+
+        # 4. 返回 JSON 格式结果
+        return json.dumps(stats, ensure_ascii=False) 
 
-        for data in self.datas:
-            fs=int(self.data_filter['sampling_frequency'].iloc[0])
-            dt = 1 / fs
-            time_stamp=data['time_stamp'][0]
-            print(time_stamp)
-            data=np.array(ast.literal_eval(data['mesure_data'][0]))
 
-            velocity = _integrate(data, dt)
-            velocity_rms = np.sqrt(np.mean(velocity**2))
-            stats = _calculate_stats(data)
-            stats["velocity_rms"] = round(velocity_rms, 2)#速度有效值
-            stats["time_stamp"] = str(time_stamp)#时间戳
-
-            all_stats.append(stats)
-
-            # df = pd.DataFrame(all_stats)
-        all_stats = [self.replace_nan(stats) for stats in all_stats]
-        all_stats = json.dumps(all_stats,  ensure_ascii=False)
-        return all_stats
-
-    # def Characteristic_Frequency(self):
-    #     """提取轴承、齿轮等参数"""
-    #     # 1、从测点名称中提取部件名称(计算特征频率的部件)
-    #     str1 = self.mesure_point_name
-    #     # str2 = ["main_bearing", "front_main_bearing", "rear_main_bearing", "generator_non_drive_end"]
-    #     str2 = ["main_bearing", "front_main_bearing", "rear_main_bearing", "generator","stator","gearbox"]
-    #     for str in str2:
-    #         if str in str1:
-    #             parts = str
-    #             if parts == "front_main_bearing":
-    #                 parts = "front_bearing"
-    #             elif parts == "rear_main_bearing":
-    #                 parts = "rear_bearing"
-
-    #     print(parts)
     def Characteristic_Frequency(self):
         """提取轴承、齿轮等参数"""
-        # 1、从测点名称中提取部件名称(计算特征频率的部件)
         str1 = self.mesure_point_name
-        # str2 = ["main_bearing", "front_main_bearing", "rear_main_bearing", "generator_non_drive_end"]
-        # str2 = ["main_bearing", "front_main_bearing", "rear_main_bearing", "generator","stator","gearbox"]
-        # for str in str2:
-        #     if str in str1:
-        #         parts = str
-        #         # if parts == "front_main_bearing":
-        #         #     parts = "front_bearing"
-        #         # elif parts == "rear_main_bearing":
-        #         #     parts = "rear_bearing"
         print(str1)
-
         # 2、连接233的数据库'energy_show',从表'wind_engine_group'查找风机编号'engine_code'对应的机型编号'mill_type_code'
         engine_code = self.wind_code
         print(engine_code)
         Engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
-        #Engine2 = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')        
+        #Engine = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')        
         # df_sql2 = f"SELECT * FROM {'wind_engine_group'} where engine_code = {'engine_code'} "
         df_sql2 = f"SELECT * FROM wind_engine_group WHERE engine_code = '{engine_code}'"
         df2 = pd.read_sql(df_sql2, Engine)
@@ -462,25 +459,10 @@ class CMSAnalyst:
         print(mill_type_code)
 
         # # 3、从表'unit_bearings'中通过机型编号'mill_type_code'查找部件'brand'、'model'的参数信息
-        # Engine3 = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')
-        # #unit_bearings是主轴承的参数 
-        # df_sql3 = f"SELECT * FROM {'unit_bearings'} where mill_type_code = {'mill_type_code'} "
-        # df3 = pd.read_sql(df_sql3, Engine3)
-        # brand = 'front_bearing' + '_brand'  # parts代替'front_bearing'
-        # model = 'front_bearing' + '_model'  # parts代替'front_bearing'
-        # print(brand)
-        # _brand = df3[brand].iloc[0]
-        # _model = df3[model].iloc[0]
-        # print(_brand)
-        # print(_model)
-
         # 3、从相关的表中通过机型编号'mill_type_code'或者齿轮箱编号gearbox_code查找部件'brand'、'model'的参数信息
-
-#      Engine3 = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')
         #unit_bearings主轴承参数表 关键词"main_bearing"
         if 'main_bearing' in str1:
             print("main_bearing")
-            # df_sql3 = f"SELECT * FROM {'unit_bearings'} where mill_type_code = {'mill_type_code'} "
             df_sql3 = f"SELECT * FROM unit_bearings WHERE mill_type_code = '{mill_type_code}' "            
             df3 = pd.read_sql(df_sql3, Engine)
             if df3.empty:
@@ -643,25 +625,7 @@ class CMSAnalyst:
                 print(_brand)
                 print(_model)   
 
-        # # 4、从表'unit_dict_brand_model'中通过'_brand'、'_model'查找部件的参数信息
-        # Engine4 = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')
-        # df_sql4 = f"SELECT * FROM unit_dict_brand_model where manufacture = %s AND model_number = %s"
-        # params = [(_brand, _model)]
-        # df4 = pd.read_sql(df_sql4, Engine4, params=params)
-        # if 'bearing' in parts:
-        #     n_rolls = df4['rolls_number'].iloc[0]
-        #     d_rolls = df4['rolls_diameter'].iloc[0]
-        #     D_diameter = df4['circle_diameter'].iloc[0]
-        #     theta_deg = df4['theta_deg'].iloc[0]
-        #     result = {
-        #         "type":'bearing',
-        #         "n_rolls":round(n_rolls, 2),
-        #         "d_rolls":round(d_rolls, 2),
-        #         "D_diameter":round(D_diameter, 2),
-        #         "theta_deg":round(theta_deg, 2),
-        #     }
-        # # result = json.dumps(result, ensure_ascii=False)
-        #     return result
+
         # 4、从表'unit_dict_brand_model'中通过'_brand'、'_model'查找部件的参数信息
 
         df_sql4 = f"SELECT * FROM unit_dict_brand_model where manufacture = %s AND model_number = %s"