|
- import numpy as np
- import pandas as pd
- from DiagnosisLib import DiagnosisLib
- from typing import List, Dict, Optional
- import json
- from sqlalchemy import create_engine, text
- import math
- class Auto_diag:
- def __init__(self, ids: List[int], wind_code: str, engine_code: str):
- """
- 初始化诊断类
- 参数:
- ids: 数据ID列表
- wind_code: 风场编号
- engine_code: 风机编号
- """
- self.ids = ids
- self.wind_code = wind_code
- self.engine_code = engine_code
- self.myDiagnosisLib = DiagnosisLib()
- self.diagnosis_results = [] # 存储所有诊断结果
- self.bearing_frequencies = {} # 存储特征频率
- self.teeth_count={} #存储齿数
- # 从数据库加载振动数据
- self.load_vibration_data()
- # 计算转速传动比
- self.calculate_speed_ratio()
- # 提取轴承参数、齿轮参数 计算特征频率
- self.Characteristic_Frequency()
- # 计算频谱
- self.calculate_spectrums()
-
- def load_vibration_data(self):
- """从数据库中获取振动加速度数据"""
- try:
- engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
- #engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')
- with engine.connect() as conn:
- table_name = self.wind_code + '_wave'
- # 将所有id拼接到SQL语句中,一次性查询所有数据
- ids_str = ','.join(map(str, self.ids))
- sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str})"
- df = pd.read_sql(sql, conn)
- self.df_data = df
-
- # 提取公共参数
- self.fspd = self.df_data['rotational_speed'].astype(float).values # 转速
- self.fs_acc = self.df_data['sampling_frequency'].astype(float).values # 采样频率
- self.mesure_point_names = self.df_data['mesure_point_name'].values
-
- # 处理加速度数据
- self.Acc = []
- self.t_acc = []
- for i, row in self.df_data.iterrows():
- measure_data_str = row['mesure_data']
- try:
- measure_data_list = json.loads(measure_data_str)
- acc_data = np.array(measure_data_list, dtype=np.float32)
- except json.JSONDecodeError:
- stripped_str = measure_data_str.strip('[]')
- measure_data_list = [float(x.strip()) for x in stripped_str.split(',') if x.strip()]
- acc_data = np.array(measure_data_list, dtype=np.float32)
-
- self.Acc.append(acc_data)
- # 使用转换后的数值类型计算时间轴
- self.t_acc.append(np.arange(0, len(acc_data)/self.fs_acc[i], 1/self.fs_acc[i]))
- except Exception as e:
- raise Exception(f"Failed to load vibration data: {str(e)}")
- finally:
- engine.dispose()
- def calculate_spectrums(self):
- """计算各种频谱"""
- print("计算频谱")
- self.fAcc = []
- self.AccSpec = []
- self.VelSpec = []
- self.gE_10 = []
- self.fAcc_gE = []
- self.gESpec_10 = []
- if self.fs_acc[0] < 12800:
- print("当前采集频率不适合进行诊断分析")
- raise ValueError("当前采集频率不适合进行诊断分析")
-
- for i, acc in enumerate(self.Acc):
- # 获取当前 id 对应的采样频率
- fs_acc = self.fs_acc[i]
- fmax_acc =fs_acc/2.56
- print(fs_acc)
- print(acc)
- lowcut = 500 #最低值是固定的 /51.2
- highcut =fs_acc /2.56
- # 计算振动加速度频谱
- [f, spec] = self.myDiagnosisLib.Spectrum(acc, fs_acc)
- self.fAcc.append(f)
- self.AccSpec.append(spec)
- # 计算振动速度频谱
- fCut = 0.1
- vel_spec = self.myDiagnosisLib.FreqIntegration(spec, fmax_acc, fCut)
- self.VelSpec.append(vel_spec)
- # 计算加速度包络频谱
- order = 6
- acc_band = self.myDiagnosisLib.BandPassButter(acc, lowcut, highcut, fs_acc, order)
- fmax_gE_10 =1000
- DownSampleRate =int(fs_acc /2.56/fmax_gE_10 ) #降采样
- gE = self.myDiagnosisLib.EnvLopInTime(acc_band, DownSampleRate, Method="SKF")
- self.gE_10.append(gE)
- fs_gE_10 = 2.56 * fmax_gE_10
- [f, spec] = self.myDiagnosisLib.Spectrum(gE, fs_gE_10)
- self.fAcc_gE.append(f)
- self.gESpec_10.append(spec)
-
- def Unbalance_diag(self):
- """不平衡诊断"""
- print("不平衡诊断")
- status_codes = []
-
- for i in range(len(self.ids)):
- ShaftSpd = self.fspd[i]*self.speed_ratio
- if ShaftSpd ==0:
- status_codes.append(-1)
- continue
- try:
- fs_acc = self.fs_acc[i]
- fmax_vel = 1000
- # fmax_vel =max(1000,fs_acc/2.56)
- print('ShaftSpd',ShaftSpd)
- fTarget = ShaftSpd /60
- print('fTarget',fTarget)
- numHarmonics = 4
-
- fRange = 0.05
- fRangeMode = "Per"
- Detection = "RMS"
- print(self.VelSpec[i])
- try:
- Vel_RunningSpd_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.VelSpec[i], fmax_vel, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- # print(f"计算 Vel_RunningSpd_HM 时出错: {e}")
- Vel_RunningSpd_HM = [0] * (numHarmonics + 1)
- print('Vel_RunningSpd_HM',Vel_RunningSpd_HM)
- Vel_alert_mmsRMS = 4 # 速度报警阈值
- Vel_danger_mmsRMS = 7 # 速度危险阈值
- defectType = 'Unbalance'
- try:
- detection, severity = self.myDiagnosisLib.RunningSpd_Fault_Diag(
- Vel_RunningSpd_HM, Vel_alert_mmsRMS, Vel_danger_mmsRMS, defectType
- )
- except Exception as e:
- detection =False
- severity =0
- print("severity",severity)
- if not detection:
- status_code = 0
- elif severity < 6:
- status_code = 1
- else:
- status_code = 2
- except Exception as e:
- status_code =0
- status_codes.append(status_code)
- print("status_code",status_code)
- # 返回结果和统计信息
- return {
- "type": "Unbalance",
- "results": status_codes,
- "statistics": {
- "max_status": max(status_codes),
- "count_0": status_codes.count(0),
- "count_1": status_codes.count(1),
- "count_2": status_codes.count(2)
- }
- }
-
- def Misalignment_diag(self):
- """不对中诊断"""
- status_codes = []
- print("不对中诊断")
- for i in range(len(self.ids)):
- ShaftSpd = self.fspd[i]*self.speed_ratio
- if ShaftSpd ==0:
- status_codes.append(-1)
- continue
- try:
- fs_acc = self.fs_acc[i]
- fmax_vel = 1000
- # fmax_vel =max(1000,fs_acc/2.56)
- fTarget = ShaftSpd /60
- numHarmonics = 4
- fRange = 0.05
- fRangeMode = "Per"
- Detection = "RMS"
- print('ShaftSpd',ShaftSpd)
- print('fTarget',fTarget)
- try:
- Vel_RunningSpd_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.VelSpec[i], fmax_vel, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- # print(f"计算 Vel_RunningSpd_HM 时出错: {e}")
- Vel_RunningSpd_HM = [0] * (numHarmonics + 1)
- print('Vel_RunningSpd_HM',Vel_RunningSpd_HM)
- Vel_alert_mmsRMS = 4 # 速度报警阈值
- Vel_danger_mmsRMS = 7 # 速度危险阈值
- try:
- defectType = 'Misalignment'
- detection, severity = self.myDiagnosisLib.RunningSpd_Fault_Diag(
- Vel_RunningSpd_HM, Vel_alert_mmsRMS, Vel_danger_mmsRMS, defectType
- )
- except Exception as e:
- detection = False
- severity =0
- print('severity',severity)
- if not detection:
- status_code = 0
- elif severity < 6:
- status_code = 1
- else:
- status_code = 2
- except Exception as e:
- status_code =0
- status_codes.append(status_code)
-
- return {
- "type": "Misalignment",
- "results": status_codes,
- "statistics": {
- "max_status": max(status_codes),
- "count_0": status_codes.count(0),
- "count_1": status_codes.count(1),
- "count_2": status_codes.count(2)
- }
- }
-
- def Looseness_diag(self):
- """松动诊断"""
- status_codes = []
- print("松动诊断")
- for i in range(len(self.ids)):
- ShaftSpd = self.fspd[i]*self.speed_ratio
- if ShaftSpd ==0:
- status_codes.append(-1)
- continue
- try:
- fs_acc = self.fs_acc[i]
- fmax_vel = 1000
- # fmax_vel =max(1000,fs_acc/2.56)
- fTarget = ShaftSpd /60
- numHarmonics = 4
- fRange = 0.05
- fRangeMode = "Per"
- Detection = "RMS"
- print('ShaftSpd',ShaftSpd)
- print('fTarget',fTarget)
- try:
- Vel_RunningSpd_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.VelSpec[i], fmax_vel, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- # print(f"计算 Vel_RunningSpd_HM 时出错: {e}")
- Vel_RunningSpd_HM = [0] * (numHarmonics + 1)
- print('Vel_RunningSpd_HM',Vel_RunningSpd_HM)
- Vel_alert_mmsRMS = 4 # 速度报警阈值
- Vel_danger_mmsRMS = 7 # 速度危险阈值
- try:
- defectType = 'Looseness'
- detection, severity = self.myDiagnosisLib.RunningSpd_Fault_Diag(
- Vel_RunningSpd_HM, Vel_alert_mmsRMS, Vel_danger_mmsRMS, defectType
- )
- except Exception as e:
- detection =False
- severity =0
- print('severity',severity)
- if not detection:
- status_code = 0
- elif severity < 6:
- status_code = 1
- else:
- status_code = 2
- except Exception as e:
- status_code =0
- status_codes.append(status_code)
-
- return {
- "type": "Looseness",
- "results": status_codes,
- "statistics": {
- "max_status": max(status_codes),
- "count_0": status_codes.count(0),
- "count_1": status_codes.count(1),
- "count_2": status_codes.count(2)
- }
- }
-
-
- def Bearing_diag(self):
- """轴承诊断"""
- print("轴承诊断")
- # Vel_RunningSpd_HM,生成转频的1X-5X的倍频值
- # VelSpec,振动速度频谱
- # fmax_vel,最大分析频率
- # fTarget,目标频率为转频
- Vel_alert_mmsRMS = 4 # 速度总值预警阈值 (mm/s 有效值)
- Vel_danger_mmsRMS = 7 # 速度总值报警阈值 (mm/s 有效值)
- gE_alert = 3 # 包络总值预警阈值(gE)
- gE_danger = 10 # 包络总值报警阈值(gE)
- results = {
- "BPFO": {"status_codes": []},
- "BPFI": {"status_codes": []},
- "BSF": {"status_codes": []},
- "FTF": {"status_codes": []}
- }
- # 获取第一个id的特征频率(所有id相同)
- id = self.ids[0]
- bearing_freq = self.bearing_frequencies.get(id, {})
- BPFO = bearing_freq.get("BPFO", 0)
- BPFI = bearing_freq.get("BPFI", 0)
- BSF = bearing_freq.get("BSF", 0)
- FTF = bearing_freq.get("FTF", 0)
- for i in range(len(self.ids)):
- ShaftSpd = self.fspd[i] * self.speed_ratio
- if ShaftSpd ==0:
- results["BPFO"]["status_codes"].append(-1)
- results["BPFI"]["status_codes"].append(-1)
- results["BSF"]["status_codes"].append(-1)
- results["FTF"]["status_codes"].append(-1)
- continue
- fs_acc = self.fs_acc[i]
- # fmax_vel =1000
- #fmax_vel如果直接取1000 还是会出现目标范围落不到搜寻范围的现象
- fmax_vel = max(1000,fs_acc/2.56)
- fRange = 0.05
- fmax_gE_10 = 1000
- try:
- """外圈故障诊断"""
- print("外圈")
- fbr_BPFO = BPFO
- fTarget = fbr_BPFO #在计算轴承特征频率的函数中已经乘过转速
- if self.wind_code == 'WOF091200030':
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <1:
- fmax_gE_10 =10
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
- else:
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <10:
- fmax_gE_10 =100
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
- numHarmonics = 4
-
- fRangeMode = "Per"
- Detection = "RMS"
- print('BPFO',BPFO)
- print('ShaftSpd',ShaftSpd)
- print('speed_ratio',self.speed_ratio)
- print('fTarget',fTarget)
- print('VelSpec',self.VelSpec[i])
- print('gESpec_10',self.gESpec_10[i])
- try:
- Vel_RunningSpd_HM = self.myDiagnosisLib.FixedFreqFeature(self.VelSpec[i], fmax_vel, fTarget, fRange, \
- fRangeMode, Detection, numHarmonics)
- except Exception as e:
- # print(f"计算 Vel_RunningSpd_HM 时出错: {e}")
- Vel_RunningSpd_HM = [0] * (numHarmonics + 1)
- print('Vel_RunningSpd_HM\n',Vel_RunningSpd_HM)
- # 速度频谱特征
- try:
- Vel_Bearing_Defect_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.VelSpec[i], fmax_vel, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- #print(f"计算 Vel_Bearing_Defect_HM 时出错: {e}")
- Vel_Bearing_Defect_HM = [0] * (numHarmonics + 1)
- print('Vel_Bearing_Defect_HM\n',Vel_Bearing_Defect_HM)
- try:
- # 包络频谱特征
- Detection = "Peak"
- gE_Bearing_Defect_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.gESpec_10[i], fmax_gE_10, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- #print(f"计算 gE_Bearing_Defect_HM 时出错: {e}")
- gE_Bearing_Defect_HM = [0] * (numHarmonics + 1)
- print('gE_Bearing_Defect_HM\n',gE_Bearing_Defect_HM)
- try:
- # 诊断外圈故障
- BPFOdetection, BPFOseverity = self.myDiagnosisLib.Bearing_Fault_Diag(
- gE_Bearing_Defect_HM, Vel_Bearing_Defect_HM, Vel_RunningSpd_HM, gE_alert, gE_danger,
- Vel_alert_mmsRMS, Vel_danger_mmsRMS, 'BPFO'
- )
- except Exception as e:
- BPFOdetection = False
- BPFOseverity = 0
- print("BPFOseverity",BPFOseverity)
- if not BPFOdetection:
- BPFOStatusCode = 0
- elif BPFOseverity < 6:
- BPFOStatusCode = 1
- else:
- BPFOStatusCode = 2
- except Exception as e:
- BPFOStatusCode = 0
- results["BPFO"]["status_codes"].append(BPFOStatusCode)
- try:
- """内圈故障诊断"""
- print("内圈")
- fbr_BPFI = BPFI
- #fbr_BPFI = ShaftSpd * BPFI
- fTarget = fbr_BPFI
- if self.wind_code == 'WOF091200030':
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <1:
- fmax_gE_10 =10
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
- else:
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <10:
- fmax_gE_10 =100
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
-
- numHarmonics = 4
- fRangeMode = "Per"
- Detection = "RMS"
- print('BPFI',BPFI)
- print('ShaftSpd',ShaftSpd)
- print('fTarget',fTarget)
- try:
- Vel_RunningSpd_HM = self.myDiagnosisLib.FixedFreqFeature(self.VelSpec[i], fmax_vel, fTarget, fRange, \
- fRangeMode, Detection, numHarmonics)
- except Exception as e:
- #print(f"计算 Vel_RunningSpd_HM 时出错: {e}")
- Vel_RunningSpd_HM = [0] * (numHarmonics + 1)
- print('Vel_RunningSpd_HM\n',Vel_RunningSpd_HM)
-
- # 速度频谱特征
- try:
- Vel_Bearing_Defect_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.VelSpec[i], fmax_vel, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- # print(f"计算 Vel_Bearing_Defect_HM 时出错: {e}")
- Vel_Bearing_Defect_HM = [0] * (numHarmonics + 1)
- print('Vel_Bearing_Defect_HM\n',Vel_Bearing_Defect_HM)
- # 包络频谱特征
- try:
- Detection = "Peak"
- gE_Bearing_Defect_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.gESpec_10[i], fmax_gE_10, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- #print(f"计算 gE_Bearing_Defect_HM 时出错: {e}")
- gE_Bearing_Defect_HM = [0] * (numHarmonics + 1)
- print('gE_Bearing_Defect_HM\n',gE_Bearing_Defect_HM)
- try:
- # 诊断内圈故障
- BPFIdetection, BPFIseverity = self.myDiagnosisLib.Bearing_Fault_Diag(
- gE_Bearing_Defect_HM, Vel_Bearing_Defect_HM, Vel_RunningSpd_HM, gE_alert, gE_danger,
- Vel_alert_mmsRMS, Vel_danger_mmsRMS, 'BPFI'
- )
- except Exception as e:
- BPFIdetection =False
- BPFIseverity =0
- if not BPFIdetection:
- BPFIStatusCode = 0
- elif BPFIseverity < 6:
- BPFIStatusCode = 1
- else:
- BPFIStatusCode = 2
- except Exception as e:
- #print(f"处理内圈故障时发生未捕获的异常: {e}")
- BPFIStatusCode = 0
- results["BPFI"]["status_codes"].append(BPFIStatusCode)
- try:
- """滚动体故障诊断"""
- print("滚动体")
- fbr_BSF = BSF
-
- if self.wind_code == 'WOF091200030':
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <1:
- fmax_gE_10 =10
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
- else:
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <10:
- fmax_gE_10 =100
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
-
- numHarmonics = 4
- fTarget = fbr_BSF
- fRangeMode = "Per"
- Detection = "RMS"
- print('BSF',BSF)
- print('ShaftSpd',ShaftSpd)
- print('fTarget',fTarget)
- try:
- Vel_RunningSpd_HM = self.myDiagnosisLib.FixedFreqFeature(self.VelSpec[i], fmax_vel, fTarget, fRange, \
- fRangeMode, Detection, numHarmonics)
- except Exception as e:
- # print(f"计算 Vel_RunningSpd_HM 时出错: {e}")
- Vel_RunningSpd_HM = [0] * (numHarmonics + 1)
- print('Vel_RunningSpd_HM\n',Vel_RunningSpd_HM)
- try:
- # 速度频谱特征
- Vel_Bearing_Defect_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.VelSpec[i], fmax_vel, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- #print(f"计算 Vel_Bearing_Defect_HM 时出错: {e}")
- Vel_Bearing_Defect_HM = [0] * (numHarmonics + 1)
- print('Vel_Bearing_Defect_HM\n',Vel_Bearing_Defect_HM)
- try:
- # 包络频谱特征S
- Detection = "Peak"
- gE_Bearing_Defect_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.gESpec_10[i], fmax_gE_10, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- #print(f"计算 gE_Bearing_Defect_HM 时出错: {e}")
- gE_Bearing_Defect_HM = [0] * (numHarmonics + 1)
- print('gE_Bearing_Defect_HM\n',gE_Bearing_Defect_HM)
- try:
- # 诊断滚动体故障
- BSFdetection, BSFseverity = self.myDiagnosisLib.Bearing_Fault_Diag(
- gE_Bearing_Defect_HM, Vel_Bearing_Defect_HM, Vel_RunningSpd_HM, gE_alert, gE_danger,
- Vel_alert_mmsRMS, Vel_danger_mmsRMS, 'BSF'
- )
- except Exception as e:
- BSFdetection=False
- BSFseverity=0
- if not BSFdetection:
- BSFStatusCode = 0
- elif BSFseverity < 6:
- BSFStatusCode = 1
- else:
- BSFStatusCode = 2
- except Exception as e:
- #print(f"处理滚动体故障时发生未捕获的异常: {e}")
- BSFStatusCode = 0
- results["BSF"]["status_codes"].append(BSFStatusCode)
- try:
- """保持架故障诊断"""
- print("保持架")
- fbr_FTF = FTF
- fTarget = fbr_FTF
- if self.wind_code == 'WOF091200030':
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <1:
- fmax_gE_10 =10
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
- else:
- if fTarget >100:
- fmax_gE_10 =10000
- # elif fTarget <1:
- # fmax_gE_10 =0.001
- # fmax_vel=min(1000,fs_acc/2.56)
- elif fTarget <10:
- fmax_gE_10 =10
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
-
- numHarmonics = 4
- fRangeMode = "Per"
- Detection = "RMS"
- print('FTF',FTF)
- print('ShaftSpd',ShaftSpd)
- print('fTarget',fTarget)
- try:
- Vel_RunningSpd_HM = self.myDiagnosisLib.FixedFreqFeature(self.VelSpec[i], fmax_vel, fTarget, fRange, \
- fRangeMode, Detection, numHarmonics)
- except Exception as e:
- #print(f"计算 Vel_RunningSpd_HM 时出错: {e}")
- Vel_RunningSpd_HM = [0] * (numHarmonics + 1)
- print('Vel_RunningSpd_HM\n',Vel_RunningSpd_HM)
-
- try:
- # 速度频谱特征
- Vel_Bearing_Defect_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.VelSpec[i], fmax_vel, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- #print(f"计算 Vel_Bearing_Defect_HM 时出错: {e}")
- Vel_Bearing_Defect_HM = [0] * (numHarmonics + 1)
- print('Vel_Bearing_Defect_HM\n',Vel_Bearing_Defect_HM)
- try:
- # 包络频谱特征
- Detection = "Peak"
- gE_Bearing_Defect_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.gESpec_10[i], fmax_gE_10, fTarget, fRange, fRangeMode, Detection, numHarmonics
- )
- except Exception as e:
- #print(f"计算 gE_Bearing_Defect_HM 时出错: {e}")
- gE_Bearing_Defect_HM = [0] * (numHarmonics + 1)
- print('gE_Bearing_Defect_HM\n',gE_Bearing_Defect_HM)
- try:
- # 诊断保持架故障
- FTFdetection, FTFseverity = self.myDiagnosisLib.Bearing_Fault_Diag(
- gE_Bearing_Defect_HM, Vel_Bearing_Defect_HM, Vel_RunningSpd_HM, gE_alert, gE_danger,
- Vel_alert_mmsRMS, Vel_danger_mmsRMS, 'FTF'
- )
- except Exception as e:
- FTFdetection =False
- FTFseverity =0
- if not FTFdetection:
- FTFStatusCode = 0
- elif FTFseverity < 6:
- FTFStatusCode = 1
- else:
- FTFStatusCode = 2
- except Exception as e:
- #print(f"处理保持架故障时发生未捕获的异常: {e}")
- FTFStatusCode = 0
- results["FTF"]["status_codes"].append(FTFStatusCode)
- # 计算统计信息
- status_counts = {"count_0": 0, "count_1": 0, "count_2": 0}
-
- for fault_type in results:
- status_codes = results[fault_type]["status_codes"]
- results[fault_type]["max_status"] = max(status_codes)
-
- # 累加状态计数
- status_counts["count_0"] += status_codes.count(0)
- status_counts["count_1"] += status_codes.count(1)
- status_counts["count_2"] += status_codes.count(2)
-
- return {
- "type": "Bearing",
- "results": results,
- "statistics": status_counts
- }
- def Gear_diag(self):
- """齿轮诊断"""
- print('齿轮诊断')
- mesure_point_name = self.mesure_point_names[0].lower()
- if 'gearbox' not in mesure_point_name:
- print("Can not perform gearbox diagnosis")
- raise ValueError("Can not perform gearbox diagnosis")
- results = {
- "wear": {"status_codes": []},
- "crack": {"status_codes": []}
- }
- id =self.ids[0]
- numTeeth = int(self.teeth_count.get(id,0))
- print(numTeeth)
- for i in range(len(self.ids)):
- ShaftSpd =self.fspd[i] *self.speed_ratio
- if ShaftSpd ==0:
- results["wear"]["status_codes"].append(-1)
- results["crack"]["status_codes"].append(-1)
- print('speed_ratio',self.speed_ratio)
- print('ShaftSpd',ShaftSpd)
- try:
- GMF = ShaftSpd * numTeeth / 60
- fTarget = GMF
- print('fTarget',fTarget)
- #numHarmonics = 2
- numHarmonics = 2
- fRange = 0.05
- fmax_gE_10 =1000
- fs_acc = self.fs_acc[i]
- fmax_vel = max(1000,fs_acc/2.56)
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <0.1:
- fmax_gE_10 =10
- elif fTarget <1:
- fmax_gE_10 =100
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
- fmax_acc =fs_acc /2.56
- try:
- Vel_GMF_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.VelSpec[i], fmax_vel, fTarget, fRange, "Per", "RMS", numHarmonics
- )
- except Exception as e:
- #print(f"计算 Vel_GMF_HM 时出错: {e}")
- Vel_GMF_HM = [0] * (numHarmonics + 1)
- print('Vel_GMF_HM',Vel_GMF_HM)
- try:
- Acc_GMF_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.AccSpec[i], fmax_acc, fTarget,fRange, "Per", "Peak", numHarmonics
- )
- except Exception as e:
- #print(f"计算 Acc_GMF_HM 时出错: {e}")
- Acc_GMF_HM = [0] * (numHarmonics + 1)
- print('Acc_GMF_HM',Acc_GMF_HM)
- try:
- gE_GMF_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.gESpec_10[i], fmax_gE_10, fTarget, fRange, "Per", "Peak", numHarmonics
- )
- except Exception as e:
- #print(f"计算 gE_GMF_HM 时出错: {e}")
- gE_GMF_HM = [0] * (numHarmonics + 1)
- print('gE_GMF_HM',gE_GMF_HM)
- fTarget =ShaftSpd /60
- print('fTarget',fTarget)
- if fTarget >100:
- fmax_gE_10 =10000
- elif fTarget <0.1:
- fmax_gE_10 =10
- elif fTarget <1:
- fmax_gE_10 =100
- fmax_vel=min(1000,fs_acc/2.56)
- else:
- fmax_gE_10 =1000
- numHarmonics = 4
- try:
- gE_GTIF_HM = self.myDiagnosisLib.FixedFreqFeature(
- self.gESpec_10[i], fmax_gE_10, fTarget, fRange, "Per", "Peak", numHarmonics
- )
- except Exception as e:
- #print(f"计算 gE_GTIF_HM 时出错: {e}")
- gE_GTIF_HM = [0] * (numHarmonics + 1)
- print('gE_GTIF_HM',gE_GTIF_HM)
- Acc_GMF_alert_gPk = 0.3 # 齿轮缺陷加速度预警阈值 (g 峰值)
- Acc_GMF_danger_gPk = 0.5 # 齿轮缺陷加速度报警阈值 (g 峰值)
- Vel_GMF_alert_mmsRMS = 4.0 # 速度总值预警阈值 (mm/s 有效值)
- Vel_GMF_danger_mmsRMS = 7.0 # 速度总值报警阈值 (mm/s 有效值)
- gE_GMF_alert_gE = 3.0 # 齿轮磨损缺陷加速度包络预警阈值 (gE 峰值)
- gE_GMF_danger_gE = 10.0 # 齿轮磨损缺陷加速度包络报警阈值 (gE 峰值)
- gE_GTIF_alert_gE = 3.0 # 齿轮断齿缺陷加速度包络预警阈值 (gE 峰值)
- gE_GTIF_danger_gE = 10.0 # 齿轮断齿缺陷加速度包络报警阈值 (gE 峰值)
- try:
- # 诊断齿轮故障
- (GearToothWearDetection, GearToothWearSev, GearToothCrackBrokenDetection, GearToothCrackBrokenSev) = \
- self.myDiagnosisLib.Gear_Fault_Diag(Acc_GMF_HM, Vel_GMF_HM, gE_GMF_HM, gE_GTIF_HM, Acc_GMF_alert_gPk, \
- Acc_GMF_danger_gPk, Vel_GMF_alert_mmsRMS, Vel_GMF_danger_mmsRMS, gE_GMF_alert_gE,
- gE_GMF_danger_gE, \
- gE_GTIF_alert_gE, gE_GTIF_danger_gE)
- except Exception as e:
- GearToothWearDetection =False
- GearToothCrackBrokenDetection =False
- GearToothWearSev =0
- GearToothCrackBrokenSev =0
- print('齿轮磨损', GearToothWearSev, '齿轮断齿', GearToothCrackBrokenSev)
- # 处理磨损状态码
- if not GearToothWearDetection:
- wear_status = 0
- elif GearToothWearSev < 6:
- wear_status = 1
- else:
- wear_status = 2
-
- # 处理裂纹状态码
- if not GearToothCrackBrokenDetection:
- crack_status = 0
- elif GearToothCrackBrokenSev < 6:
- crack_status = 1
- else:
- crack_status = 2
- except Exception as e:
- wear_status=0
- crack_status=0
- results["wear"]["status_codes"].append(wear_status)
- results["crack"]["status_codes"].append(crack_status)
-
- # 计算统计信息
- status_counts = {"count_0": 0, "count_1": 0, "count_2": 0}
-
- for fault_type in results:
- status_codes = results[fault_type]["status_codes"]
- results[fault_type]["max_status"] = max(status_codes)
-
- # 累加状态计数
- status_counts["count_0"] += status_codes.count(0)
- status_counts["count_1"] += status_codes.count(1)
- status_counts["count_2"] += status_codes.count(2)
-
- return {
- "type": "Gear",
- "results": results,
- "statistics": status_counts
- }
-
- def Characteristic_Frequency(self):
- """提取轴承、齿轮等参数"""
- #前端给的ids所对应的轴承参数、齿轮参数是一致的 计算一次即可
- if not self.bearing_frequencies:
- id = self.ids[0] # 取第一个id计算
- mesure_point_name = self.mesure_point_names[0]
- rpm = self.fspd[0] * self.speed_ratio # 应用转速传动比
-
- # 1、从测点名称中提取部件名称(计算特征频率的部件)
- str1 = mesure_point_name
- print(str1)
- # 2、连接233的数据库'energy_show',从表'wind_engine_group'查找风机编号'engine_code'对应的机型编号'mill_type_code'
- engine_code = self.engine_code
- print(engine_code)
- Engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
- #Engine = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')
- df_sql2 = f"SELECT * FROM wind_engine_group WHERE engine_code = '{engine_code}'"
- df2 = pd.read_sql(df_sql2, Engine)
- mill_type_code = df2['mill_type_code'].iloc[0]
- print(mill_type_code)
- # 3、从相关的表中通过机型编号'mill_type_code'或者齿轮箱编号gearbox_code查找部件'brand'、'model'的参数信息
- #unit_bearings主轴承参数表 关键词"main_bearing"
- if 'main_bearing' in str1:
- print("main_bearing")
- # df_sql3 = f"SELECT * FROM {'unit_bearings'} where mill_type_code = {'mill_type_code'} "
- df_sql3 = f"SELECT * FROM unit_bearings WHERE mill_type_code = '{mill_type_code}' "
- df3 = pd.read_sql(df_sql3, Engine)
- if df3.empty:
- print("警告: 没有找到有效的机型信息")
- if 'front' in str1:
- brand = 'front_bearing' + '_brand'
- model = 'front_bearing' + '_model'
- front_has_value = not pd.isna(df3[brand].iloc[0]) and not pd.isna(df3[model].iloc[0])
- if not front_has_value:
- print("警告: 没有找到有效的品牌信息")
- elif 'rear' in str1:
- brand = 'rear_bearing' + '_brand'
- model = 'rear_bearing' + '_model'
- end_has_value = not pd.isna(df3[brand].iloc[0]) and not pd.isna(df3[model].iloc[0])
- if not end_has_value:
- print("警告: 没有找到有效的品牌信息")
- else:
- # 当没有指定 front 或 end 时,自动选择有值的轴承信息
- front_brand_col = 'front_bearing_brand'
- front_model_col = 'front_bearing_model'
- rear_brand_col = 'rear_bearing_brand'
- rear_model_col = 'rear_bearing_model'
- # 检查 front_bearing 是否有值
- front_has_value = not pd.isna(df3[front_brand_col].iloc[0]) and not pd.isna(df3[front_model_col].iloc[0])
- # 检查 end_bearing 是否有值
- end_has_value = not pd.isna(df3[rear_brand_col].iloc[0]) and not pd.isna(df3[rear_model_col].iloc[0])
- # 根据检查结果选择合适的列
- if front_has_value and end_has_value:
- # 如果两者都有值,默认选择 front
- brand = front_brand_col
- model = front_model_col
- elif front_has_value:
- brand = front_brand_col
- model = front_model_col
- elif end_has_value:
- brand = rear_brand_col
- model = rear_model_col
- else:
- # 如果两者都没有有效值,设置默认值或抛出异常
- print("警告: 没有找到有效的轴承信息")
- brand = front_brand_col # 默认使用 front
- model = front_model_col # 默认使用 front
- print(brand)
- _brand = df3[brand].iloc[0]
- _model = df3[model].iloc[0]
- print(_brand)
- print(_model)
- #unit_dynamo 发电机参数表 关键词generator stator
- elif 'generator'in str1 or 'stator' in str1:
- print("generator or 'stator'")
- # df_sql3 = f"SELECT * FROM {'unit_dynamo'} where mill_type_code = {'mill_type_code'} "
- df_sql3 = f"SELECT * FROM unit_dynamo WHERE mill_type_code = '{mill_type_code}' "
- df3 = pd.read_sql(df_sql3, Engine)
- if 'non' in str1:
- brand = 'non_drive_end_bearing' + '_brand'
- model = 'non_drive_end_bearing' + '_model'
- else:
- brand = 'drive_end_bearing' + '_brand'
- model = 'drive_end_bearing' + '_model'
- print(brand)
- _brand = df3[brand].iloc[0]
- _model = df3[model].iloc[0]
- print(_brand)
- print(_model)
- #齿轮箱区分行星轮/平行轮 和 轴承两个表
- elif 'gearbox' in str1:
- print("gearbox")
- #根据mill_type_code从unit_gearbox表中获得gearbox_code
- df_sql3 = f"SELECT * FROM unit_gearbox WHERE mill_type_code = '{mill_type_code}' "
- df3 = pd.read_sql(df_sql3, Engine)
- gearbox_code =df3['code'].iloc[0]
- print(gearbox_code)
- #如果是行星轮/平行轮 则从unit_gearbox_structure 表中取数据
- if 'planet'in str1 or 'sun' in str1:
- print("'planet' or 'sun' ")
- gearbox_structure =1 if 'planet'in str1 else 2
- planetary_gear_grade =1
- if 'first' in str1:
- planetary_gear_grade =1
- elif 'second'in str1:
- planetary_gear_grade =2
- elif 'third'in str1:
- planetary_gear_grade =3
- # df_sql33 = f"SELECT * FROM unit_gearbox_structure WHERE gearbox_code = '{gearbox_code}' "
- df_sql33 = f"""
- SELECT bearing_brand, bearing_model,gear_ring_teeth_count
- FROM unit_gearbox_structure
- WHERE gearbox_code = '{gearbox_code}'
- AND gearbox_structure = '{gearbox_structure}'
- AND planetary_gear_grade = '{planetary_gear_grade}'
- """
- df33 = pd.read_sql(df_sql33, Engine)
- if df33.empty:
- print("unit_gearbox_structure没有该测点的参数")
- else:
- brand = 'bearing' + '_brand'
- model = 'bearing' + '_model'
- teeth_count= df33['gear_ring_teeth_count'].iloc[0]
- print('teeth_count',teeth_count)
- print(brand)
- _brand = df33[brand].iloc[0]
- _model = df33[model].iloc[0]
- has_value = not pd.isna(df33[brand].iloc[0]) and not pd.isna(df33[model].iloc[0])
- if has_value:
- print(_brand)
- print(_model)
- else:
- print("警告: 没有找到有效的轴承信息")
- #如果是齿轮箱轴承 则从unit_gearbox_bearings 表中取数据
- elif 'shaft' in str1 or'input' in str1:
- print("'shaft'or'input'")
- #高速轴 低速中间轴 取bearing_rs/gs均可
- parallel_wheel_grade=1
- if 'low_speed' in str1:
- parallel_wheel_grade =1
- elif 'low_speed_intermediate' in str1:
- parallel_wheel_grade =2
- elif 'high_speed' in str1:
- parallel_wheel_grade =3
- # df_sql33 = f"SELECT * FROM unit_gearbox_bearings WHERE gearbox_code = '{gearbox_code}' "
- df_sql33 = f"""
- SELECT bearing_rs_brand, bearing_rs_model, bearing_gs_brand, bearing_gs_model,gear_ring_teeth_count
- FROM unit_gearbox_bearings
- WHERE gearbox_code = '{gearbox_code}'
- AND parallel_wheel_grade = '{parallel_wheel_grade}'
- """
- df33 = pd.read_sql(df_sql33, Engine)
- if not df33.empty:
- if 'high_speed' in str1 or 'low_speed_intermediate' in str1:
- rs_brand = 'bearing_rs' + '_brand'
- rs_model = 'bearing_rs' + '_model'
- gs_brand = 'bearing_gs' + '_brand'
- gs_model = 'bearing_gs' + '_model'
- rs_has_value = not pd.isna(df33[rs_brand].iloc[0]) and not pd.isna(df33[rs_model].iloc[0])
- gs_has_value = not pd.isna(df33[gs_brand].iloc[0]) and not pd.isna(df33[gs_model].iloc[0])
- if rs_has_value and gs_has_value:
- brand = rs_brand
- model = rs_model
- elif rs_has_value:
- brand = rs_brand
- model = rs_model
- elif gs_has_value:
- brand = gs_brand
- model = gs_model
- else:
- print("警告: 没有找到有效的品牌信息")
- brand = rs_brand
- model = rs_model
- #低速轴 取bearing_model
- elif 'low_speed'in str1:
- brand = 'bearing' + '_brand'
- model = 'bearing' + '_model'
- else:
- print("警告: 没有找到有效的轴承信息")
- if not df33.empty:
- if 'high_speed' in str1:
- teeth_count= df33['gear_ring_teeth_count'].iloc[0]
- print('teeth_count',teeth_count)
- print(brand)
- _brand = df33[brand].iloc[0]
- _model = df33[model].iloc[0]
- print(_brand)
- print(_model)
- # 4、从表'unit_dict_brand_model'中通过'_brand'、'_model'查找部件的参数信息
- df_sql4 = f"SELECT * FROM unit_dict_brand_model where manufacture = %s AND model_number = %s"
- params = [(_brand, _model)]
- df4 = pd.read_sql(df_sql4, Engine, params=params)
- n_rolls = df4['rolls_number'].iloc[0]
- d_rolls = df4['rolls_diameter'].iloc[0]
- D_diameter = df4['circle_diameter'].iloc[0]
- theta_deg = df4['theta_deg'].iloc[0]
- # 计算特征频率
- bearing_freq = self.calculate_bearing_frequencies(n_rolls, d_rolls, D_diameter, theta_deg, rpm)
- print(bearing_freq)
- # 将结果赋给所有id
-
- for id in self.ids:
- self.bearing_frequencies[id] = bearing_freq
- print('aaaa')
- if 'gearbox' in str1:
- self.teeth_count[id] =teeth_count
-
- def calculate_bearing_frequencies(self, n, d, D, theta_deg, rpm):
- """
- 计算轴承各部件特征频率
- 参数:
- n (int): 滚动体数量
- d (float): 滚动体直径(单位:mm)
- D (float): 轴承节圆直径(滚动体中心圆直径,单位:mm)
- theta_deg (float): 接触角(单位:度)
- rpm (float): 转速(转/分钟)
- 返回:
- dict: 包含各特征频率的字典(单位:Hz)
- """
- # 转换角度为弧度
- theta = math.radians(theta_deg)
- # 转换直径单位为米(保持单位一致性,实际计算中比值抵消单位影响)
- # 注意:由于公式中使用的是比值,单位可以保持mm不需要转换
- ratio = d / D
- # 基础频率计算(转/秒)
- # f_r = rpm
- f_r = rpm / 60.0
- # 计算各特征频率
- BPFI = (n / 2) * (1 + ratio * math.cos(theta)) * f_r # 内圈故障频率
- BPFO = (n / 2) * (1 - ratio * math.cos(theta)) * f_r # 外圈故障频率
- BSF = (D / (2 * d)) * (1 - (ratio ** 2) * (math.cos(theta) ** 2)) * f_r # 滚动体故障频率
- FTF = 0.5 * (1 - ratio * math.cos(theta)) * f_r # 保持架故障频率
- return {
- "BPFI": round(BPFI, 2),
- "BPFO": round(BPFO, 2),
- "BSF": round(BSF, 2),
- "FTF": round(FTF, 2),
- }
-
- def calculate_speed_ratio(self):
- """
- 根据风场编号和测点名称计算转速传动比
- """
- # 默认传动比为1
- self.speed_ratio = 1.0
-
- # WOF046400029风场 七台河需要特殊处理
- if self.wind_code == "WOF046400029":
- # 获取第一个测点名称
- mesure_point_name = self.mesure_point_names[0]
- if "gearbox" in mesure_point_name:
- if "high_speed" in mesure_point_name:
- # 高速轴输出端
- self.speed_ratio = 1
- elif "first" in mesure_point_name:
- # 一级行星级
- self.speed_ratio = 1 /(5.3 * 5.5 * 3.8)
- elif "second" in mesure_point_name:
- # 二级行星级
- self.speed_ratio = 1 / (5.5 * 3.8)
- if "generator" in mesure_point_name:
- self.speed_ratio =1
- else:
- # 非齿轮箱测点
- self.speed_ratio = 1 / (5.3 * 5.5 * 3.8)
-
-
-
-
|