| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- import pandas as pd
- import matplotlib.pyplot as plt
- import warnings
- from pandas.errors import SettingWithCopyWarning
- from scipy.signal import butter, filtfilt
- from scipy.signal import welch
- from scipy.fft import fft, fftfreq
- import numpy as np
- warnings.filterwarnings("ignore", category=SettingWithCopyWarning)
- plt.rcParams['font.sans-serif'] = ['SimHei']
- plt.rcParams['axes.unicode_minus'] = False
- def butter_lowpass_filter(data, cutoff, fs, order=5):
- nyq = 0.5 * fs # Nyquist频率
- normal_cutoff = cutoff / nyq
- b, a = butter(order, normal_cutoff, btype='low', analog=False)
- y = filtfilt(b, a, data)
- return y
- def butter_bandpass_filter(data, cutoff_low, cutoff_high, fs, order=5):
- nyq = 0.5 * fs # Nyquist频率
- high_cutoff = cutoff_high / nyq
- low_cutoff = cutoff_low / nyq
- b, a = butter(order, [low_cutoff, high_cutoff], btype='band', analog=False)
- y = filtfilt(b, a, data)
- return y
- def apply_fft(x, fs):
- n = len(x)
- t = 1.0 / fs
- fft_cof = fft(x - np.mean(x)) # Obtain FFT coefficients after removing the mean (DC component) from the signal.
- xf = fftfreq(n, t)[:n // 2] # Taking positive spectrum only.
- # Multiply abs(FFT coefficients) by 2 to compensate for positive spectrum and normalize by signal length.
- fft_positive = 2.0 / n * np.abs(fft_cof[0:n // 2])
- return xf, fft_positive
- # Function to compute PSD
- def compute_psd(signal, length, fs):
- seg_length = length / 10000 # Segment length.
- overlap = seg_length / 20 # overlap between segments (in number of samples)
- nfft_length = 2 ** 14 # FFT length
- frequencies, psd = welch(signal, fs=fs, window='han', nperseg=seg_length, noverlap=overlap, nfft=nfft_length)
- return frequencies, psd
- def tower_cal(data, start_points, end_points, fs):
- """
- 计算测量叶片数据的塔筒测量距离
- :param data: cycle_calculate计算完成后的数据。
- :param start_points: 所有每个周期开始点,叶片前缘突变点。
- :param end_points: 叶片后缘突变点。
- :param fs: 采样频率。
- """
- cutoff_low = 0.01 * fs # 设置低通滤波截止频率
- combined_df_sorted = pd.concat([start_points, end_points]).sort_values(by='time') # 合并DataFrame并按 'time' 列排序
- # 检查排序后的数据从end开始,start结束
- if combined_df_sorted.iloc[0].equals(start_points.iloc[0]):
- combined_df_sorted = combined_df_sorted.iloc[1:]
- if combined_df_sorted.iloc[-1].equals(end_points.iloc[-1]):
- combined_df_sorted = combined_df_sorted.iloc[:-1]
- combined_df_sorted.reset_index(drop=True, inplace=True)
- # 将 start_points 中的时间点转换为列表
- start_times = combined_df_sorted['time'].tolist()
- fft_positive_filters = []
- tower_dists = []
- # 遍历所有起始时间点
- for i in range(0, len(start_times), 2):
- # 获取当前起始和结束时间点
- start_time = start_times[i] + 0.01 * (start_times[i + 1] - start_times[i])
- end_time = start_times[i + 1] - 0.01 * (start_times[i + 1] - start_times[i])
- # 根据当前起始时间点和结束时间点对数据进行分段
- segment = data[(data['time'] > start_time) & (data['time'] <= end_time)]
- mean_point = segment['distance'].mean()
- tower_dists.append(mean_point)
- tower_dist = np.mean(tower_dists)
- return tower_dist
- def process_fft(data, fs):
- """
- 塔筒数据每组低通滤波,求平均得振动分析;
- 叶片数据分组高通滤波
- :param data: tower_cal计算完成后的数据。
- :param fs: 采样频率。
- """
- cutoff_low = 0.001 * fs # 设置低通滤波截止频率
- segment = data.head(int(len(data) * 0.95))
- # 确保segment的长度是2的n次幂
- desired_length = 2**((len(segment) - 1).bit_length() - 1)
- segment = segment.head(desired_length)
- segment.loc[:, 'distance_filtered'] = butter_lowpass_filter(segment['distance'], cutoff_low, fs)
- # 提取时间序列数据
- time_series_filter = segment['distance_filtered'].values # 使用滤波后的距离数据
- xf_filter, fft_positive_filter = apply_fft(time_series_filter, fs)
- fft_positive_filters_truncated = fft_positive_filter[0:1000]
- xf_filter_truncated = xf_filter[0:1000]
- # 将 NumPy 数组转换为 Python 列表
- fft_y = fft_positive_filters_truncated.tolist()
- fft_y_scaled = [x * 1000 for x in fft_y]
- fft_y_scaled = [0] + fft_y_scaled
- fft_x = xf_filter_truncated.tolist()
- fft_x = [0] + fft_x
- fft_y_scaled = [0 if a_val < 0.1 else b_val for a_val, b_val in zip(fft_x, fft_y_scaled)]
- max_value = max(fft_y_scaled)
- max_index = fft_y_scaled.index(max_value)
- '''
- plt.plot(fft_x, fft_y_scaled, label='Filtered Signal')
- plt.xlabel('频率 (Hz)', fontsize=8)
- plt.ylabel('振幅 (m)', fontsize=8)
- plt.tick_params('x', labelsize=8)
- plt.tick_params('y', labelsize=8)
- plt.title('Frequency Spectrum', fontsize=12)
- plt.legend(fontsize=7)
- plt.grid(True)
- plt.savefig(f"filter_fft1.png", dpi=110, pil_kwargs={"icc_profile": False})
- plt.close()
- plt.plot(segment['time'], segment['distance_filtered'], label='Filtered Signal')
- plt.xlabel('时间 (s)', fontsize=8)
- plt.ylabel('振幅 (m)', fontsize=8)
- plt.tick_params('x', labelsize=8)
- plt.tick_params('y', labelsize=8)
- plt.title('Time-Domain Waveform', fontsize=12)
- plt.legend(fontsize=7)
- plt.grid(True)
- plt.savefig(f"filter_fft2.png", dpi=110, pil_kwargs={"icc_profile": False})
- plt.close()
- '''
- return segment, fft_x, fft_y_scaled, round(fft_x[max_index], 2), round(max_value, 2)
|