Kaynağa Gözat

修改CMS索引越界问题

Xmia 2 ay önce
ebeveyn
işleme
bc5840dda8
1 değiştirilmiş dosya ile 508 ekleme ve 501 silme
  1. 508 501
      cms_class_20241201.py

+ 508 - 501
cms_class_20241201.py

@@ -59,62 +59,80 @@ df_data = []
 
 # %%
 
-# 主要的类
 class CMSAnalyst:
     def __init__(self, fmin, fmax, table_name, ids):
-        self.table_name =table_name
+        """
+        table_name: 当前代码实际传入的是 windcode(例如 SKF001),内部会拼 _wave
+        ids: [id1, id2, ...]
+        """
+        self.table_name = table_name
         self.ids = ids
-        # envelope_spectrum_analysis
-        # datas是[df1,df2,.....]
-        #从数据库获取原始数据
-        self.datas = self._get_by_id(table_name,ids)
+
+        # 1) 从数据库获取原始数据(按 id 分组)
+        self.datas = self._get_by_id(table_name, ids)
+        if not self.datas:
+            raise ValueError(f"[ERROR] 未查到任何数据 ids={ids}")
+
+        # 2) 只保留需要字段
         self.datas = [
-            df[['id', 'mesure_data', 'time_stamp', 'sampling_frequency', 
+            df[['id', 'mesure_data', 'time_stamp', 'sampling_frequency',
                 'wind_turbine_number', 'rotational_speed', 'mesure_point_name']]
-            for df in self.datas
-        ]       
+            for df in self.datas if df is not None and not df.empty
+        ]
+        if not self.datas:
+            raise ValueError("[ERROR] 分组后 DataFrame 全为空")
 
-        # 只输入一个id,返回一个[df],所以拿到self.data[0]
+        # 3) 单 id 情况:取第一个 df
         self.data_filter = self.datas[0]
-        # print("mesure_data sample:", self.data_filter['mesure_data'].iloc[0])  # 打印第一条数据
-        self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'].iloc[0]))        
-        # print(self.data_filter)
-        # 取数据列
-        self.data = np.array(ast.literal_eval(self.data_filter['mesure_data'][0]))
+        if self.data_filter.empty:
+            raise ValueError("[ERROR] data_filter 为空,无法读取数据")
+
+        # 4) 解析 mesure_data
+        raw_md = self.data_filter['mesure_data'].iloc[0]
+        self.data = self._parse_mesure_data(raw_md)
+        if self.data is None or len(self.data) == 0:
+            raise ValueError("[ERROR] mesure_data 解析失败或为空")
+
+        self.data = np.asarray(self.data, dtype=float)
+
+        # 5) 采样频率
+        self.fs = int(self.data_filter['sampling_frequency'].iloc[0])
+
+        # 6) 分析参数
         self.envelope_spectrum_m = self.data.shape[0]
         self.envelope_spectrum_n = 1
-        #设置分析参数
-        self.fs = int(self.data_filter['sampling_frequency'].iloc[0])
         self.envelope_spectrum_t = np.arange(self.envelope_spectrum_m) / self.fs
-        self.fmin = fmin if fmin is not None else 0 
-        self.fmax = fmax if fmax is not None else float('inf') 
-        self.envelope_spectrum_y = self._bandpass_filter(self.data)
-        self.f, self.HP = self._calculate_envelope_spectrum(self.envelope_spectrum_y)
-        #设备信息
+        self.fmin = fmin if fmin is not None else 0
+        self.fmax = fmax if fmax is not None else float('inf')
+
+        # 7) 设备信息
         self.wind_code = self.data_filter['wind_turbine_number'].iloc[0]
         self.rpm_Gen = self.data_filter['rotational_speed'].iloc[0]
         self.mesure_point_name = self.data_filter['mesure_point_name'].iloc[0]
-        self.fn_Gen = round(self.rpm_Gen/60,2)
-
-        self.CF = self.Characteristic_Frequency()
-        print(self.CF)
-        self.CF = pd.DataFrame(self.CF,index=[0])
-        print(self.CF)
-        print(self.rpm_Gen)
-        #if self.CF['type'].iloc[0] == 'bearing':
+        self.fn_Gen = round(self.rpm_Gen / 60, 2)
+
+        # 8) 包络谱预计算
+        self.envelope_spectrum_y = self._bandpass_filter(self.data)
+        self.f, self.HP = self._calculate_envelope_spectrum(self.envelope_spectrum_y)
+
+        # 9) 特征频率 & 轴承频率
+        cf_dict = self.Characteristic_Frequency()
+        self.CF = pd.DataFrame([cf_dict])
+
         n_rolls_m = self.CF['n_rolls'].iloc[0]
         d_rolls_m = self.CF['d_rolls'].iloc[0]
         D_diameter_m = self.CF['D_diameter'].iloc[0]
         theta_deg_m = self.CF['theta_deg'].iloc[0]
-        print(n_rolls_m)
-        print(d_rolls_m)
-        print(D_diameter_m)
-        print(theta_deg_m)
-        self.bearing_frequencies = self.calculate_bearing_frequencies(n_rolls_m, d_rolls_m, D_diameter_m, theta_deg_m, self.rpm_Gen)
-        print(self.bearing_frequencies)
-        self.bearing_frequencies = pd.DataFrame(self.bearing_frequencies,index=[0])
-        print(self.bearing_frequencies)
-        # frequency_domain_analysis
+
+        if any(v is None for v in [n_rolls_m, d_rolls_m, D_diameter_m, theta_deg_m]):
+            self.bearing_frequencies = None
+        else:
+            self.bearing_frequencies = self.calculate_bearing_frequencies(
+                n_rolls_m, d_rolls_m, D_diameter_m, theta_deg_m, self.rpm_Gen
+            )
+            self.bearing_frequencies = pd.DataFrame([self.bearing_frequencies])
+
+        # 10) 频谱预计算
         (
             self.frequency_domain_analysis_t,
             self.frequency_domain_analysis_f,
@@ -123,301 +141,288 @@ class CMSAnalyst:
             self.frequency_domain_analysis_Xrms,
         ) = self._calculate_spectrum(self.data)
 
-        # time_domain_analysis
+        # 11) 时域时间轴
         self.time_domain_analysis_t = np.arange(self.data.shape[0]) / self.fs
-        
-    # def _get_by_id(self, windcode, ids):
-    #     df_res = []
-    #     #engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
-    #     engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')
-    #     for id in ids:
-    #         table_name=windcode+'_wave'
-    #         lastday_df_sql = f"SELECT * FROM {table_name} where id = {id} "
-    #         # print(lastday_df_sql)
-    #         df = pd.read_sql(lastday_df_sql, engine)
-    #         df_res.append(df)
-    #     return df_res
-    
-    # def _get_by_id(self, windcode, ids):
-    #     #engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')
-    #     engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
-    #     table_name = windcode + '_wave'
-    #     ids_str = ','.join(map(str, ids))
-    #     sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str}) ORDER BY time_stamp"
-    #     df = pd.read_sql(sql, engine)
-        
-    #     # 按ID分组返回
-    #     grouped = [group for _, group in df.groupby('id')]
-    #     return grouped        
+
+    # ==========================================================
+    # 工具:解析 mesure_data(兼容 json 字符串 / python list 字符串 / list)
+    # ==========================================================
+    def _parse_mesure_data(self, raw):
+        if raw is None:
+            return None
+
+        # 已经是 list/np.array
+        if isinstance(raw, (list, tuple, np.ndarray)):
+            return list(raw)
+
+        # 字符串:可能是 JSON 或 python list 字符串
+        if isinstance(raw, str):
+            s = raw.strip()
+            # 优先 json.loads(如果是标准 JSON)
+            try:
+                v = json.loads(s)
+                if isinstance(v, list):
+                    return v
+            except Exception:
+                pass
+
+            # 再尝试 ast.literal_eval(如果是 python 格式 list)
+            try:
+                v = ast.literal_eval(s)
+                if isinstance(v, (list, tuple, np.ndarray)):
+                    return list(v)
+            except Exception:
+                return None
+
+        return None
+
+    # ==========================================================
+    # DB:按 id 拉取波形
+    # ==========================================================
     def _get_by_id(self, windcode, ids):
         engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
-        table_name = windcode + '_wave'
+        table_name = f"{windcode}_wave"
         ids_str = ','.join(map(str, ids))
         sql = f"SELECT * FROM {table_name} WHERE id IN ({ids_str}) ORDER BY time_stamp"
-        print("Executing SQL:", sql)  # 打印 SQL
         df = pd.read_sql(sql, engine)
-        print("Returned DataFrame shape:", df.shape)  # 检查返回的数据量
+        if df.empty:
+            return []
         grouped = [group for _, group in df.groupby('id')]
         return grouped
 
-    # envelope_spectrum_analysis 包络谱分析
+    # ==========================================================
+    # 包络谱:带通滤波(FFT 截频)
+    # ==========================================================
     def _bandpass_filter(self, data):
-        """带通滤波"""
-        
-        m= data.shape[0]
-        ni = round(self.fmin * self.envelope_spectrum_m / self.fs + 1)
-        # na = round(self.fmax * self.envelope_spectrum_m / self.fs + 1)
+        m = data.shape[0]
+
+        # index 保护:fmin/fmax 可能超范围
+        ni = int(round(self.fmin * m / self.fs + 1))
+        ni = max(0, min(ni, m))
+
         if self.fmax == float('inf'):
             na = m
         else:
-            na = round(self.fmax * m / self.fs + 1)
-        col = 1
-        y = np.zeros((self.envelope_spectrum_m, col))
-        # for p in range(col):
-        #     print(data.shape,p)
+            na = int(round(self.fmax * m / self.fs + 1))
+            na = max(0, min(na, m))
+
+        if na <= ni:
+            # 退化情况:不做滤波
+            y = np.zeros((m, 1))
+            y[:, 0] = data
+            return y
+
         z = np.fft.fft(data)
-        a = np.zeros(self.envelope_spectrum_m, dtype=complex)
+        a = np.zeros(m, dtype=complex)
+
         a[ni:na] = z[ni:na]
-        a[self.envelope_spectrum_m - na + 1 : self.envelope_spectrum_m - ni + 1] = z[
-            self.envelope_spectrum_m - na + 1 : self.envelope_spectrum_m - ni + 1
-        ]
-        z = np.fft.ifft(a)
-        y[:, 0] = np.real(z)
+        # 对称频段
+        a[m - na + 1: m - ni + 1] = z[m - na + 1: m - ni + 1]
 
+        x_ifft = np.fft.ifft(a)
+        y = np.zeros((m, 1))
+        y[:, 0] = np.real(x_ifft)
         return y
 
     def _calculate_envelope_spectrum(self, y):
-        """计算包络谱"""
         m, n = y.shape
         HP = np.zeros((m, n))
-        col = 1
-        for p in range(col):
+        for p in range(n):
             H = np.abs(hilbert(y[:, p] - np.mean(y[:, p])))
             HP[:, p] = np.abs(np.fft.fft(H - np.mean(H))) * 2 / m
         f = np.fft.fftfreq(m, d=1 / self.fs)
         return f, HP
 
-
     def envelope_spectrum(self):
-        """绘制包络谱"""
-        # 只取正频率部分
         positive_frequencies = self.f[: self.envelope_spectrum_m // 2]
         positive_HP = self.HP[: self.envelope_spectrum_m // 2, 0]
 
         x = positive_frequencies
         y = positive_HP
-        title = "包络谱"
-        xaxis = "频率(Hz)"
-        yaxis = "加速度(m/s^2)"
-        Xrms = np.sqrt(np.mean(y**2))  # 加速度均方根值(有效值)
-        rpm_Gen = round(self.rpm_Gen, 2)
-        BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2)
-        BPFO_1X = round(self.bearing_frequencies['BPFO'].iloc[0], 2)
-        BSF_1X = round(self.bearing_frequencies['BSF'].iloc[0], 2)
-        FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2)
-        fn_Gen = round(self.fn_Gen, 2)
-        _3P_1X = round(self.fn_Gen, 2) * 3
-
-        # if self.CF['type'].iloc[0] == 'bearing':
-        result = {
-            "fs":self.fs,
-            "Xrms":round(Xrms, 2),
-            "x":list(x),
-            "y":list(y),
-            "title":title,
-            "xaxis":xaxis,
-            "yaxis":yaxis,
-            "rpm_Gen": round(rpm_Gen, 2),  # 转速r/min
-            "BPFI":  [{"Xaxis": BPFI_1X ,"val": "1BPFI"},{"Xaxis": BPFI_1X*2 ,"val": "2BPFI"},
-                        {"Xaxis": BPFI_1X*3, "val": "3BPFI"}, {"Xaxis": BPFI_1X*4, "val": "4BPFI"},
-                        {"Xaxis": BPFI_1X*5, "val": "5BPFI"}, {"Xaxis": BPFI_1X*6, "val": "6BPFI"}],
-            "BPFO": [{"Xaxis": BPFO_1X ,"val": "1BPFO"},{"Xaxis": BPFO_1X*2 ,"val": "2BPFO"},
-                        {"Xaxis": BPFO_1X*3, "val": "3BPFO"}, {"Xaxis": BPFO_1X*4, "val": "4BPFO"},
-                        {"Xaxis": BPFO_1X*5, "val": "5BPFO"}, {"Xaxis": BPFO_1X*6, "val": "6BPFO"}],
-            "BSF": [{"Xaxis": BSF_1X ,"val": "1BSF"},{"Xaxis": BSF_1X*2 ,"val": "2BSF"},
-                        {"Xaxis": BSF_1X*3, "val": "3BSF"}, {"Xaxis": BSF_1X*4, "val": "4BSF"},
-                        {"Xaxis": BSF_1X*5, "val": "5BSF"}, {"Xaxis": BSF_1X*6, "val": "6BSF"}],
-            "FTF": [{"Xaxis": FTF_1X ,"val": "1FTF"},{"Xaxis": FTF_1X*2 ,"val": "2FTF"},
-                        {"Xaxis": FTF_1X*3, "val": "3FTF"}, {"Xaxis": FTF_1X*4, "val": "4FTF"},
-                        {"Xaxis": FTF_1X*5, "val": "5FTF"}, {"Xaxis": FTF_1X*6, "val": "6FTF"}],
-            "fn_Gen":[{"Xaxis": fn_Gen ,"val": "1X"},{"Xaxis": fn_Gen*2 ,"val": "2X"},
-                        {"Xaxis": fn_Gen*3, "val": "3X"}, {"Xaxis": fn_Gen*4, "val": "4X"},
-                        {"Xaxis": fn_Gen*5, "val": "5X"}, {"Xaxis": fn_Gen*6, "val": "6X"}],
-            "B3P":_3P_1X,
-        }
-        # result = json.dumps(result, ensure_ascii=False)
-        result = self.replace_nan(result)
 
-        return result
+        Xrms = float(np.sqrt(np.mean(y ** 2)))
+        rpm_Gen = round(float(self.rpm_Gen), 2)
 
+        if self.bearing_frequencies is None:
+            BPFI_1X = BPFO_1X = BSF_1X = FTF_1X = None
+        else:
+            BPFI_1X = round(float(self.bearing_frequencies['BPFI'].iloc[0]), 2)
+            BPFO_1X = round(float(self.bearing_frequencies['BPFO'].iloc[0]), 2)
+            BSF_1X = round(float(self.bearing_frequencies['BSF'].iloc[0]), 2)
+            FTF_1X = round(float(self.bearing_frequencies['FTF'].iloc[0]), 2)
+
+        fn_Gen = round(float(self.fn_Gen), 2)
+        _3P_1X = fn_Gen * 3
 
-    # frequency_domain_analysis 频谱分析
+        result = {
+            "fs": int(self.fs),
+            "Xrms": round(Xrms, 2),
+            "x": list(x),
+            "y": list(y),
+            "title": "包络谱",
+            "xaxis": "频率(Hz)",
+            "yaxis": "加速度(m/s^2)",
+            "rpm_Gen": rpm_Gen,
+            "BPFI": [{"Xaxis": (None if BPFI_1X is None else BPFI_1X * k), "val": f"{k}BPFI"} for k in range(1, 7)],
+            "BPFO": [{"Xaxis": (None if BPFO_1X is None else BPFO_1X * k), "val": f"{k}BPFO"} for k in range(1, 7)],
+            "BSF":  [{"Xaxis": (None if BSF_1X is None else BSF_1X * k),  "val": f"{k}BSF"}  for k in range(1, 7)],
+            "FTF":  [{"Xaxis": (None if FTF_1X is None else FTF_1X * k),  "val": f"{k}FTF"}  for k in range(1, 7)],
+            "fn_Gen": [{"Xaxis": fn_Gen * k, "val": f"{k}X"} for k in range(1, 7)],
+            "B3P": _3P_1X,
+        }
 
+        return self.replace_nan(result)
+
+    # ==========================================================
+    # 频谱
+    # ==========================================================
     def _calculate_spectrum(self, data):
-        """计算频谱"""
         m = data.shape[0]
-        n = 1
         t = np.arange(m) / self.fs
-        mag = np.zeros((m, n))
-        Xrms = np.sqrt(np.mean(data**2))  # 加速度均方根值(有效值)
-        # col=1
-        # for p in range(col):
         mag = np.abs(np.fft.fft(data - np.mean(data))) * 2 / m
         f = np.fft.fftfreq(m, d=1 / self.fs)
+        Xrms = float(np.sqrt(np.mean(data ** 2)))
         return t, f, m, mag, Xrms
 
     def frequency_domain(self):
-        """绘制频域波形参数"""
-        # 只取正频率部分
-        positive_frequencies = self.frequency_domain_analysis_f[
-            : self.frequency_domain_analysis_m // 2
-        ]
-        positive_mag = self.frequency_domain_analysis_mag[
-            : self.frequency_domain_analysis_m // 2
-        ]
+        positive_frequencies = self.frequency_domain_analysis_f[: self.frequency_domain_analysis_m // 2]
+        positive_mag = self.frequency_domain_analysis_mag[: self.frequency_domain_analysis_m // 2]
 
         x = positive_frequencies
         y = positive_mag
-        title = "频域信号"
-        xaxis = "频率(Hz)"
-        yaxis = "加速度(m/s^2)"
-        Xrms = self.frequency_domain_analysis_Xrms
-
-        rpm_Gen = round(self.rpm_Gen, 2)
-        BPFI_1X = round(self.bearing_frequencies['BPFI'].iloc[0], 2)
-        BPFO_1X =  round(self.bearing_frequencies['BPFO'].iloc[0], 2)
-        BSF_1X =  round(self.bearing_frequencies['BSF'].iloc[0], 2)
-        FTF_1X = round(self.bearing_frequencies['FTF'].iloc[0], 2)
-        fn_Gen = round(self.fn_Gen, 2)
-        _3P_1X = round(self.fn_Gen, 2) * 3
-
-        # if self.CF['type'].iloc[0] == 'bearing':
+
+        Xrms = float(self.frequency_domain_analysis_Xrms)
+        rpm_Gen = round(float(self.rpm_Gen), 2)
+
+        if self.bearing_frequencies is None:
+            BPFI_1X = BPFO_1X = BSF_1X = FTF_1X = None
+        else:
+            BPFI_1X = round(float(self.bearing_frequencies['BPFI'].iloc[0]), 2)
+            BPFO_1X = round(float(self.bearing_frequencies['BPFO'].iloc[0]), 2)
+            BSF_1X = round(float(self.bearing_frequencies['BSF'].iloc[0]), 2)
+            FTF_1X = round(float(self.bearing_frequencies['FTF'].iloc[0]), 2)
+
+        fn_Gen = round(float(self.fn_Gen), 2)
+        _3P_1X = fn_Gen * 3
+
         result = {
-            "fs":self.fs,
-            "Xrms":round(Xrms, 2),
-            "x":list(x),
-            "y":list(y),
-            "title":title,
-            "xaxis":xaxis,
-            "yaxis":yaxis,
-            "rpm_Gen": round(rpm_Gen, 2),  # 转速r/min
-            "BPFI":  [{"Xaxis": BPFI_1X ,"val": "1BPFI"},{"Xaxis": BPFI_1X*2 ,"val": "2BPFI"},
-                        {"Xaxis": BPFI_1X*3, "val": "3BPFI"}, {"Xaxis": BPFI_1X*4, "val": "4BPFI"},
-                        {"Xaxis": BPFI_1X*5, "val": "5BPFI"}, {"Xaxis": BPFI_1X*6, "val": "6BPFI"}],
-            "BPFO": [{"Xaxis": BPFO_1X ,"val": "1BPFO"},{"Xaxis": BPFO_1X*2 ,"val": "2BPFO"},
-                        {"Xaxis": BPFO_1X*3, "val": "3BPFO"}, {"Xaxis": BPFO_1X*4, "val": "4BPFO"},
-                        {"Xaxis": BPFO_1X*5, "val": "5BPFO"}, {"Xaxis": BPFO_1X*6, "val": "6BPFO"}],
-            "BSF": [{"Xaxis": BSF_1X ,"val": "1BSF"},{"Xaxis": BSF_1X*2 ,"val": "2BSF"},
-                        {"Xaxis": BSF_1X*3, "val": "3BSF"}, {"Xaxis": BSF_1X*4, "val": "4BSF"},
-                        {"Xaxis": BSF_1X*5, "val": "5BSF"}, {"Xaxis": BSF_1X*6, "val": "6BSF"}],
-            "FTF": [{"Xaxis": FTF_1X ,"val": "1FTF"},{"Xaxis": FTF_1X*2 ,"val": "2FTF"},
-                        {"Xaxis": FTF_1X*3, "val": "3FTF"}, {"Xaxis": FTF_1X*4, "val": "4FTF"},
-                        {"Xaxis": FTF_1X*5, "val": "5FTF"}, {"Xaxis": FTF_1X*6, "val": "6FTF"}],
-            "fn_Gen":[{"Xaxis": fn_Gen ,"val": "1X"},{"Xaxis": fn_Gen*2 ,"val": "2X"},
-                        {"Xaxis": fn_Gen*3, "val": "3X"}, {"Xaxis": fn_Gen*4, "val": "4X"},
-                        {"Xaxis": fn_Gen*5, "val": "5X"}, {"Xaxis": fn_Gen*6, "val": "6X"}],
-            "B3P":_3P_1X,
+            "fs": int(self.fs),
+            "Xrms": round(Xrms, 2),
+            "x": list(x),
+            "y": list(y),
+            "title": "频域信号",
+            "xaxis": "频率(Hz)",
+            "yaxis": "加速度(m/s^2)",
+            "rpm_Gen": rpm_Gen,
+            "BPFI": [{"Xaxis": (None if BPFI_1X is None else BPFI_1X * k), "val": f"{k}BPFI"} for k in range(1, 7)],
+            "BPFO": [{"Xaxis": (None if BPFO_1X is None else BPFO_1X * k), "val": f"{k}BPFO"} for k in range(1, 7)],
+            "BSF":  [{"Xaxis": (None if BSF_1X is None else BSF_1X * k),  "val": f"{k}BSF"}  for k in range(1, 7)],
+            "FTF":  [{"Xaxis": (None if FTF_1X is None else FTF_1X * k),  "val": f"{k}FTF"}  for k in range(1, 7)],
+            "fn_Gen": [{"Xaxis": fn_Gen * k, "val": f"{k}X"} for k in range(1, 7)],
+            "B3P": _3P_1X,
         }
-        result = self.replace_nan(result)       
-        result = json.dumps(result, ensure_ascii=False)
-        return result
 
-    # time_domain_analysis 时域分析
+        result = self.replace_nan(result)
+        return json.dumps(result, ensure_ascii=False)
+
+    # ==========================================================
+    # 时域
+    # ==========================================================
     def time_domain(self):
-        """绘制时域波形参数"""        
         x = self.time_domain_analysis_t
         y = self.data
-        rpm_Gen =self.rpm_Gen
-        title = "时间域信号"
-        xaxis = "时间(s)"
-        yaxis = "加速度(m/s^2)"
-        # 图片右侧统计量
-        mean_value = np.mean(y)#平均值
-        max_value = np.max(y)#最大值
-        min_value = np.min(y)#最小值
-        Xrms = np.sqrt(np.mean(y**2))  # 加速度均方根值(有效值)
-        Xp = (max_value - min_value) / 2  # 峰值(单峰最大值) # 峰值
-        Xpp=max_value-min_value#峰峰值
-        Cf = Xp / Xrms  # 峰值指标
-        Sf = Xrms / mean_value  # 波形指标
-        If = Xp / np.mean(np.abs(y))  # 脉冲指标
-        Xr = np.mean(np.sqrt(np.abs(y))) ** 2  # 方根幅值
-        Ce = Xp / Xr  # 裕度指标
-        # 计算每个数据点的绝对值减去均值后的三次方,并求和
-        sum_abs_diff_cubed_3 = np.mean((np.abs(y) - mean_value) ** 3)
-        # 计算偏度指标
-        Cw = sum_abs_diff_cubed_3 / (Xrms**3)
-        # 计算每个数据点的绝对值减去均值后的四次方,并求和
-        sum_abs_diff_cubed_4 = np.mean((np.abs(y) - mean_value) ** 4)
-        # 计算峭度指标
-        Cq = sum_abs_diff_cubed_4 / (Xrms**4)
-        result = {
-                 
-            "x":list(x),
-            "y":list(y),
-            "title":title,
-            "xaxis":xaxis,
-            "yaxis":yaxis,
-            "fs":self.fs,
-            "Xrms":round(Xrms, 2),#有效值
-            "mean_value":round(mean_value, 2),# 均值 
-            "max_value":round(max_value, 2),# 最大值 
-            "min_value":round(min_value, 2),  # 最小值          
-            "Xp":round(Xp, 2),# 峰值
-            "Xpp":round(Xpp, 2),# 峰峰值
-            "Cf":round(Cf, 2),# 峰值指标
-            "Sf":round(Sf, 2),# 波形因子
-            "If":round(If, 2),# 脉冲指标
-            "Ce":round(Ce, 2),# 裕度指标
-            "Cw":round(Cw, 2) ,# 偏度指标
-            "Cq":round(Cq, 2) ,# 峭度指标
-            "rpm_Gen": round(rpm_Gen, 2),  # 转速r/min
 
+        mean_value = float(np.mean(y))
+        max_value = float(np.max(y))
+        min_value = float(np.min(y))
+        Xrms = float(np.sqrt(np.mean(y ** 2)))
+
+        Xp = float((max_value - min_value) / 2)
+        Xpp = float(max_value - min_value)
+
+        Cf = (Xp / Xrms) if Xrms != 0 else 0.0
+        Sf = (Xrms / mean_value) if mean_value != 0 else 0.0
+        If = (Xp / float(np.mean(np.abs(y)))) if np.mean(np.abs(y)) != 0 else 0.0
+
+        Xr = float(np.mean(np.sqrt(np.abs(y))) ** 2)
+        Ce = (Xp / Xr) if Xr != 0 else 0.0
+
+        # 偏度/峭度
+        Cw = float(np.mean((np.abs(y) - mean_value) ** 3) / (Xrms ** 3)) if Xrms != 0 else 0.0
+        Cq = float(np.mean((np.abs(y) - mean_value) ** 4) / (Xrms ** 4)) if Xrms != 0 else 0.0
+
+        result = {
+            "x": list(x),
+            "y": list(y),
+            "title": "时间域信号",
+            "xaxis": "时间(s)",
+            "yaxis": "加速度(m/s^2)",
+            "fs": int(self.fs),
+            "Xrms": round(Xrms, 2),
+            "mean_value": round(mean_value, 2),
+            "max_value": round(max_value, 2),
+            "min_value": round(min_value, 2),
+            "Xp": round(Xp, 2),
+            "Xpp": round(Xpp, 2),
+            "Cf": round(Cf, 2),
+            "Sf": round(Sf, 2),
+            "If": round(If, 2),
+            "Ce": round(Ce, 2),
+            "Cw": round(Cw, 2),
+            "Cq": round(Cq, 2),
+            "rpm_Gen": round(float(self.rpm_Gen), 2),
         }
-        result = self.replace_nan(result)
-        result = json.dumps(result, ensure_ascii=False)
 
-        return result
+        result = self.replace_nan(result)
+        return json.dumps(result, ensure_ascii=False)
 
+    # ==========================================================
+    # 趋势分析(按id统计一条数据,取每个id的第一条)
+    # ==========================================================
     def trend_analysis(self) -> str:
-        """
-        优化后的趋势分析方法(向量化计算统计指标)
-        返回 JSON 字符串,包含所有时间点的统计结果。
-        """
-        for df in self.datas:
-            df['parsed_data'] = df['mesure_data'].apply(json.loads)            
-        # 1. 合并所有数据并解析 mesure_data
-        combined_df = pd.concat(self.datas)
-        
-
-        combined_df['parsed_data'] = combined_df['mesure_data'].apply(json.loads)  # 批量解析 JSON
-        
-        # 2. 向量化计算统计指标(避免逐行循环)
-        def calculate_stats(group: pd.DataFrame) -> Dict[str, Any]:
-            data = np.array(group['parsed_data'].iloc[0])  # 提取振动数据数组
-            fs = int(group['sampling_frequency'].iloc[0])  # 采样频率
-            dt = 1 / fs  # 时间间隔
-
-            # 计算时域指标(向量化操作)
-            mean = np.mean(data)
-            max_val = np.max(data)
-            min_val = np.min(data)
-            Xrms = np.sqrt(np.mean(data**2))
-            Xp = (max_val - min_val) / 2
-            Cf = Xp / Xrms
-            Sf = Xrms / mean if mean != 0 else 0
-            If = Xp / np.mean(np.abs(data))
-            Xr = np.mean(np.sqrt(np.abs(data))) ** 2
-            Ce = Xp / Xr
-            
-            # 计算偏度和峭度
-
- 
-            # 计算速度有效值
+        combined_df = pd.concat(self.datas, ignore_index=True)
+        if combined_df.empty:
+            return json.dumps([], ensure_ascii=False)
+
+        # 统一解析 mesure_data
+        def parse_cell(x):
+            v = self._parse_mesure_data(x)
+            return v
+
+        combined_df['parsed_data'] = combined_df['mesure_data'].apply(parse_cell)
+
+        def calculate_stats(group: pd.DataFrame) -> Optional[Dict[str, Any]]:
+            if group.empty:
+                return None
+
+            arr = group['parsed_data'].iloc[0]
+            if arr is None or len(arr) == 0:
+                return None
+
+            data = np.asarray(arr, dtype=float)
+            fs = int(group['sampling_frequency'].iloc[0])
+            dt = 1 / fs
+
+            mean = float(np.mean(data))
+            max_val = float(np.max(data))
+            min_val = float(np.min(data))
+            Xrms = float(np.sqrt(np.mean(data ** 2)))
+
+            Xp = float((max_val - min_val) / 2)
+            Cf = (Xp / Xrms) if Xrms != 0 else 0.0
+            Sf = (Xrms / mean) if mean != 0 else 0.0
+            If = (Xp / float(np.mean(np.abs(data)))) if np.mean(np.abs(data)) != 0 else 0.0
+
+            Xr = float(np.mean(np.sqrt(np.abs(data))) ** 2)
+            Ce = (Xp / Xr) if Xr != 0 else 0.0
+
+            # 速度有效值
             velocity = np.cumsum(data) * dt
-            velocity_rms = np.sqrt(np.mean(velocity**2))
-            Cw = np.mean((data - mean) ** 3) / (Xrms ** 3) if Xrms != 0 else 0
-            Cq = np.mean((data - mean) ** 4) / (Xrms ** 4) if Xrms != 0 else 0                     
+            velocity_rms = float(np.sqrt(np.mean(velocity ** 2)))
+
+            Cw = float(np.mean((data - mean) ** 3) / (Xrms ** 3)) if Xrms != 0 else 0.0
+            Cq = float(np.mean((data - mean) ** 4) / (Xrms ** 4)) if Xrms != 0 else 0.0
+
             return {
                 "fs": fs,
                 "Mean": round(mean, 2),
@@ -432,263 +437,265 @@ class CMSAnalyst:
                 "Cw": round(Cw, 2),
                 "Cq": round(Cq, 2),
                 "velocity_rms": round(velocity_rms, 2),
-                "time_stamp": str(group['time_stamp'].iloc[0])
+                "time_stamp": str(group['time_stamp'].iloc[0]),
+                "id": int(group['id'].iloc[0]),
             }
-        
- 
-        # 3. 按 ID 分组并应用统计计算
-        stats = combined_df.groupby('id').apply(calculate_stats).tolist()
 
-        # 4. 返回 JSON 格式结果
-        return json.dumps(stats, ensure_ascii=False) 
+        stats = [
+            s for s in combined_df.groupby('id', sort=True).apply(calculate_stats).tolist()
+            if s is not None
+        ]
 
+        stats = self.replace_nan(stats)
+        return json.dumps(stats, ensure_ascii=False)
 
+    # ==========================================================
+    # 特征频率
+    # ==========================================================
     def Characteristic_Frequency(self):
-        """提取轴承、齿轮等参数"""
-        str1 = self.mesure_point_name
-        print(str1)
-        # 2、连接233的数据库'energy_show',从表'wind_engine_group'查找风机编号'engine_code'对应的机型编号'mill_type_code'
-        engine_code = self.wind_code
-        print(engine_code)
+        """
+        目标:拿到 _brand/_model -> unit_dict_brand_model -> rolls_number 等参数
+        任意失败:返回 None 字段(不会炸)
+        """
+        def empty_result():
+            return {
+                "type": "bearing",
+                "n_rolls": None,
+                "d_rolls": None,
+                "D_diameter": None,
+                "theta_deg": None,
+            }
+
+        str1 = str(self.mesure_point_name or "")
+        engine_code = str(self.wind_code or "")
+
         Engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
-        #Engine = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')        
-        # df_sql2 = f"SELECT * FROM {'wind_engine_group'} where engine_code = {'engine_code'} "
-        df_sql2 = f"SELECT * FROM wind_engine_group WHERE engine_code = '{engine_code}'"
-        df2 = pd.read_sql(df_sql2, Engine)
+
+        df2 = pd.read_sql(
+            f"SELECT * FROM wind_engine_group WHERE engine_code = '{engine_code}'",
+            Engine
+        )
+        if df2.empty or 'mill_type_code' not in df2.columns:
+            return empty_result()
+
         mill_type_code = df2['mill_type_code'].iloc[0]
-        print(mill_type_code)
+        _brand = None
+        _model = None
 
-        # # 3、从表'unit_bearings'中通过机型编号'mill_type_code'查找部件'brand'、'model'的参数信息
-        # 3、从相关的表中通过机型编号'mill_type_code'或者齿轮箱编号gearbox_code查找部件'brand'、'model'的参数信息
-        #unit_bearings主轴承参数表 关键词"main_bearing"
+        # --------------------------
+        # main_bearing
+        # --------------------------
         if 'main_bearing' in str1:
-            print("main_bearing")
-            df_sql3 = f"SELECT * FROM unit_bearings WHERE mill_type_code = '{mill_type_code}' "            
-            df3 = pd.read_sql(df_sql3, Engine)
+            df3 = pd.read_sql(
+                f"SELECT * FROM unit_bearings WHERE mill_type_code = '{mill_type_code}'",
+                Engine
+            )
             if df3.empty:
-                print("警告: 没有找到有效的机型信息")   
+                return empty_result()
+
+            # front/rear/自动选择
             if 'front' in str1:
-                brand = 'front_bearing' + '_brand'  
-                model = 'front_bearing' + '_model'  
-                front_has_value = not pd.isna(df3[brand].iloc[0]) and not pd.isna(df3[model].iloc[0]) 
-                if not  front_has_value:
-                    print("警告: 没有找到有效的品牌信息")
+                brand_col = 'front_bearing_brand'
+                model_col = 'front_bearing_model'
             elif 'rear' in str1:
-                brand = 'rear_bearing' + '_brand'  
-                model = 'rear_bearing' + '_model'  
-                end_has_value = not pd.isna(df3[brand].iloc[0]) and not pd.isna(df3[model].iloc[0])       
-                if not  end_has_value:
-                    print("警告: 没有找到有效的品牌信息")                
+                brand_col = 'rear_bearing_brand'
+                model_col = 'rear_bearing_model'
             else:
-                # 当没有指定 front 或 end 时,自动选择有值的轴承信息
-                front_brand_col = 'front_bearing_brand'
-                front_model_col = 'front_bearing_model'
-                rear_brand_col = 'rear_bearing_brand'
-                rear_model_col = 'rear_bearing_model'
-                # 检查 front_bearing 是否有值
-                front_has_value = not pd.isna(df3[front_brand_col].iloc[0]) and not pd.isna(df3[front_model_col].iloc[0])                
-                # 检查 end_bearing 是否有值
-                end_has_value = not pd.isna(df3[rear_brand_col].iloc[0]) and not pd.isna(df3[rear_model_col].iloc[0])               
-                # 根据检查结果选择合适的列
-                if front_has_value and end_has_value:
-                    # 如果两者都有值,默认选择 front
-                    brand = front_brand_col
-                    model = front_model_col
-                elif front_has_value:
-                    brand = front_brand_col
-                    model = front_model_col
-                elif end_has_value:
-                    brand = rear_brand_col
-                    model = rear_model_col
-                else:
-                    # 如果两者都没有有效值,设置默认值或抛出异常
-                    print("警告: 没有找到有效的轴承信息")
-                    brand = front_brand_col  # 默认使用 front
-                    model = front_model_col  # 默认使用 front
-            print(brand)
-            _brand = df3[brand].iloc[0]
-            _model = df3[model].iloc[0]
-            print(_brand)
-            print(_model)     
-        #unit_dynamo 发电机参数表 关键词generator stator
-        elif 'generator'in str1 or 'stator' in str1:
-            print("generator or 'stator'")
-            # df_sql3 = f"SELECT * FROM {'unit_dynamo'} where mill_type_code = {'mill_type_code'} "
-            df_sql3 = f"SELECT * FROM unit_dynamo WHERE mill_type_code = '{mill_type_code}' "
-            df3 = pd.read_sql(df_sql3, Engine)
+                candidates = [
+                    ('front_bearing_brand', 'front_bearing_model'),
+                    ('rear_bearing_brand', 'rear_bearing_model')
+                ]
+                brand_col = model_col = None
+                for b, m in candidates:
+                    if b in df3.columns and m in df3.columns:
+                        if pd.notna(df3[b].iloc[0]) and pd.notna(df3[m].iloc[0]):
+                            brand_col, model_col = b, m
+                            break
+                if brand_col is None:
+                    return empty_result()
+
+            if brand_col not in df3.columns or model_col not in df3.columns:
+                return empty_result()
+
+            _brand = df3[brand_col].iloc[0]
+            _model = df3[model_col].iloc[0]
+
+        # --------------------------
+        # generator / stator
+        # --------------------------
+        elif 'generator' in str1 or 'stator' in str1:
+            df3 = pd.read_sql(
+                f"SELECT * FROM unit_dynamo WHERE mill_type_code = '{mill_type_code}'",
+                Engine
+            )
+            if df3.empty:
+                return empty_result()
+
             if 'non' in str1:
-                brand = 'non_drive_end_bearing' + '_brand'  
-                model = 'non_drive_end_bearing' + '_model'  
+                brand_col = 'non_drive_end_bearing_brand'
+                model_col = 'non_drive_end_bearing_model'
             else:
-                brand = 'drive_end_bearing' + '_brand'  
-                model = 'drive_end_bearing' + '_model'              
-            print(brand)
-            _brand = df3[brand].iloc[0]
-            _model = df3[model].iloc[0]
-            print(_brand)
-            print(_model)       
-
-         #齿轮箱区分行星轮/平行轮 和 轴承两个表
+                brand_col = 'drive_end_bearing_brand'
+                model_col = 'drive_end_bearing_model'
+
+            if brand_col not in df3.columns or model_col not in df3.columns:
+                return empty_result()
+
+            _brand = df3[brand_col].iloc[0]
+            _model = df3[model_col].iloc[0]
+
+        # --------------------------
+        # gearbox
+        # --------------------------
         elif 'gearbox' in str1:
-            print("gearbox")
-            #根据mill_type_code从unit_gearbox表中获得gearbox_code
-            df_sql3 = f"SELECT * FROM unit_gearbox WHERE mill_type_code = '{mill_type_code}' "
-            df3 = pd.read_sql(df_sql3, Engine)
-            gearbox_code =df3['code'].iloc[0]
-            print(gearbox_code)         
-            #Engine33 = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')
-             #如果是行星轮/平行轮 则从unit_gearbox_structure 表中取数据
-            if 'planet'in str1 or 'sun' in str1:
-                print("'planet' or 'sun' ")
-                gearbox_structure =1 if 'planet'in str1 else 2
-                planetary_gear_grade =1
-                if 'first' in str1:
-                    planetary_gear_grade =1
-                elif 'second'in str1:
-                    planetary_gear_grade =2
-                elif 'third'in str1:
-                    planetary_gear_grade =3
-                # df_sql33 = f"SELECT * FROM unit_gearbox_structure WHERE gearbox_code = '{gearbox_code}' "
-                df_sql33 = f"""
-                SELECT bearing_brand, bearing_model 
-                FROM unit_gearbox_structure 
-                    WHERE gearbox_code = '{gearbox_code}' 
-                    AND gearbox_structure = '{gearbox_structure}'
-                    AND planetary_gear_grade = '{planetary_gear_grade}'
-                    """
-                df33 = pd.read_sql(df_sql33, Engine)
+            df3 = pd.read_sql(
+                f"SELECT * FROM unit_gearbox WHERE mill_type_code = '{mill_type_code}'",
+                Engine
+            )
+            if df3.empty or 'code' not in df3.columns:
+                return empty_result()
+
+            gearbox_code = df3['code'].iloc[0]
+
+            # 行星轮/太阳轮:unit_gearbox_structure
+            if ('planet' in str1) or ('sun' in str1):
+                gearbox_structure = 1 if 'planet' in str1 else 2
+                planetary_gear_grade = 1
+                if 'second' in str1:
+                    planetary_gear_grade = 2
+                elif 'third' in str1:
+                    planetary_gear_grade = 3
+
+                df33 = pd.read_sql(
+                    f"""
+                    SELECT bearing_brand, bearing_model
+                    FROM unit_gearbox_structure
+                    WHERE gearbox_code = '{gearbox_code}'
+                      AND gearbox_structure = '{gearbox_structure}'
+                      AND planetary_gear_grade = '{planetary_gear_grade}'
+                    """,
+                    Engine
+                )
                 if df33.empty:
-                    print("unit_gearbox_structure没有该测点的参数")
-                else:
-                    brand = 'bearing' + '_brand'  
-                    model = 'bearing' + '_model'  
-                    print(brand)
-                    _brand = df33[brand].iloc[0]
-                    _model = df33[model].iloc[0]
-                    has_value = not pd.isna(df33[brand].iloc[0]) and not pd.isna(df33[model].iloc[0])
-                    if has_value:   
-                        print(_brand)
-                        print(_model)   
-                    else:
-                        print("警告: 没有找到有效的轴承信息")
-             #如果是齿轮箱轴承 则从unit_gearbox_bearings 表中取数据
-            elif 'shaft' in str1 or'input' in str1:
-                print("'shaft'or'input'")
-                # df_sql33 = f"SELECT * FROM unit_gearbox_bearings WHERE gearbox_code = '{gearbox_code}' "
-                # df33 = pd.read_sql(df_sql33, Engine33)
-                #高速轴 低速中间轴 取bearing_rs/gs均可
-                parallel_wheel_grade=1
-                if 'low_speed' in str1:
-                    parallel_wheel_grade =1                
-                elif 'low_speed_intermediate' in str1:
-                    parallel_wheel_grade =2
+                    return empty_result()
+
+                if 'bearing_brand' not in df33.columns or 'bearing_model' not in df33.columns:
+                    return empty_result()
+
+                _brand = df33['bearing_brand'].iloc[0]
+                _model = df33['bearing_model'].iloc[0]
+
+            # 轴承:unit_gearbox_bearings
+            elif ('shaft' in str1) or ('input' in str1) or ('low_speed' in str1) or ('high_speed' in str1):
+                parallel_wheel_grade = 1
+                if 'low_speed_intermediate' in str1:
+                    parallel_wheel_grade = 2
                 elif 'high_speed' in str1:
-                    parallel_wheel_grade =3    
-                # df_sql33 = f"SELECT * FROM unit_gearbox_bearings WHERE gearbox_code = '{gearbox_code}' "
-                df_sql33 = f"""
-                SELECT bearing_rs_brand, bearing_rs_model, bearing_gs_brand, bearing_gs_model 
-                FROM unit_gearbox_bearings 
-                WHERE gearbox_code = '{gearbox_code}'
-                AND parallel_wheel_grade = '{parallel_wheel_grade}'
-                """
-                df33 = pd.read_sql(df_sql33, Engine)
-                if not df33.empty:
-                    if 'high_speed' in str1 or 'low_speed_intermediate' in str1:
-                        rs_brand = 'bearing_rs' + '_brand'  
-                        rs_model = 'bearing_rs' + '_model'  
-                        gs_brand = 'bearing_gs' + '_brand'  
-                        gs_model = 'bearing_gs' + '_model'  
-                        rs_has_value = not pd.isna(df33[rs_brand].iloc[0]) and not pd.isna(df33[rs_model].iloc[0])     
-                        gs_has_value = not pd.isna(df33[gs_brand].iloc[0]) and not pd.isna(df33[gs_model].iloc[0])    
-                        if rs_has_value and gs_has_value:
-                            brand = rs_brand
-                            model = rs_model
-                        elif rs_has_value:
-                            brand = rs_brand
-                            model = rs_model
-                        elif gs_has_value:
-                            brand = gs_brand
-                            model = gs_model
-                        else:
-                            print("警告: 没有找到有效的品牌信息")
-                            brand = rs_brand 
-                            model = rs_model 
-                    #低速轴 取bearing_model          
-                    elif 'low_speed'in str1:
-                        brand = 'bearing' + '_brand'  
-                        model = 'bearing' + '_model'
+                    parallel_wheel_grade = 3
+
+                df33 = pd.read_sql(
+                    f"""
+                    SELECT bearing_rs_brand, bearing_rs_model, bearing_gs_brand, bearing_gs_model, bearing_brand, bearing_model
+                    FROM unit_gearbox_bearings
+                    WHERE gearbox_code = '{gearbox_code}'
+                      AND parallel_wheel_grade = '{parallel_wheel_grade}'
+                    """,
+                    Engine
+                )
+                if df33.empty:
+                    return empty_result()
+
+                # 高速/中间优先 rs/gs;低速取 bearing_brand/model
+                if ('high_speed' in str1) or ('low_speed_intermediate' in str1):
+                    # rs/gs 选有值的
+                    candidates = [
+                        ('bearing_rs_brand', 'bearing_rs_model'),
+                        ('bearing_gs_brand', 'bearing_gs_model')
+                    ]
+                    for b, m in candidates:
+                        if b in df33.columns and m in df33.columns:
+                            if pd.notna(df33[b].iloc[0]) and pd.notna(df33[m].iloc[0]):
+                                _brand = df33[b].iloc[0]
+                                _model = df33[m].iloc[0]
+                                break
+                    if _brand is None:
+                        return empty_result()
                 else:
-                    print("警告: 没有找到有效的轴承信息")     
-                print(brand)
-                _brand = df33[brand].iloc[0]
-                _model = df33[model].iloc[0]
-                print(_brand)
-                print(_model)   
-
-
-        # 4、从表'unit_dict_brand_model'中通过'_brand'、'_model'查找部件的参数信息
-
-        df_sql4 = f"SELECT * FROM unit_dict_brand_model where manufacture = %s AND model_number = %s"
-        params = [(_brand, _model)]
-        df4 = pd.read_sql(df_sql4, Engine, params=params)
-        n_rolls = df4['rolls_number'].iloc[0]
-        d_rolls = df4['rolls_diameter'].iloc[0]
-        D_diameter = df4['circle_diameter'].iloc[0]
-        theta_deg = df4['theta_deg'].iloc[0]
-        result = {
-            "type":'bearing',
-            "n_rolls":round(n_rolls, 2),
-            "d_rolls":round(d_rolls, 2),
-            "D_diameter":round(D_diameter, 2),
-            "theta_deg":round(theta_deg, 2),
-        }
-    # result = json.dumps(result, ensure_ascii=False)
-        return result
-    def calculate_bearing_frequencies(self, n, d, D, theta_deg, rpm):
-        """
-        计算轴承各部件特征频率
+                    # low_speed:bearing_brand/model
+                    if 'bearing_brand' in df33.columns and 'bearing_model' in df33.columns:
+                        if pd.notna(df33['bearing_brand'].iloc[0]) and pd.notna(df33['bearing_model'].iloc[0]):
+                            _brand = df33['bearing_brand'].iloc[0]
+                            _model = df33['bearing_model'].iloc[0]
+                        else:
+                            return empty_result()
+                    else:
+                        return empty_result()
 
-        参数:
-        n (int): 滚动体数量
-        d (float): 滚动体直径(单位:mm)
-        D (float): 轴承节圆直径(滚动体中心圆直径,单位:mm)
-        theta_deg (float): 接触角(单位:度)
-        rpm (float): 转速(转/分钟)
+            else:
+                return empty_result()
 
-        返回:
-        dict: 包含各特征频率的字典(单位:Hz)
-        """
-        # 转换角度为弧度
-        theta = math.radians(theta_deg)
+        else:
+            # 其它测点:直接返回空,不炸
+            return empty_result()
+
+        # 最终检查
+        if _brand is None or _model is None or (pd.isna(_brand) or pd.isna(_model)):
+            return empty_result()
+
+        # --------------------------
+        # unit_dict_brand_model 查询
+        # --------------------------
+        df4 = pd.read_sql(
+            "SELECT * FROM unit_dict_brand_model WHERE manufacture = %s AND model_number = %s",
+            Engine,
+            params=(str(_brand), str(_model))
+        )
+        if df4.empty:
+            return empty_result()
+
+        # 字段安全读取
+        needed = ['rolls_number', 'rolls_diameter', 'circle_diameter', 'theta_deg']
+        for col in needed:
+            if col not in df4.columns:
+                return empty_result()
 
-        # 转换直径单位为米(保持单位一致性,实际计算中比值抵消单位影响)
-        # 注意:由于公式中使用的是比值,单位可以保持mm不需要转换
-        ratio = d / D
+        return {
+            "type": "bearing",
+            "n_rolls": None if pd.isna(df4['rolls_number'].iloc[0]) else float(df4['rolls_number'].iloc[0]),
+            "d_rolls": None if pd.isna(df4['rolls_diameter'].iloc[0]) else float(df4['rolls_diameter'].iloc[0]),
+            "D_diameter": None if pd.isna(df4['circle_diameter'].iloc[0]) else float(df4['circle_diameter'].iloc[0]),
+            "theta_deg": None if pd.isna(df4['theta_deg'].iloc[0]) else float(df4['theta_deg'].iloc[0]),
+        }
 
-        # 基础频率计算(转/秒)
+    # ==========================================================
+    # 轴承频率公式
+    # ==========================================================
+    def calculate_bearing_frequencies(self, n, d, D, theta_deg, rpm):
+        theta = math.radians(theta_deg)
+        ratio = d / D
         f_r = rpm / 60.0
 
-        # 计算各特征频率
-        BPFI = n / 2 * (1 + ratio * math.cos(theta)) * f_r  # 内圈故障频率
-        BPFO = n / 2 * (1 - ratio * math.cos(theta)) * f_r  # 外圈故障频率
-        BSF = (D / (2 * d)) * (1 - (ratio ** 2) * (math.cos(theta) ** 2)) * f_r  # 滚动体故障频率
-        FTF = 0.5 * (1 - ratio * math.cos(theta)) * f_r  # 保持架故障频率
+        BPFI = n / 2 * (1 + ratio * math.cos(theta)) * f_r
+        BPFO = n / 2 * (1 - ratio * math.cos(theta)) * f_r
+        BSF = (D / (2 * d)) * (1 - (ratio ** 2) * (math.cos(theta) ** 2)) * f_r
+        FTF = 0.5 * (1 - ratio * math.cos(theta)) * f_r
 
         return {
-            "BPFI": round(BPFI, 2),
-            "BPFO": round(BPFO, 2),
-            "BSF": round(BSF, 2),
-            "FTF": round(FTF, 2),
-
+            "BPFI": round(float(BPFI), 2),
+            "BPFO": round(float(BPFO), 2),
+            "BSF": round(float(BSF), 2),
+            "FTF": round(float(FTF), 2),
         }
-    
-    #检查返回结果是否有nan 若有,则替换成none
+
+    # ==========================================================
+    # NaN 替换
+    # ==========================================================
     def replace_nan(self, obj):
         if isinstance(obj, dict):
             return {k: self.replace_nan(v) for k, v in obj.items()}
-        elif isinstance(obj, list):
-            return [self.replace_nan(item) for item in obj]
-        elif isinstance(obj, float) and math.isnan(obj):
+        if isinstance(obj, list):
+            return [self.replace_nan(x) for x in obj]
+        if isinstance(obj, float) and math.isnan(obj):
             return None
         return obj