StatisticsAndSaveFile.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. import pandas as pd
  6. import numpy as np
  7. from etl.base import TransParam
  8. from etl.base.PathsAndTable import PathsAndTable
  9. from etl.step.ClassIdentifier import ClassIdentifier
  10. from service.plt_service import update_trans_transfer_progress
  11. from utils.conf.read_conf import read_conf
  12. from utils.df_utils.util import get_time_space
  13. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
  14. from utils.log.trans_log import trans_print
  15. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, print_memory_usage
  16. class StatisticsAndSaveFile(object):
  17. def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map,
  18. rated_power_and_cutout_speed_map):
  19. self.paths_and_table = paths_and_table
  20. self.trans_param = trans_param
  21. self.statistics_map = statistics_map
  22. self.lock = multiprocessing.Manager().Lock()
  23. self.rated_power_and_cutout_speed_map = rated_power_and_cutout_speed_map
  24. def set_statistics_data(self, df):
  25. if not df.empty:
  26. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  27. min_date = df['time_stamp'].min()
  28. max_date = df['time_stamp'].max()
  29. with self.lock:
  30. if 'min_date' in self.statistics_map.keys():
  31. if self.statistics_map['min_date'] > min_date:
  32. self.statistics_map['min_date'] = min_date
  33. else:
  34. self.statistics_map['min_date'] = min_date
  35. if 'max_date' in self.statistics_map.keys():
  36. if self.statistics_map['max_date'] < max_date:
  37. self.statistics_map['max_date'] = max_date
  38. else:
  39. self.statistics_map['max_date'] = max_date
  40. if 'total_count' in self.statistics_map.keys():
  41. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  42. else:
  43. self.statistics_map['total_count'] = df.shape[0]
  44. if 'time_granularity' not in self.statistics_map.keys():
  45. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  46. def save_statistics_file(self):
  47. save_path = os.path.join(os.path.dirname(self.paths_and_table.get_save_path()),
  48. self.paths_and_table.read_type + '_statistics.txt')
  49. create_file_path(save_path, is_file_path=True)
  50. with open(save_path, 'w', encoding='utf8') as f:
  51. f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
  52. f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
  53. f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
  54. f.write("风机数量:" + str(len(read_excel_files(self.paths_and_table.get_read_tmp_path()))) + "\n")
  55. def check_data_validity(self, df):
  56. pass
  57. def save_to_csv(self, filename):
  58. print_memory_usage("开始读取csv:" + os.path.basename(filename))
  59. df = read_file_to_df(filename)
  60. if self.trans_param.is_vertical_table:
  61. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  62. values=self.trans_param.vertical_value,
  63. aggfunc='max')
  64. # 重置索引以得到普通的列
  65. df.reset_index(inplace=True)
  66. print_memory_usage("结束读取csv,:" + os.path.basename(filename))
  67. # 转化风机名称
  68. trans_print("开始转化风机名称")
  69. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  70. df['wind_turbine_name'] = df['wind_turbine_number']
  71. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  72. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  73. wind_col_name = str(df['wind_turbine_number'].values[0])
  74. print_memory_usage("转化风机名称结束:" + wind_col_name)
  75. not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
  76. 'param9', 'param10']
  77. solve_time_begin = datetime.datetime.now()
  78. trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0])
  79. df.replace(np.nan, -999999999, inplace=True)
  80. number_cols = df.select_dtypes(include=['number']).columns.tolist()
  81. for col in df.columns:
  82. if col not in not_double_cols and col not in number_cols:
  83. if not df[col].isnull().all():
  84. df[col] = pd.to_numeric(df[col], errors='coerce')
  85. # 删除包含NaN的行(即那些列A转换失败的行)
  86. df = df.dropna(subset=[col])
  87. trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0])
  88. df.replace(-999999999, np.nan, inplace=True)
  89. print_memory_usage("处理非法数据大小结束:" + wind_col_name)
  90. trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0])
  91. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  92. trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0])
  93. trans_print("处理非法重复数据结束,耗时:", datetime.datetime.now() - solve_time_begin)
  94. print_memory_usage("处理重复数据结束:" + wind_col_name)
  95. # 添加年月日
  96. solve_time_begin = datetime.datetime.now()
  97. trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
  98. trans_print(wind_col_name, "时间原始大小:", df.shape[0])
  99. # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
  100. # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  101. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  102. df.dropna(subset=['time_stamp'], inplace=True)
  103. trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  104. df.sort_values(by='time_stamp', inplace=True)
  105. trans_print("处理时间字段结束,耗时:", datetime.datetime.now() - solve_time_begin)
  106. print_memory_usage("处理时间结果:" + wind_col_name)
  107. df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
  108. print_memory_usage("删减无用字段后内存占用:" + wind_col_name)
  109. rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
  110. if rated_power_and_cutout_speed_tuple is None:
  111. rated_power_and_cutout_speed_tuple = (None, None)
  112. print_memory_usage("打标签前内存占用:" + wind_col_name)
  113. class_identifiler = ClassIdentifier(wind_turbine_number=wind_col_name, origin_df=df,
  114. rated_power=rated_power_and_cutout_speed_tuple[0],
  115. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  116. df = class_identifiler.run()
  117. print_memory_usage("打标签后内存占用:" + wind_col_name)
  118. df['year'] = df['time_stamp'].dt.year
  119. df['month'] = df['time_stamp'].dt.month
  120. df['day'] = df['time_stamp'].dt.day
  121. df['time_stamp'] = df['time_stamp'].apply(
  122. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  123. print_memory_usage("添加年月日后:" + wind_col_name)
  124. if self.paths_and_table.save_zip:
  125. save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv.gz')
  126. else:
  127. save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv')
  128. create_file_path(save_path, is_file_path=True)
  129. if self.paths_and_table.save_zip:
  130. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  131. else:
  132. df.to_csv(save_path, index=False, encoding='utf-8')
  133. self.set_statistics_data(df)
  134. del df
  135. trans_print("保存" + str(wind_col_name) + "成功")
  136. def mutiprocessing_to_save_file(self):
  137. print_memory_usage("开始执行,占用内存")
  138. # 开始保存到正式文件
  139. trans_print("开始保存到excel文件")
  140. all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
  141. # split_count = self.pathsAndTable.multi_pool_count
  142. split_count = use_files_get_max_cpu_count(all_tmp_files)
  143. all_arrays = split_array(all_tmp_files, split_count)
  144. try:
  145. for index, arr in enumerate(all_arrays):
  146. with multiprocessing.Pool(split_count) as pool:
  147. pool.starmap(self.save_to_csv, [(i,) for i in arr])
  148. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type,
  149. round(50 + 20 * (index + 1) / len(all_arrays), 2),
  150. self.paths_and_table.save_db)
  151. except Exception as e:
  152. trans_print(traceback.format_exc())
  153. message = "保存文件错误,系统返回错误:" + str(e)
  154. raise ValueError(message)
  155. trans_print("结束保存到excel文件")
  156. def run(self):
  157. trans_print("开始保存数据到正式文件")
  158. begin = datetime.datetime.now()
  159. self.mutiprocessing_to_save_file()
  160. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, 70,
  161. self.paths_and_table.save_db)
  162. trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin)