organize_xinhua_files.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import multiprocessing
  2. import datetime
  3. import os
  4. import warnings
  5. import pandas as pd
  6. warnings.filterwarnings("ignore")
  7. def __build_directory_dict(directory_dict, path, filter_types=None):
  8. # 遍历目录下的所有项
  9. for item in os.listdir(path):
  10. item_path = os.path.join(path, item)
  11. if os.path.isdir(item_path):
  12. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  13. elif os.path.isfile(item_path):
  14. if path not in directory_dict:
  15. directory_dict[path] = []
  16. if filter_types is None or len(filter_types) == 0:
  17. directory_dict[path].append(item_path)
  18. elif str(item_path).split(".")[-1] in filter_types:
  19. if str(item_path).count("~$") == 0:
  20. directory_dict[path].append(item_path)
  21. # 读取路径下所有的excel文件
  22. def read_excel_files(read_path):
  23. if os.path.isfile(read_path):
  24. return [read_path]
  25. directory_dict = {}
  26. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
  27. return [path for paths in directory_dict.values() for path in paths if path]
  28. # 创建路径
  29. def create_file_path(path, is_file_path=False):
  30. """
  31. 创建路径
  32. :param path:创建文件夹的路径
  33. :param is_file_path: 传入的path是否包含具体的文件名
  34. """
  35. if is_file_path:
  36. path = os.path.dirname(path)
  37. if not os.path.exists(path):
  38. os.makedirs(path, exist_ok=True)
  39. def boolean_is_check_data(df_cols):
  40. fault_list = ['快速停机', '故障名称', '故障代码', '故障停机']
  41. df_cols = [str(i).split('_')[-1] for i in df_cols]
  42. for fault in fault_list:
  43. if fault in df_cols:
  44. return True
  45. return False
  46. def read_fle_to_df(file_path):
  47. df = pd.read_excel(file_path)
  48. wind_name = [i for i in df.columns if i.find('_') > -1][0].split('_')[0]
  49. df.columns = [i.split('_')[-1] for i in df.columns]
  50. df['wind_name'] = wind_name
  51. return boolean_is_check_data(df.columns), wind_name, df
  52. def save_to_file(dfs, wind_name, save_path='', param='', is_check=False, all_cols=list(),
  53. result_data_list=multiprocessing.Manager().list()):
  54. try:
  55. if is_check:
  56. df = pd.concat(dfs)
  57. else:
  58. df = dfs[0]
  59. for index, now_df in enumerate(dfs):
  60. if index > 0:
  61. df = pd.merge(df, now_df, on=['采样时间', 'wind_name'], how='outer')
  62. except Exception as e:
  63. print(wind_name, e)
  64. raise e
  65. df.reset_index(inplace=True)
  66. df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name'])
  67. if 'index' in df.columns:
  68. del df['index']
  69. create_file_path(save_path)
  70. df.sort_values(by='采样时间', inplace=True)
  71. loss_cols = list([i for i in df.columns if i != 'wind_name'])
  72. loss_cols.sort()
  73. loss_cols.insert(0, wind_name)
  74. loss_cols.insert(0, os.path.basename(save_path) + '-' + param)
  75. result_data_list.append(loss_cols)
  76. # for col in set(all_cols):
  77. # if col not in df.columns:
  78. # df[col] = np.nan
  79. # df.to_csv(os.path.join(save_path, param, wind_name + '.csv'), encoding='utf8', index=False)
  80. if __name__ == '__main__':
  81. begin = datetime.datetime.now()
  82. # dir1 = r'D:\data\新华水电\测试'
  83. # save_path = r'D:\data\新华水电\整理数据'
  84. result_datas = [
  85. (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/8月风机数据',
  86. r'/data/download/collection_data/1进行中/新华水电/整理数据/8月'),
  87. (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据/9月风机数据',
  88. r'/data/download/collection_data/1进行中/新华水电/整理数据/9月')
  89. ]
  90. result_data_list = multiprocessing.Manager().list()
  91. for dir1, save_path in result_datas:
  92. files = read_excel_files(dir1)
  93. with multiprocessing.Pool(30) as pool:
  94. datas = pool.starmap(read_fle_to_df, [(file,) for file in files])
  95. data_wind_name = dict()
  96. check_wind_name = dict()
  97. data_all_cols = list()
  98. check_all_cols = list()
  99. for data in datas:
  100. check_data, wind_name, df = data[0], data[1], data[2]
  101. if '工作模式' not in df.columns:
  102. # df.reset_index(inplace=True)
  103. # df.set_index(keys=['采样时间'], inplace=True)
  104. if check_data:
  105. check_all_cols.extend(list(df.columns))
  106. if wind_name in check_wind_name.keys():
  107. check_wind_name[wind_name].append(df)
  108. else:
  109. check_wind_name[wind_name] = [df]
  110. else:
  111. data_all_cols.extend(list(df.columns))
  112. if wind_name in data_wind_name.keys():
  113. data_wind_name[wind_name].append(df)
  114. else:
  115. data_wind_name[wind_name] = [df]
  116. with multiprocessing.Pool(30) as pool:
  117. pool.starmap(save_to_file,
  118. [(dfs, wind_name, save_path, "事件数据", True, check_all_cols, result_data_list) for wind_name, dfs
  119. in
  120. check_wind_name.items()])
  121. with multiprocessing.Pool(30) as pool:
  122. pool.starmap(save_to_file,
  123. [(dfs, wind_name, save_path, "数据", False, data_all_cols, result_data_list) for wind_name, dfs
  124. in
  125. data_wind_name.items()])
  126. print(datetime.datetime.now() - begin)
  127. normal_list = list(result_data_list)
  128. normal_list.sort(key=lambda x: (x[0], int(x[1][2:])))
  129. with open('loss_col.csv', 'w', encoding='utf8') as f:
  130. for datas in normal_list:
  131. f.write(",".join(datas))
  132. f.write('\n')
  133. print(datetime.datetime.now() - begin)