import os import sys from concurrent.futures.thread import ThreadPoolExecutor sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0]) import datetime import multiprocessing import pandas as pd from utils.file.trans_methods import read_files, copy_to_new, read_excel_files, read_file_to_df from utils.zip.unzip import get_desc_path, unzip def get_real_path(win_path): return win_path.replace(r'Z:', r'/data/download').replace("\\", '/') def unzip_or_remove(file, tmp_dir): if str(file).endswith("zip"): unzip(file, tmp_dir) else: copy_to_new(file, file.replace(file, tmp_dir)) def read_file_to_df_and_select(file_path): select_cols = ['Timestamp', 'Los', 'Distance', 'HWS(hub)', 'HWS(hub)status', 'DIR(hub)', 'DIR(hub)status'] df = read_file_to_df(file_path, read_cols=select_cols) condition1 = df['HWS(hub)status'] > 0.8 condition2 = df['DIR(hub)status'] > 0.8 condition3 = df['Distance'].isin([70, 90]) df = df[condition1 & condition2 & condition3] return df def read_month_data_and_select(month, files, gonglv_df): with ThreadPoolExecutor(max_workers=10) as executor: dfs = list(executor.map(read_file_to_df_and_select, files)) df = pd.concat(dfs, ignore_index=True) df['Time1'] = df['Timestamp'].apply(lambda x: x.split('.')[0]) df['Time1'] = pd.to_datetime(df['Time1'], errors='coerce') df['Time1'] = df['Time1'].apply( lambda x: x + datetime.timedelta(seconds=10 - x.second % 10) if x.second % 10 != 0 else x) del gonglv_df['month'] result_df = pd.merge(df, gonglv_df, left_on='Time1', right_on='Time1') result_df.sort_values(by='Time1', inplace=True) save_dir = get_real_path('Z:\偏航误差验证数据\整理结果') # save_dir = r'D:\data\pianhang\result' result_df.to_csv(os.path.join(save_dir, f'{month}.csv'), encoding='utf8', index=False) if __name__ == '__main__': read_dir = 'Z:\偏航误差验证数据\新华佳县雷达数据' read_dir = get_real_path(read_dir) tmp_dir = get_real_path(r'Z:\偏航误差验证数据\tmp_data') gonglv_dir = get_real_path(r'Z:\偏航误差验证数据\陕西建工陕西智华\report\output') # read_dir = r'D:\data\pianhang\1' # tmp_dir = r'D:\data\pianhang\tmp' # gonglv_dir = r'D:\data\pianhang\2' gonglv_files = read_excel_files(gonglv_dir) with multiprocessing.Pool(20) as pool: dfs = pool.starmap(read_file_to_df, [(i, ['collect_time', 'a0216']) for i in gonglv_files]) gonglv_df = pd.concat(dfs, ignore_index=True) gonglv_df.columns = ['Time1', '功率'] gonglv_df['Time1'] = pd.to_datetime(gonglv_df['Time1'], errors='coerce') gonglv_df['month'] = gonglv_df['Time1'].dt.month all_files = read_files(tmp_dir) all_files = [i for i in all_files if str(os.path.basename(i)).startswith('WindSpeed2024')] # with multiprocessing.Pool(20) as pool: # pool.starmap(unzip_or_remove, [(file, tmp_dir) for file in all_files]) month_map = dict() for file in all_files: base_name = os.path.basename(file) month = base_name[13:15] if month in month_map.keys(): month_map[month].append(file) else: month_map[month] = [file] excel_files = read_excel_files(tmp_dir) with multiprocessing.Pool(5) as pool: pool.starmap(read_month_data_and_select, [(month, files, gonglv_df[gonglv_df['month'] == int(month)]) for month, files in month_map.items()])