123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import datetime
- import multiprocessing
- import os
- import sys
- sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
- from utils.file.trans_methods import read_excel_files, copy_to_new, read_file_to_df
- from utils.zip.unzip import unzip, get_desc_path, unrar
- import pandas as pd
- read_cols = ['Time', '设备主要状态', '功率曲线风速', '湍流强度', '实际风速', '有功功率', '桨叶角度A', '桨叶角度B',
- '桨叶角度C', '机舱内温度', '机舱外温度', '绝对风向', '机舱绝对位置', '叶轮转速', '发电机转速', '瞬时风速',
- '有功设定反馈', '当前理论可发最大功率', '空气密度', '偏航误差', '发电机扭矩', '瞬时功率', '风向1s',
- '偏航压力', '桨叶1速度', '桨叶2速度', '桨叶3速度', '桨叶1角度给定', '桨叶2角度给定', '桨叶3角度给定',
- '轴1电机电流', '轴2电机电流', '轴3电机电流', '轴1电机温度', '轴2电机温度', '轴3电机温度', '待机', '启动',
- '偏航', '并网', '限功率', '正常发电', '故障', '计入功率曲线', '运行发电机冷却风扇1', '运行发电机冷却风扇2',
- '激活偏航解缆阀', '激活偏航刹车阀', '激活风轮刹车阀', '激活顺时针偏航', '激活逆时针偏航', '电缆扭角']
- read_path = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/收资数据/sec'
- save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/点检表以外测点儿-20241210'
- tmp_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/tmp/second/excel_tmp'
- # read_path = r'D:\data\张崾先风电场\6'
- # save_dir = r'D:\data\张崾先风电场\点检表以外测点儿-20241209'
- # tmp_dir = r'D:\data\张崾先风电场\tmp'
- os.makedirs(tmp_dir, exist_ok=True)
- os.makedirs(save_dir, exist_ok=True)
- def get_and_remove(file):
- to_path = tmp_dir
- if str(file).endswith("zip"):
- if str(file).endswith("csv.zip"):
- copy_to_new(file, file.replace(read_path, to_path).replace("csv.zip", 'csv.gz'))
- else:
- desc_path = file.replace(read_path, to_path)
- is_success, e = unzip(file, get_desc_path(desc_path))
- if not is_success:
- # raise e
- pass
- elif str(file).endswith("rar"):
- desc_path = file.replace(read_path, to_path)
- unrar(file, get_desc_path(desc_path))
- else:
- copy_to_new(file, file.replace(read_path, to_path))
- def get_resolve(file_path, exist_wind_names, map_lock):
- begin = datetime.datetime.now()
- df = read_file_to_df(file_path, read_cols=read_cols)
- wind_name = str(os.path.basename(file_path)[0:2])
- date = os.path.basename(file_path)[14:24]
- df['Time'] = df['Time'].apply(lambda x: date + ' ' + x)
- df = df[read_cols]
- with map_lock[str(wind_name)]:
- if wind_name in exist_wind_names:
- df.to_csv(save_dir + '/' + wind_name + '.csv', mode='a', index=False, header=False, encoding='utf8')
- else:
- df.to_csv(save_dir + '/' + wind_name + '.csv', index=False, encoding='utf8')
- exist_wind_names.append(wind_name)
- print(os.path.basename(file_path), '执行完成,耗时:', get_haoshi(begin))
- def sort_data(file_path):
- df = pd.read_csv(file_path, encoding='utf8')
- df['Time'] = pd.to_datetime(df['Time'], error='coerce')
- df.sort_values(by=['Time'], inplace=True)
- df.to_csv(file_path, index=False, encoding='utf8')
- def get_haoshi(begin):
- return datetime.datetime.now() - begin
- if __name__ == '__main__':
- begin = datetime.datetime.now()
- # all_files = read_files(read_path)
- # split_count = get_available_cpu_count_with_percent(1 / 2)
- # all_arrays = split_array(all_files, split_count)
- #
- # for index, arr in enumerate(all_arrays):
- # with multiprocessing.Pool(10) as pool:
- # pool.starmap(get_and_remove, [(i,) for i in arr])
- #
- # print("移动完成,耗时:", get_haoshi(begin))
- # exist_wind_names = multiprocessing.Manager().list()
- #
- # map_lock = dict()
- # for i in range(26, 42):
- # map_lock[str(i)] = multiprocessing.Manager().Lock()
- #
- # all_files = read_excel_files(tmp_dir)
- # with multiprocessing.Pool(16) as pool:
- # pool.starmap(get_resolve, [(i, exist_wind_names, map_lock) for i in all_files])
- #
- # print("整理完成,耗时:", get_haoshi(begin))
- all_files = read_excel_files(save_dir)
- with multiprocessing.Pool(4) as pool:
- pool.map(sort_data, all_files)
- print("排序完成,耗时:", get_haoshi(begin))
- # shutil.rmtree(tmp_dir)
- # print("移除临时文件完成,耗时:", get_haoshi(begin))
|