organize_xinhua_files_data.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. import multiprocessing
  2. import datetime
  3. import os
  4. import warnings
  5. import numpy as np
  6. import pandas as pd
  7. warnings.filterwarnings("ignore")
  8. def __build_directory_dict(directory_dict, path, filter_types=None):
  9. # 遍历目录下的所有项
  10. for item in os.listdir(path):
  11. item_path = os.path.join(path, item)
  12. if os.path.isdir(item_path):
  13. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  14. elif os.path.isfile(item_path):
  15. if path not in directory_dict:
  16. directory_dict[path] = []
  17. if filter_types is None or len(filter_types) == 0:
  18. directory_dict[path].append(item_path)
  19. elif str(item_path).split(".")[-1] in filter_types:
  20. if str(item_path).count("~$") == 0:
  21. directory_dict[path].append(item_path)
  22. # 读取路径下所有的excel文件
  23. def read_excel_files(read_path):
  24. if os.path.isfile(read_path):
  25. return [read_path]
  26. directory_dict = {}
  27. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
  28. return [path for paths in directory_dict.values() for path in paths if path]
  29. # 创建路径
  30. def create_file_path(path, is_file_path=False):
  31. """
  32. 创建路径
  33. :param path:创建文件夹的路径
  34. :param is_file_path: 传入的path是否包含具体的文件名
  35. """
  36. if is_file_path:
  37. path = os.path.dirname(path)
  38. if not os.path.exists(path):
  39. os.makedirs(path, exist_ok=True)
  40. def boolean_is_check_data(df_cols, need_valid=True):
  41. if not need_valid:
  42. return True
  43. fault_list = ['快速停机', '故障名称', '故障代码', '故障停机', '人工停机', '风机紧急停机', '风机自身故障停机', '限功率运行状态']
  44. df_cols = [str(i).split('_')[-1] for i in df_cols]
  45. for fault in fault_list:
  46. if fault in df_cols:
  47. return True
  48. return False
  49. def read_fle_to_df(file_path):
  50. df = pd.read_excel(file_path)
  51. wind_name = [i for i in df.columns if i.find('_') > -1][0].split('_')[0]
  52. df.columns = [i.split('_')[-1] for i in df.columns]
  53. df['wind_name'] = wind_name
  54. df['采样时间'] = pd.to_datetime(df['采样时间'])
  55. df['采样时间'] = df['采样时间'].dt.ceil('T')
  56. return boolean_is_check_data(df.columns, file_path.find('批次') > -1), wind_name, df
  57. def read_guzhangbaojing(file_path):
  58. try:
  59. df = pd.read_excel(file_path)
  60. df.rename(columns={'风机名': 'wind_name'}, inplace=True)
  61. df['采样时间'] = pd.to_datetime(df['采样时间'])
  62. df['采样时间'] = df['采样时间'].dt.ceil('T')
  63. df = df[(df['采样时间'] >= '2024-08-01 00:00:00') & (df['采样时间'] < '2024-10-01 00:00:00')]
  64. return df
  65. except Exception as e:
  66. print(file_path, e)
  67. raise e
  68. def combine_df(dfs, wind_name, save_path=''):
  69. print(wind_name)
  70. cols = list()
  71. col_map = dict()
  72. try:
  73. df = dfs[0]
  74. cols.extend(df.columns)
  75. for index, now_df in enumerate(dfs):
  76. if index > 0:
  77. for col in now_df.columns:
  78. if col in cols and col not in ['采样时间', 'wind_name']:
  79. if col in col_map.keys():
  80. count = col_map[col]
  81. col_map[col] = count + 1
  82. else:
  83. count = 1
  84. col_map[col] = 1
  85. now_df.rename(columns={col: col + '__' + str(count)}, inplace=True)
  86. df = pd.merge(df, now_df, on=['采样时间', 'wind_name'], how='outer')
  87. cols.extend(now_df.columns)
  88. except Exception as e:
  89. print(wind_name, e)
  90. raise e
  91. df.reset_index(inplace=True)
  92. df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name'])
  93. if 'index' in df.columns:
  94. del df['index']
  95. create_file_path(save_path)
  96. df.sort_values(by='采样时间', inplace=True)
  97. df.set_index(keys=['采样时间', 'wind_name'], inplace=True)
  98. return wind_name, df
  99. def sae_to_csv(wind_name, df):
  100. try:
  101. col_tuples = [(col.split('__')[0], col) for col in df.columns if col.find('__') > -1]
  102. col_dict = dict()
  103. for origin, col in col_tuples:
  104. if origin in col_dict.keys():
  105. col_dict[origin].add(col)
  106. else:
  107. col_dict[origin] = {col}
  108. for origin, cols in col_dict.items():
  109. print(wind_name, origin, cols)
  110. if pd.api.types.is_numeric_dtype(df[origin]):
  111. df[origin] = df[list(cols)].max(axis=1)
  112. else:
  113. df[origin] = df[list(cols)].apply(lambda x: [i for i in x.values if i][0], axis=1)
  114. for col in cols:
  115. if col != origin:
  116. del df[col]
  117. df.to_csv(os.path.join(save_path, wind_name + '.csv'), encoding='utf8')
  118. except Exception as e:
  119. print(wind_name, df.columns)
  120. raise e
  121. if __name__ == '__main__':
  122. begin = datetime.datetime.now()
  123. base_path = r'/data/download/collection_data/1进行中/新华水电/收资数据/风机SCADA数据'
  124. dir1 = base_path + r'/data'
  125. dir2 = base_path + r'/故障报警/汇能机组数据-故障'
  126. dir3 = base_path + r'/故障报警/报警'
  127. save_path = r'/data/download/collection_data/1进行中/新华水电/清理数据/合并批次1-2故障报警'
  128. create_file_path(save_path)
  129. # result_datas = [
  130. # (r'/data/download/collection_data/1进行中/新华水电/风机SCADA数据',
  131. # r'/data/download/collection_data/1进行中/新华水电/整理数据/批次1-2合并'),
  132. # ]
  133. data_wind_name = dict()
  134. files = read_excel_files(dir1)
  135. with multiprocessing.Pool(30) as pool:
  136. datas = pool.starmap(read_fle_to_df, [(file,) for file in files])
  137. for data in datas:
  138. check_data, wind_name, df = data[0], data[1], data[2]
  139. if wind_name in data_wind_name.keys():
  140. data_wind_name[wind_name].append(df)
  141. else:
  142. data_wind_name[wind_name] = [df]
  143. with multiprocessing.Pool(30) as pool:
  144. data_dfs = pool.starmap(combine_df,
  145. [(dfs, wind_name, save_path) for wind_name, dfs
  146. in
  147. data_wind_name.items()])
  148. result_data_dict = dict()
  149. for wind_name, df in data_dfs:
  150. result_data_dict[wind_name] = df
  151. for dir4 in [dir2, dir3]:
  152. guzhang_files = read_excel_files(dir4)
  153. with multiprocessing.Pool(30) as pool:
  154. guzhang_datas = pool.starmap(read_guzhangbaojing, [(file,) for file in guzhang_files])
  155. guzhang_df = pd.DataFrame()
  156. for df in guzhang_datas:
  157. if not df.empty:
  158. guzhang_df = pd.concat([guzhang_df, df])
  159. wind_names = set(list(guzhang_df['wind_name'].values))
  160. for wind_name in wind_names:
  161. now_df = guzhang_df[guzhang_df['wind_name'] == wind_name]
  162. if wind_name in result_data_dict.keys():
  163. now_df.reset_index(inplace=True)
  164. now_df.drop_duplicates(inplace=True, subset=['采样时间', 'wind_name'])
  165. if 'index' in now_df.columns:
  166. del now_df['index']
  167. now_df.sort_values(by='采样时间', inplace=True)
  168. now_df.set_index(keys=['采样时间', 'wind_name'], inplace=True)
  169. res_df = result_data_dict[wind_name]
  170. result_data_dict[wind_name] = pd.concat([res_df, now_df], axis=1)
  171. with multiprocessing.Pool(30) as pool:
  172. pool.starmap(sae_to_csv, [(wind_name, df) for wind_name, df in result_data_dict.items()])
  173. print(datetime.datetime.now() - begin)