| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- import pandas as pd
- import matplotlib.pyplot as plt
- import warnings
- from pandas.errors import SettingWithCopyWarning
- from scipy.signal import butter, filtfilt
- from scipy.signal import welch
- from scipy.fft import fft, fftfreq
- import numpy as np
- from sympy import symbols, solve, Eq, simplify, nsolve
- import itertools
- import multiprocessing
- from concurrent.futures import ThreadPoolExecutor
- warnings.filterwarnings("ignore", category=SettingWithCopyWarning)
- plt.rcParams['font.sans-serif'] = ['SimHei']
- plt.rcParams['axes.unicode_minus'] = False
- def butter_lowpass_filter(data, cutoff, fs, order=5):
- nyq = 0.5 * fs # Nyquist频率
- normal_cutoff = cutoff / nyq
- b, a = butter(order, normal_cutoff, btype='low', analog=False)
- y = filtfilt(b, a, data)
- return y
- def butter_bandpass_filter(data, cutoff_low, cutoff_high, fs, order=5):
- nyq = 0.5 * fs # Nyquist频率
- high_cutoff = cutoff_high / nyq
- low_cutoff = cutoff_low / nyq
- b, a = butter(order, [low_cutoff, high_cutoff], btype='band', analog=False)
- y = filtfilt(b, a, data)
- return y
- def apply_fft(x, fs):
- n = len(x)
- t = 1.0 / fs
- fft_cof = fft(x - np.mean(x)) # Obtain FFT coefficients after removing the mean (DC component) from the signal.
- xf = fftfreq(n, t)[:n // 2] # Taking positive spectrum only.
- # Multiply abs(FFT coefficients) by 2 to compensate for positive spectrum and normalize by signal length.
- fft_positive = 2.0 / n * np.abs(fft_cof[0:n // 2])
- return xf, fft_positive
- # Function to compute PSD
- def compute_psd(signal, length, fs):
- seg_length = length / 10000 # Segment length.
- overlap = seg_length / 20 # overlap between segments (in number of samples)
- nfft_length = 2 ** 14 # FFT length
- frequencies, psd = welch(signal, fs=fs, window='han', nperseg=seg_length, noverlap=overlap, nfft=nfft_length)
- return frequencies, psd
- def tower_cal(data, start_points, end_points, fs):
- """
- 计算测量叶片数据的塔筒测量距离
- :param data: cycle_calculate计算完成后的数据。
- :param start_points: 所有每个周期开始点,叶片前缘突变点。
- :param end_points: 叶片后缘突变点。
- :param fs: 采样频率。
- """
- cutoff_low = 0.01 * fs # 设置低通滤波截止频率
- combined_df_sorted = pd.concat([start_points, end_points]).sort_values(by='time') # 合并DataFrame并按 'time' 列排序
- # 检查排序后的数据从end开始,start结束
- if combined_df_sorted.iloc[0].equals(start_points.iloc[0]):
- combined_df_sorted = combined_df_sorted.iloc[1:]
- if combined_df_sorted.iloc[-1].equals(end_points.iloc[-1]):
- combined_df_sorted = combined_df_sorted.iloc[:-1]
- combined_df_sorted.reset_index(drop=True, inplace=True)
- # 将 start_points 中的时间点转换为列表
- start_times = combined_df_sorted['time'].tolist()
- fft_positive_filters = []
- tower_dists = []
- # 遍历所有起始时间点
- for i in range(0, len(start_times), 2):
- # 获取当前起始和结束时间点
- start_time = start_times[i] + 0.01 * (start_times[i + 1] - start_times[i])
- end_time = start_times[i + 1] - 0.01 * (start_times[i + 1] - start_times[i])
- # 根据当前起始时间点和结束时间点对数据进行分段
- segment = data[(data['time'] > start_time) & (data['time'] <= end_time)]
- mean_point = segment['distance'].mean()
- tower_dists.append(mean_point)
- tower_dist = np.mean(tower_dists)
- return tower_dist
- def process_fft(data, fs):
- """
- 塔筒数据每组低通滤波,求平均得振动分析;
- 叶片数据分组高通滤波
- :param data: tower_cal计算完成后的数据。
- :param fs: 采样频率。
- """
- cutoff_low = 0.001 * fs # 设置低通滤波截止频率
- segment = data.head(int(len(data) * 0.95))
- # 确保segment的长度是2的n次幂
- desired_length = 2**((len(segment) - 1).bit_length() - 1)
- segment = segment.head(desired_length)
- segment.loc[:, 'distance_filtered'] = butter_lowpass_filter(segment['distance'], cutoff_low, fs)
- # 提取时间序列数据
- time_series_filter = segment['distance_filtered'].values # 使用滤波后的距离数据
- xf_filter, fft_positive_filter = apply_fft(time_series_filter, fs)
- fft_positive_filters_truncated = fft_positive_filter[0:1000]
- xf_filter_truncated = xf_filter[0:1000]
- # 将 NumPy 数组转换为 Python 列表
- fft_y = fft_positive_filters_truncated.tolist()
- fft_y_scaled = [x * 1000 for x in fft_y]
- fft_y_scaled = [0] + fft_y_scaled
- fft_x = xf_filter_truncated.tolist()
- fft_x = [0] + fft_x
- fft_y_scaled = [0 if a_val < 0.1 else b_val for a_val, b_val in zip(fft_x, fft_y_scaled)]
- max_value = max(fft_y_scaled)
- max_index = fft_y_scaled.index(max_value)
- '''
- plt.plot(fft_x, fft_y_scaled, label='Filtered Signal')
- plt.xlabel('频率 (Hz)', fontsize=8)
- plt.ylabel('振幅 (m)', fontsize=8)
- plt.tick_params('x', labelsize=8)
- plt.tick_params('y', labelsize=8)
- plt.title('Frequency Spectrum', fontsize=12)
- plt.legend(fontsize=7)
- plt.grid(True)
- plt.savefig(f"filter_fft1.png", dpi=110, pil_kwargs={"icc_profile": False})
- plt.close()
- plt.plot(segment['time'], segment['distance_filtered'], label='Filtered Signal')
- plt.xlabel('时间 (s)', fontsize=8)
- plt.ylabel('振幅 (m)', fontsize=8)
- plt.tick_params('x', labelsize=8)
- plt.tick_params('y', labelsize=8)
- plt.title('Time-Domain Waveform', fontsize=12)
- plt.legend(fontsize=7)
- plt.grid(True)
- plt.savefig(f"filter_fft2.png", dpi=110, pil_kwargs={"icc_profile": False})
- plt.close()
- '''
- return segment, fft_x, fft_y_scaled, round(fft_x[max_index], 2), round(max_value, 2)
- def process_equations(list_param):
- """
- 处理方程组并求解
- 参数:
- a, b, p: 坐标点列表,P是轮毂中心
- d, e, f: 轴向倾角、相位角、锥角
- 返回:
- solutions: 包含所有有效解的列表
- positive_angles: 包含所有正角度值的列表
- """
- a, b, p, d, e, f = [list_param[i] for i in range(6)]
- # 定义符号
- ya, yb, za, zb, x, y = symbols('ya yb za zb x y', real=True)
- # 计算全局表达式
- za_expr = solve(Eq(a[0] - p[0] + ya * (a[1] - p[1]) + za * (a[2] - p[2]), 0), za)[0]
- zb_expr = solve(Eq(b[0] - p[0] + yb * (b[1] - p[1]) + zb * (b[2] - p[2]), 0), zb)[0]
- # 解线性方程组得到 x_expr, y_expr
- sol_xy = solve([Eq(x + ya * y + za_expr * d, 0), Eq(x + yb * y + zb_expr * d, 0)], [x, y], dict=True)
- if not sol_xy:
- raise RuntimeError("解线性方程组求 x,y 失败(意外),请检查输入符号。")
- x_expr = simplify(sol_xy[0][x])
- y_expr = simplify(sol_xy[0][y])
- # 最终方程
- f1 = simplify(Eq(1 + ya * yb + za_expr * zb_expr + e * (1 + ya ** 2 + za_expr ** 2) ** 0.5 * (
- 1 + yb ** 2 + zb_expr ** 2) ** 0.5, 0).lhs)
- f2 = simplify(Eq(x_expr ** 2 + y_expr ** 2 + d ** 2 - 1, 0).lhs)
- # 存储正角度值的列表
- positive_angles = []
- # 处理单个解的函数
- def process_solution(args):
- guess, index, total, solutions, positive_angles = args
- try:
- # 尝试求解
- solution = nsolve([f1, f2], [ya, yb], guess, dict=True, prec=6)
- # 获取解的值
- ya_val = solution[0][ya]
- yb_val = solution[0][yb]
- # 计算方程残差(误差)
- f1_error = abs(f1.subs({ya: ya_val, yb: yb_val}))
- f2_error = abs(f2.subs({ya: ya_val, yb: yb_val}))
- err_tol = 1e-1
- is_new = True
- if f1_error > err_tol or f2_error > err_tol:
- is_new = False
- # 检查解是否已经存在(允许小的数值误差)
- for existing_sol in solutions:
- if (abs(solution[0][ya] - existing_sol[ya]) < 1e-3 and
- abs(solution[0][yb] - existing_sol[yb]) < 1e-3):
- is_new = False
- break
- if is_new:
- solution_dict = solution[0]
- solutions.append(solution_dict)
- # 计算其他值
- ya_val = simplify(solution_dict[ya])
- yb_val = simplify(solution_dict[yb])
- x_val = simplify(x_expr.subs({ya: ya_val, yb: yb_val}))
- y_val = simplify(y_expr.subs({ya: ya_val, yb: yb_val}))
- za_val = simplify(za_expr.subs({ya: ya_val}))
- zb_val = simplify(zb_expr.subs({yb: yb_val}))
- angle = np.degrees(np.arctan(float(y_val) / float(x_val)))
- # 检查角度是否为正
- if angle > 0:
- positive_angles.append(angle)
- print(f"初始值 {guess} 的解:")
- print(f" x = {x_val}")
- print(f" y = {y_val}")
- print(f" 方程 f1 残差: {f1_error}")
- print(f" 方程 f2 残差: {f2_error}")
- print(f" angle = {angle}°")
- print("-" * 50)
- return 1
- else:
- return 0
- except Exception as e:
- if index % 1000 == 0:
- print(f"已完成 {index} 个")
- return 0
- # 设置Windows下的多进程启动方法
- multiprocessing.freeze_support()
- # 存储结果的列表
- manager = multiprocessing.Manager()
- solutions = manager.list()
- positive_angles = manager.list()
- # 初始猜测值
- initial_values = np.arange(-10, 10, 0.1)
- initial_guesses = list(itertools.product(initial_values, repeat=2))
- # 使用进程池并行处理
- print(f"总共尝试 {len(initial_guesses)} 个初始组合...")
- num_processes = int(multiprocessing.cpu_count() - 1)
- with ThreadPoolExecutor(max_workers=num_processes) as executor:
- args_list = [(guess, i, len(initial_guesses), solutions, positive_angles)
- for i, guess in enumerate(initial_guesses)]
- results = list(executor.map(process_solution, args_list))
- # 统计结果
- success_count = sum(results)
- print(f"\n计算完成:")
- print(f"- 成功找到解: {success_count} 个")
- print(f"- 总共找到 {len(solutions)} 个不同的解")
- print(f"- 找到 {len(positive_angles)} 个正角度值")
- # 打印符号求解得到的 (ya,yb) 解集
- if solutions:
- print("\n符号求解得到的 (ya,yb) 解集:")
- for idx, sol in enumerate(solutions, 1):
- ya_val = simplify(sol[ya])
- yb_val = simplify(sol[yb])
- x_val = simplify(x_expr.subs({ya: ya_val, yb: yb_val}))
- y_val = simplify(y_expr.subs({ya: ya_val, yb: yb_val}))
- za_val = simplify(za_expr.subs({ya: ya_val}))
- zb_val = simplify(zb_expr.subs({yb: yb_val}))
- angle = np.degrees(np.arctan(float(y_val) / float(x_val)))
- print(f"解 {idx}:")
- print(f" x = {x_val}")
- print(f" y = {y_val}")
- print(f" angle = {angle}°")
- print("-" * 50)
- return list(positive_angles)
|