123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- import datetime
- import multiprocessing
- import warnings
- from os import *
- import numpy as np
- import pandas as pd
- warnings.filterwarnings("ignore")
- def __build_directory_dict(directory_dict, path, filter_types=None):
- # 遍历目录下的所有项
- for item in listdir(path):
- item_path = path.join(path, item)
- if path.isdir(item_path):
- __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
- elif path.isfile(item_path):
- if path not in directory_dict:
- directory_dict[path] = []
- if filter_types is None or len(filter_types) == 0:
- directory_dict[path].append(item_path)
- elif str(item_path).split(".")[-1] in filter_types:
- if str(item_path).count("~$") == 0:
- directory_dict[path].append(item_path)
- # 读取路径下所有的excel文件
- def read_excel_files(read_path):
- if path.isfile(read_path):
- return [read_path]
- directory_dict = {}
- __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
- return [path for paths in directory_dict.values() for path in paths if path]
- # 创建路径
- def create_file_path(path, is_file_path=False):
- """
- 创建路径
- :param path:创建文件夹的路径
- :param is_file_path: 传入的path是否包含具体的文件名
- """
- if is_file_path:
- path = path.dirname(path)
- if not path.exists(path):
- makedirs(path, exist_ok=True)
- def boolean_is_check_data(df_cols):
- fault_list = ['快速停机', '故障名称', '故障代码', '故障停机', '人工停机', '风机紧急停机', '工作模式', '风机自身故障停机', '限功率运行状态']
- df_cols = [str(i).split('_')[-1] for i in df_cols]
- for fault in fault_list:
- if fault in df_cols:
- return True
- return False
- def read_fle_to_df(file_path):
- df = pd.read_excel(file_path)
- wind_name = [i for i in df.columns if i.find('_') > -1][0].split('_')[0]
- df.columns = [i.split('_')[-1] for i in df.columns]
- df['wind_name'] = wind_name
- return boolean_is_check_data(df.columns), wind_name, df
- def save_to_file(dfs, wind_name, save_path='', param='', is_check=False, all_cols=list(),
- result_data_list=multiprocessing.Manager().list()):
- try:
- if is_check:
- df = pd.concat(dfs)
- else:
- df = dfs[0]
- for index, now_df in enumerate(dfs):
- if index > 0:
- df = pd.merge(df, now_df, on=['采样时间', 'wind_name'], how='outer')
- except Exception as e:
- print(wind_name, e)
- raise e
- df.reset_index(inplace=True)
- df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name'])
- if 'index' in df.columns:
- del df['index']
- create_file_path(save_path)
- df.sort_values(by='采样时间', inplace=True)
- loss_cols = list([i for i in df.columns if i != 'wind_name'])
- loss_cols.sort()
- loss_cols.insert(0, wind_name)
- loss_cols.insert(0, path.basename(save_path) + '-' + param)
- result_data_list.append(loss_cols)
- for col in set(all_cols):
- if col not in df.columns:
- df[col] = np.nan
- df.to_csv(path.join(save_path, param, wind_name + '.csv'), encoding='utf8', index=False)
- if __name__ == '__main__':
- begin = datetime.datetime.now()
- # dir1 = r'D:\data\新华水电\测试'
- # save_path = r'D:\data\新华水电\整理数据'
- result_datas = [
- (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/8月风机数据',
- r'/data/download/collection_data/1进行中/新华水电/整理数据/8月'),
- (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/9月风机数据',
- r'/data/download/collection_data/1进行中/新华水电/整理数据/9月')
- ]
- result_data_list = multiprocessing.Manager().list()
- for dir1, save_path in result_datas:
- files = read_excel_files(dir1)
- with multiprocessing.Pool(30) as pool:
- datas = pool.starmap(read_fle_to_df, [(file,) for file in files])
- data_wind_name = dict()
- check_wind_name = dict()
- data_all_cols = list()
- check_all_cols = list()
- for data in datas:
- check_data, wind_name, df = data[0], data[1], data[2]
- if '工作模式' not in df.columns:
- # df.reset_index(inplace=True)
- # df.set_index(keys=['采样时间'], inplace=True)
- if check_data:
- check_all_cols.extend(list(df.columns))
- if wind_name in check_wind_name.keys():
- check_wind_name[wind_name].append(df)
- else:
- check_wind_name[wind_name] = [df]
- else:
- data_all_cols.extend(list(df.columns))
- if wind_name in data_wind_name.keys():
- data_wind_name[wind_name].append(df)
- else:
- data_wind_name[wind_name] = [df]
- # with multiprocessing.Pool(30) as pool:
- # pool.starmap(combine_df,
- # [(dfs, wind_name, save_path, "事件数据", True, check_all_cols, result_data_list) for wind_name, dfs
- # in
- # check_wind_name.items()])
- with multiprocessing.Pool(30) as pool:
- pool.starmap(save_to_file,
- [(dfs, wind_name, save_path, "数据", False, data_all_cols, result_data_list) for wind_name, dfs
- in
- data_wind_name.items()])
- print(datetime.datetime.now() - begin)
- normal_list = list(result_data_list)
- normal_list.sort(key=lambda x: (x[0], int(x[1][2:])))
- with open('loss_col.csv', 'w', encoding='utf8') as f:
- for datas in normal_list:
- f.write(",".join(datas))
- f.write('\n')
- print(datetime.datetime.now() - begin)
|