import multiprocessing import datetime import os import warnings import pandas as pd warnings.filterwarnings("ignore") def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif os.path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取路径下所有的excel文件 def read_excel_files(read_path): if os.path.isfile(read_path): return [read_path] directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz']) return [path for paths in directory_dict.values() for path in paths if path] # 创建路径 def create_file_path(path, is_file_path=False): """ 创建路径 :param path:创建文件夹的路径 :param is_file_path: 传入的path是否包含具体的文件名 """ if is_file_path: path = os.path.dirname(path) if not os.path.exists(path): os.makedirs(path, exist_ok=True) def boolean_is_check_data(df_cols): fault_list = ['快速停机', '故障名称', '故障代码', '故障停机'] df_cols = [str(i).split('_')[-1] for i in df_cols] for fault in fault_list: if fault in df_cols: return True return False def read_fle_to_df(file_path): df = pd.read_excel(file_path) wind_name = [i for i in df.columns if i.find('_') > -1][0].split('_')[0] df.columns = [i.split('_')[-1] for i in df.columns] df['wind_name'] = wind_name return boolean_is_check_data(df.columns), wind_name, df def save_to_file(dfs, wind_name, save_path='', param='', is_check=False, all_cols=list(), result_data_list=multiprocessing.Manager().list()): try: if is_check: df = pd.concat(dfs) else: df = dfs[0] for index, now_df in enumerate(dfs): if index > 0: df = pd.merge(df, now_df, on=['采样时间', 'wind_name'], how='outer') except Exception as e: print(wind_name, e) raise e df.reset_index(inplace=True) df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name']) if 'index' in df.columns: del df['index'] create_file_path(save_path) df.sort_values(by='采样时间', inplace=True) loss_cols = list([i for i in df.columns if i != 'wind_name']) loss_cols.sort() loss_cols.insert(0, wind_name) loss_cols.insert(0, os.path.basename(save_path) + '-' + param) result_data_list.append(loss_cols) # for col in set(all_cols): # if col not in df.columns: # df[col] = np.nan # df.to_csv(os.path.join(save_path, param, wind_name + '.csv'), encoding='utf8', index=False) if __name__ == '__main__': begin = datetime.datetime.now() # dir1 = r'D:\data\新华水电\测试' # save_path = r'D:\data\新华水电\整理数据' result_datas = [ (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/8月风机数据', r'/data/download/collection_data/1进行中/新华水电/整理数据/8月'), (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/9月风机数据', r'/data/download/collection_data/1进行中/新华水电/整理数据/9月') ] result_data_list = multiprocessing.Manager().list() for dir1, save_path in result_datas: files = read_excel_files(dir1) with multiprocessing.Pool(30) as pool: datas = pool.starmap(read_fle_to_df, [(file,) for file in files]) data_wind_name = dict() check_wind_name = dict() data_all_cols = list() check_all_cols = list() for data in datas: check_data, wind_name, df = data[0], data[1], data[2] if '工作模式' not in df.columns: # df.reset_index(inplace=True) # df.set_index(keys=['采样时间'], inplace=True) if check_data: check_all_cols.extend(list(df.columns)) if wind_name in check_wind_name.keys(): check_wind_name[wind_name].append(df) else: check_wind_name[wind_name] = [df] else: data_all_cols.extend(list(df.columns)) if wind_name in data_wind_name.keys(): data_wind_name[wind_name].append(df) else: data_wind_name[wind_name] = [df] with multiprocessing.Pool(30) as pool: pool.starmap(save_to_file, [(dfs, wind_name, save_path, "事件数据", True, check_all_cols, result_data_list) for wind_name, dfs in check_wind_name.items()]) with multiprocessing.Pool(30) as pool: pool.starmap(save_to_file, [(dfs, wind_name, save_path, "数据", False, data_all_cols, result_data_list) for wind_name, dfs in data_wind_name.items()]) print(datetime.datetime.now() - begin) normal_list = list(result_data_list) normal_list.sort(key=lambda x: (x[0], int(x[1][2:]))) with open('loss_col.csv', 'w', encoding='utf8') as f: for datas in normal_list: f.write(",".join(datas)) f.write('\n') print(datetime.datetime.now() - begin)