import numpy as np from scipy.signal import hilbert from scipy.fft import ifft import plotly.graph_objs as go import pandas as pd from sqlalchemy import create_engine, text import sqlalchemy from typing import Dict,Any import json import ast import math ''' # 输入: { "ids":[12345,121212], "windCode":"xxxx", "analysisType":"xxxxx", "fmin":int(xxxx) (None), "fmax":"int(xxxx) (None), } [{id:xxxx,"time":xxx},{}] id[123456] # 通过id,读取mysql,获取数据 engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data') def get_by_id(table_name,id): lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} " # print(lastday_df_sql) df = pd.read_sql(lastday_df_sql, engine) return df select distinct id, timeStamp from table_name group by ids ids time 1 xxx 2 xxxx df_data = [] # for id in ids: # sql_data = get_by_id('SKF001_wave',id) # df_data.append(sql_data) # print(sql_data) [df1,df2] ''' ''' 数据库字段: "samplingFrequency" "timeStamp" "mesureData" ''' # %% # %% class CMSAnalyst: def __init__(self, fmin, fmax, table_name, ids): """ table_name: 当前代码实际传入的是 windcode(例如 SKF001),内部会拼 _wave ids: [id1, id2, ...] """ self.table_name = table_name self.ids = ids # 1) 从数据库获取原始数据(按 id 分组) self.datas = self._get_by_id(table_name, ids) if not self.datas: raise ValueError(f"[ERROR] 未查到任何数据 ids={ids}") # 2) 只保留需要字段 self.datas = [ df[['id', 'mesure_data', 'time_stamp', 'sampling_frequency', 'wind_turbine_number', 'rotational_speed', 'mesure_point_name']] for df in self.datas if df is not None and not df.empty ] if not self.datas: raise ValueError("[ERROR] 分组后 DataFrame 全为空") # 3) 单 id 情况:取第一个 df self.data_filter = self.datas[0] if self.data_filter.empty: raise ValueError("[ERROR] data_filter 为空,无法读取数据") # 4) 解析 mesure_data raw_md = self.data_filter['mesure_data'].iloc[0] self.data = self._parse_mesure_data(raw_md) if self.data is None or len(self.data) == 0: raise ValueError("[ERROR] mesure_data 解析失败或为空") self.data = np.asarray(self.data, dtype=float) # 5) 采样频率 self.fs = int(self.data_filter['sampling_frequency'].iloc[0]) # 6) 分析参数 self.envelope_spectrum_m = self.data.shape[0] self.envelope_spectrum_n = 1 self.envelope_spectrum_t = np.arange(self.envelope_spectrum_m) / self.fs self.fmin = fmin if fmin is not None else 0 self.fmax = fmax if fmax is not None else float('inf') # 7) 设备信息 self.wind_code = self.data_filter['wind_turbine_number'].iloc[0] self.rpm_Gen = self.data_filter['rotational_speed'].iloc[0] self.mesure_point_name = self.data_filter['mesure_point_name'].iloc[0] self.fn_Gen = round(self.rpm_Gen / 60, 2) # 8) 包络谱预计算 self.envelope_spectrum_y = self._bandpass_filter(self.data) self.f, self.HP = self._calculate_envelope_spectrum(self.envelope_spectrum_y) # 9) 特征频率 & 轴承频率 cf_dict = self.Characteristic_Frequency() self.CF = pd.DataFrame([cf_dict]) n_rolls_m = self.CF['n_rolls'].iloc[0] d_rolls_m = self.CF['d_rolls'].iloc[0] D_diameter_m = self.CF['D_diameter'].iloc[0] theta_deg_m = self.CF['theta_deg'].iloc[0] if any(v is None for v in [n_rolls_m, d_rolls_m, D_diameter_m, theta_deg_m]): self.bearing_frequencies = None else: self.bearing_frequencies = self.calculate_bearing_frequencies( n_rolls_m, d_rolls_m, D_diameter_m, theta_deg_m, self.rpm_Gen ) self.bearing_frequencies = pd.DataFrame([self.bearing_frequencies]) # 10) 频谱预计算 ( self.frequency_domain_analysis_t, self.frequency_domain_analysis_f, self.frequency_domain_analysis_m, self.frequency_domain_analysis_mag, self.frequency_domain_analysis_Xrms, ) = self._calculate_spectrum(self.data) # 11) 时域时间轴 self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs # ========================================================== # 工具:解析 mesure_data(兼容 json 字符串 / python list 字符串 / list) # ========================================================== def _parse_mesure_data(self, raw): if raw is None: return None # 已经是 list/np.array if isinstance(raw, (list, tuple, np.ndarray)): return list(raw) # 字符串:可能是 JSON 或 python list 字符串 if isinstance(raw, str): s = raw.strip() # 优先 json.loads(如果是标准 JSON) try: v = json.loads(s) if isinstance(v, list): return v except Exception: pass # 再尝试 ast.literal_eval(如果是 python 格式 list) try: v = ast.literal_eval(s) if isinstance(v, (list, tuple, np.ndarray)): return list(v) except Exception: return None return None # ========================================================== # DB:按 id 拉取波形 # ========================================================== def _get_by_id(self, windcode, ids): engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod') table_name = f"{windcode}_wave" ids_str = ','.join(map(str, ids)) sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str}) ORDER BY time_stamp" df = pd.read_sql(sql, engine) if df.empty: return [] grouped = [group for _, group in df.groupby('id')] return grouped # ========================================================== # 包络谱:带通滤波(FFT 截频) # ========================================================== def _bandpass_filter(self, data): m = data.shape[0] # index 保护:fmin/fmax 可能超范围 ni = int(round(self.fmin * m / self.fs + 1)) ni = max(0, min(ni, m)) if self.fmax == float('inf'): na = m else: na = int(round(self.fmax * m / self.fs + 1)) na = max(0, min(na, m)) if na <= ni: # 退化情况:不做滤波 y = np.zeros((m, 1)) y[:, 0] = data return y z = np.fft.fft(data) a = np.zeros(m, dtype=complex) a[ni:na] = z[ni:na] # 对称频段 a[m - na + 1: m - ni + 1] = z[m - na + 1: m - ni + 1] x_ifft = np.fft.ifft(a) y = np.zeros((m, 1)) y[:, 0] = np.real(x_ifft) return y def _calculate_envelope_spectrum(self, y): m, n = y.shape HP = np.zeros((m, n)) for p in range(n): H = np.abs(hilbert(y[:, p] - np.mean(y[:, p]))) HP[:, p] = np.abs(np.fft.fft(H - np.mean(H))) * 2 / m f = np.fft.fftfreq(m, d=1 / self.fs) return f, HP def envelope_spectrum(self): positive_frequencies = self.f[: self.envelope_spectrum_m // 2] positive_HP = self.HP[: self.envelope_spectrum_m // 2, 0] x = positive_frequencies y = positive_HP Xrms = float(np.sqrt(np.mean(y ** 2))) rpm_Gen = round(float(self.rpm_Gen), 2) if self.bearing_frequencies is None: BPFI_1X = BPFO_1X = BSF_1X = FTF_1X = None else: BPFI_1X = round(float(self.bearing_frequencies['BPFI'].iloc[0]), 2) BPFO_1X = round(float(self.bearing_frequencies['BPFO'].iloc[0]), 2) BSF_1X = round(float(self.bearing_frequencies['BSF'].iloc[0]), 2) FTF_1X = round(float(self.bearing_frequencies['FTF'].iloc[0]), 2) fn_Gen = round(float(self.fn_Gen), 2) _3P_1X = fn_Gen * 3 result = { "fs": int(self.fs), "Xrms": round(Xrms, 2), "x": list(x), "y": list(y), "title": "包络谱", "xaxis": "频率(Hz)", "yaxis": "加速度(m/s^2)", "rpm_Gen": rpm_Gen, "BPFI": [{"Xaxis": (None if BPFI_1X is None else BPFI_1X * k), "val": f"{k}BPFI"} for k in range(1, 7)], "BPFO": [{"Xaxis": (None if BPFO_1X is None else BPFO_1X * k), "val": f"{k}BPFO"} for k in range(1, 7)], "BSF": [{"Xaxis": (None if BSF_1X is None else BSF_1X * k), "val": f"{k}BSF"} for k in range(1, 7)], "FTF": [{"Xaxis": (None if FTF_1X is None else FTF_1X * k), "val": f"{k}FTF"} for k in range(1, 7)], "fn_Gen": [{"Xaxis": fn_Gen * k, "val": f"{k}X"} for k in range(1, 7)], "B3P": _3P_1X, } return self.replace_nan(result) # ========================================================== # 频谱 # ========================================================== def _calculate_spectrum(self, data): m = data.shape[0] t = np.arange(m) / self.fs mag = np.abs(np.fft.fft(data - np.mean(data))) * 2 / m f = np.fft.fftfreq(m, d=1 / self.fs) Xrms = float(np.sqrt(np.mean(data ** 2))) return t, f, m, mag, Xrms def frequency_domain(self): positive_frequencies = self.frequency_domain_analysis_f[: self.frequency_domain_analysis_m // 2] positive_mag = self.frequency_domain_analysis_mag[: self.frequency_domain_analysis_m // 2] x = positive_frequencies y = positive_mag Xrms = float(self.frequency_domain_analysis_Xrms) rpm_Gen = round(float(self.rpm_Gen), 2) if self.bearing_frequencies is None: BPFI_1X = BPFO_1X = BSF_1X = FTF_1X = None else: BPFI_1X = round(float(self.bearing_frequencies['BPFI'].iloc[0]), 2) BPFO_1X = round(float(self.bearing_frequencies['BPFO'].iloc[0]), 2) BSF_1X = round(float(self.bearing_frequencies['BSF'].iloc[0]), 2) FTF_1X = round(float(self.bearing_frequencies['FTF'].iloc[0]), 2) fn_Gen = round(float(self.fn_Gen), 2) _3P_1X = fn_Gen * 3 result = { "fs": int(self.fs), "Xrms": round(Xrms, 2), "x": list(x), "y": list(y), "title": "频域信号", "xaxis": "频率(Hz)", "yaxis": "加速度(m/s^2)", "rpm_Gen": rpm_Gen, "BPFI": [{"Xaxis": (None if BPFI_1X is None else BPFI_1X * k), "val": f"{k}BPFI"} for k in range(1, 7)], "BPFO": [{"Xaxis": (None if BPFO_1X is None else BPFO_1X * k), "val": f"{k}BPFO"} for k in range(1, 7)], "BSF": [{"Xaxis": (None if BSF_1X is None else BSF_1X * k), "val": f"{k}BSF"} for k in range(1, 7)], "FTF": [{"Xaxis": (None if FTF_1X is None else FTF_1X * k), "val": f"{k}FTF"} for k in range(1, 7)], "fn_Gen": [{"Xaxis": fn_Gen * k, "val": f"{k}X"} for k in range(1, 7)], "B3P": _3P_1X, } result = self.replace_nan(result) return json.dumps(result, ensure_ascii=False) # ========================================================== # 时域 # ========================================================== def time_domain(self): x = self.time_domain_analysis_t y = self.data mean_value = float(np.mean(y)) max_value = float(np.max(y)) min_value = float(np.min(y)) Xrms = float(np.sqrt(np.mean(y ** 2))) Xp = float((max_value - min_value) / 2) Xpp = float(max_value - min_value) Cf = (Xp / Xrms) if Xrms != 0 else 0.0 Sf = (Xrms / mean_value) if mean_value != 0 else 0.0 If = (Xp / float(np.mean(np.abs(y)))) if np.mean(np.abs(y)) != 0 else 0.0 Xr = float(np.mean(np.sqrt(np.abs(y))) ** 2) Ce = (Xp / Xr) if Xr != 0 else 0.0 # 偏度/峭度 Cw = float(np.mean((np.abs(y) - mean_value) ** 3) / (Xrms ** 3)) if Xrms != 0 else 0.0 Cq = float(np.mean((np.abs(y) - mean_value) ** 4) / (Xrms ** 4)) if Xrms != 0 else 0.0 result = { "x": list(x), "y": list(y), "title": "时间域信号", "xaxis": "时间(s)", "yaxis": "加速度(m/s^2)", "fs": int(self.fs), "Xrms": round(Xrms, 2), "mean_value": round(mean_value, 2), "max_value": round(max_value, 2), "min_value": round(min_value, 2), "Xp": round(Xp, 2), "Xpp": round(Xpp, 2), "Cf": round(Cf, 2), "Sf": round(Sf, 2), "If": round(If, 2), "Ce": round(Ce, 2), "Cw": round(Cw, 2), "Cq": round(Cq, 2), "rpm_Gen": round(float(self.rpm_Gen), 2), } result = self.replace_nan(result) return json.dumps(result, ensure_ascii=False) # ========================================================== # 趋势分析(按id统计一条数据,取每个id的第一条) # ========================================================== def trend_analysis(self) -> str: combined_df = pd.concat(self.datas, ignore_index=True) if combined_df.empty: return json.dumps([], ensure_ascii=False) # 统一解析 mesure_data def parse_cell(x): v = self._parse_mesure_data(x) return v combined_df['parsed_data'] = combined_df['mesure_data'].apply(parse_cell) def calculate_stats(group: pd.DataFrame) -> Optional[Dict[str, Any]]: if group.empty: return None arr = group['parsed_data'].iloc[0] if arr is None or len(arr) == 0: return None data = np.asarray(arr, dtype=float) fs = int(group['sampling_frequency'].iloc[0]) dt = 1 / fs mean = float(np.mean(data)) max_val = float(np.max(data)) min_val = float(np.min(data)) Xrms = float(np.sqrt(np.mean(data ** 2))) Xp = float((max_val - min_val) / 2) Cf = (Xp / Xrms) if Xrms != 0 else 0.0 Sf = (Xrms / mean) if mean != 0 else 0.0 If = (Xp / float(np.mean(np.abs(data)))) if np.mean(np.abs(data)) != 0 else 0.0 Xr = float(np.mean(np.sqrt(np.abs(data))) ** 2) Ce = (Xp / Xr) if Xr != 0 else 0.0 # 速度有效值 velocity = np.cumsum(data) * dt velocity_rms = float(np.sqrt(np.mean(velocity ** 2))) Cw = float(np.mean((data - mean) ** 3) / (Xrms ** 3)) if Xrms != 0 else 0.0 Cq = float(np.mean((data - mean) ** 4) / (Xrms ** 4)) if Xrms != 0 else 0.0 return { "fs": fs, "Mean": round(mean, 2), "Max": round(max_val, 2), "Min": round(min_val, 2), "Xrms": round(Xrms, 2), "Xp": round(Xp, 2), "If": round(If, 2), "Cf": round(Cf, 2), "Sf": round(Sf, 2), "Ce": round(Ce, 2), "Cw": round(Cw, 2), "Cq": round(Cq, 2), "velocity_rms": round(velocity_rms, 2), "time_stamp": str(group['time_stamp'].iloc[0]), "id": int(group['id'].iloc[0]), } stats = [ s for s in combined_df.groupby('id', sort=True).apply(calculate_stats).tolist() if s is not None ] stats = self.replace_nan(stats) return json.dumps(stats, ensure_ascii=False) # ========================================================== # 特征频率 # ========================================================== def Characteristic_Frequency(self): """ 目标:拿到 _brand/_model -> unit_dict_brand_model -> rolls_number 等参数 任意失败:返回 None 字段(不会炸) """ def empty_result(): return { "type": "bearing", "n_rolls": None, "d_rolls": None, "D_diameter": None, "theta_deg": None, } str1 = str(self.mesure_point_name or "") engine_code = str(self.wind_code or "") Engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show') df2 = pd.read_sql( f"SELECT * FROM wind_engine_group WHERE engine_code = '{engine_code}'", Engine ) if df2.empty or 'mill_type_code' not in df2.columns: return empty_result() mill_type_code = df2['mill_type_code'].iloc[0] _brand = None _model = None # -------------------------- # main_bearing # -------------------------- if 'main_bearing' in str1: df3 = pd.read_sql( f"SELECT * FROM unit_bearings WHERE mill_type_code = '{mill_type_code}'", Engine ) if df3.empty: return empty_result() # front/rear/自动选择 if 'front' in str1: brand_col = 'front_bearing_brand' model_col = 'front_bearing_model' elif 'rear' in str1: brand_col = 'rear_bearing_brand' model_col = 'rear_bearing_model' else: candidates = [ ('front_bearing_brand', 'front_bearing_model'), ('rear_bearing_brand', 'rear_bearing_model') ] brand_col = model_col = None for b, m in candidates: if b in df3.columns and m in df3.columns: if pd.notna(df3[b].iloc[0]) and pd.notna(df3[m].iloc[0]): brand_col, model_col = b, m break if brand_col is None: return empty_result() if brand_col not in df3.columns or model_col not in df3.columns: return empty_result() _brand = df3[brand_col].iloc[0] _model = df3[model_col].iloc[0] # -------------------------- # generator / stator # -------------------------- elif 'generator' in str1 or 'stator' in str1: df3 = pd.read_sql( f"SELECT * FROM unit_dynamo WHERE mill_type_code = '{mill_type_code}'", Engine ) if df3.empty: return empty_result() if 'non' in str1: brand_col = 'non_drive_end_bearing_brand' model_col = 'non_drive_end_bearing_model' else: brand_col = 'drive_end_bearing_brand' model_col = 'drive_end_bearing_model' if brand_col not in df3.columns or model_col not in df3.columns: return empty_result() _brand = df3[brand_col].iloc[0] _model = df3[model_col].iloc[0] # -------------------------- # gearbox # -------------------------- elif 'gearbox' in str1: df3 = pd.read_sql( f"SELECT * FROM unit_gearbox WHERE mill_type_code = '{mill_type_code}'", Engine ) if df3.empty or 'code' not in df3.columns: return empty_result() gearbox_code = df3['code'].iloc[0] # 行星轮/太阳轮:unit_gearbox_structure if ('planet' in str1) or ('sun' in str1): gearbox_structure = 1 if 'planet' in str1 else 2 planetary_gear_grade = 1 if 'second' in str1: planetary_gear_grade = 2 elif 'third' in str1: planetary_gear_grade = 3 df33 = pd.read_sql( f""" SELECT bearing_brand, bearing_model FROM unit_gearbox_structure WHERE gearbox_code = '{gearbox_code}' AND gearbox_structure = '{gearbox_structure}' AND planetary_gear_grade = '{planetary_gear_grade}' """, Engine ) if df33.empty: return empty_result() if 'bearing_brand' not in df33.columns or 'bearing_model' not in df33.columns: return empty_result() _brand = df33['bearing_brand'].iloc[0] _model = df33['bearing_model'].iloc[0] # 轴承:unit_gearbox_bearings elif ('shaft' in str1) or ('input' in str1) or ('low_speed' in str1) or ('high_speed' in str1): parallel_wheel_grade = 1 if 'low_speed_intermediate' in str1: parallel_wheel_grade = 2 elif 'high_speed' in str1: parallel_wheel_grade = 3 df33 = pd.read_sql( f""" SELECT bearing_rs_brand, bearing_rs_model, bearing_gs_brand, bearing_gs_model, bearing_brand, bearing_model FROM unit_gearbox_bearings WHERE gearbox_code = '{gearbox_code}' AND parallel_wheel_grade = '{parallel_wheel_grade}' """, Engine ) if df33.empty: return empty_result() # 高速/中间优先 rs/gs;低速取 bearing_brand/model if ('high_speed' in str1) or ('low_speed_intermediate' in str1): # rs/gs 选有值的 candidates = [ ('bearing_rs_brand', 'bearing_rs_model'), ('bearing_gs_brand', 'bearing_gs_model') ] for b, m in candidates: if b in df33.columns and m in df33.columns: if pd.notna(df33[b].iloc[0]) and pd.notna(df33[m].iloc[0]): _brand = df33[b].iloc[0] _model = df33[m].iloc[0] break if _brand is None: return empty_result() else: # low_speed:bearing_brand/model if 'bearing_brand' in df33.columns and 'bearing_model' in df33.columns: if pd.notna(df33['bearing_brand'].iloc[0]) and pd.notna(df33['bearing_model'].iloc[0]): _brand = df33['bearing_brand'].iloc[0] _model = df33['bearing_model'].iloc[0] else: return empty_result() else: return empty_result() else: return empty_result() else: # 其它测点:直接返回空,不炸 return empty_result() # 最终检查 if _brand is None or _model is None or (pd.isna(_brand) or pd.isna(_model)): return empty_result() # -------------------------- # unit_dict_brand_model 查询 # -------------------------- df4 = pd.read_sql( "SELECT * FROM unit_dict_brand_model WHERE manufacture = %s AND model_number = %s", Engine, params=(str(_brand), str(_model)) ) if df4.empty: return empty_result() # 字段安全读取 needed = ['rolls_number', 'rolls_diameter', 'circle_diameter', 'theta_deg'] for col in needed: if col not in df4.columns: return empty_result() return { "type": "bearing", "n_rolls": None if pd.isna(df4['rolls_number'].iloc[0]) else float(df4['rolls_number'].iloc[0]), "d_rolls": None if pd.isna(df4['rolls_diameter'].iloc[0]) else float(df4['rolls_diameter'].iloc[0]), "D_diameter": None if pd.isna(df4['circle_diameter'].iloc[0]) else float(df4['circle_diameter'].iloc[0]), "theta_deg": None if pd.isna(df4['theta_deg'].iloc[0]) else float(df4['theta_deg'].iloc[0]), } # ========================================================== # 轴承频率公式 # ========================================================== def calculate_bearing_frequencies(self, n, d, D, theta_deg, rpm): theta = math.radians(theta_deg) ratio = d / D f_r = rpm / 60.0 BPFI = n / 2 * (1 + ratio * math.cos(theta)) * f_r BPFO = n / 2 * (1 - ratio * math.cos(theta)) * f_r BSF = (D / (2 * d)) * (1 - (ratio ** 2) * (math.cos(theta) ** 2)) * f_r FTF = 0.5 * (1 - ratio * math.cos(theta)) * f_r return { "BPFI": round(float(BPFI), 2), "BPFO": round(float(BPFO), 2), "BSF": round(float(BSF), 2), "FTF": round(float(FTF), 2), } # ========================================================== # NaN 替换 # ========================================================== def replace_nan(self, obj): if isinstance(obj, dict): return {k: self.replace_nan(v) for k, v in obj.items()} if isinstance(obj, list): return [self.replace_nan(x) for x in obj] if isinstance(obj, float) and math.isnan(obj): return None return obj if __name__ == "__main__": # table_name = "SKF001_wave" # ids = [67803,67804] # fmin, fmax = None, None cms = CMSAnalyst(fmin, fmax,table_name,ids) time_domain = cms.time_domain() # print(time_domain) ''' trace = go.Scatter( x=time_domain['x'], y=time_domain['y'], mode="lines", name=time_domain['title'], ) layout = go.Layout( title= time_domain['title'], xaxis=dict(title=time_domain["xaxis"]), yaxis=dict(title=time_domain["yaxis"]), ) fig = go.Figure(data=[trace], layout=layout) fig.show() ''' # data_path_lsit = ["test1.csv", "test2.csv"] # trend_analysis_test = cms.trend_analysis(data_path_lsit, fmin, fmax) # print(trend_analysis_test)