import pandas as pd import matplotlib.pyplot as plt import warnings from pandas.errors import SettingWithCopyWarning from scipy.signal import butter, filtfilt from scipy.signal import welch from scipy.fft import fft, fftfreq import numpy as np warnings.filterwarnings("ignore", category=SettingWithCopyWarning) plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False def butter_lowpass_filter(data, cutoff, fs, order=5): nyq = 0.5 * fs # Nyquist频率 normal_cutoff = cutoff / nyq b, a = butter(order, normal_cutoff, btype='low', analog=False) y = filtfilt(b, a, data) return y def butter_bandpass_filter(data, cutoff_low, cutoff_high, fs, order=5): nyq = 0.5 * fs # Nyquist频率 high_cutoff = cutoff_high / nyq low_cutoff = cutoff_low / nyq b, a = butter(order, [low_cutoff, high_cutoff], btype='band', analog=False) y = filtfilt(b, a, data) return y def apply_fft(x, fs): n = len(x) t = 1.0 / fs fft_cof = fft(x - np.mean(x)) # Obtain FFT coefficients after removing the mean (DC component) from the signal. xf = fftfreq(n, t)[:n // 2] # Taking positive spectrum only. # Multiply abs(FFT coefficients) by 2 to compensate for positive spectrum and normalize by signal length. fft_positive = 2.0 / n * np.abs(fft_cof[0:n // 2]) return xf, fft_positive # Function to compute PSD def compute_psd(signal, length, fs): seg_length = length / 10000 # Segment length. overlap = seg_length / 20 # overlap between segments (in number of samples) nfft_length = 2 ** 14 # FFT length frequencies, psd = welch(signal, fs=fs, window='han', nperseg=seg_length, noverlap=overlap, nfft=nfft_length) return frequencies, psd def tower_cal(data, start_points, end_points, fs): """ 计算测量叶片数据的塔筒测量距离 :param data: cycle_calculate计算完成后的数据。 :param start_points: 所有每个周期开始点,叶片前缘突变点。 :param end_points: 叶片后缘突变点。 :param fs: 采样频率。 """ cutoff_low = 0.01 * fs # 设置低通滤波截止频率 combined_df_sorted = pd.concat([start_points, end_points]).sort_values(by='time') # 合并DataFrame并按 'time' 列排序 # 检查排序后的数据从end开始,start结束 if combined_df_sorted.iloc[0].equals(start_points.iloc[0]): combined_df_sorted = combined_df_sorted.iloc[1:] if combined_df_sorted.iloc[-1].equals(end_points.iloc[-1]): combined_df_sorted = combined_df_sorted.iloc[:-1] combined_df_sorted.reset_index(drop=True, inplace=True) # 将 start_points 中的时间点转换为列表 start_times = combined_df_sorted['time'].tolist() fft_positive_filters = [] tower_dists = [] # 遍历所有起始时间点 for i in range(0, len(start_times), 2): # 获取当前起始和结束时间点 start_time = start_times[i] + 0.01 * (start_times[i + 1] - start_times[i]) end_time = start_times[i + 1] - 0.01 * (start_times[i + 1] - start_times[i]) # 根据当前起始时间点和结束时间点对数据进行分段 segment = data[(data['time'] > start_time) & (data['time'] <= end_time)] mean_point = segment['distance'].mean() tower_dists.append(mean_point) tower_dist = np.mean(tower_dists) return tower_dist def process_fft(data, fs): """ 塔筒数据每组低通滤波,求平均得振动分析; 叶片数据分组高通滤波 :param data: tower_cal计算完成后的数据。 :param fs: 采样频率。 """ cutoff_low = 0.001 * fs # 设置低通滤波截止频率 segment = data.head(int(len(data) * 0.95)) # 确保segment的长度是2的n次幂 desired_length = 2**((len(segment) - 1).bit_length() - 1) segment = segment.head(desired_length) segment.loc[:, 'distance_filtered'] = butter_lowpass_filter(segment['distance'], cutoff_low, fs) # 提取时间序列数据 time_series_filter = segment['distance_filtered'].values # 使用滤波后的距离数据 xf_filter, fft_positive_filter = apply_fft(time_series_filter, fs) fft_positive_filters_truncated = fft_positive_filter[0:1000] xf_filter_truncated = xf_filter[0:1000] # 将 NumPy 数组转换为 Python 列表 fft_y = fft_positive_filters_truncated.tolist() fft_y_scaled = [x * 1000 for x in fft_y] fft_y_scaled = [0] + fft_y_scaled fft_x = xf_filter_truncated.tolist() fft_x = [0] + fft_x fft_y_scaled = [0 if a_val < 0.1 else b_val for a_val, b_val in zip(fft_x, fft_y_scaled)] max_value = max(fft_y_scaled) max_index = fft_y_scaled.index(max_value) ''' plt.plot(fft_x, fft_y_scaled, label='Filtered Signal') plt.xlabel('频率 (Hz)', fontsize=8) plt.ylabel('振幅 (m)', fontsize=8) plt.tick_params('x', labelsize=8) plt.tick_params('y', labelsize=8) plt.title('Frequency Spectrum', fontsize=12) plt.legend(fontsize=7) plt.grid(True) plt.savefig(f"filter_fft1.png", dpi=110, pil_kwargs={"icc_profile": False}) plt.close() plt.plot(segment['time'], segment['distance_filtered'], label='Filtered Signal') plt.xlabel('时间 (s)', fontsize=8) plt.ylabel('振幅 (m)', fontsize=8) plt.tick_params('x', labelsize=8) plt.tick_params('y', labelsize=8) plt.title('Time-Domain Waveform', fontsize=12) plt.legend(fontsize=7) plt.grid(True) plt.savefig(f"filter_fft2.png", dpi=110, pil_kwargs={"icc_profile": False}) plt.close() ''' return segment, fft_x, fft_y_scaled, round(fft_x[max_index], 2), round(max_value, 2)