StatisticsAndSaveTmpFormalFile.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import multiprocessing
  2. import traceback
  3. from os import path
  4. import numpy as np
  5. import pandas as pd
  6. from conf.constants import DataProcessing, ParallelProcessing, Types
  7. from etl.common.PathsAndTable import PathsAndTable
  8. from etl.wind_power.min_sec import TransParam
  9. from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
  10. from etl.wind_power.min_sec.FilterValidData import FilterValidData
  11. from service.trans_conf_service import update_trans_transfer_progress
  12. from utils.conf.read_conf import read_conf
  13. from utils.df_utils.util import estimate_time_interval as get_time_space
  14. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df
  15. from utils.log.trans_log import debug, error
  16. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  17. exec("import math")
  18. class StatisticsAndSaveTmpFormalFile(object):
  19. def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map,
  20. rated_power_and_cutout_speed_map):
  21. self.paths_and_table = paths_and_table
  22. self.trans_param = trans_param
  23. self.statistics_map = statistics_map
  24. self.lock = multiprocessing.Manager().Lock()
  25. self.rated_power_and_cutout_speed_map = rated_power_and_cutout_speed_map
  26. def set_statistics_data(self, df):
  27. if not df.empty:
  28. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  29. min_date = df['time_stamp'].min()
  30. max_date = df['time_stamp'].max()
  31. with self.lock:
  32. if 'min_date' in self.statistics_map.keys():
  33. if self.statistics_map['min_date'] > min_date:
  34. self.statistics_map['min_date'] = min_date
  35. else:
  36. self.statistics_map['min_date'] = min_date
  37. if 'max_date' in self.statistics_map.keys():
  38. if self.statistics_map['max_date'] < max_date:
  39. self.statistics_map['max_date'] = max_date
  40. else:
  41. self.statistics_map['max_date'] = max_date
  42. if 'total_count' in self.statistics_map.keys():
  43. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  44. else:
  45. self.statistics_map['total_count'] = df.shape[0]
  46. if 'time_granularity' not in self.statistics_map.keys():
  47. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  48. def save_to_csv(self, filename):
  49. df = read_file_to_df(filename)
  50. if self.trans_param.is_vertical_table:
  51. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  52. values=self.trans_param.vertical_value,
  53. aggfunc='max')
  54. # 重置索引以得到普通的列
  55. df.reset_index(inplace=True)
  56. # 转化风机名称
  57. origin_wind_name = str(df['wind_turbine_number'].values[0])
  58. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  59. # df['wind_turbine_name'] = df['wind_turbine_number']
  60. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  61. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  62. wind_col_name = str(df['wind_turbine_number'].values[0])
  63. not_double_cols = DataProcessing.NOT_DOUBLE_COLS
  64. # 删除 有功功率 和 风速均为空的情况
  65. df.dropna(subset=['active_power', 'wind_velocity'], how='any', inplace=True)
  66. debug(origin_wind_name, wind_col_name, "删除有功功率和风速有空的情况后:", df.shape)
  67. df.replace(np.nan, DataProcessing.NAN_REPLACE_VALUE, inplace=True)
  68. number_cols = df.select_dtypes(include=['number']).columns.tolist()
  69. for col in df.columns:
  70. if col not in not_double_cols and col not in number_cols:
  71. if not df[col].isnull().all():
  72. df[col] = pd.to_numeric(df[col], errors='coerce')
  73. # 删除包含NaN的行(即那些列A转换失败的行)
  74. df = df.dropna(subset=[col])
  75. debug(origin_wind_name, wind_col_name, "删除非数值列名:", col)
  76. df.replace(DataProcessing.NAN_REPLACE_VALUE, np.nan, inplace=True)
  77. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  78. df['time_stamp'] = df['time_stamp'].str.strip()
  79. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  80. df.dropna(subset=['time_stamp'], inplace=True)
  81. df.sort_values(by='time_stamp', inplace=True)
  82. df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
  83. # 删除每行有空值的行(2025-3-24)
  84. # origin_count = df.shape[0]
  85. # df = df.dropna()
  86. # trans_print(f"原始数据量:{origin_count},去除na后数据量:{df.shape[0]}")
  87. # 如果秒级有可能合并到分钟级
  88. # TODO add 秒转分钟
  89. if self.trans_param.boolean_sec_to_min:
  90. df['time_stamp'] = df['time_stamp'].apply(lambda x: x + pd.Timedelta(minutes=(10 - x.minute % 10) % 10))
  91. df['time_stamp'] = df['time_stamp'].dt.floor(DataProcessing.TIME_INTERVAL)
  92. df = df.groupby(['wind_turbine_number', 'time_stamp']).mean().reset_index()
  93. debug('有功功率前10个', df.head(10)['active_power'].values)
  94. power_df = df[df['active_power'] > 0]
  95. debug(origin_wind_name, wind_col_name, "功率大于0的数量:", power_df.shape)
  96. power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
  97. debug(origin_wind_name, wind_col_name, '有功功率,中位数', power)
  98. if power > DataProcessing.POWER_UNIT_THRESHOLD:
  99. df['active_power'] = df['active_power'] / 1000
  100. # 做数据检测前,羡强行处理有功功率
  101. # df = df[df['active_power'] < 50000]
  102. rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
  103. if rated_power_and_cutout_speed_tuple is None:
  104. # rated_power_and_cutout_speed_tuple = (None, None)
  105. error(origin_wind_name, '未从平台匹配到额定功率')
  106. else:
  107. debug(origin_wind_name, '过滤数据前数据大小', df.shape)
  108. debug(origin_wind_name, '额定功率', rated_power_and_cutout_speed_tuple[0])
  109. # trans_print(origin_wind_name, '\n', df.head(10))
  110. filter_valid_data = FilterValidData(df, rated_power_and_cutout_speed_tuple[0])
  111. try:
  112. df = filter_valid_data.run()
  113. except:
  114. error(origin_wind_name, '过滤数据异常', filename)
  115. raise
  116. debug(origin_wind_name, '过滤数据后数据大小', df.shape)
  117. # 如果有需要处理的,先进行代码处理,在进行打标签
  118. # exec_code = get_trans_exec_code(self.paths_and_table.exec_id, self.paths_and_table.read_type)
  119. # if exec_code:
  120. # if 'import ' in exec_code:
  121. # raise Exception("执行代码不支持导入包")
  122. # exec(exec_code)
  123. if power_df.shape[0] == 0:
  124. df.loc[:, 'lab'] = -1
  125. else:
  126. class_identifier = ClassIdentifier(wind_turbine_number=origin_wind_name, origin_df=df,
  127. rated_power=rated_power_and_cutout_speed_tuple[0],
  128. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  129. df = class_identifier.run()
  130. del power_df
  131. df['year'] = df['time_stamp'].dt.year
  132. df['month'] = df['time_stamp'].dt.month
  133. df['day'] = df['time_stamp'].dt.day
  134. df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  135. df['wind_turbine_name'] = str(origin_wind_name)
  136. df['year_month'] = df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1)
  137. cols = df.columns
  138. if self.paths_and_table.read_type == Types.SECOND:
  139. type_col = 'year_month'
  140. else:
  141. type_col = 'year'
  142. date_strs = df[type_col].unique().tolist()
  143. for date_str in date_strs:
  144. save_path = path.join(self.paths_and_table.get_tmp_formal_path(), str(date_str),
  145. str(origin_wind_name) + '.csv')
  146. create_file_path(save_path, is_file_path=True)
  147. now_df = df[df[type_col] == date_str][cols]
  148. if self.paths_and_table.save_zip:
  149. save_path = save_path + '.gz'
  150. now_df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  151. else:
  152. now_df.to_csv(save_path, index=False, encoding='utf-8')
  153. del now_df
  154. self.set_statistics_data(df)
  155. del df
  156. debug("保存" + str(wind_col_name) + "成功")
  157. def multiprocessing_to_save_file(self):
  158. # 开始保存到正式文件
  159. all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
  160. if not all_tmp_files:
  161. debug("没有临时文件需要处理")
  162. return
  163. # 计算最佳进程数
  164. max_processes = use_files_get_max_cpu_count(all_tmp_files)
  165. max_processes = min(max_processes, len(all_tmp_files), ParallelProcessing.MAX_PROCESSES) # 限制最大进程数
  166. try:
  167. # 创建一个进程池处理所有文件
  168. with multiprocessing.Pool(max_processes) as pool:
  169. # 分批次处理并更新进度
  170. batch_size = max(1, len(all_tmp_files) // ParallelProcessing.MAX_BATCHES) # 最多10个批次
  171. for i in range(0, len(all_tmp_files), batch_size):
  172. batch_files = all_tmp_files[i:i + batch_size]
  173. pool.starmap(self.save_to_csv, [(file,) for file in batch_files])
  174. # 更新进度
  175. progress = 50 + 15 * (i + len(batch_files)) / len(all_tmp_files)
  176. update_trans_transfer_progress(self.paths_and_table.id,
  177. round(progress, 2),
  178. self.paths_and_table.save_db)
  179. except Exception as e:
  180. error(traceback.format_exc())
  181. message = "保存文件错误,系统返回错误:" + str(e)
  182. raise ValueError(message)
  183. def run(self):
  184. self.multiprocessing_to_save_file()
  185. update_trans_transfer_progress(self.paths_and_table.id, 65,
  186. self.paths_and_table.save_db)