StatisticsAndSaveFile.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. import pandas as pd
  6. import numpy as np
  7. from etl.wind_power.min_sec import TransParam
  8. from etl.common.PathsAndTable import PathsAndTable
  9. from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
  10. from service.plt_service import update_trans_transfer_progress
  11. from utils.conf.read_conf import read_conf
  12. from utils.df_utils.util import get_time_space
  13. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
  14. from utils.log.trans_log import trans_print
  15. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  16. class StatisticsAndSaveFile(object):
  17. def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map,
  18. rated_power_and_cutout_speed_map):
  19. self.paths_and_table = paths_and_table
  20. self.trans_param = trans_param
  21. self.statistics_map = statistics_map
  22. self.lock = multiprocessing.Manager().Lock()
  23. self.rated_power_and_cutout_speed_map = rated_power_and_cutout_speed_map
  24. def set_statistics_data(self, df):
  25. if not df.empty:
  26. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  27. min_date = df['time_stamp'].min()
  28. max_date = df['time_stamp'].max()
  29. with self.lock:
  30. if 'min_date' in self.statistics_map.keys():
  31. if self.statistics_map['min_date'] > min_date:
  32. self.statistics_map['min_date'] = min_date
  33. else:
  34. self.statistics_map['min_date'] = min_date
  35. if 'max_date' in self.statistics_map.keys():
  36. if self.statistics_map['max_date'] < max_date:
  37. self.statistics_map['max_date'] = max_date
  38. else:
  39. self.statistics_map['max_date'] = max_date
  40. if 'total_count' in self.statistics_map.keys():
  41. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  42. else:
  43. self.statistics_map['total_count'] = df.shape[0]
  44. if 'time_granularity' not in self.statistics_map.keys():
  45. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  46. def save_to_csv(self, filename):
  47. df = read_file_to_df(filename)
  48. if self.trans_param.is_vertical_table:
  49. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  50. values=self.trans_param.vertical_value,
  51. aggfunc='max')
  52. # 重置索引以得到普通的列
  53. df.reset_index(inplace=True)
  54. # 转化风机名称
  55. origin_wind_name = str(df['wind_turbine_number'].values[0])
  56. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  57. # df['wind_turbine_name'] = df['wind_turbine_number']
  58. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  59. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  60. wind_col_name = str(df['wind_turbine_number'].values[0])
  61. not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
  62. 'param9', 'param10']
  63. df.replace(np.nan, -999999999, inplace=True)
  64. number_cols = df.select_dtypes(include=['number']).columns.tolist()
  65. for col in df.columns:
  66. if col not in not_double_cols and col not in number_cols:
  67. if not df[col].isnull().all():
  68. df[col] = pd.to_numeric(df[col], errors='coerce')
  69. # 删除包含NaN的行(即那些列A转换失败的行)
  70. df = df.dropna(subset=[col])
  71. df.replace(-999999999, np.nan, inplace=True)
  72. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  73. # 添加年月日
  74. solve_time_begin = datetime.datetime.now()
  75. # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
  76. # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  77. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  78. df.dropna(subset=['time_stamp'], inplace=True)
  79. df.sort_values(by='time_stamp', inplace=True)
  80. df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
  81. rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
  82. if rated_power_and_cutout_speed_tuple is None:
  83. rated_power_and_cutout_speed_tuple = (None, None)
  84. class_identifiler = ClassIdentifier(wind_turbine_number=wind_col_name, origin_df=df,
  85. rated_power=rated_power_and_cutout_speed_tuple[0],
  86. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  87. df = class_identifiler.run()
  88. df['year'] = df['time_stamp'].dt.year
  89. df['month'] = df['time_stamp'].dt.month
  90. df['day'] = df['time_stamp'].dt.day
  91. df['time_stamp'] = df['time_stamp'].apply(
  92. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  93. df['wind_turbine_name'] = str(origin_wind_name)
  94. if self.paths_and_table.save_zip:
  95. save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv.gz')
  96. else:
  97. save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv')
  98. create_file_path(save_path, is_file_path=True)
  99. if self.paths_and_table.save_zip:
  100. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  101. else:
  102. df.to_csv(save_path, index=False, encoding='utf-8')
  103. self.set_statistics_data(df)
  104. del df
  105. trans_print("保存" + str(wind_col_name) + "成功")
  106. def mutiprocessing_to_save_file(self):
  107. # 开始保存到正式文件
  108. all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
  109. # split_count = self.pathsAndTable.multi_pool_count
  110. split_count = use_files_get_max_cpu_count(all_tmp_files)
  111. all_arrays = split_array(all_tmp_files, split_count)
  112. try:
  113. for index, arr in enumerate(all_arrays):
  114. with multiprocessing.Pool(split_count) as pool:
  115. pool.starmap(self.save_to_csv, [(i,) for i in arr])
  116. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type,
  117. round(50 + 20 * (index + 1) / len(all_arrays), 2),
  118. self.paths_and_table.save_db)
  119. except Exception as e:
  120. trans_print(traceback.format_exc())
  121. message = "保存文件错误,系统返回错误:" + str(e)
  122. raise ValueError(message)
  123. def run(self):
  124. self.mutiprocessing_to_save_file()
  125. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, 70,
  126. self.paths_and_table.save_db)