import ast import json import math import numpy as np import pandas as pd from scipy.signal import hilbert from typing import List, Dict, Any from app.config import dataBase from app.database import get_engine from app.logger import logger class CMSAnalyst: def __init__(self, fmin, fmax, table_name, ids): # 从数据库获取原始数据 self.table_name =table_name self.ids = ids self.datas = self._get_by_id(table_name, ids) self.datas = [ df[['id', 'mesure_data', 'time_stamp', 'sampling_frequency', 'wind_turbine_number', 'rotational_speed', 'mesure_point_name']] for df in self.datas ] # 只输入一个id,返回一个[df],所以拿到self.data[0] self.data_filter = self.datas[0] # 取数据列 self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'].iloc[0])) self.envelope_spectrum_m = self.data.shape[0] self.envelope_spectrum_n = 1 # 设置分析参数 self.fs = int(self.data_filter['sampling_frequency'].iloc[0]) self.envelope_spectrum_t = np.arange(self.envelope_spectrum_m) / self.fs self.fmin = fmin if fmin is not None else 0 self.fmax = fmax if fmax is not None else float('inf') self.envelope_spectrum_y = self._bandpass_filter(self.data) self.f, self.HP = self._calculate_envelope_spectrum(self.envelope_spectrum_y) # 设备信息 self.wind_code = self.data_filter['wind_turbine_number'].iloc[0] self.rpm_Gen = self.data_filter['rotational_speed'].iloc[0] self.mesure_point_name = self.data_filter['mesure_point_name'].iloc[0] self.fn_Gen = round(self.rpm_Gen / 60, 2) self.CF = self.Characteristic_Frequency() self.CF = pd.DataFrame(self.CF, index=[0]) n_rolls_m = self.CF['n_rolls'].iloc[0] d_rolls_m = self.CF['d_rolls'].iloc[0] D_diameter_m = self.CF['D_diameter'].iloc[0] theta_deg_m = self.CF['theta_deg'].iloc[0] self.bearing_frequencies = self.calculate_bearing_frequencies(n_rolls_m, d_rolls_m, D_diameter_m, theta_deg_m, self.rpm_Gen) self.bearing_frequencies = pd.DataFrame(self.bearing_frequencies, index=[0]) # frequency_domain_analysis ( self.frequency_domain_analysis_t, self.frequency_domain_analysis_f, self.frequency_domain_analysis_m, self.frequency_domain_analysis_mag, self.frequency_domain_analysis_Xrms, ) = self._calculate_spectrum(self.data) # time_domain_analysis self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs def _get_by_id(self, windcode, ids): engine = get_engine(dataBase.DATA_DB) table_name = windcode + '_wave' ids_str = ','.join(map(str, ids)) sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str}) ORDER BY time_stamp" df = pd.read_sql(sql, engine) grouped = [group.reset_index(drop=True) for _, group in df.groupby('id')] return grouped # envelope_spectrum_analysis 包络谱分析 def _bandpass_filter(self, data): """带通滤波""" m = data.shape[0] ni = round(self.fmin * self.envelope_spectrum_m / self.fs + 1) if self.fmax == float('inf'): na = m else: na = round(self.fmax * m / self.fs + 1) col = 1 y = np.zeros((self.envelope_spectrum_m, col)) z = np.fft.fft(data) a = np.zeros(self.envelope_spectrum_m, dtype=complex) a[ni:na] = z[ni:na] a[self.envelope_spectrum_m - na + 1: self.envelope_spectrum_m - ni + 1] = z[ self.envelope_spectrum_m - na + 1: self.envelope_spectrum_m - ni + 1 ] z = np.fft.ifft(a) y[:, 0] = np.real(z) return y def _calculate_envelope_spectrum(self, y): """计算包络谱""" m, n = y.shape HP = np.zeros((m, n)) col = 1 for p in range(col): H = np.abs(hilbert(y[:, p] - np.mean(y[:, p]))) HP[:, p] = np.abs(np.fft.fft(H - np.mean(H))) * 2 / m f = np.fft.fftfreq(m, d=1 / self.fs) return f, HP def envelope_spectrum(self): """绘制包络谱""" # 只取正频率部分 positive_frequencies = self.f[: self.envelope_spectrum_m // 2] positive_HP = self.HP[: self.envelope_spectrum_m // 2, 0] x = positive_frequencies y = positive_HP title = "包络谱" xaxis = "频率(Hz)" yaxis = "加速度(m/s^2)" # 加速度均方根值(有效值) Xrms = np.sqrt(np.mean(y ** 2)) rpm_Gen = round(self.rpm_Gen, 2) BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2) BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2) BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2) FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2) fn_Gen = round(self.fn_Gen, 2) _3P_1X = round(self.fn_Gen, 2) * 3 result = { "fs": self.fs, "Xrms": round(Xrms, 2), "x": list(x), "y": list(y), "title": title, "xaxis": xaxis, "yaxis": yaxis, "rpm_Gen": round(rpm_Gen, 2), # 转速r/min "BPFI": [{"Xaxis": BPFI_1X, "val": "1BPFI"}, {"Xaxis": BPFI_1X * 2, "val": "2BPFI"}, {"Xaxis": BPFI_1X * 3, "val": "3BPFI"}, {"Xaxis": BPFI_1X * 4, "val": "4BPFI"}, {"Xaxis": BPFI_1X * 5, "val": "5BPFI"}, {"Xaxis": BPFI_1X * 6, "val": "6BPFI"}], "BPFO": [{"Xaxis": BPFO_1X, "val": "1BPFO"}, {"Xaxis": BPFO_1X * 2, "val": "2BPFO"}, {"Xaxis": BPFO_1X * 3, "val": "3BPFO"}, {"Xaxis": BPFO_1X * 4, "val": "4BPFO"}, {"Xaxis": BPFO_1X * 5, "val": "5BPFO"}, {"Xaxis": BPFO_1X * 6, "val": "6BPFO"}], "BSF": [{"Xaxis": BSF_1X, "val": "1BSF"}, {"Xaxis": BSF_1X * 2, "val": "2BSF"}, {"Xaxis": BSF_1X * 3, "val": "3BSF"}, {"Xaxis": BSF_1X * 4, "val": "4BSF"}, {"Xaxis": BSF_1X * 5, "val": "5BSF"}, {"Xaxis": BSF_1X * 6, "val": "6BSF"}], "FTF": [{"Xaxis": FTF_1X, "val": "1FTF"}, {"Xaxis": FTF_1X * 2, "val": "2FTF"}, {"Xaxis": FTF_1X * 3, "val": "3FTF"}, {"Xaxis": FTF_1X * 4, "val": "4FTF"}, {"Xaxis": FTF_1X * 5, "val": "5FTF"}, {"Xaxis": FTF_1X * 6, "val": "6FTF"}], "fn_Gen": [{"Xaxis": fn_Gen, "val": "1X"}, {"Xaxis": fn_Gen * 2, "val": "2X"}, {"Xaxis": fn_Gen * 3, "val": "3X"}, {"Xaxis": fn_Gen * 4, "val": "4X"}, {"Xaxis": fn_Gen * 5, "val": "5X"}, {"Xaxis": fn_Gen * 6, "val": "6X"}], "B3P": _3P_1X, } # result = json.dumps(result, ensure_ascii=False) result = self.replace_nan(result) return result # frequency_domain_analysis 频谱分析 def _calculate_spectrum(self, data): """计算频谱""" m = data.shape[0] n = 1 t = np.arange(m) / self.fs mag = np.zeros((m, n)) Xrms = np.sqrt(np.mean(data ** 2)) # 加速度均方根值(有效值) # col=1 # for p in range(col): mag = np.abs(np.fft.fft(data - np.mean(data))) * 2 / m f = np.fft.fftfreq(m, d=1 / self.fs) return t, f, m, mag, Xrms def frequency_domain(self): """绘制频域波形参数""" # 只取正频率部分 positive_frequencies = self.frequency_domain_analysis_f[ : self.frequency_domain_analysis_m // 2 ] positive_mag = self.frequency_domain_analysis_mag[ : self.frequency_domain_analysis_m // 2 ] x = positive_frequencies y = positive_mag title = "频域信号" xaxis = "频率(Hz)" yaxis = "加速度(m/s^2)" Xrms = self.frequency_domain_analysis_Xrms rpm_Gen = round(self.rpm_Gen, 2) BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2) BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2) BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2) FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2) fn_Gen = round(self.fn_Gen, 2) _3P_1X = round(self.fn_Gen, 2) * 3 result = { "fs": self.fs, "Xrms": round(Xrms, 2), "x": list(x), "y": list(y), "title": title, "xaxis": xaxis, "yaxis": yaxis, "rpm_Gen": round(rpm_Gen, 2), # 转速r/min "BPFI": [{"Xaxis": BPFI_1X, "val": "1BPFI"}, {"Xaxis": BPFI_1X * 2, "val": "2BPFI"}, {"Xaxis": BPFI_1X * 3, "val": "3BPFI"}, {"Xaxis": BPFI_1X * 4, "val": "4BPFI"}, {"Xaxis": BPFI_1X * 5, "val": "5BPFI"}, {"Xaxis": BPFI_1X * 6, "val": "6BPFI"}], "BPFO": [{"Xaxis": BPFO_1X, "val": "1BPFO"}, {"Xaxis": BPFO_1X * 2, "val": "2BPFO"}, {"Xaxis": BPFO_1X * 3, "val": "3BPFO"}, {"Xaxis": BPFO_1X * 4, "val": "4BPFO"}, {"Xaxis": BPFO_1X * 5, "val": "5BPFO"}, {"Xaxis": BPFO_1X * 6, "val": "6BPFO"}], "BSF": [{"Xaxis": BSF_1X, "val": "1BSF"}, {"Xaxis": BSF_1X * 2, "val": "2BSF"}, {"Xaxis": BSF_1X * 3, "val": "3BSF"}, {"Xaxis": BSF_1X * 4, "val": "4BSF"}, {"Xaxis": BSF_1X * 5, "val": "5BSF"}, {"Xaxis": BSF_1X * 6, "val": "6BSF"}], "FTF": [{"Xaxis": FTF_1X, "val": "1FTF"}, {"Xaxis": FTF_1X * 2, "val": "2FTF"}, {"Xaxis": FTF_1X * 3, "val": "3FTF"}, {"Xaxis": FTF_1X * 4, "val": "4FTF"}, {"Xaxis": FTF_1X * 5, "val": "5FTF"}, {"Xaxis": FTF_1X * 6, "val": "6FTF"}], "fn_Gen": [{"Xaxis": fn_Gen, "val": "1X"}, {"Xaxis": fn_Gen * 2, "val": "2X"}, {"Xaxis": fn_Gen * 3, "val": "3X"}, {"Xaxis": fn_Gen * 4, "val": "4X"}, {"Xaxis": fn_Gen * 5, "val": "5X"}, {"Xaxis": fn_Gen * 6, "val": "6X"}], "B3P": _3P_1X, } result = self.replace_nan(result) result = json.dumps(result, ensure_ascii=False) return result # time_domain_analysis 时域分析 def time_domain(self): """绘制时域波形参数""" x = self.time_domain_analysis_t y = self.data rpm_Gen = self.rpm_Gen title = "时间域信号" xaxis = "时间(s)" yaxis = "加速度(m/s^2)" # 图片右侧统计量 # 平均值 mean_value = np.mean(y) # 最大值 max_value = np.max(y) # 最小值 min_value = np.min(y) # 加速度均方根值(有效值) Xrms = np.sqrt(np.mean(y ** 2)) # 峰值(单峰最大值) # 峰值 Xp = (max_value - min_value) / 2 # 峰峰值 Xpp = max_value - min_value # 峰值指标 Cf = Xp / Xrms # 波形指标 Sf = Xrms / mean_value # 脉冲指标 If = Xp / np.mean(np.abs(y)) # 方根幅值 Xr = np.mean(np.sqrt(np.abs(y))) ** 2 # 裕度指标 Ce = Xp / Xr # 计算每个数据点的绝对值减去均值后的三次方,并求和 sum_abs_diff_cubed_3 = np.mean((np.abs(y) - mean_value) ** 3) # 计算偏度指标 Cw = sum_abs_diff_cubed_3 / (Xrms ** 3) # 计算每个数据点的绝对值减去均值后的四次方,并求和 sum_abs_diff_cubed_4 = np.mean((np.abs(y) - mean_value) ** 4) # 计算峭度指标 Cq = sum_abs_diff_cubed_4 / (Xrms ** 4) result = { "x": list(x), "y": list(y), "title": title, "xaxis": xaxis, "yaxis": yaxis, "fs": self.fs, "Xrms": round(Xrms, 2), # 有效值 "mean_value": round(mean_value, 2), # 均值 "max_value": round(max_value, 2), # 最大值 "min_value": round(min_value, 2), # 最小值 "Xp": round(Xp, 2), # 峰值 "Xpp": round(Xpp, 2), # 峰峰值 "Cf": round(Cf, 2), # 峰值指标 "Sf": round(Sf, 2), # 波形因子 "If": round(If, 2), # 脉冲指标 "Ce": round(Ce, 2), # 裕度指标 "Cw": round(Cw, 2), # 偏度指标 "Cq": round(Cq, 2), # 峭度指标 "rpm_Gen": round(rpm_Gen, 2), # 转速r/min } result = self.replace_nan(result) result = json.dumps(result, ensure_ascii=False) return result def trend_analysis(self) -> str: """ 优化后的趋势分析方法(向量化计算统计指标) 返回 JSON 字符串,包含所有时间点的统计结果。 """ for df in self.datas: df['parsed_data'] = df['mesure_data'].apply(json.loads) # 1. 合并所有数据并解析 mesure_data combined_df = pd.concat(self.datas) combined_df['parsed_data'] = combined_df['mesure_data'].apply(json.loads) # 批量解析 JSON # 2. 向量化计算统计指标(避免逐行循环) def calculate_stats(group: pd.DataFrame) -> Dict[str, Any]: data = np.array(group['parsed_data'].iloc[0]) # 提取振动数据数组 fs = int(group['sampling_frequency'].iloc[0]) # 采样频率 dt = 1 / fs # 时间间隔 # 计算时域指标(向量化操作) mean = np.mean(data) max_val = np.max(data) min_val = np.min(data) Xrms = np.sqrt(np.mean(data ** 2)) Xp = (max_val - min_val) / 2 Cf = Xp / Xrms Sf = Xrms / mean if mean != 0 else 0 If = Xp / np.mean(np.abs(data)) # 计算速度和峭度指标 velocity = np.cumsum(data) * dt # 积分计算速度 velocity_rms = np.sqrt(np.mean(velocity ** 2)) Cq = np.mean((data - mean) ** 4) / (Xrms ** 4) if Xrms != 0 else 0 return { "time_stamp": str(group['time_stamp'].iloc[0]), "fs": fs, "Mean": round(mean, 2), "Max": round(max_val, 2), "Min": round(min_val, 2), "Xrms": round(Xrms, 2), "Xp": round(Xp, 2), "Cf": round(Cf, 2), "Sf": round(Sf, 2), "If": round(If, 2), "velocity_rms": round(velocity_rms, 2), "Cq": round(Cq, 2) } # 3. 按 ID 分组并应用统计计算 stats = combined_df.groupby('id').apply(calculate_stats).tolist() # 4. 返回 JSON 格式结果 return json.dumps(stats, ensure_ascii=False) def Characteristic_Frequency(self): """提取轴承、齿轮等参数""" # 1、从测点名称中提取部件名称(计算特征频率的部件) str1 = self.mesure_point_name # 2、连接233的数据库'energy_show',从表'wind_engine_group'查找风机编号'engine_code'对应的机型编号'mill_type_code' engine_code = self.wind_code engine = get_engine(dataBase.PLATFORM_DB) df_sql2 = f"SELECT * FROM wind_engine_group WHERE engine_code = '{engine_code}'" df2 = pd.read_sql(df_sql2, engine) mill_type_code = df2['mill_type_code'].iloc[0] # 3、从相关的表中通过机型编号'mill_type_code'或者齿轮箱编号gearbox_code查找部件'brand'、'model'的参数信息 # unit_bearings主轴承参数表 关键词"main_bearing" if 'main_bearing' in str1: logger.info("main_bearing") # df_sql3 = f"SELECT * FROM {'unit_bearings'} where mill_type_code = {'mill_type_code'} " df_sql3 = f"SELECT * FROM unit_bearings WHERE mill_type_code = '{mill_type_code}' " df3 = pd.read_sql(df_sql3, engine) if df3.empty: logger.info("警告: 没有找到有效的机型信息") if 'front' in str1: brand = 'front_bearing' + '_brand' model = 'front_bearing' + '_model' front_has_value = not pd.isna(df3[brand].iloc[0]) and not pd.isna(df3[model].iloc[0]) if not front_has_value: logger.info("警告: 没有找到有效的品牌信息") elif 'rear' in str1: brand = 'rear_bearing' + '_brand' model = 'rear_bearing' + '_model' end_has_value = not pd.isna(df3[brand].iloc[0]) and not pd.isna(df3[model].iloc[0]) if not end_has_value: logger.info("警告: 没有找到有效的品牌信息") else: # 当没有指定 front 或 end 时,自动选择有值的轴承信息 front_brand_col = 'front_bearing_brand' front_model_col = 'front_bearing_model' rear_brand_col = 'rear_bearing_brand' rear_model_col = 'rear_bearing_model' # 检查 front_bearing 是否有值 front_has_value = not pd.isna(df3[front_brand_col].iloc[0]) and not pd.isna( df3[front_model_col].iloc[0]) # 检查 end_bearing 是否有值 end_has_value = not pd.isna(df3[rear_brand_col].iloc[0]) and not pd.isna(df3[rear_model_col].iloc[0]) # 根据检查结果选择合适的列 if front_has_value and end_has_value: # 如果两者都有值,默认选择 front brand = front_brand_col model = front_model_col elif front_has_value: brand = front_brand_col model = front_model_col elif end_has_value: brand = rear_brand_col model = rear_model_col else: # 如果两者都没有有效值,设置默认值或抛出异常 logger.info("警告: 没有找到有效的轴承信息") brand = front_brand_col # 默认使用 front model = front_model_col # 默认使用 front _brand = df3[brand].iloc[0] _model = df3[model].iloc[0] logger.info(f"brand = {_brand}, model = {_model}") # unit_dynamo 发电机参数表 关键词generator stator elif 'generator' in str1 or 'stator' in str1: logger.info("generator or 'stator'") df_sql3 = f"SELECT * FROM unit_dynamo WHERE mill_type_code = '{mill_type_code}' " df3 = pd.read_sql(df_sql3, engine) if 'non' in str1: brand = 'non_drive_end_bearing' + '_brand' model = 'non_drive_end_bearing' + '_model' else: brand = 'drive_end_bearing' + '_brand' model = 'drive_end_bearing' + '_model' _brand = df3[brand].iloc[0] _model = df3[model].iloc[0] logger.info(f"brand = {_brand}, model = {_model}") # 齿轮箱区分行星轮/平行轮 和 轴承两个表 elif 'gearbox' in str1: logger.info("gearbox") # 根据mill_type_code从unit_gearbox表中获得gearbox_code df_sql3 = f"SELECT * FROM unit_gearbox WHERE mill_type_code = '{mill_type_code}' " df3 = pd.read_sql(df_sql3, engine) gearbox_code = df3['code'].iloc[0] logger.info(gearbox_code) # 如果是行星轮/平行轮 则从unit_gearbox_structure 表中取数据 if 'planet' in str1 or 'sun' in str1: logger.info("'planet' or 'sun' ") gearbox_structure = 1 if 'planet' in str1 else 2 planetary_gear_grade = 1 if 'first' in str1: planetary_gear_grade = 1 elif 'second' in str1: planetary_gear_grade = 2 elif 'third' in str1: planetary_gear_grade = 3 df_sql33 = f""" SELECT bearing_brand, bearing_model FROM unit_gearbox_structure WHERE gearbox_code = '{gearbox_code}' AND gearbox_structure = '{gearbox_structure}' AND planetary_gear_grade = '{planetary_gear_grade}' """ df33 = pd.read_sql(df_sql33, engine) if df33.empty: logger.info("unit_gearbox_structure没有该测点的参数") else: brand = 'bearing' + '_brand' model = 'bearing' + '_model' logger.info(brand) _brand = df33[brand].iloc[0] _model = df33[model].iloc[0] has_value = not pd.isna(df33[brand].iloc[0]) and not pd.isna(df33[model].iloc[0]) if has_value: logger.info(_brand) logger.info(_model) else: logger.info("警告: 没有找到有效的轴承信息") # 如果是齿轮箱轴承 则从unit_gearbox_bearings 表中取数据 elif 'shaft' in str1 or 'input' in str1: logger.info("'shaft'or'input'") # df_sql33 = f"SELECT * FROM unit_gearbox_bearings WHERE gearbox_code = '{gearbox_code}' " # df33 = pd.read_sql(df_sql33, Engine33) # 高速轴 低速中间轴 取bearing_rs/gs均可 parallel_wheel_grade = 1 if 'low_speed' in str1: parallel_wheel_grade = 1 elif 'low_speed_intermediate' in str1: parallel_wheel_grade = 2 elif 'high_speed' in str1: parallel_wheel_grade = 3 df_sql33 = f""" SELECT bearing_rs_brand, bearing_rs_model, bearing_gs_brand, bearing_gs_model FROM unit_gearbox_bearings WHERE gearbox_code = '{gearbox_code}' AND parallel_wheel_grade = '{parallel_wheel_grade}' """ df33 = pd.read_sql(df_sql33, engine) if not df33.empty: if 'high_speed' in str1 or 'low_speed_intermediate' in str1: rs_brand = 'bearing_rs' + '_brand' rs_model = 'bearing_rs' + '_model' gs_brand = 'bearing_gs' + '_brand' gs_model = 'bearing_gs' + '_model' rs_has_value = not pd.isna(df33[rs_brand].iloc[0]) and not pd.isna(df33[rs_model].iloc[0]) gs_has_value = not pd.isna(df33[gs_brand].iloc[0]) and not pd.isna(df33[gs_model].iloc[0]) if rs_has_value and gs_has_value: brand = rs_brand model = rs_model elif rs_has_value: brand = rs_brand model = rs_model elif gs_has_value: brand = gs_brand model = gs_model else: logger.info("警告: 没有找到有效的品牌信息") brand = rs_brand model = rs_model # 低速轴 取bearing_model elif 'low_speed' in str1: brand = 'bearing' + '_brand' model = 'bearing' + '_model' else: logger.info("警告: 没有找到有效的轴承信息") logger.info(f"brand = {brand}") _brand = df33[brand].iloc[0] _model = df33[model].iloc[0] logger.info(f"brand = {_brand}, model = {_model}") # 4、从表'unit_dict_brand_model'中通过'_brand'、'_model'查找部件的参数信息 df_sql4 = f"SELECT * FROM unit_dict_brand_model where manufacture = %s AND model_number = %s" params = [(_brand, _model)] df4 = pd.read_sql(df_sql4, engine, params=params) n_rolls = df4['rolls_number'].iloc[0] d_rolls = df4['rolls_diameter'].iloc[0] D_diameter = df4['circle_diameter'].iloc[0] theta_deg = df4['theta_deg'].iloc[0] result = { "type": 'bearing', "n_rolls": round(n_rolls, 2), "d_rolls": round(d_rolls, 2), "D_diameter": round(D_diameter, 2), "theta_deg": round(theta_deg, 2), } return result def calculate_bearing_frequencies(self, n, d, D, theta_deg, rpm): """ 计算轴承各部件特征频率 参数: n (int): 滚动体数量 d (float): 滚动体直径(单位:mm) D (float): 轴承节圆直径(滚动体中心圆直径,单位:mm) theta_deg (float): 接触角(单位:度) rpm (float): 转速(转/分钟) 返回: dict: 包含各特征频率的字典(单位:Hz) """ # 转换角度为弧度 theta = math.radians(theta_deg) # 转换直径单位为米(保持单位一致性,实际计算中比值抵消单位影响) # 注意:由于公式中使用的是比值,单位可以保持mm不需要转换 ratio = d / D # 基础频率计算(转/秒) f_r = rpm / 60.0 # 计算各特征频率 BPFI = n / 2 * (1 + ratio * math.cos(theta)) * f_r # 内圈故障频率 BPFO = n / 2 * (1 - ratio * math.cos(theta)) * f_r # 外圈故障频率 BSF = (D / (2 * d)) * (1 - (ratio ** 2) * (math.cos(theta) ** 2)) * f_r # 滚动体故障频率 FTF = 0.5 * (1 - ratio * math.cos(theta)) * f_r # 保持架故障频率 return { "BPFI": round(BPFI, 2), "BPFO": round(BPFO, 2), "BSF": round(BSF, 2), "FTF": round(FTF, 2), } #检查返回结果是否有nan 若有,则替换成none def replace_nan(self, obj): if isinstance(obj, dict): return {k: self.replace_nan(v) for k, v in obj.items()} elif isinstance(obj, list): return [self.replace_nan(item) for item in obj] elif isinstance(obj, float) and math.isnan(obj): return None return obj