StatisticsAndSaveFile.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. import pandas as pd
  6. import numpy as np
  7. from etl.base import TransParam
  8. from etl.base.PathsAndTable import PathsAndTable
  9. from etl.step.ClassIdentifier import ClassIdentifier
  10. from service.plt_service import update_trans_transfer_progress
  11. from utils.conf.read_conf import read_conf
  12. from utils.df_utils.util import get_time_space
  13. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
  14. from utils.log.trans_log import trans_print
  15. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  16. class StatisticsAndSaveFile(object):
  17. def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map, rated_power_map):
  18. self.paths_and_table = paths_and_table
  19. self.trans_param = trans_param
  20. self.statistics_map = statistics_map
  21. self.lock = multiprocessing.Manager().Lock()
  22. self.rated_power_map = rated_power_map
  23. def set_statistics_data(self, df):
  24. if not df.empty:
  25. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  26. min_date = df['time_stamp'].min()
  27. max_date = df['time_stamp'].max()
  28. with self.lock:
  29. if 'min_date' in self.statistics_map.keys():
  30. if self.statistics_map['min_date'] > min_date:
  31. self.statistics_map['min_date'] = min_date
  32. else:
  33. self.statistics_map['min_date'] = min_date
  34. if 'max_date' in self.statistics_map.keys():
  35. if self.statistics_map['max_date'] < max_date:
  36. self.statistics_map['max_date'] = max_date
  37. else:
  38. self.statistics_map['max_date'] = max_date
  39. if 'total_count' in self.statistics_map.keys():
  40. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  41. else:
  42. self.statistics_map['total_count'] = df.shape[0]
  43. if 'time_granularity' not in self.statistics_map.keys():
  44. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  45. def save_statistics_file(self):
  46. save_path = os.path.join(os.path.dirname(self.paths_and_table.get_save_path()),
  47. self.paths_and_table.read_type + '_statistics.txt')
  48. create_file_path(save_path, is_file_path=True)
  49. with open(save_path, 'w', encoding='utf8') as f:
  50. f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
  51. f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
  52. f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
  53. f.write("风机数量:" + str(len(read_excel_files(self.paths_and_table.get_read_tmp_path()))) + "\n")
  54. def check_data_validity(self, df):
  55. pass
  56. def save_to_csv(self, filename):
  57. df = read_file_to_df(filename)
  58. if self.trans_param.is_vertical_table:
  59. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  60. values=self.trans_param.vertical_value,
  61. aggfunc='max')
  62. # 重置索引以得到普通的列
  63. df.reset_index(inplace=True)
  64. for k in self.trans_param.cols_tran.keys():
  65. if k not in df.columns:
  66. df[k] = None
  67. df = df[self.trans_param.cols_tran.keys()]
  68. # 转化风机名称
  69. trans_print("开始转化风机名称")
  70. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  71. df['wind_turbine_name'] = df['wind_turbine_number']
  72. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  73. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  74. wind_col_name = str(df['wind_turbine_number'].values[0])
  75. # 添加年月日
  76. trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
  77. trans_print(wind_col_name, "时间原始大小:", df.shape[0])
  78. # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
  79. # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  80. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  81. df.dropna(subset=['time_stamp'], inplace=True)
  82. trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  83. df['year'] = df['time_stamp'].dt.year
  84. df['month'] = df['time_stamp'].dt.month
  85. df['day'] = df['time_stamp'].dt.day
  86. df.sort_values(by='time_stamp', inplace=True)
  87. df['time_stamp'] = df['time_stamp'].apply(
  88. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  89. trans_print("处理时间字段结束")
  90. not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
  91. 'param9', 'param10']
  92. trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0])
  93. df.replace(np.nan, -999999999, inplace=True)
  94. for col in df.columns:
  95. if col not in not_double_cols:
  96. if not df[col].isnull().all():
  97. df[col] = pd.to_numeric(df[col], errors='coerce')
  98. # 删除包含NaN的行(即那些列A转换失败的行)
  99. df = df.dropna(subset=[col])
  100. trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0])
  101. df.replace(-999999999, np.nan, inplace=True)
  102. trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0])
  103. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  104. trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0])
  105. class_identifiler = ClassIdentifier(origin_df=df, rated_power=read_conf(self.rated_power_map, str(wind_col_name)))
  106. df = class_identifiler.run()
  107. if self.paths_and_table.save_zip:
  108. save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv.gz')
  109. else:
  110. save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv')
  111. create_file_path(save_path, is_file_path=True)
  112. if self.paths_and_table.save_zip:
  113. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  114. else:
  115. df.to_csv(save_path, index=False, encoding='utf-8')
  116. self.set_statistics_data(df)
  117. del df
  118. trans_print("保存" + str(wind_col_name) + "成功")
  119. def mutiprocessing_to_save_file(self):
  120. # 开始保存到正式文件
  121. trans_print("开始保存到excel文件")
  122. all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
  123. # split_count = self.pathsAndTable.multi_pool_count
  124. split_count = use_files_get_max_cpu_count(all_tmp_files)
  125. all_arrays = split_array(all_tmp_files, split_count)
  126. try:
  127. for index, arr in enumerate(all_arrays):
  128. with multiprocessing.Pool(split_count) as pool:
  129. pool.starmap(self.save_to_csv, [(i,) for i in arr])
  130. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type,
  131. round(50 + 20 * (index + 1) / len(all_arrays), 2),
  132. self.paths_and_table.save_db)
  133. except Exception as e:
  134. trans_print(traceback.format_exc())
  135. message = "保存文件错误,系统返回错误:" + str(e)
  136. raise ValueError(message)
  137. trans_print("结束保存到excel文件")
  138. def run(self):
  139. trans_print("开始保存数据到正式文件")
  140. begin = datetime.datetime.now()
  141. self.mutiprocessing_to_save_file()
  142. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, 70,
  143. self.paths_and_table.save_db)
  144. trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin)