123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- import ast
- import json
- import math
- import numpy as np
- import pandas as pd
- from scipy.signal import hilbert
- from app.config import dataBase
- from app.database import get_engine
- from app.logger import logger
- class CMSAnalyst:
- def __init__(self, fmin, fmax, table_name, ids):
- # 从数据库获取原始数据
- self.datas = self._get_by_id(table_name, ids)
- self.datas = [df[['mesure_data', 'time_stamp', 'sampling_frequency', 'wind_turbine_number', 'rotational_speed',
- 'mesure_point_name']] for df in self.datas]
- # 只输入一个id,返回一个[df],所以拿到self.data[0]
- self.data_filter = self.datas[0]
- # 取数据列
- self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'][0]))
- self.envelope_spectrum_m = self.data.shape[0]
- self.envelope_spectrum_n = 1
- # 设置分析参数
- self.fs = int(self.data_filter['sampling_frequency'].iloc[0])
- self.envelope_spectrum_t = np.arange(self.envelope_spectrum_m) / self.fs
- self.fmin = fmin if fmin is not None else 0
- self.fmax = fmax if fmax is not None else float('inf')
- self.envelope_spectrum_y = self._bandpass_filter(self.data)
- self.f, self.HP = self._calculate_envelope_spectrum(self.envelope_spectrum_y)
- # 设备信息
- self.wind_code = self.data_filter['wind_turbine_number'].iloc[0]
- self.rpm_Gen = self.data_filter['rotational_speed'].iloc[0]
- self.mesure_point_name = self.data_filter['mesure_point_name'].iloc[0]
- self.fn_Gen = round(self.rpm_Gen / 60, 2)
- self.CF = self.Characteristic_Frequency()
- self.CF = pd.DataFrame(self.CF, index=[0])
- n_rolls_m = self.CF['n_rolls'].iloc[0]
- d_rolls_m = self.CF['d_rolls'].iloc[0]
- D_diameter_m = self.CF['D_diameter'].iloc[0]
- theta_deg_m = self.CF['theta_deg'].iloc[0]
- self.bearing_frequencies = self.calculate_bearing_frequencies(n_rolls_m, d_rolls_m, D_diameter_m, theta_deg_m,
- self.rpm_Gen)
- self.bearing_frequencies = pd.DataFrame(self.bearing_frequencies, index=[0])
- # frequency_domain_analysis
- (
- self.frequency_domain_analysis_t,
- self.frequency_domain_analysis_f,
- self.frequency_domain_analysis_m,
- self.frequency_domain_analysis_mag,
- self.frequency_domain_analysis_Xrms,
- ) = self._calculate_spectrum(self.data)
- # time_domain_analysis
- self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs
- def _get_by_id(self, windcode, ids):
- df_res = []
- engine = get_engine(dataBase.DATA_DB)
- for id in ids:
- table_name = windcode + '_wave'
- lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} "
- df = pd.read_sql(lastday_df_sql, engine)
- df_res.append(df)
- return df_res
- # envelope_spectrum_analysis 包络谱分析
- def _bandpass_filter(self, data):
- """带通滤波"""
- m = data.shape[0]
- ni = round(self.fmin * self.envelope_spectrum_m / self.fs + 1)
- if self.fmax == float('inf'):
- na = m
- else:
- na = round(self.fmax * m / self.fs + 1)
- col = 1
- y = np.zeros((self.envelope_spectrum_m, col))
- z = np.fft.fft(data)
- a = np.zeros(self.envelope_spectrum_m, dtype=complex)
- a[ni:na] = z[ni:na]
- a[self.envelope_spectrum_m - na + 1: self.envelope_spectrum_m - ni + 1] = z[
- self.envelope_spectrum_m - na + 1: self.envelope_spectrum_m - ni + 1
- ]
- z = np.fft.ifft(a)
- y[:, 0] = np.real(z)
- return y
- def _calculate_envelope_spectrum(self, y):
- """计算包络谱"""
- m, n = y.shape
- HP = np.zeros((m, n))
- col = 1
- for p in range(col):
- H = np.abs(hilbert(y[:, p] - np.mean(y[:, p])))
- HP[:, p] = np.abs(np.fft.fft(H - np.mean(H))) * 2 / m
- f = np.fft.fftfreq(m, d=1 / self.fs)
- return f, HP
- def envelope_spectrum(self):
- """绘制包络谱"""
- # 只取正频率部分
- positive_frequencies = self.f[: self.envelope_spectrum_m // 2]
- positive_HP = self.HP[: self.envelope_spectrum_m // 2, 0]
- x = positive_frequencies
- y = positive_HP
- title = "包络谱"
- xaxis = "频率(Hz)"
- yaxis = "加速度(m/s^2)"
- # 加速度均方根值(有效值)
- Xrms = np.sqrt(np.mean(y ** 2))
- rpm_Gen = round(self.rpm_Gen, 2)
- BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2)
- BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2)
- BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2)
- FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2)
- fn_Gen = round(self.fn_Gen, 2)
- _3P_1X = round(self.fn_Gen, 2) * 3
- result = {
- "fs": self.fs,
- "Xrms": round(Xrms, 2),
- "x": list(x),
- "y": list(y),
- "title": title,
- "xaxis": xaxis,
- "yaxis": yaxis,
- "rpm_Gen": round(rpm_Gen, 2), # 转速r/min
- "BPFI": [{"Xaxis": BPFI_1X, "val": "1BPFI"}, {"Xaxis": BPFI_1X * 2, "val": "2BPFI"},
- {"Xaxis": BPFI_1X * 3, "val": "3BPFI"}, {"Xaxis": BPFI_1X * 4, "val": "4BPFI"},
- {"Xaxis": BPFI_1X * 5, "val": "5BPFI"}, {"Xaxis": BPFI_1X * 6, "val": "6BPFI"}],
- "BPFO": [{"Xaxis": BPFO_1X, "val": "1BPFO"}, {"Xaxis": BPFO_1X * 2, "val": "2BPFO"},
- {"Xaxis": BPFO_1X * 3, "val": "3BPFO"}, {"Xaxis": BPFO_1X * 4, "val": "4BPFO"},
- {"Xaxis": BPFO_1X * 5, "val": "5BPFO"}, {"Xaxis": BPFO_1X * 6, "val": "6BPFO"}],
- "BSF": [{"Xaxis": BSF_1X, "val": "1BSF"}, {"Xaxis": BSF_1X * 2, "val": "2BSF"},
- {"Xaxis": BSF_1X * 3, "val": "3BSF"}, {"Xaxis": BSF_1X * 4, "val": "4BSF"},
- {"Xaxis": BSF_1X * 5, "val": "5BSF"}, {"Xaxis": BSF_1X * 6, "val": "6BSF"}],
- "FTF": [{"Xaxis": FTF_1X, "val": "1FTF"}, {"Xaxis": FTF_1X * 2, "val": "2FTF"},
- {"Xaxis": FTF_1X * 3, "val": "3FTF"}, {"Xaxis": FTF_1X * 4, "val": "4FTF"},
- {"Xaxis": FTF_1X * 5, "val": "5FTF"}, {"Xaxis": FTF_1X * 6, "val": "6FTF"}],
- "fn_Gen": [{"Xaxis": fn_Gen, "val": "1X"}, {"Xaxis": fn_Gen * 2, "val": "2X"},
- {"Xaxis": fn_Gen * 3, "val": "3X"}, {"Xaxis": fn_Gen * 4, "val": "4X"},
- {"Xaxis": fn_Gen * 5, "val": "5X"}, {"Xaxis": fn_Gen * 6, "val": "6X"}],
- "B3P": _3P_1X,
- }
- # result = json.dumps(result, ensure_ascii=False)
- result = self.replace_nan(result)
- return result
- # frequency_domain_analysis 频谱分析
- def _calculate_spectrum(self, data):
- """计算频谱"""
- m = data.shape[0]
- n = 1
- t = np.arange(m) / self.fs
- mag = np.zeros((m, n))
- Xrms = np.sqrt(np.mean(data ** 2)) # 加速度均方根值(有效值)
- # col=1
- # for p in range(col):
- mag = np.abs(np.fft.fft(data - np.mean(data))) * 2 / m
- f = np.fft.fftfreq(m, d=1 / self.fs)
- return t, f, m, mag, Xrms
- def frequency_domain(self):
- """绘制频域波形参数"""
- # 只取正频率部分
- positive_frequencies = self.frequency_domain_analysis_f[
- : self.frequency_domain_analysis_m // 2
- ]
- positive_mag = self.frequency_domain_analysis_mag[
- : self.frequency_domain_analysis_m // 2
- ]
- x = positive_frequencies
- y = positive_mag
- title = "频域信号"
- xaxis = "频率(Hz)"
- yaxis = "加速度(m/s^2)"
- Xrms = self.frequency_domain_analysis_Xrms
- rpm_Gen = round(self.rpm_Gen, 2)
- BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2)
- BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2)
- BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2)
- FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2)
- fn_Gen = round(self.fn_Gen, 2)
- _3P_1X = round(self.fn_Gen, 2) * 3
- result = {
- "fs": self.fs,
- "Xrms": round(Xrms, 2),
- "x": list(x),
- "y": list(y),
- "title": title,
- "xaxis": xaxis,
- "yaxis": yaxis,
- "rpm_Gen": round(rpm_Gen, 2), # 转速r/min
- "BPFI": [{"Xaxis": BPFI_1X, "val": "1BPFI"}, {"Xaxis": BPFI_1X * 2, "val": "2BPFI"},
- {"Xaxis": BPFI_1X * 3, "val": "3BPFI"}, {"Xaxis": BPFI_1X * 4, "val": "4BPFI"},
- {"Xaxis": BPFI_1X * 5, "val": "5BPFI"}, {"Xaxis": BPFI_1X * 6, "val": "6BPFI"}],
- "BPFO": [{"Xaxis": BPFO_1X, "val": "1BPFO"}, {"Xaxis": BPFO_1X * 2, "val": "2BPFO"},
- {"Xaxis": BPFO_1X * 3, "val": "3BPFO"}, {"Xaxis": BPFO_1X * 4, "val": "4BPFO"},
- {"Xaxis": BPFO_1X * 5, "val": "5BPFO"}, {"Xaxis": BPFO_1X * 6, "val": "6BPFO"}],
- "BSF": [{"Xaxis": BSF_1X, "val": "1BSF"}, {"Xaxis": BSF_1X * 2, "val": "2BSF"},
- {"Xaxis": BSF_1X * 3, "val": "3BSF"}, {"Xaxis": BSF_1X * 4, "val": "4BSF"},
- {"Xaxis": BSF_1X * 5, "val": "5BSF"}, {"Xaxis": BSF_1X * 6, "val": "6BSF"}],
- "FTF": [{"Xaxis": FTF_1X, "val": "1FTF"}, {"Xaxis": FTF_1X * 2, "val": "2FTF"},
- {"Xaxis": FTF_1X * 3, "val": "3FTF"}, {"Xaxis": FTF_1X * 4, "val": "4FTF"},
- {"Xaxis": FTF_1X * 5, "val": "5FTF"}, {"Xaxis": FTF_1X * 6, "val": "6FTF"}],
- "fn_Gen": [{"Xaxis": fn_Gen, "val": "1X"}, {"Xaxis": fn_Gen * 2, "val": "2X"},
- {"Xaxis": fn_Gen * 3, "val": "3X"}, {"Xaxis": fn_Gen * 4, "val": "4X"},
- {"Xaxis": fn_Gen * 5, "val": "5X"}, {"Xaxis": fn_Gen * 6, "val": "6X"}],
- "B3P": _3P_1X,
- }
- result = self.replace_nan(result)
- result = json.dumps(result, ensure_ascii=False)
- return result
- # time_domain_analysis 时域分析
- def time_domain(self):
- """绘制时域波形参数"""
- x = self.time_domain_analysis_t
- y = self.data
- rpm_Gen = self.rpm_Gen
- title = "时间域信号"
- xaxis = "时间(s)"
- yaxis = "加速度(m/s^2)"
- # 图片右侧统计量
- # 平均值
- mean_value = np.mean(y)
- # 最大值
- max_value = np.max(y)
- # 最小值
- min_value = np.min(y)
- # 加速度均方根值(有效值)
- Xrms = np.sqrt(np.mean(y ** 2))
- # 峰值(单峰最大值) # 峰值
- Xp = (max_value - min_value) / 2
- # 峰峰值
- Xpp = max_value - min_value
- # 峰值指标
- Cf = Xp / Xrms
- # 波形指标
- Sf = Xrms / mean_value
- # 脉冲指标
- If = Xp / np.mean(np.abs(y))
- # 方根幅值
- Xr = np.mean(np.sqrt(np.abs(y))) ** 2
- # 裕度指标
- Ce = Xp / Xr
- # 计算每个数据点的绝对值减去均值后的三次方,并求和
- sum_abs_diff_cubed_3 = np.mean((np.abs(y) - mean_value) ** 3)
- # 计算偏度指标
- Cw = sum_abs_diff_cubed_3 / (Xrms ** 3)
- # 计算每个数据点的绝对值减去均值后的四次方,并求和
- sum_abs_diff_cubed_4 = np.mean((np.abs(y) - mean_value) ** 4)
- # 计算峭度指标
- Cq = sum_abs_diff_cubed_4 / (Xrms ** 4)
- result = {
- "x": list(x),
- "y": list(y),
- "title": title,
- "xaxis": xaxis,
- "yaxis": yaxis,
- "fs": self.fs,
- "Xrms": round(Xrms, 2), # 有效值
- "mean_value": round(mean_value, 2), # 均值
- "max_value": round(max_value, 2), # 最大值
- "min_value": round(min_value, 2), # 最小值
- "Xp": round(Xp, 2), # 峰值
- "Xpp": round(Xpp, 2), # 峰峰值
- "Cf": round(Cf, 2), # 峰值指标
- "Sf": round(Sf, 2), # 波形因子
- "If": round(If, 2), # 脉冲指标
- "Ce": round(Ce, 2), # 裕度指标
- "Cw": round(Cw, 2), # 偏度指标
- "Cq": round(Cq, 2), # 峭度指标
- "rpm_Gen": round(rpm_Gen, 2), # 转速r/min
- }
- result = self.replace_nan(result)
- result = json.dumps(result, ensure_ascii=False)
- return result
- # trend_analysis 趋势图
- def trend_analysis(self):
- all_stats = []
- # 定义积分函数
- def _integrate(data, dt):
- return np.cumsum(data) * dt
- # 定义计算统计指标的函数
- def _calculate_stats(data):
- mean_value = np.mean(data)
- max_value = np.max(data)
- min_value = np.min(data)
- Xrms = np.sqrt(np.mean(data ** 2)) # 加速度均方根值(有效值)
- Xp = (max_value - min_value) / 2 # 峰值(单峰最大值) # 峰值
- Cf = Xp / Xrms # 峰值指标
- Sf = Xrms / mean_value # 波形指标
- If = Xp / np.mean(np.abs(data)) # 脉冲指标
- Xr = np.mean(np.sqrt(np.abs(data))) ** 2 # 方根幅值
- Ce = Xp / Xr # 裕度指标
- # 计算每个数据点的绝对值减去均值后的三次方,并求和
- sum_abs_diff_cubed_3 = np.mean((np.abs(data) - mean_value) ** 3)
- # 计算偏度指标
- Cw = sum_abs_diff_cubed_3 / (Xrms ** 3)
- # 计算每个数据点的绝对值减去均值后的四次方,并求和
- sum_abs_diff_cubed_4 = np.mean((np.abs(data) - mean_value) ** 4)
- # 计算峭度指标
- Cq = sum_abs_diff_cubed_4 / (Xrms ** 4)
- #
- return {
- "fs": self.fs, # 采样频率
- "Mean": round(mean_value, 2), # 平均值
- "Max": round(max_value, 2), # 最大值
- "Min": round(min_value, 2), # 最小值
- "Xrms": round(Xrms, 2), # 有效值
- "Xp": round(Xp, 2), # 峰值
- "If": round(If, 2), # 脉冲指标
- "Cf": round(Cf, 2), # 峰值指标
- "Sf": round(Sf, 2), # 波形指标
- "Ce": round(Ce, 2), # 裕度指标
- "Cw": round(Cw, 2), # 偏度指标
- "Cq": round(Cq, 2), # 峭度指标
- # velocity_rms :速度有效值
- # time_stamp:时间戳
- }
- for data in self.datas:
- fs = int(self.data_filter['sampling_frequency'].iloc[0])
- dt = 1 / fs
- time_stamp = data['time_stamp'][0]
- data = np.array(ast.literal_eval(data['mesure_data'][0]))
- velocity = _integrate(data, dt)
- velocity_rms = np.sqrt(np.mean(velocity ** 2))
- stats = _calculate_stats(data)
- # 速度有效值
- stats["velocity_rms"] = round(velocity_rms, 2)
- # 时间戳
- stats["time_stamp"] = str(time_stamp)
- all_stats.append(stats)
- all_stats = [self.replace_nan(stats) for stats in all_stats]
- all_stats = json.dumps(all_stats, ensure_ascii=False)
- return all_stats
- def Characteristic_Frequency(self):
- """提取轴承、齿轮等参数"""
- # 1、从测点名称中提取部件名称(计算特征频率的部件)
- str1 = self.mesure_point_name
- # 2、连接233的数据库'energy_show',从表'wind_engine_group'查找风机编号'engine_code'对应的机型编号'mill_type_code'
- engine_code = self.wind_code
- engine = get_engine(dataBase.PLATFORM_DB)
- df_sql2 = f"SELECT * FROM wind_engine_group WHERE engine_code = '{engine_code}'"
- df2 = pd.read_sql(df_sql2, engine)
- mill_type_code = df2['mill_type_code'].iloc[0]
- # 3、从相关的表中通过机型编号'mill_type_code'或者齿轮箱编号gearbox_code查找部件'brand'、'model'的参数信息
- # unit_bearings主轴承参数表 关键词"main_bearing"
- if 'main_bearing' in str1:
- logger.info("main_bearing")
- # df_sql3 = f"SELECT * FROM {'unit_bearings'} where mill_type_code = {'mill_type_code'} "
- df_sql3 = f"SELECT * FROM unit_bearings WHERE mill_type_code = '{mill_type_code}' "
- df3 = pd.read_sql(df_sql3, engine)
- if df3.empty:
- logger.info("警告: 没有找到有效的机型信息")
- if 'front' in str1:
- brand = 'front_bearing' + '_brand'
- model = 'front_bearing' + '_model'
- front_has_value = not pd.isna(df3[brand].iloc[0]) and not pd.isna(df3[model].iloc[0])
- if not front_has_value:
- logger.info("警告: 没有找到有效的品牌信息")
- elif 'rear' in str1:
- brand = 'rear_bearing' + '_brand'
- model = 'rear_bearing' + '_model'
- end_has_value = not pd.isna(df3[brand].iloc[0]) and not pd.isna(df3[model].iloc[0])
- if not end_has_value:
- logger.info("警告: 没有找到有效的品牌信息")
- else:
- # 当没有指定 front 或 end 时,自动选择有值的轴承信息
- front_brand_col = 'front_bearing_brand'
- front_model_col = 'front_bearing_model'
- rear_brand_col = 'rear_bearing_brand'
- rear_model_col = 'rear_bearing_model'
- # 检查 front_bearing 是否有值
- front_has_value = not pd.isna(df3[front_brand_col].iloc[0]) and not pd.isna(
- df3[front_model_col].iloc[0])
- # 检查 end_bearing 是否有值
- end_has_value = not pd.isna(df3[rear_brand_col].iloc[0]) and not pd.isna(df3[rear_model_col].iloc[0])
- # 根据检查结果选择合适的列
- if front_has_value and end_has_value:
- # 如果两者都有值,默认选择 front
- brand = front_brand_col
- model = front_model_col
- elif front_has_value:
- brand = front_brand_col
- model = front_model_col
- elif end_has_value:
- brand = rear_brand_col
- model = rear_model_col
- else:
- # 如果两者都没有有效值,设置默认值或抛出异常
- logger.info("警告: 没有找到有效的轴承信息")
- brand = front_brand_col # 默认使用 front
- model = front_model_col # 默认使用 front
- _brand = df3[brand].iloc[0]
- _model = df3[model].iloc[0]
- logger.info(f"brand = {_brand}, model = {_model}")
- # unit_dynamo 发电机参数表 关键词generator stator
- elif 'generator' in str1 or 'stator' in str1:
- logger.info("generator or 'stator'")
- df_sql3 = f"SELECT * FROM unit_dynamo WHERE mill_type_code = '{mill_type_code}' "
- df3 = pd.read_sql(df_sql3, engine)
- if 'non' in str1:
- brand = 'non_drive_end_bearing' + '_brand'
- model = 'non_drive_end_bearing' + '_model'
- else:
- brand = 'drive_end_bearing' + '_brand'
- model = 'drive_end_bearing' + '_model'
- _brand = df3[brand].iloc[0]
- _model = df3[model].iloc[0]
- logger.info(f"brand = {_brand}, model = {_model}")
- # 齿轮箱区分行星轮/平行轮 和 轴承两个表
- elif 'gearbox' in str1:
- logger.info("gearbox")
- # 根据mill_type_code从unit_gearbox表中获得gearbox_code
- df_sql3 = f"SELECT * FROM unit_gearbox WHERE mill_type_code = '{mill_type_code}' "
- df3 = pd.read_sql(df_sql3, engine)
- gearbox_code = df3['code'].iloc[0]
- logger.info(gearbox_code)
- # 如果是行星轮/平行轮 则从unit_gearbox_structure 表中取数据
- if 'planet' in str1 or 'sun' in str1:
- logger.info("'planet' or 'sun' ")
- gearbox_structure = 1 if 'planet' in str1 else 2
- planetary_gear_grade = 1
- if 'first' in str1:
- planetary_gear_grade = 1
- elif 'second' in str1:
- planetary_gear_grade = 2
- elif 'third' in str1:
- planetary_gear_grade = 3
- df_sql33 = f"""
- SELECT bearing_brand, bearing_model
- FROM unit_gearbox_structure
- WHERE gearbox_code = '{gearbox_code}'
- AND gearbox_structure = '{gearbox_structure}'
- AND planetary_gear_grade = '{planetary_gear_grade}'
- """
- df33 = pd.read_sql(df_sql33, engine)
- if df33.empty:
- logger.info("unit_gearbox_structure没有该测点的参数")
- else:
- brand = 'bearing' + '_brand'
- model = 'bearing' + '_model'
- logger.info(brand)
- _brand = df33[brand].iloc[0]
- _model = df33[model].iloc[0]
- has_value = not pd.isna(df33[brand].iloc[0]) and not pd.isna(df33[model].iloc[0])
- if has_value:
- logger.info(_brand)
- logger.info(_model)
- else:
- logger.info("警告: 没有找到有效的轴承信息")
- # 如果是齿轮箱轴承 则从unit_gearbox_bearings 表中取数据
- elif 'shaft' in str1 or 'input' in str1:
- logger.info("'shaft'or'input'")
- # df_sql33 = f"SELECT * FROM unit_gearbox_bearings WHERE gearbox_code = '{gearbox_code}' "
- # df33 = pd.read_sql(df_sql33, Engine33)
- # 高速轴 低速中间轴 取bearing_rs/gs均可
- parallel_wheel_grade = 1
- if 'low_speed' in str1:
- parallel_wheel_grade = 1
- elif 'low_speed_intermediate' in str1:
- parallel_wheel_grade = 2
- elif 'high_speed' in str1:
- parallel_wheel_grade = 3
- df_sql33 = f"""
- SELECT bearing_rs_brand, bearing_rs_model, bearing_gs_brand, bearing_gs_model
- FROM unit_gearbox_bearings
- WHERE gearbox_code = '{gearbox_code}'
- AND parallel_wheel_grade = '{parallel_wheel_grade}'
- """
- df33 = pd.read_sql(df_sql33, engine)
- if not df33.empty:
- if 'high_speed' in str1 or 'low_speed_intermediate' in str1:
- rs_brand = 'bearing_rs' + '_brand'
- rs_model = 'bearing_rs' + '_model'
- gs_brand = 'bearing_gs' + '_brand'
- gs_model = 'bearing_gs' + '_model'
- rs_has_value = not pd.isna(df33[rs_brand].iloc[0]) and not pd.isna(df33[rs_model].iloc[0])
- gs_has_value = not pd.isna(df33[gs_brand].iloc[0]) and not pd.isna(df33[gs_model].iloc[0])
- if rs_has_value and gs_has_value:
- brand = rs_brand
- model = rs_model
- elif rs_has_value:
- brand = rs_brand
- model = rs_model
- elif gs_has_value:
- brand = gs_brand
- model = gs_model
- else:
- logger.info("警告: 没有找到有效的品牌信息")
- brand = rs_brand
- model = rs_model
- # 低速轴 取bearing_model
- elif 'low_speed' in str1:
- brand = 'bearing' + '_brand'
- model = 'bearing' + '_model'
- else:
- logger.info("警告: 没有找到有效的轴承信息")
- logger.info(f"brand = {brand}")
- _brand = df33[brand].iloc[0]
- _model = df33[model].iloc[0]
- logger.info(f"brand = {_brand}, model = {_model}")
- # 4、从表'unit_dict_brand_model'中通过'_brand'、'_model'查找部件的参数信息
- df_sql4 = f"SELECT * FROM unit_dict_brand_model where manufacture = %s AND model_number = %s"
- params = [(_brand, _model)]
- df4 = pd.read_sql(df_sql4, engine, params=params)
- n_rolls = df4['rolls_number'].iloc[0]
- d_rolls = df4['rolls_diameter'].iloc[0]
- D_diameter = df4['circle_diameter'].iloc[0]
- theta_deg = df4['theta_deg'].iloc[0]
- result = {
- "type": 'bearing',
- "n_rolls": round(n_rolls, 2),
- "d_rolls": round(d_rolls, 2),
- "D_diameter": round(D_diameter, 2),
- "theta_deg": round(theta_deg, 2),
- }
- return result
- def calculate_bearing_frequencies(self, n, d, D, theta_deg, rpm):
- """
- 计算轴承各部件特征频率
- 参数:
- n (int): 滚动体数量
- d (float): 滚动体直径(单位:mm)
- D (float): 轴承节圆直径(滚动体中心圆直径,单位:mm)
- theta_deg (float): 接触角(单位:度)
- rpm (float): 转速(转/分钟)
- 返回:
- dict: 包含各特征频率的字典(单位:Hz)
- """
- # 转换角度为弧度
- theta = math.radians(theta_deg)
- # 转换直径单位为米(保持单位一致性,实际计算中比值抵消单位影响)
- # 注意:由于公式中使用的是比值,单位可以保持mm不需要转换
- ratio = d / D
- # 基础频率计算(转/秒)
- f_r = rpm / 60.0
- # 计算各特征频率
- BPFI = n / 2 * (1 + ratio * math.cos(theta)) * f_r # 内圈故障频率
- BPFO = n / 2 * (1 - ratio * math.cos(theta)) * f_r # 外圈故障频率
- BSF = (D / (2 * d)) * (1 - (ratio ** 2) * (math.cos(theta) ** 2)) * f_r # 滚动体故障频率
- FTF = 0.5 * (1 - ratio * math.cos(theta)) * f_r # 保持架故障频率
- return {
- "BPFI": round(BPFI, 2),
- "BPFO": round(BPFO, 2),
- "BSF": round(BSF, 2),
- "FTF": round(FTF, 2),
- }
- #检查返回结果是否有nan 若有,则替换成none
- def replace_nan(self, obj):
- if isinstance(obj, dict):
- return {k: self.replace_nan(v) for k, v in obj.items()}
- elif isinstance(obj, list):
- return [self.replace_nan(item) for item in obj]
- elif isinstance(obj, float) and math.isnan(obj):
- return None
- return obj
|