organize_xinhua_files.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import datetime
  2. import multiprocessing
  3. import warnings
  4. from os import *
  5. import numpy as np
  6. import pandas as pd
  7. warnings.filterwarnings("ignore")
  8. def __build_directory_dict(directory_dict, path, filter_types=None):
  9. # 遍历目录下的所有项
  10. for item in listdir(path):
  11. item_path = path.join(path, item)
  12. if path.isdir(item_path):
  13. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  14. elif path.isfile(item_path):
  15. if path not in directory_dict:
  16. directory_dict[path] = []
  17. if filter_types is None or len(filter_types) == 0:
  18. directory_dict[path].append(item_path)
  19. elif str(item_path).split(".")[-1] in filter_types:
  20. if str(item_path).count("~$") == 0:
  21. directory_dict[path].append(item_path)
  22. # 读取路径下所有的excel文件
  23. def read_excel_files(read_path):
  24. if path.isfile(read_path):
  25. return [read_path]
  26. directory_dict = {}
  27. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
  28. return [path for paths in directory_dict.values() for path in paths if path]
  29. # 创建路径
  30. def create_file_path(path, is_file_path=False):
  31. """
  32. 创建路径
  33. :param path:创建文件夹的路径
  34. :param is_file_path: 传入的path是否包含具体的文件名
  35. """
  36. if is_file_path:
  37. path = path.dirname(path)
  38. if not path.exists(path):
  39. makedirs(path, exist_ok=True)
  40. def boolean_is_check_data(df_cols):
  41. fault_list = ['快速停机', '故障名称', '故障代码', '故障停机', '人工停机', '风机紧急停机', '工作模式', '风机自身故障停机', '限功率运行状态']
  42. df_cols = [str(i).split('_')[-1] for i in df_cols]
  43. for fault in fault_list:
  44. if fault in df_cols:
  45. return True
  46. return False
  47. def read_fle_to_df(file_path):
  48. df = pd.read_excel(file_path)
  49. wind_name = [i for i in df.columns if i.find('_') > -1][0].split('_')[0]
  50. df.columns = [i.split('_')[-1] for i in df.columns]
  51. df['wind_name'] = wind_name
  52. return boolean_is_check_data(df.columns), wind_name, df
  53. def save_to_file(dfs, wind_name, save_path='', param='', is_check=False, all_cols=list(),
  54. result_data_list=multiprocessing.Manager().list()):
  55. try:
  56. if is_check:
  57. df = pd.concat(dfs)
  58. else:
  59. df = dfs[0]
  60. for index, now_df in enumerate(dfs):
  61. if index > 0:
  62. df = pd.merge(df, now_df, on=['采样时间', 'wind_name'], how='outer')
  63. except Exception as e:
  64. print(wind_name, e)
  65. raise e
  66. df.reset_index(inplace=True)
  67. df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name'])
  68. if 'index' in df.columns:
  69. del df['index']
  70. create_file_path(save_path)
  71. df.sort_values(by='采样时间', inplace=True)
  72. loss_cols = list([i for i in df.columns if i != 'wind_name'])
  73. loss_cols.sort()
  74. loss_cols.insert(0, wind_name)
  75. loss_cols.insert(0, path.basename(save_path) + '-' + param)
  76. result_data_list.append(loss_cols)
  77. for col in set(all_cols):
  78. if col not in df.columns:
  79. df[col] = np.nan
  80. df.to_csv(path.join(save_path, param, wind_name + '.csv'), encoding='utf8', index=False)
  81. if __name__ == '__main__':
  82. begin = datetime.datetime.now()
  83. # dir1 = r'D:\data\新华水电\测试'
  84. # save_path = r'D:\data\新华水电\整理数据'
  85. result_datas = [
  86. (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/8月风机数据',
  87. r'/data/download/collection_data/1进行中/新华水电/整理数据/8月'),
  88. (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/9月风机数据',
  89. r'/data/download/collection_data/1进行中/新华水电/整理数据/9月')
  90. ]
  91. result_data_list = multiprocessing.Manager().list()
  92. for dir1, save_path in result_datas:
  93. files = read_excel_files(dir1)
  94. with multiprocessing.Pool(30) as pool:
  95. datas = pool.starmap(read_fle_to_df, [(file,) for file in files])
  96. data_wind_name = dict()
  97. check_wind_name = dict()
  98. data_all_cols = list()
  99. check_all_cols = list()
  100. for data in datas:
  101. check_data, wind_name, df = data[0], data[1], data[2]
  102. if '工作模式' not in df.columns:
  103. # df.reset_index(inplace=True)
  104. # df.set_index(keys=['采样时间'], inplace=True)
  105. if check_data:
  106. check_all_cols.extend(list(df.columns))
  107. if wind_name in check_wind_name.keys():
  108. check_wind_name[wind_name].append(df)
  109. else:
  110. check_wind_name[wind_name] = [df]
  111. else:
  112. data_all_cols.extend(list(df.columns))
  113. if wind_name in data_wind_name.keys():
  114. data_wind_name[wind_name].append(df)
  115. else:
  116. data_wind_name[wind_name] = [df]
  117. # with multiprocessing.Pool(30) as pool:
  118. # pool.starmap(combine_df,
  119. # [(dfs, wind_name, save_path, "事件数据", True, check_all_cols, result_data_list) for wind_name, dfs
  120. # in
  121. # check_wind_name.items()])
  122. with multiprocessing.Pool(30) as pool:
  123. pool.starmap(save_to_file,
  124. [(dfs, wind_name, save_path, "数据", False, data_all_cols, result_data_list) for wind_name, dfs
  125. in
  126. data_wind_name.items()])
  127. print(datetime.datetime.now() - begin)
  128. normal_list = list(result_data_list)
  129. normal_list.sort(key=lambda x: (x[0], int(x[1][2:])))
  130. with open('loss_col.csv', 'w', encoding='utf8') as f:
  131. for datas in normal_list:
  132. f.write(",".join(datas))
  133. f.write('\n')
  134. print(datetime.datetime.now() - begin)