import datetime import multiprocessing import warnings from os import * import pandas as pd warnings.filterwarnings("ignore") def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in listdir(path): item_path = path.join(path, item) if path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取路径下所有的excel文件 def read_excel_files(read_path): if path.isfile(read_path): return [read_path] directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz']) return [path for paths in directory_dict.values() for path in paths if path] # 创建路径 def create_file_path(path, is_file_path=False): """ 创建路径 :param path:创建文件夹的路径 :param is_file_path: 传入的path是否包含具体的文件名 """ if is_file_path: path = path.dirname(path) if not path.exists(path): makedirs(path, exist_ok=True) def boolean_is_check_data(df_cols, need_valid=True): if not need_valid: return True fault_list = ['快速停机', '故障名称', '故障代码', '故障停机', '人工停机', '风机紧急停机', '风机自身故障停机', '限功率运行状态'] df_cols = [str(i).split('_')[-1] for i in df_cols] for fault in fault_list: if fault in df_cols: return True return False def read_fle_to_df(file_path): df = pd.read_excel(file_path) wind_name = [i for i in df.columns if i.find('_') > -1][0].split('_')[0] df.columns = [i.split('_')[-1] for i in df.columns] df['wind_name'] = wind_name df['采样时间'] = pd.to_datetime(df['采样时间']) df['采样时间'] = df['采样时间'].dt.ceil('T') return boolean_is_check_data(df.columns, file_path.find('批次') > -1), wind_name, df def read_guzhangbaojing(file_path): try: df = pd.read_excel(file_path) df.rename(columns={'风机名': 'wind_name'}, inplace=True) df['采样时间'] = pd.to_datetime(df['采样时间']) df['采样时间'] = df['采样时间'].dt.ceil('T') df = df[(df['采样时间'] >= '2024-08-01 00:00:00') & (df['采样时间'] < '2024-10-01 00:00:00')] return df except Exception as e: print(file_path, e) raise e def combine_df(dfs, wind_name, save_path=''): print(wind_name) cols = list() col_map = dict() try: df = dfs[0] cols.extend(df.columns) for index, now_df in enumerate(dfs): if index > 0: for col in now_df.columns: if col in cols and col not in ['采样时间', 'wind_name']: if col in col_map.keys(): count = col_map[col] col_map[col] = count + 1 else: count = 1 col_map[col] = 1 now_df.rename(columns={col: col + '__' + str(count)}, inplace=True) df = pd.merge(df, now_df, on=['采样时间', 'wind_name'], how='outer') cols.extend(now_df.columns) except Exception as e: print(wind_name, e) raise e df.reset_index(inplace=True) df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name']) if 'index' in df.columns: del df['index'] create_file_path(save_path) df.sort_values(by='采样时间', inplace=True) df.set_index(keys=['采样时间', 'wind_name'], inplace=True) return wind_name, df def sae_to_csv(wind_name, df): try: col_tuples = [(col.split('__')[0], col) for col in df.columns if col.find('__') > -1] col_dict = dict() for origin, col in col_tuples: if origin in col_dict.keys(): col_dict[origin].add(col) else: col_dict[origin] = {col} for origin, cols in col_dict.items(): print(wind_name, origin, cols) if pd.api.types.is_numeric_dtype(df[origin]): df[origin] = df[list(cols)].max(axis=1) else: df[origin] = df[list(cols)].apply(lambda x: [i for i in x.values if i][0], axis=1) for col in cols: if col != origin: del df[col] df.to_csv(path.join(save_path, wind_name + '.csv'), encoding='utf8') except Exception as e: print(wind_name, df.columns) raise e if __name__ == '__main__': begin = datetime.datetime.now() base_path = r'/data/download/collection_data/1进行中/新华水电/收资数据/风机SCADA数据' dir1 = base_path + r'/data' dir2 = base_path + r'/故障报警/汇能机组数据-故障' dir3 = base_path + r'/故障报警/报警' save_path = r'/data/download/collection_data/1进行中/新华水电/清理数据/合并批次1-2故障报警' create_file_path(save_path) # result_datas = [ # (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据', # r'/data/download/collection_data/1进行中/新华水电/整理数据/批次1-2合并'), # ] data_wind_name = dict() files = read_excel_files(dir1) with multiprocessing.Pool(30) as pool: datas = pool.starmap(read_fle_to_df, [(file,) for file in files]) for data in datas: check_data, wind_name, df = data[0], data[1], data[2] if wind_name in data_wind_name.keys(): data_wind_name[wind_name].append(df) else: data_wind_name[wind_name] = [df] with multiprocessing.Pool(30) as pool: data_dfs = pool.starmap(combine_df, [(dfs, wind_name, save_path) for wind_name, dfs in data_wind_name.items()]) result_data_dict = dict() for wind_name, df in data_dfs: result_data_dict[wind_name] = df for dir4 in [dir2, dir3]: guzhang_files = read_excel_files(dir4) with multiprocessing.Pool(30) as pool: guzhang_datas = pool.starmap(read_guzhangbaojing, [(file,) for file in guzhang_files]) guzhang_df = pd.DataFrame() for df in guzhang_datas: if not df.empty: guzhang_df = pd.concat([guzhang_df, df]) wind_names = set(list(guzhang_df['wind_name'].values)) for wind_name in wind_names: now_df = guzhang_df[guzhang_df['wind_name'] == wind_name] if wind_name in result_data_dict.keys(): now_df.reset_index(inplace=True) now_df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name']) if 'index' in now_df.columns: del now_df['index'] now_df.sort_values(by='采样时间', inplace=True) now_df.set_index(keys=['采样时间', 'wind_name'], inplace=True) res_df = result_data_dict[wind_name] result_data_dict[wind_name] = pd.concat([res_df, now_df], axis=1) with multiprocessing.Pool(30) as pool: pool.starmap(sae_to_csv, [(wind_name, df) for wind_name, df in result_data_dict.items()]) print(datetime.datetime.now() - begin)