StatisticsAndSaveTmpFormalFile.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. import multiprocessing
  2. import traceback
  3. from os import path
  4. import numpy as np
  5. import pandas as pd
  6. from etl.common.PathsAndTable import PathsAndTable
  7. from etl.wind_power.min_sec import TransParam
  8. from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
  9. from etl.wind_power.min_sec.FilterValidData import FilterValidData
  10. from service.trans_conf_service import update_trans_transfer_progress
  11. from utils.conf.read_conf import read_conf
  12. from utils.df_utils.util import get_time_space
  13. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
  14. from utils.log.trans_log import trans_print
  15. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  16. exec("import math")
  17. class StatisticsAndSaveTmpFormalFile(object):
  18. def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map,
  19. rated_power_and_cutout_speed_map):
  20. self.paths_and_table = paths_and_table
  21. self.trans_param = trans_param
  22. self.statistics_map = statistics_map
  23. self.lock = multiprocessing.Manager().Lock()
  24. self.rated_power_and_cutout_speed_map = rated_power_and_cutout_speed_map
  25. def set_statistics_data(self, df):
  26. if not df.empty:
  27. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  28. min_date = df['time_stamp'].min()
  29. max_date = df['time_stamp'].max()
  30. with self.lock:
  31. if 'min_date' in self.statistics_map.keys():
  32. if self.statistics_map['min_date'] > min_date:
  33. self.statistics_map['min_date'] = min_date
  34. else:
  35. self.statistics_map['min_date'] = min_date
  36. if 'max_date' in self.statistics_map.keys():
  37. if self.statistics_map['max_date'] < max_date:
  38. self.statistics_map['max_date'] = max_date
  39. else:
  40. self.statistics_map['max_date'] = max_date
  41. if 'total_count' in self.statistics_map.keys():
  42. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  43. else:
  44. self.statistics_map['total_count'] = df.shape[0]
  45. if 'time_granularity' not in self.statistics_map.keys():
  46. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  47. def save_to_csv(self, filename):
  48. df = read_file_to_df(filename)
  49. if self.trans_param.is_vertical_table:
  50. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  51. values=self.trans_param.vertical_value,
  52. aggfunc='max')
  53. # 重置索引以得到普通的列
  54. df.reset_index(inplace=True)
  55. # 转化风机名称
  56. origin_wind_name = str(df['wind_turbine_number'].values[0])
  57. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  58. # df['wind_turbine_name'] = df['wind_turbine_number']
  59. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  60. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  61. wind_col_name = str(df['wind_turbine_number'].values[0])
  62. not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
  63. 'param9', 'param10']
  64. # 删除 有功功率 和 风速均为空的情况
  65. df.dropna(subset=['active_power', 'wind_velocity'], how='any', inplace=True)
  66. trans_print(origin_wind_name, wind_col_name, "删除有功功率和风速均为空的情况后:", df.shape)
  67. df.replace(np.nan, -999999999, inplace=True)
  68. number_cols = df.select_dtypes(include=['number']).columns.tolist()
  69. for col in df.columns:
  70. if col not in not_double_cols and col not in number_cols:
  71. if not df[col].isnull().all():
  72. df[col] = pd.to_numeric(df[col], errors='coerce')
  73. # 删除包含NaN的行(即那些列A转换失败的行)
  74. df = df.dropna(subset=[col])
  75. trans_print(origin_wind_name, wind_col_name, "删除非数值列名:", col)
  76. df.replace(-999999999, np.nan, inplace=True)
  77. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  78. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  79. df.dropna(subset=['time_stamp'], inplace=True)
  80. df.sort_values(by='time_stamp', inplace=True)
  81. df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
  82. # 删除每行有空值的行(2025-3-24)
  83. origin_count = df.shape[0]
  84. df = df.dropna()
  85. trans_print(f'原始数据量:{origin_count},去除na后数据量:{df.shape[0]}')
  86. # 如果秒级有可能合并到分钟级
  87. # TODO add 秒转分钟
  88. if self.trans_param.boolean_sec_to_min:
  89. df['time_stamp'] = df['time_stamp'].apply(lambda x: x + pd.Timedelta(minutes=(10 - x.minute % 10) % 10))
  90. df['time_stamp'] = df['time_stamp'].dt.floor('10T')
  91. df = df.groupby(['wind_turbine_number', 'time_stamp']).mean().reset_index()
  92. trans_print('有功功率前10个', df.head(10)['active_power'].values)
  93. power_df = df[df['active_power'] > 0]
  94. trans_print(origin_wind_name, wind_col_name, "功率大于0的数量:", power_df.shape)
  95. power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
  96. trans_print(origin_wind_name, wind_col_name, '有功功率,中位数', power)
  97. if power > 100000:
  98. df['active_power'] = df['active_power'] / 1000
  99. ## 做数据检测前,羡强行处理有功功率
  100. # df = df[df['active_power'] < 50000]
  101. rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
  102. if rated_power_and_cutout_speed_tuple is None:
  103. rated_power_and_cutout_speed_tuple = (None, None)
  104. trans_print('过滤数据前数据大小', df.shape)
  105. filter_valid_data = FilterValidData(df, rated_power_and_cutout_speed_tuple[0])
  106. df = filter_valid_data.run()
  107. trans_print('过滤数据后数据大小', df.shape)
  108. # 如果有需要处理的,先进行代码处理,在进行打标签
  109. # exec_code = get_trans_exec_code(self.paths_and_table.exec_id, self.paths_and_table.read_type)
  110. # if exec_code:
  111. # if 'import ' in exec_code:
  112. # raise Exception("执行代码不支持导入包")
  113. # exec(exec_code)
  114. if power_df.shape[0] == 0:
  115. df.loc[:, 'lab'] = -1
  116. else:
  117. class_identifiler = ClassIdentifier(wind_turbine_number=origin_wind_name, origin_df=df,
  118. rated_power=rated_power_and_cutout_speed_tuple[0],
  119. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  120. df = class_identifiler.run()
  121. del power_df
  122. df['year'] = df['time_stamp'].dt.year
  123. df['month'] = df['time_stamp'].dt.month
  124. df['day'] = df['time_stamp'].dt.day
  125. df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  126. df['wind_turbine_name'] = str(origin_wind_name)
  127. df['year_month'] = df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1)
  128. cols = df.columns
  129. if self.paths_and_table.read_type == 'second':
  130. type_col = 'year_month'
  131. else:
  132. type_col = 'year'
  133. date_strs = df[type_col].unique().tolist()
  134. for date_str in date_strs:
  135. save_path = path.join(self.paths_and_table.get_tmp_formal_path(), str(date_str),
  136. str(origin_wind_name) + '.csv')
  137. create_file_path(save_path, is_file_path=True)
  138. now_df = df[df[type_col] == date_str][cols]
  139. if self.paths_and_table.save_zip:
  140. save_path = save_path + '.gz'
  141. now_df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  142. else:
  143. now_df.to_csv(save_path, index=False, encoding='utf-8')
  144. del now_df
  145. self.set_statistics_data(df)
  146. del df
  147. trans_print("保存" + str(wind_col_name) + "成功")
  148. def mutiprocessing_to_save_file(self):
  149. # 开始保存到正式文件
  150. all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
  151. # split_count = self.pathsAndTable.multi_pool_count
  152. split_count = use_files_get_max_cpu_count(all_tmp_files)
  153. all_arrays = split_array(all_tmp_files, split_count)
  154. try:
  155. for index, arr in enumerate(all_arrays):
  156. with multiprocessing.Pool(split_count) as pool:
  157. pool.starmap(self.save_to_csv, [(i,) for i in arr])
  158. update_trans_transfer_progress(self.paths_and_table.id,
  159. round(50 + 15 * (index + 1) / len(all_arrays), 2),
  160. self.paths_and_table.save_db)
  161. except Exception as e:
  162. trans_print(traceback.format_exc())
  163. message = "保存文件错误,系统返回错误:" + str(e)
  164. raise ValueError(message)
  165. def run(self):
  166. self.mutiprocessing_to_save_file()
  167. update_trans_transfer_progress(self.paths_and_table.id, 65,
  168. self.paths_and_table.save_db)