StatisticsAndSaveFile.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. import pandas as pd
  6. import numpy as np
  7. from etl.base import TransParam
  8. from etl.base.PathsAndTable import PathsAndTable
  9. from service.plt_service import update_trans_transfer_progress
  10. from utils.df_utils.util import get_time_space
  11. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
  12. from utils.log.trans_log import trans_print
  13. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  14. class StatisticsAndSaveFile(object):
  15. def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam, statistics_map):
  16. self.pathsAndTable = pathsAndTable
  17. self.trans_param = trans_param
  18. self.statistics_map = statistics_map
  19. self.lock = multiprocessing.Manager().Lock()
  20. def set_statistics_data(self, df):
  21. if not df.empty:
  22. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  23. min_date = df['time_stamp'].min()
  24. max_date = df['time_stamp'].max()
  25. with self.lock:
  26. if 'min_date' in self.statistics_map.keys():
  27. if self.statistics_map['min_date'] > min_date:
  28. self.statistics_map['min_date'] = min_date
  29. else:
  30. self.statistics_map['min_date'] = min_date
  31. if 'max_date' in self.statistics_map.keys():
  32. if self.statistics_map['max_date'] < max_date:
  33. self.statistics_map['max_date'] = max_date
  34. else:
  35. self.statistics_map['max_date'] = max_date
  36. if 'total_count' in self.statistics_map.keys():
  37. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  38. else:
  39. self.statistics_map['total_count'] = df.shape[0]
  40. if 'time_granularity' not in self.statistics_map.keys():
  41. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  42. def save_statistics_file(self):
  43. save_path = os.path.join(os.path.dirname(self.pathsAndTable.get_save_path()),
  44. self.pathsAndTable.read_type + '_statistics.txt')
  45. create_file_path(save_path, is_file_path=True)
  46. with open(save_path, 'w', encoding='utf8') as f:
  47. f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
  48. f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
  49. f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
  50. f.write("风机数量:" + str(len(read_excel_files(self.pathsAndTable.get_read_tmp_path()))) + "\n")
  51. def check_data_validity(self, df):
  52. pass
  53. def save_to_csv(self, filename):
  54. df = read_file_to_df(filename)
  55. if self.trans_param.is_vertical_table:
  56. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  57. values=self.trans_param.vertical_value,
  58. aggfunc='max')
  59. # 重置索引以得到普通的列
  60. df.reset_index(inplace=True)
  61. for k in self.trans_param.cols_tran.keys():
  62. if k not in df.columns:
  63. df[k] = None
  64. df = df[self.trans_param.cols_tran.keys()]
  65. # 转化风机名称
  66. trans_print("开始转化风机名称")
  67. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  68. df['wind_turbine_name'] = df['wind_turbine_number']
  69. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  70. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  71. wind_col_name = str(df['wind_turbine_number'].values[0])
  72. # 添加年月日
  73. trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
  74. trans_print(wind_col_name, "时间原始大小:", df.shape[0])
  75. df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
  76. trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  77. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  78. df['year'] = df['time_stamp'].dt.year
  79. df['month'] = df['time_stamp'].dt.month
  80. df['day'] = df['time_stamp'].dt.day
  81. df.sort_values(by='time_stamp', inplace=True)
  82. df['time_stamp'] = df['time_stamp'].apply(
  83. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  84. trans_print("处理时间字段结束")
  85. not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
  86. 'param9', 'param10']
  87. trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0])
  88. df.replace(np.nan, -999999999, inplace=True)
  89. for col in df.columns:
  90. if col not in not_double_cols:
  91. if not df[col].isnull().all():
  92. df[col] = pd.to_numeric(df[col], errors='coerce')
  93. # 删除包含NaN的行(即那些列A转换失败的行)
  94. df = df.dropna(subset=[col])
  95. trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0])
  96. df.replace(-999999999, np.nan, inplace=True)
  97. trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0])
  98. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  99. trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0])
  100. if self.pathsAndTable.save_zip:
  101. save_path = os.path.join(self.pathsAndTable.get_save_path(), str(wind_col_name) + '.csv.gz')
  102. else:
  103. save_path = os.path.join(self.pathsAndTable.get_save_path(), str(wind_col_name) + '.csv')
  104. create_file_path(save_path, is_file_path=True)
  105. if self.pathsAndTable.save_zip:
  106. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  107. else:
  108. df.to_csv(save_path, index=False, encoding='utf-8')
  109. self.set_statistics_data(df)
  110. del df
  111. trans_print("保存" + str(wind_col_name) + "成功")
  112. def mutiprocessing_to_save_file(self):
  113. # 开始保存到正式文件
  114. trans_print("开始保存到excel文件")
  115. all_tmp_files = read_excel_files(self.pathsAndTable.get_read_tmp_path())
  116. # split_count = self.pathsAndTable.multi_pool_count
  117. split_count = use_files_get_max_cpu_count(all_tmp_files)
  118. all_arrays = split_array(all_tmp_files, split_count)
  119. try:
  120. for index, arr in enumerate(all_arrays):
  121. with multiprocessing.Pool(split_count) as pool:
  122. pool.starmap(self.save_to_csv, [(i,) for i in arr])
  123. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  124. round(50 + 20 * (index + 1) / len(all_arrays), 2),
  125. self.pathsAndTable.save_db)
  126. except Exception as e:
  127. trans_print(traceback.format_exc())
  128. message = "保存文件错误,系统返回错误:" + str(e)
  129. raise ValueError(message)
  130. trans_print("结束保存到excel文件")
  131. def run(self):
  132. trans_print("开始保存数据到正式文件")
  133. begin = datetime.datetime.now()
  134. self.mutiprocessing_to_save_file()
  135. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 70,
  136. self.pathsAndTable.save_db)
  137. trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin)