StatisticsAndSaveFile.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. import pandas as pd
  6. import numpy as np
  7. from etl.base import TransParam
  8. from etl.base.PathsAndTable import PathsAndTable
  9. from etl.step.ClassIdentifier import ClassIdentifier
  10. from service.plt_service import update_trans_transfer_progress
  11. from utils.df_utils.util import get_time_space
  12. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
  13. from utils.log.trans_log import trans_print
  14. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  15. class StatisticsAndSaveFile(object):
  16. def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam, statistics_map:dict(),rated_power_map: dict()):
  17. self.pathsAndTable = pathsAndTable
  18. self.trans_param = trans_param
  19. self.statistics_map = statistics_map
  20. self.lock = multiprocessing.Manager().Lock()
  21. self.rated_power_map = rated_power_map
  22. def set_statistics_data(self, df):
  23. if not df.empty:
  24. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  25. min_date = df['time_stamp'].min()
  26. max_date = df['time_stamp'].max()
  27. with self.lock:
  28. if 'min_date' in self.statistics_map.keys():
  29. if self.statistics_map['min_date'] > min_date:
  30. self.statistics_map['min_date'] = min_date
  31. else:
  32. self.statistics_map['min_date'] = min_date
  33. if 'max_date' in self.statistics_map.keys():
  34. if self.statistics_map['max_date'] < max_date:
  35. self.statistics_map['max_date'] = max_date
  36. else:
  37. self.statistics_map['max_date'] = max_date
  38. if 'total_count' in self.statistics_map.keys():
  39. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  40. else:
  41. self.statistics_map['total_count'] = df.shape[0]
  42. if 'time_granularity' not in self.statistics_map.keys():
  43. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  44. def save_statistics_file(self):
  45. save_path = os.path.join(os.path.dirname(self.pathsAndTable.get_save_path()),
  46. self.pathsAndTable.read_type + '_statistics.txt')
  47. create_file_path(save_path, is_file_path=True)
  48. with open(save_path, 'w', encoding='utf8') as f:
  49. f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
  50. f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
  51. f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
  52. f.write("风机数量:" + str(len(read_excel_files(self.pathsAndTable.get_read_tmp_path()))) + "\n")
  53. def check_data_validity(self, df):
  54. pass
  55. def save_to_csv(self, filename):
  56. df = read_file_to_df(filename)
  57. if self.trans_param.is_vertical_table:
  58. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  59. values=self.trans_param.vertical_value,
  60. aggfunc='max')
  61. # 重置索引以得到普通的列
  62. df.reset_index(inplace=True)
  63. for k in self.trans_param.cols_tran.keys():
  64. if k not in df.columns:
  65. df[k] = None
  66. df = df[self.trans_param.cols_tran.keys()]
  67. # 转化风机名称
  68. trans_print("开始转化风机名称")
  69. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  70. df['wind_turbine_name'] = df['wind_turbine_number']
  71. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  72. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  73. wind_col_name = str(df['wind_turbine_number'].values[0])
  74. # 添加年月日
  75. trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
  76. trans_print(wind_col_name, "时间原始大小:", df.shape[0])
  77. # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
  78. # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  79. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  80. df.dropna(subset=['time_stamp'], inplace=True)
  81. trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  82. df['year'] = df['time_stamp'].dt.year
  83. df['month'] = df['time_stamp'].dt.month
  84. df['day'] = df['time_stamp'].dt.day
  85. df.sort_values(by='time_stamp', inplace=True)
  86. df['time_stamp'] = df['time_stamp'].apply(
  87. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  88. trans_print("处理时间字段结束")
  89. not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
  90. 'param9', 'param10']
  91. trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0])
  92. df.replace(np.nan, -999999999, inplace=True)
  93. for col in df.columns:
  94. if col not in not_double_cols:
  95. if not df[col].isnull().all():
  96. df[col] = pd.to_numeric(df[col], errors='coerce')
  97. # 删除包含NaN的行(即那些列A转换失败的行)
  98. df = df.dropna(subset=[col])
  99. trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0])
  100. df.replace(-999999999, np.nan, inplace=True)
  101. trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0])
  102. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  103. trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0])
  104. filter = ClassIdentifier(origin_df=df, rated_power=self.rated_power_map[str(wind_col_name)])
  105. df = filter.run()
  106. if self.pathsAndTable.save_zip:
  107. save_path = os.path.join(self.pathsAndTable.get_save_path(), str(wind_col_name) + '.csv.gz')
  108. else:
  109. save_path = os.path.join(self.pathsAndTable.get_save_path(), str(wind_col_name) + '.csv')
  110. create_file_path(save_path, is_file_path=True)
  111. if self.pathsAndTable.save_zip:
  112. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  113. else:
  114. df.to_csv(save_path, index=False, encoding='utf-8')
  115. self.set_statistics_data(df)
  116. del df
  117. trans_print("保存" + str(wind_col_name) + "成功")
  118. def mutiprocessing_to_save_file(self):
  119. # 开始保存到正式文件
  120. trans_print("开始保存到excel文件")
  121. all_tmp_files = read_excel_files(self.pathsAndTable.get_read_tmp_path())
  122. # split_count = self.pathsAndTable.multi_pool_count
  123. split_count = use_files_get_max_cpu_count(all_tmp_files)
  124. all_arrays = split_array(all_tmp_files, split_count)
  125. try:
  126. for index, arr in enumerate(all_arrays):
  127. with multiprocessing.Pool(split_count) as pool:
  128. pool.starmap(self.save_to_csv, [(i,) for i in arr])
  129. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  130. round(50 + 20 * (index + 1) / len(all_arrays), 2),
  131. self.pathsAndTable.save_db)
  132. except Exception as e:
  133. trans_print(traceback.format_exc())
  134. message = "保存文件错误,系统返回错误:" + str(e)
  135. raise ValueError(message)
  136. trans_print("结束保存到excel文件")
  137. def run(self):
  138. trans_print("开始保存数据到正式文件")
  139. begin = datetime.datetime.now()
  140. self.mutiprocessing_to_save_file()
  141. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 70,
  142. self.pathsAndTable.save_db)
  143. trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin)