|
@@ -5,7 +5,7 @@ import math
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from scipy.signal import hilbert
|
|
|
-
|
|
|
+from typing import List, Dict, Any
|
|
|
from app.config import dataBase
|
|
|
from app.database import get_engine
|
|
|
from app.logger import logger
|
|
@@ -14,13 +14,18 @@ from app.logger import logger
|
|
|
class CMSAnalyst:
|
|
|
def __init__(self, fmin, fmax, table_name, ids):
|
|
|
# 从数据库获取原始数据
|
|
|
+ self.table_name =table_name
|
|
|
+ self.ids = ids
|
|
|
self.datas = self._get_by_id(table_name, ids)
|
|
|
- self.datas = [df[['mesure_data', 'time_stamp', 'sampling_frequency', 'wind_turbine_number', 'rotational_speed',
|
|
|
- 'mesure_point_name']] for df in self.datas]
|
|
|
+ self.datas = [
|
|
|
+ df[['id', 'mesure_data', 'time_stamp', 'sampling_frequency',
|
|
|
+ 'wind_turbine_number', 'rotational_speed', 'mesure_point_name']]
|
|
|
+ for df in self.datas
|
|
|
+ ]
|
|
|
# 只输入一个id,返回一个[df],所以拿到self.data[0]
|
|
|
self.data_filter = self.datas[0]
|
|
|
# 取数据列
|
|
|
- self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'][0]))
|
|
|
+ self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'].iloc[0]))
|
|
|
self.envelope_spectrum_m = self.data.shape[0]
|
|
|
self.envelope_spectrum_n = 1
|
|
|
# 设置分析参数
|
|
@@ -56,16 +61,16 @@ class CMSAnalyst:
|
|
|
# time_domain_analysis
|
|
|
self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs
|
|
|
|
|
|
+
|
|
|
def _get_by_id(self, windcode, ids):
|
|
|
- df_res = []
|
|
|
engine = get_engine(dataBase.DATA_DB)
|
|
|
- for id in ids:
|
|
|
- table_name = windcode + '_wave'
|
|
|
- lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} "
|
|
|
- df = pd.read_sql(lastday_df_sql, engine)
|
|
|
- df_res.append(df)
|
|
|
- return df_res
|
|
|
-
|
|
|
+ table_name = windcode + '_wave'
|
|
|
+ ids_str = ','.join(map(str, ids))
|
|
|
+ sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str}) ORDER BY time_stamp"
|
|
|
+ df = pd.read_sql(sql, engine)
|
|
|
+ grouped = [group.reset_index(drop=True) for _, group in df.groupby('id')]
|
|
|
+ return grouped
|
|
|
+
|
|
|
# envelope_spectrum_analysis 包络谱分析
|
|
|
def _bandpass_filter(self, data):
|
|
|
"""带通滤波"""
|
|
@@ -291,73 +296,59 @@ class CMSAnalyst:
|
|
|
|
|
|
return result
|
|
|
|
|
|
- # trend_analysis 趋势图
|
|
|
-
|
|
|
- def trend_analysis(self):
|
|
|
- all_stats = []
|
|
|
-
|
|
|
- # 定义积分函数
|
|
|
- def _integrate(data, dt):
|
|
|
- return np.cumsum(data) * dt
|
|
|
-
|
|
|
- # 定义计算统计指标的函数
|
|
|
- def _calculate_stats(data):
|
|
|
- mean_value = np.mean(data)
|
|
|
- max_value = np.max(data)
|
|
|
- min_value = np.min(data)
|
|
|
- Xrms = np.sqrt(np.mean(data ** 2)) # 加速度均方根值(有效值)
|
|
|
- Xp = (max_value - min_value) / 2 # 峰值(单峰最大值) # 峰值
|
|
|
- Cf = Xp / Xrms # 峰值指标
|
|
|
- Sf = Xrms / mean_value # 波形指标
|
|
|
- If = Xp / np.mean(np.abs(data)) # 脉冲指标
|
|
|
- Xr = np.mean(np.sqrt(np.abs(data))) ** 2 # 方根幅值
|
|
|
- Ce = Xp / Xr # 裕度指标
|
|
|
-
|
|
|
- # 计算每个数据点的绝对值减去均值后的三次方,并求和
|
|
|
- sum_abs_diff_cubed_3 = np.mean((np.abs(data) - mean_value) ** 3)
|
|
|
- # 计算偏度指标
|
|
|
- Cw = sum_abs_diff_cubed_3 / (Xrms ** 3)
|
|
|
- # 计算每个数据点的绝对值减去均值后的四次方,并求和
|
|
|
- sum_abs_diff_cubed_4 = np.mean((np.abs(data) - mean_value) ** 4)
|
|
|
- # 计算峭度指标
|
|
|
- Cq = sum_abs_diff_cubed_4 / (Xrms ** 4)
|
|
|
- #
|
|
|
+ def trend_analysis(self) -> str:
|
|
|
+ """
|
|
|
+ 优化后的趋势分析方法(向量化计算统计指标)
|
|
|
+ 返回 JSON 字符串,包含所有时间点的统计结果。
|
|
|
+ """
|
|
|
+ for df in self.datas:
|
|
|
+ df['parsed_data'] = df['mesure_data'].apply(json.loads)
|
|
|
+ # 1. 合并所有数据并解析 mesure_data
|
|
|
+ combined_df = pd.concat(self.datas)
|
|
|
+ combined_df['parsed_data'] = combined_df['mesure_data'].apply(json.loads) # 批量解析 JSON
|
|
|
+
|
|
|
+ # 2. 向量化计算统计指标(避免逐行循环)
|
|
|
+ def calculate_stats(group: pd.DataFrame) -> Dict[str, Any]:
|
|
|
+ data = np.array(group['parsed_data'].iloc[0]) # 提取振动数据数组
|
|
|
+ fs = int(group['sampling_frequency'].iloc[0]) # 采样频率
|
|
|
+ dt = 1 / fs # 时间间隔
|
|
|
+
|
|
|
+ # 计算时域指标(向量化操作)
|
|
|
+ mean = np.mean(data)
|
|
|
+ max_val = np.max(data)
|
|
|
+ min_val = np.min(data)
|
|
|
+ Xrms = np.sqrt(np.mean(data ** 2))
|
|
|
+ Xp = (max_val - min_val) / 2
|
|
|
+ Cf = Xp / Xrms
|
|
|
+ Sf = Xrms / mean if mean != 0 else 0
|
|
|
+ If = Xp / np.mean(np.abs(data))
|
|
|
+
|
|
|
+ # 计算速度和峭度指标
|
|
|
+ velocity = np.cumsum(data) * dt # 积分计算速度
|
|
|
+ velocity_rms = np.sqrt(np.mean(velocity ** 2))
|
|
|
+ Cq = np.mean((data - mean) ** 4) / (Xrms ** 4) if Xrms != 0 else 0
|
|
|
|
|
|
return {
|
|
|
- "fs": self.fs, # 采样频率
|
|
|
- "Mean": round(mean_value, 2), # 平均值
|
|
|
- "Max": round(max_value, 2), # 最大值
|
|
|
- "Min": round(min_value, 2), # 最小值
|
|
|
- "Xrms": round(Xrms, 2), # 有效值
|
|
|
- "Xp": round(Xp, 2), # 峰值
|
|
|
- "If": round(If, 2), # 脉冲指标
|
|
|
- "Cf": round(Cf, 2), # 峰值指标
|
|
|
- "Sf": round(Sf, 2), # 波形指标
|
|
|
- "Ce": round(Ce, 2), # 裕度指标
|
|
|
- "Cw": round(Cw, 2), # 偏度指标
|
|
|
- "Cq": round(Cq, 2), # 峭度指标
|
|
|
- # velocity_rms :速度有效值
|
|
|
- # time_stamp:时间戳
|
|
|
+ "time_stamp": str(group['time_stamp'].iloc[0]),
|
|
|
+ "fs": fs,
|
|
|
+ "Mean": round(mean, 2),
|
|
|
+ "Max": round(max_val, 2),
|
|
|
+ "Min": round(min_val, 2),
|
|
|
+ "Xrms": round(Xrms, 2),
|
|
|
+ "Xp": round(Xp, 2),
|
|
|
+ "Cf": round(Cf, 2),
|
|
|
+ "Sf": round(Sf, 2),
|
|
|
+ "If": round(If, 2),
|
|
|
+ "velocity_rms": round(velocity_rms, 2),
|
|
|
+ "Cq": round(Cq, 2)
|
|
|
}
|
|
|
|
|
|
- for data in self.datas:
|
|
|
- fs = int(self.data_filter['sampling_frequency'].iloc[0])
|
|
|
- dt = 1 / fs
|
|
|
- time_stamp = data['time_stamp'][0]
|
|
|
- data = np.array(ast.literal_eval(data['mesure_data'][0]))
|
|
|
-
|
|
|
- velocity = _integrate(data, dt)
|
|
|
- velocity_rms = np.sqrt(np.mean(velocity ** 2))
|
|
|
- stats = _calculate_stats(data)
|
|
|
- # 速度有效值
|
|
|
- stats["velocity_rms"] = round(velocity_rms, 2)
|
|
|
- # 时间戳
|
|
|
- stats["time_stamp"] = str(time_stamp)
|
|
|
- all_stats.append(stats)
|
|
|
- all_stats = [self.replace_nan(stats) for stats in all_stats]
|
|
|
- all_stats = json.dumps(all_stats, ensure_ascii=False)
|
|
|
- return all_stats
|
|
|
+ # 3. 按 ID 分组并应用统计计算
|
|
|
+ stats = combined_df.groupby('id').apply(calculate_stats).tolist()
|
|
|
|
|
|
+ # 4. 返回 JSON 格式结果
|
|
|
+ return json.dumps(stats, ensure_ascii=False)
|
|
|
+
|
|
|
def Characteristic_Frequency(self):
|
|
|
"""提取轴承、齿轮等参数"""
|
|
|
# 1、从测点名称中提取部件名称(计算特征频率的部件)
|