import numpy as np from scipy.signal import hilbert from scipy.fft import ifft import plotly.graph_objs as go import pandas as pd from sqlalchemy import create_engine, text import sqlalchemy import json import ast import math ''' # 输入: { "ids":[12345,121212], "windCode":"xxxx", "analysisType":"xxxxx", "fmin":int(xxxx) (None), "fmax":"int(xxxx) (None), } [{id:xxxx,"time":xxx},{}] id[123456] # 通过id,读取mysql,获取数据 engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data') def get_by_id(table_name,id): lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} " # print(lastday_df_sql) df = pd.read_sql(lastday_df_sql, engine) return df select distinct id, timeStamp from table_name group by ids ids time 1 xxx 2 xxxx df_data = [] # for id in ids: # sql_data = get_by_id('SKF001_wave',id) # df_data.append(sql_data) # print(sql_data) [df1,df2] ''' ''' 数据库字段: "samplingFrequency" "timeStamp" "mesureData" ''' # %% # %% # 主要的类 class CMSAnalyst: def __init__(self, fmin, fmax, table_name, ids): # envelope_spectrum_analysis # datas是[df1,df2,.....] self.datas = self._get_by_id(table_name,ids) self.datas = [df[['mesure_data','time_stamp','sampling_frequency','wind_turbine_number','rotational_speed','mesure_point_name']] for df in self.datas] # 只输入一个id,返回一个[df],所以拿到self.data[0] self.data_filter = self.datas[0] # print(self.data_filter) # 取数据列 self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'][0])) self.envelope_spectrum_m = self.data.shape[0] self.envelope_spectrum_n = 1 self.fs = int(self.data_filter['sampling_frequency'].iloc[0]) self.envelope_spectrum_t = np.arange(self.envelope_spectrum_m) / self.fs self.fmin = fmin if fmin is not None else 0 self.fmax = fmax if fmax is not None else float('inf') self.envelope_spectrum_y = self._bandpass_filter(self.data) self.f, self.HP = self._calculate_envelope_spectrum(self.envelope_spectrum_y) self.wind_code = self.data_filter['wind_turbine_number'].iloc[0] self.rpm_Gen = self.data_filter['rotational_speed'].iloc[0] self.mesure_point_name = self.data_filter['mesure_point_name'].iloc[0] self.fn_Gen = round(self.rpm_Gen/60,2) self.CF = self.Characteristic_Frequency() print(self.CF) self.CF = pd.DataFrame(self.CF,index=[0]) print(self.CF) print(self.rpm_Gen) if self.CF['type'].iloc[0] == 'bearing': n_rolls_m = self.CF['n_rolls'].iloc[0] d_rolls_m = self.CF['d_rolls'].iloc[0] D_diameter_m = self.CF['D_diameter'].iloc[0] theta_deg_m = self.CF['theta_deg'].iloc[0] print(n_rolls_m) print(d_rolls_m) print(D_diameter_m) print(theta_deg_m) self.bearing_frequencies = self.calculate_bearing_frequencies(n_rolls_m, d_rolls_m, D_diameter_m, theta_deg_m, self.rpm_Gen) print(self.bearing_frequencies) self.bearing_frequencies = pd.DataFrame(self.bearing_frequencies,index=[0]) print(self.bearing_frequencies) # frequency_domain_analysis ( self.frequency_domain_analysis_t, self.frequency_domain_analysis_f, self.frequency_domain_analysis_m, self.frequency_domain_analysis_mag, self.frequency_domain_analysis_Xrms, ) = self._calculate_spectrum(self.data) # time_domain_analysis self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs def _get_by_id(self, windcode, ids): df_res = [] engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod') for id in ids: table_name=windcode+'_wave' lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} " # print(lastday_df_sql) df = pd.read_sql(lastday_df_sql, engine) df_res.append(df) return df_res # envelope_spectrum_analysis def _bandpass_filter(self, data): """带通滤波""" m= data.shape[0] ni = round(self.fmin * self.envelope_spectrum_m / self.fs + 1) # na = round(self.fmax * self.envelope_spectrum_m / self.fs + 1) if self.fmax == float('inf'): na = m else: na = round(self.fmax * m / self.fs + 1) col = 1 y = np.zeros((self.envelope_spectrum_m, col)) # for p in range(col): # print(data.shape,p) z = np.fft.fft(data) a = np.zeros(self.envelope_spectrum_m, dtype=complex) a[ni:na] = z[ni:na] a[self.envelope_spectrum_m - na + 1 : self.envelope_spectrum_m - ni + 1] = z[ self.envelope_spectrum_m - na + 1 : self.envelope_spectrum_m - ni + 1 ] z = np.fft.ifft(a) y[:, 0] = np.real(z) return y def _calculate_envelope_spectrum(self, y): """计算包络谱""" m, n = y.shape HP = np.zeros((m, n)) col = 1 for p in range(col): H = np.abs(hilbert(y[:, p] - np.mean(y[:, p]))) HP[:, p] = np.abs(np.fft.fft(H - np.mean(H))) * 2 / m f = np.fft.fftfreq(m, d=1 / self.fs) return f, HP def envelope_spectrum(self): """绘制包络谱""" # 只取正频率部分 positive_frequencies = self.f[: self.envelope_spectrum_m // 2] positive_HP = self.HP[: self.envelope_spectrum_m // 2, 0] x = positive_frequencies y = positive_HP title = "包络谱" xaxis = "频率(Hz)" yaxis = "加速度(m/s^2)" Xrms = np.sqrt(np.mean(y**2)) # 加速度均方根值(有效值) rpm_Gen = round(self.rpm_Gen, 2) BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2) BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2) BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2) FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2) fn_Gen = round(self.fn_Gen, 2) _3P_1X = round(self.fn_Gen, 2) * 3 if self.CF['type'].iloc[0] == 'bearing': result = { "fs":self.fs, "Xrms":round(Xrms, 2), "x":list(x), "y":list(y), "title":title, "xaxis":xaxis, "yaxis":yaxis, "rpm_Gen": round(rpm_Gen, 2), # 转速r/min "BPFI": [{"Xaxis": BPFI_1X ,"val": "1BPFI"},{"Xaxis": BPFI_1X*2 ,"val": "2BPFI"}, {"Xaxis": BPFI_1X*3, "val": "3BPFI"}, {"Xaxis": BPFI_1X*4, "val": "4BPFI"}, {"Xaxis": BPFI_1X*5, "val": "5BPFI"}, {"Xaxis": BPFI_1X*6, "val": "6BPFI"}], "BPFO": [{"Xaxis": BPFO_1X ,"val": "1BPFO"},{"Xaxis": BPFO_1X*2 ,"val": "2BPFO"}, {"Xaxis": BPFO_1X*3, "val": "3BPFO"}, {"Xaxis": BPFO_1X*4, "val": "4BPFO"}, {"Xaxis": BPFO_1X*5, "val": "5BPFO"}, {"Xaxis": BPFO_1X*6, "val": "6BPFO"}], "BSF": [{"Xaxis": BSF_1X ,"val": "1BSF"},{"Xaxis": BSF_1X*2 ,"val": "2BSF"}, {"Xaxis": BSF_1X*3, "val": "3BSF"}, {"Xaxis": BSF_1X*4, "val": "4BSF"}, {"Xaxis": BSF_1X*5, "val": "5BSF"}, {"Xaxis": BSF_1X*6, "val": "6BSF"}], "FTF": [{"Xaxis": FTF_1X ,"val": "1FTF"},{"Xaxis": FTF_1X*2 ,"val": "2FTF"}, {"Xaxis": FTF_1X*3, "val": "3FTF"}, {"Xaxis": FTF_1X*4, "val": "4FTF"}, {"Xaxis": FTF_1X*5, "val": "5FTF"}, {"Xaxis": FTF_1X*6, "val": "6FTF"}], "fn_Gen":[{"Xaxis": fn_Gen ,"val": "1X"},{"Xaxis": fn_Gen*2 ,"val": "2X"}, {"Xaxis": fn_Gen*3, "val": "3X"}, {"Xaxis": fn_Gen*4, "val": "4X"}, {"Xaxis": fn_Gen*5, "val": "5X"}, {"Xaxis": fn_Gen*6, "val": "6X"}], "B3P":_3P_1X, } # result = json.dumps(result, ensure_ascii=False) return result # frequency_domain_analysis def _calculate_spectrum(self, data): """计算频谱""" m = data.shape[0] n = 1 t = np.arange(m) / self.fs mag = np.zeros((m, n)) Xrms = np.sqrt(np.mean(data**2)) # 加速度均方根值(有效值) # col=1 # for p in range(col): mag = np.abs(np.fft.fft(data - np.mean(data))) * 2 / m f = np.fft.fftfreq(m, d=1 / self.fs) return t, f, m, mag, Xrms def frequency_domain(self): """绘制频域波形参数""" # 只取正频率部分 positive_frequencies = self.frequency_domain_analysis_f[ : self.frequency_domain_analysis_m // 2 ] positive_mag = self.frequency_domain_analysis_mag[ : self.frequency_domain_analysis_m // 2 ] x = positive_frequencies y = positive_mag title = "频域信号" xaxis = "频率(Hz)" yaxis = "加速度(m/s^2)" Xrms = self.frequency_domain_analysis_Xrms rpm_Gen = round(self.rpm_Gen, 2) BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2) BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2) BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2) FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2) fn_Gen = round(self.fn_Gen, 2) _3P_1X = round(self.fn_Gen, 2) * 3 if self.CF['type'].iloc[0] == 'bearing': result = { "fs":self.fs, "Xrms":round(Xrms, 2), "x":list(x), "y":list(y), "title":title, "xaxis":xaxis, "yaxis":yaxis, "rpm_Gen": round(rpm_Gen, 2), # 转速r/min "BPFI": [{"Xaxis": BPFI_1X ,"val": "1BPFI"},{"Xaxis": BPFI_1X*2 ,"val": "2BPFI"}, {"Xaxis": BPFI_1X*3, "val": "3BPFI"}, {"Xaxis": BPFI_1X*4, "val": "4BPFI"}, {"Xaxis": BPFI_1X*5, "val": "5BPFI"}, {"Xaxis": BPFI_1X*6, "val": "6BPFI"}], "BPFO": [{"Xaxis": BPFO_1X ,"val": "1BPFO"},{"Xaxis": BPFO_1X*2 ,"val": "2BPFO"}, {"Xaxis": BPFO_1X*3, "val": "3BPFO"}, {"Xaxis": BPFO_1X*4, "val": "4BPFO"}, {"Xaxis": BPFO_1X*5, "val": "5BPFO"}, {"Xaxis": BPFO_1X*6, "val": "6BPFO"}], "BSF": [{"Xaxis": BSF_1X ,"val": "1BSF"},{"Xaxis": BSF_1X*2 ,"val": "2BSF"}, {"Xaxis": BSF_1X*3, "val": "3BSF"}, {"Xaxis": BSF_1X*4, "val": "4BSF"}, {"Xaxis": BSF_1X*5, "val": "5BSF"}, {"Xaxis": BSF_1X*6, "val": "6BSF"}], "FTF": [{"Xaxis": FTF_1X ,"val": "1FTF"},{"Xaxis": FTF_1X*2 ,"val": "2FTF"}, {"Xaxis": FTF_1X*3, "val": "3FTF"}, {"Xaxis": FTF_1X*4, "val": "4FTF"}, {"Xaxis": FTF_1X*5, "val": "5FTF"}, {"Xaxis": FTF_1X*6, "val": "6FTF"}], "fn_Gen":[{"Xaxis": fn_Gen ,"val": "1X"},{"Xaxis": fn_Gen*2 ,"val": "2X"}, {"Xaxis": fn_Gen*3, "val": "3X"}, {"Xaxis": fn_Gen*4, "val": "4X"}, {"Xaxis": fn_Gen*5, "val": "5X"}, {"Xaxis": fn_Gen*6, "val": "6X"}], "B3P":_3P_1X, } result = json.dumps(result, ensure_ascii=False) return result # time_domain_analysis def time_domain(self): """绘制时域波形参数""" x = self.time_domain_analysis_t y = self.data rpm_Gen =self.rpm_Gen title = "时间域信号" xaxis = "时间(s)" yaxis = "加速度(m/s^2)" # 图片右侧统计量 mean_value = np.mean(y)#平均值 max_value = np.max(y)#最大值 min_value = np.min(y)#最小值 Xrms = np.sqrt(np.mean(y**2)) # 加速度均方根值(有效值) Xp = (max_value - min_value) / 2 # 峰值(单峰最大值) # 峰值 Xpp=max_value-min_value#峰峰值 Cf = Xp / Xrms # 峰值指标 Sf = Xrms / mean_value # 波形指标 If = Xp / np.mean(np.abs(y)) # 脉冲指标 Xr = np.mean(np.sqrt(np.abs(y))) ** 2 # 方根幅值 Ce = Xp / Xr # 裕度指标 # 计算每个数据点的绝对值减去均值后的三次方,并求和 sum_abs_diff_cubed_3 = np.mean((np.abs(y) - mean_value) ** 3) # 计算偏度指标 Cw = sum_abs_diff_cubed_3 / (Xrms**3) # 计算每个数据点的绝对值减去均值后的四次方,并求和 sum_abs_diff_cubed_4 = np.mean((np.abs(y) - mean_value) ** 4) # 计算峭度指标 Cq = sum_abs_diff_cubed_4 / (Xrms**4) result = { "x":list(x), "y":list(y), "title":title, "xaxis":xaxis, "yaxis":yaxis, "fs":self.fs, "Xrms":round(Xrms, 2),#有效值 "mean_value":round(mean_value, 2),# 均值 "max_value":round(max_value, 2),# 最大值 "min_value":round(min_value, 2), # 最小值 "Xp":round(Xp, 2),# 峰值 "Xpp":round(Xpp, 2),# 峰峰值 "Cf":round(Cf, 2),# 峰值指标 "Sf":round(Sf, 2),# 波形因子 "If":round(If, 2),# 脉冲指标 "Ce":round(Ce, 2),# 裕度指标 "Cw":round(Cw, 2) ,# 偏度指标 "Cq":round(Cq, 2) ,# 峭度指标 "rpm_Gen": round(rpm_Gen, 2), # 转速r/min } result = json.dumps(result, ensure_ascii=False) return result # trend_analysis def trend_analysis(self): all_stats = [] # 定义积分函数 def _integrate(data, dt): return np.cumsum(data) * dt # 定义计算统计指标的函数 def _calculate_stats(data): mean_value = np.mean(data) max_value = np.max(data) min_value = np.min(data) Xrms = np.sqrt(np.mean(data**2)) # 加速度均方根值(有效值) # Xrms = filtered_acceleration_rms # 加速度均方根值(有效值) Xp = (max_value - min_value) / 2 # 峰值(单峰最大值) # 峰值 Cf = Xp / Xrms # 峰值指标 Sf = Xrms / mean_value # 波形指标 If = Xp / np.mean(np.abs(data)) # 脉冲指标 Xr = np.mean(np.sqrt(np.abs(data))) ** 2 # 方根幅值 Ce = Xp / Xr # 裕度指标 # 计算每个数据点的绝对值减去均值后的三次方,并求和 sum_abs_diff_cubed_3 = np.mean((np.abs(data) - mean_value) ** 3) # 计算偏度指标 Cw = sum_abs_diff_cubed_3 / (Xrms**3) # 计算每个数据点的绝对值减去均值后的四次方,并求和 sum_abs_diff_cubed_4 = np.mean((np.abs(data) - mean_value) ** 4) # 计算峭度指标 Cq = sum_abs_diff_cubed_4 / (Xrms**4) # return { "fs":self.fs,#采样频率 "Mean": round(mean_value, 2),#平均值 "Max": round(max_value, 2),#最大值 "Min": round(min_value, 2),#最小值 "Xrms": round(Xrms, 2),#有效值 "Xp": round(Xp, 2),#峰值 "If": round(If, 2), # 脉冲指标 "Cf": round(Cf, 2),#峰值指标 "Sf": round(Sf, 2),#波形指标 "Ce": round(Ce, 2),#裕度指标 "Cw": round(Cw, 2) ,#偏度指标 "Cq": round(Cq, 2),#峭度指标 #velocity_rms :速度有效值 #time_stamp:时间戳 } for data in self.datas: fs=int(self.data_filter['sampling_frequency'].iloc[0]) dt = 1 / fs time_stamp=data['time_stamp'][0] print(time_stamp) data=np.array(ast.literal_eval(data['mesure_data'][0])) velocity = _integrate(data, dt) velocity_rms = np.sqrt(np.mean(velocity**2)) stats = _calculate_stats(data) stats["velocity_rms"] = round(velocity_rms, 2)#速度有效值 stats["time_stamp"] = str(time_stamp)#时间戳 all_stats.append(stats) # df = pd.DataFrame(all_stats) all_stats = json.dumps(all_stats, ensure_ascii=False) return all_stats def Characteristic_Frequency(self): """提取轴承、齿轮等参数""" # 1、从测点名称中提取部件名称(计算特征频率的部件) str1 = self.mesure_point_name str2 = ["main_bearing", "front_main_bearing", "rear_main_bearing", "generator_non_drive_end"] for str in str2: if str in str1: parts = str if parts == "front_main_bearing": parts = "front_bearing" elif parts == "rear_main_bearing": parts = "rear_bearing" print(parts) # 2、连接233的数据库'energy_show',从表'wind_engine_group'查找风机编号'engine_code'对应的机型编号'mill_type_code' engine_code = self.wind_code Engine2 = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show') df_sql2 = f"SELECT * FROM {'wind_engine_group'} where engine_code = {'engine_code'} " df2 = pd.read_sql(df_sql2, Engine2) mill_type_code = df2['mill_type_code'].iloc[0] print(mill_type_code) # 3、从表'unit_bearings'中通过机型编号'mill_type_code'查找部件'brand'、'model'的参数信息 Engine3 = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show') df_sql3 = f"SELECT * FROM {'unit_bearings'} where mill_type_code = {'mill_type_code'} " df3 = pd.read_sql(df_sql3, Engine3) brand = 'front_bearing' + '_brand' # parts代替'front_bearing' model = 'front_bearing' + '_model' # parts代替'front_bearing' print(brand) _brand = df3[brand].iloc[0] _model = df3[model].iloc[0] print(_brand) print(_model) # 4、从表'unit_dict_brand_model'中通过'_brand'、'_model'查找部件的参数信息 Engine4 = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show') df_sql4 = f"SELECT * FROM unit_dict_brand_model where manufacture = %s AND model_number = %s" params = [(_brand, _model)] df4 = pd.read_sql(df_sql4, Engine4, params=params) if 'bearing' in parts: n_rolls = df4['rolls_number'].iloc[0] d_rolls = df4['rolls_diameter'].iloc[0] D_diameter = df4['circle_diameter'].iloc[0] theta_deg = df4['theta_deg'].iloc[0] result = { "type":'bearing', "n_rolls":round(n_rolls, 2), "d_rolls":round(d_rolls, 2), "D_diameter":round(D_diameter, 2), "theta_deg":round(theta_deg, 2), } # result = json.dumps(result, ensure_ascii=False) return result def calculate_bearing_frequencies(self, n, d, D, theta_deg, rpm): """ 计算轴承各部件特征频率 参数: n (int): 滚动体数量 d (float): 滚动体直径(单位:mm) D (float): 轴承节圆直径(滚动体中心圆直径,单位:mm) theta_deg (float): 接触角(单位:度) rpm (float): 转速(转/分钟) 返回: dict: 包含各特征频率的字典(单位:Hz) """ # 转换角度为弧度 theta = math.radians(theta_deg) # 转换直径单位为米(保持单位一致性,实际计算中比值抵消单位影响) # 注意:由于公式中使用的是比值,单位可以保持mm不需要转换 ratio = d / D # 基础频率计算(转/秒) f_r = rpm / 60.0 # 计算各特征频率 BPFI = n / 2 * (1 + ratio * math.cos(theta)) * f_r # 内圈故障频率 BPFO = n / 2 * (1 - ratio * math.cos(theta)) * f_r # 外圈故障频率 BSF = (D / (2 * d)) * (1 - (ratio ** 2) * (math.cos(theta) ** 2)) * f_r # 滚动体故障频率 FTF = 0.5 * (1 - ratio * math.cos(theta)) * f_r # 保持架故障频率 return { "BPFI": round(BPFI, 2), "BPFO": round(BPFO, 2), "BSF": round(BSF, 2), "FTF": round(FTF, 2), } if __name__ == "__main__": # table_name = "SKF001_wave" # ids = [67803,67804] # fmin, fmax = None, None cms = CMSAnalyst(fmin, fmax,table_name,ids) time_domain = cms.time_domain() # print(time_domain) ''' trace = go.Scatter( x=time_domain['x'], y=time_domain['y'], mode="lines", name=time_domain['title'], ) layout = go.Layout( title= time_domain['title'], xaxis=dict(title=time_domain["xaxis"]), yaxis=dict(title=time_domain["yaxis"]), ) fig = go.Figure(data=[trace], layout=layout) fig.show() ''' # data_path_lsit = ["test1.csv", "test2.csv"] # trend_analysis_test = cms.trend_analysis(data_path_lsit, fmin, fmax) # print(trend_analysis_test)