import datetime import multiprocessing import os import sys sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0]) from utils.file.trans_methods import read_excel_files, copy_to_new, read_file_to_df from utils.zip.unzip import unzip, get_desc_path, unrar import pandas as pd read_cols = ['Time', '设备主要状态', '功率曲线风速', '湍流强度', '实际风速', '有功功率', '桨叶角度A', '桨叶角度B', '桨叶角度C', '机舱内温度', '机舱外温度', '绝对风向', '机舱绝对位置', '叶轮转速', '发电机转速', '瞬时风速', '有功设定反馈', '当前理论可发最大功率', '空气密度', '偏航误差', '发电机扭矩', '瞬时功率', '风向1s', '偏航压力', '桨叶1速度', '桨叶2速度', '桨叶3速度', '桨叶1角度给定', '桨叶2角度给定', '桨叶3角度给定', '轴1电机电流', '轴2电机电流', '轴3电机电流', '轴1电机温度', '轴2电机温度', '轴3电机温度', '待机', '启动', '偏航', '并网', '限功率', '正常发电', '故障', '计入功率曲线', '运行发电机冷却风扇1', '运行发电机冷却风扇2', '激活偏航解缆阀', '激活偏航刹车阀', '激活风轮刹车阀', '激活顺时针偏航', '激活逆时针偏航', '电缆扭角'] read_path = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/收资数据/sec' save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/点检表以外测点儿-20241210' tmp_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/tmp/second/excel_tmp' # read_path = r'D:\data\张崾先风电场\6' # save_dir = r'D:\data\张崾先风电场\点检表以外测点儿-20241209' # tmp_dir = r'D:\data\张崾先风电场\tmp' os.makedirs(tmp_dir, exist_ok=True) os.makedirs(save_dir, exist_ok=True) def get_and_remove(file): to_path = tmp_dir if str(file).endswith("zip"): if str(file).endswith("csv.zip"): copy_to_new(file, file.replace(read_path, to_path).replace("csv.zip", 'csv.gz')) else: desc_path = file.replace(read_path, to_path) is_success, e = unzip(file, get_desc_path(desc_path)) if not is_success: # raise e pass elif str(file).endswith("rar"): desc_path = file.replace(read_path, to_path) unrar(file, get_desc_path(desc_path)) else: copy_to_new(file, file.replace(read_path, to_path)) def get_resolve(file_path, exist_wind_names, map_lock): begin = datetime.datetime.now() df = read_file_to_df(file_path, read_cols=read_cols) wind_name = str(os.path.basename(file_path)[0:2]) date = os.path.basename(file_path)[14:24] df['Time'] = df['Time'].apply(lambda x: date + ' ' + x) df = df[read_cols] with map_lock[str(wind_name)]: if wind_name in exist_wind_names: df.to_csv(save_dir + '/' + wind_name + '.csv', mode='a', index=False, header=False, encoding='utf8') else: df.to_csv(save_dir + '/' + wind_name + '.csv', index=False, encoding='utf8') exist_wind_names.append(wind_name) print(os.path.basename(file_path), '执行完成,耗时:', get_haoshi(begin)) def sort_data(file_path): df = pd.read_csv(file_path, encoding='utf8') df['Time'] = pd.to_datetime(df['Time'], error='coerce') df.sort_values(by=['Time'], inplace=True) df.to_csv(file_path, index=False, encoding='utf8') def get_haoshi(begin): return datetime.datetime.now() - begin if __name__ == '__main__': begin = datetime.datetime.now() # all_files = read_files(read_path) # split_count = get_available_cpu_count_with_percent(1 / 2) # all_arrays = split_array(all_files, split_count) # # for index, arr in enumerate(all_arrays): # with multiprocessing.Pool(10) as pool: # pool.starmap(get_and_remove, [(i,) for i in arr]) # # print("移动完成,耗时:", get_haoshi(begin)) # exist_wind_names = multiprocessing.Manager().list() # # map_lock = dict() # for i in range(26, 42): # map_lock[str(i)] = multiprocessing.Manager().Lock() # # all_files = read_excel_files(tmp_dir) # with multiprocessing.Pool(16) as pool: # pool.starmap(get_resolve, [(i, exist_wind_names, map_lock) for i in all_files]) # # print("整理完成,耗时:", get_haoshi(begin)) all_files = read_excel_files(save_dir) with multiprocessing.Pool(4) as pool: pool.map(sort_data, all_files) print("排序完成,耗时:", get_haoshi(begin)) # shutil.rmtree(tmp_dir) # print("移除临时文件完成,耗时:", get_haoshi(begin))