import pandas as pd import matplotlib.pyplot as plt import warnings from pandas.errors import SettingWithCopyWarning from scipy.signal import butter, filtfilt from scipy.signal import welch from scipy.fft import fft, fftfreq import numpy as np from sympy import symbols, solve, Eq, simplify, nsolve import itertools import multiprocessing from concurrent.futures import ThreadPoolExecutor warnings.filterwarnings("ignore", category=SettingWithCopyWarning) plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False def butter_lowpass_filter(data, cutoff, fs, order=5): nyq = 0.5 * fs # Nyquist频率 normal_cutoff = cutoff / nyq b, a = butter(order, normal_cutoff, btype='low', analog=False) y = filtfilt(b, a, data) return y def butter_bandpass_filter(data, cutoff_low, cutoff_high, fs, order=5): nyq = 0.5 * fs # Nyquist频率 high_cutoff = cutoff_high / nyq low_cutoff = cutoff_low / nyq b, a = butter(order, [low_cutoff, high_cutoff], btype='band', analog=False) y = filtfilt(b, a, data) return y def apply_fft(x, fs): n = len(x) t = 1.0 / fs fft_cof = fft(x - np.mean(x)) # Obtain FFT coefficients after removing the mean (DC component) from the signal. xf = fftfreq(n, t)[:n // 2] # Taking positive spectrum only. # Multiply abs(FFT coefficients) by 2 to compensate for positive spectrum and normalize by signal length. fft_positive = 2.0 / n * np.abs(fft_cof[0:n // 2]) return xf, fft_positive # Function to compute PSD def compute_psd(signal, length, fs): seg_length = length / 10000 # Segment length. overlap = seg_length / 20 # overlap between segments (in number of samples) nfft_length = 2 ** 14 # FFT length frequencies, psd = welch(signal, fs=fs, window='han', nperseg=seg_length, noverlap=overlap, nfft=nfft_length) return frequencies, psd def tower_cal(data, start_points, end_points, fs): """ 计算测量叶片数据的塔筒测量距离 :param data: cycle_calculate计算完成后的数据。 :param start_points: 所有每个周期开始点,叶片前缘突变点。 :param end_points: 叶片后缘突变点。 :param fs: 采样频率。 """ cutoff_low = 0.01 * fs # 设置低通滤波截止频率 combined_df_sorted = pd.concat([start_points, end_points]).sort_values(by='time') # 合并DataFrame并按 'time' 列排序 # 检查排序后的数据从end开始,start结束 if combined_df_sorted.iloc[0].equals(start_points.iloc[0]): combined_df_sorted = combined_df_sorted.iloc[1:] if combined_df_sorted.iloc[-1].equals(end_points.iloc[-1]): combined_df_sorted = combined_df_sorted.iloc[:-1] combined_df_sorted.reset_index(drop=True, inplace=True) # 将 start_points 中的时间点转换为列表 start_times = combined_df_sorted['time'].tolist() fft_positive_filters = [] tower_dists = [] # 遍历所有起始时间点 for i in range(0, len(start_times), 2): # 获取当前起始和结束时间点 start_time = start_times[i] + 0.01 * (start_times[i + 1] - start_times[i]) end_time = start_times[i + 1] - 0.01 * (start_times[i + 1] - start_times[i]) # 根据当前起始时间点和结束时间点对数据进行分段 segment = data[(data['time'] > start_time) & (data['time'] <= end_time)] mean_point = segment['distance'].mean() tower_dists.append(mean_point) tower_dist = np.mean(tower_dists) return tower_dist def process_fft(data, fs): """ 塔筒数据每组低通滤波,求平均得振动分析; 叶片数据分组高通滤波 :param data: tower_cal计算完成后的数据。 :param fs: 采样频率。 """ cutoff_low = 0.001 * fs # 设置低通滤波截止频率 segment = data.head(int(len(data) * 1)) # 确保segment的长度是2的n次幂 desired_length = 2**((len(segment) - 1).bit_length() - 1) segment = segment.head(desired_length) segment.loc[:, 'distance_filtered'] = butter_lowpass_filter(segment['distance'], cutoff_low, fs) # 提取时间序列数据 time_series_filter = segment['distance_filtered'].values # 使用滤波后的距离数据 xf_filter, fft_positive_filter = apply_fft(time_series_filter, fs) fft_positive_filters_truncated = fft_positive_filter[0:1000] xf_filter_truncated = xf_filter[0:1000] # 将 NumPy 数组转换为 Python 列表 fft_y = fft_positive_filters_truncated.tolist() fft_y_scaled = [x * 1000 for x in fft_y] fft_y_scaled = [0] + fft_y_scaled fft_x = xf_filter_truncated.tolist() fft_x = [0] + fft_x fft_y_scaled = [0 if a_val < 0.04 else b_val for a_val, b_val in zip(fft_x, fft_y_scaled)] max_value = max(fft_y_scaled) max_index = fft_y_scaled.index(max_value) ''' plt.plot(fft_x, fft_y_scaled, label='Filtered Signal') plt.xlabel('频率 (Hz)', fontsize=8) plt.ylabel('振幅 (m)', fontsize=8) plt.tick_params('x', labelsize=8) plt.tick_params('y', labelsize=8) plt.title('Frequency Spectrum', fontsize=12) plt.legend(fontsize=7) plt.grid(True) plt.savefig(f"filter_fft1.png", dpi=110, pil_kwargs={"icc_profile": False}) plt.close() plt.plot(segment['time'], segment['distance_filtered'], label='Filtered Signal') plt.xlabel('时间 (s)', fontsize=8) plt.ylabel('振幅 (m)', fontsize=8) plt.tick_params('x', labelsize=8) plt.tick_params('y', labelsize=8) plt.title('Time-Domain Waveform', fontsize=12) plt.legend(fontsize=7) plt.grid(True) plt.savefig(f"filter_fft2.png", dpi=110, pil_kwargs={"icc_profile": False}) plt.close() ''' return segment, fft_x, fft_y_scaled, round(fft_x[max_index], 2), round(max_value, 2) def process_equations(list_param): """ 处理方程组并求解 参数: a, b, p: 坐标点列表,P是轮毂中心 d, e, f: 轴向倾角、相位角、锥角 返回: solutions: 包含所有有效解的列表 positive_angles: 包含所有正角度值的列表 """ a, b, p, d, e, f = [list_param[i] for i in range(6)] # 定义符号 ya, yb, za, zb, x, y = symbols('ya yb za zb x y', real=True) # 计算全局表达式 za_expr = solve(Eq(a[0] - p[0] + ya * (a[1] - p[1]) + za * (a[2] - p[2]), 0), za)[0] zb_expr = solve(Eq(b[0] - p[0] + yb * (b[1] - p[1]) + zb * (b[2] - p[2]), 0), zb)[0] # 解线性方程组得到 x_expr, y_expr sol_xy = solve([Eq(x + ya * y + za_expr * d, 0), Eq(x + yb * y + zb_expr * d, 0)], [x, y], dict=True) if not sol_xy: raise RuntimeError("解线性方程组求 x,y 失败(意外),请检查输入符号。") x_expr = simplify(sol_xy[0][x]) y_expr = simplify(sol_xy[0][y]) # 最终方程 f1 = simplify(Eq(1 + ya * yb + za_expr * zb_expr + e * (1 + ya ** 2 + za_expr ** 2) ** 0.5 * ( 1 + yb ** 2 + zb_expr ** 2) ** 0.5, 0).lhs) f2 = simplify(Eq(x_expr ** 2 + y_expr ** 2 + d ** 2 - 1, 0).lhs) # 存储正角度值的列表 positive_angles = [] # 处理单个解的函数 def process_solution(args): guess, index, total, solutions, positive_angles = args try: # 尝试求解 solution = nsolve([f1, f2], [ya, yb], guess, dict=True, prec=6) # 获取解的值 ya_val = solution[0][ya] yb_val = solution[0][yb] # 计算方程残差(误差) f1_error = abs(f1.subs({ya: ya_val, yb: yb_val})) f2_error = abs(f2.subs({ya: ya_val, yb: yb_val})) err_tol = 1e-1 is_new = True if f1_error > err_tol or f2_error > err_tol: is_new = False # 检查解是否已经存在(允许小的数值误差) for existing_sol in solutions: if (abs(solution[0][ya] - existing_sol[ya]) < 1e-3 and abs(solution[0][yb] - existing_sol[yb]) < 1e-3): is_new = False break if is_new: solution_dict = solution[0] solutions.append(solution_dict) # 计算其他值 ya_val = simplify(solution_dict[ya]) yb_val = simplify(solution_dict[yb]) x_val = simplify(x_expr.subs({ya: ya_val, yb: yb_val})) y_val = simplify(y_expr.subs({ya: ya_val, yb: yb_val})) za_val = simplify(za_expr.subs({ya: ya_val})) zb_val = simplify(zb_expr.subs({yb: yb_val})) angle = np.degrees(np.arctan(float(y_val) / float(x_val))) # 检查角度是否为正 if angle > 0: positive_angles.append(angle) print(f"初始值 {guess} 的解:") print(f" x = {x_val}") print(f" y = {y_val}") print(f" 方程 f1 残差: {f1_error}") print(f" 方程 f2 残差: {f2_error}") print(f" angle = {angle}°") print("-" * 50) return 1 else: return 0 except Exception as e: if index % 1000 == 0: print(f"已完成 {index} 个") return 0 # 设置Windows下的多进程启动方法 multiprocessing.freeze_support() # 存储结果的列表 manager = multiprocessing.Manager() solutions = manager.list() positive_angles = manager.list() # 初始猜测值 initial_values = np.arange(-10, 10, 0.1) initial_guesses = list(itertools.product(initial_values, repeat=2)) # 使用进程池并行处理 print(f"总共尝试 {len(initial_guesses)} 个初始组合...") num_processes = int(multiprocessing.cpu_count() - 1) with ThreadPoolExecutor(max_workers=num_processes) as executor: args_list = [(guess, i, len(initial_guesses), solutions, positive_angles) for i, guess in enumerate(initial_guesses)] results = list(executor.map(process_solution, args_list)) # 统计结果 success_count = sum(results) print(f"\n计算完成:") print(f"- 成功找到解: {success_count} 个") print(f"- 总共找到 {len(solutions)} 个不同的解") print(f"- 找到 {len(positive_angles)} 个正角度值") # 打印符号求解得到的 (ya,yb) 解集 if solutions: print("\n符号求解得到的 (ya,yb) 解集:") for idx, sol in enumerate(solutions, 1): ya_val = simplify(sol[ya]) yb_val = simplify(sol[yb]) x_val = simplify(x_expr.subs({ya: ya_val, yb: yb_val})) y_val = simplify(y_expr.subs({ya: ya_val, yb: yb_val})) za_val = simplify(za_expr.subs({ya: ya_val})) zb_val = simplify(zb_expr.subs({yb: yb_val})) angle = np.degrees(np.arctan(float(y_val) / float(x_val))) print(f"解 {idx}:") print(f" x = {x_val}") print(f" y = {y_val}") print(f" angle = {angle}°") print("-" * 50) return list(positive_angles) def median_filter_correction(df, column_index, window_size): """ 对指定列进行中值滤波修正 参数: df: pandas DataFrame,包含数据的DataFrame column_index: int,要处理的列索引(从0开始) window_size: int,窗口大小 返回: 修正后的DataFrame """ df_corrected = df.copy() gray_values = df.iloc[:, column_index].values corrected_values = gray_values.copy() # 对每window_size个点计算中值,并赋值给第一个点 # 最后window_size-1个点不做处理 for i in range(len(gray_values) - window_size + 1): window_data = gray_values[i:i + window_size] median_value = np.median(window_data) corrected_values[i] = median_value df_corrected.iloc[:, column_index] = corrected_values return df_corrected