StatisticsAndSaveFile.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import datetime
  2. import multiprocessing
  3. import traceback
  4. from os import path
  5. import numpy as np
  6. import pandas as pd
  7. from etl.common.PathsAndTable import PathsAndTable
  8. from etl.wind_power.min_sec import TransParam
  9. from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
  10. from service.plt_service import update_trans_transfer_progress
  11. from service.trans_service import get_trans_exec_code
  12. from utils.conf.read_conf import read_conf
  13. from utils.df_utils.util import get_time_space
  14. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
  15. from utils.log.trans_log import trans_print
  16. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  17. class StatisticsAndSaveFile(object):
  18. def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map,
  19. rated_power_and_cutout_speed_map):
  20. self.paths_and_table = paths_and_table
  21. self.trans_param = trans_param
  22. self.statistics_map = statistics_map
  23. self.lock = multiprocessing.Manager().Lock()
  24. self.rated_power_and_cutout_speed_map = rated_power_and_cutout_speed_map
  25. def set_statistics_data(self, df):
  26. if not df.empty:
  27. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  28. min_date = df['time_stamp'].min()
  29. max_date = df['time_stamp'].max()
  30. with self.lock:
  31. if 'min_date' in self.statistics_map.keys():
  32. if self.statistics_map['min_date'] > min_date:
  33. self.statistics_map['min_date'] = min_date
  34. else:
  35. self.statistics_map['min_date'] = min_date
  36. if 'max_date' in self.statistics_map.keys():
  37. if self.statistics_map['max_date'] < max_date:
  38. self.statistics_map['max_date'] = max_date
  39. else:
  40. self.statistics_map['max_date'] = max_date
  41. if 'total_count' in self.statistics_map.keys():
  42. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  43. else:
  44. self.statistics_map['total_count'] = df.shape[0]
  45. if 'time_granularity' not in self.statistics_map.keys():
  46. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  47. def save_to_csv(self, filename):
  48. df = read_file_to_df(filename)
  49. if self.trans_param.is_vertical_table:
  50. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  51. values=self.trans_param.vertical_value,
  52. aggfunc='max')
  53. # 重置索引以得到普通的列
  54. df.reset_index(inplace=True)
  55. # 转化风机名称
  56. origin_wind_name = str(df['wind_turbine_number'].values[0])
  57. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  58. # df['wind_turbine_name'] = df['wind_turbine_number']
  59. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  60. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  61. wind_col_name = str(df['wind_turbine_number'].values[0])
  62. not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
  63. 'param9', 'param10']
  64. # 删除 有功功率 和 风速均为空的情况
  65. df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
  66. df.replace(np.nan, -999999999, inplace=True)
  67. number_cols = df.select_dtypes(include=['number']).columns.tolist()
  68. for col in df.columns:
  69. if col not in not_double_cols and col not in number_cols:
  70. if not df[col].isnull().all():
  71. df[col] = pd.to_numeric(df[col], errors='coerce')
  72. # 删除包含NaN的行(即那些列A转换失败的行)
  73. df = df.dropna(subset=[col])
  74. df.replace(-999999999, np.nan, inplace=True)
  75. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  76. # 添加年月日
  77. solve_time_begin = datetime.datetime.now()
  78. # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
  79. # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  80. # df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce", format='%d-%m-%Y %H:%M:%S')
  81. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  82. df.dropna(subset=['time_stamp'], inplace=True)
  83. df.sort_values(by='time_stamp', inplace=True)
  84. df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
  85. # 如果秒级有可能合并到分钟级
  86. # TODO add 秒转分钟
  87. if self.trans_param.boolean_sec_to_min:
  88. df['time_stamp'] = df['time_stamp'].apply(lambda x: x + pd.Timedelta(minutes=(10 - x.minute % 10) % 10))
  89. df['time_stamp'] = df['time_stamp'].dt.floor('10T')
  90. df = df.groupby(['wind_turbine_number', 'time_stamp']).mean().reset_index()
  91. power = df.sample(int(df.shape[0] / 100))['active_power'].median()
  92. if power > 10000:
  93. df['active_power'] = df['active_power'] / 1000
  94. ## 做数据检测前,羡强行处理有功功率
  95. df = df[df['active_power'] < 5000]
  96. rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
  97. if rated_power_and_cutout_speed_tuple is None:
  98. rated_power_and_cutout_speed_tuple = (None, None)
  99. # 如果有需要处理的,先进行代码处理,在进行打标签
  100. exec_code = get_trans_exec_code(self.paths_and_table.batch_no, self.paths_and_table.read_type)
  101. if exec_code:
  102. if 'import ' in exec_code:
  103. raise Exception("执行代码不支持导入包")
  104. exec(exec_code)
  105. class_identifiler = ClassIdentifier(wind_turbine_number=wind_col_name, origin_df=df,
  106. rated_power=rated_power_and_cutout_speed_tuple[0],
  107. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  108. df = class_identifiler.run()
  109. df['year'] = df['time_stamp'].dt.year
  110. df['month'] = df['time_stamp'].dt.month
  111. df['day'] = df['time_stamp'].dt.day
  112. df['time_stamp'] = df['time_stamp'].apply(
  113. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  114. df['wind_turbine_name'] = str(origin_wind_name)
  115. if self.paths_and_table.save_zip:
  116. save_path = path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv.gz')
  117. else:
  118. save_path = path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv')
  119. create_file_path(save_path, is_file_path=True)
  120. if self.paths_and_table.save_zip:
  121. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  122. else:
  123. df.to_csv(save_path, index=False, encoding='utf-8')
  124. self.set_statistics_data(df)
  125. del df
  126. trans_print("保存" + str(wind_col_name) + "成功")
  127. def mutiprocessing_to_save_file(self):
  128. # 开始保存到正式文件
  129. all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
  130. # split_count = self.pathsAndTable.multi_pool_count
  131. split_count = use_files_get_max_cpu_count(all_tmp_files)
  132. all_arrays = split_array(all_tmp_files, split_count)
  133. try:
  134. for index, arr in enumerate(all_arrays):
  135. with multiprocessing.Pool(split_count) as pool:
  136. pool.starmap(self.save_to_csv, [(i,) for i in arr])
  137. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type,
  138. round(50 + 20 * (index + 1) / len(all_arrays), 2),
  139. self.paths_and_table.save_db)
  140. except Exception as e:
  141. trans_print(traceback.format_exc())
  142. message = "保存文件错误,系统返回错误:" + str(e)
  143. raise ValueError(message)
  144. def run(self):
  145. self.mutiprocessing_to_save_file()
  146. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, 70,
  147. self.paths_and_table.save_db)