StatisticsAndSaveFile.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import datetime
  2. import multiprocessing
  3. import traceback
  4. from os import path
  5. import numpy as np
  6. import pandas as pd
  7. from etl.common.PathsAndTable import PathsAndTable
  8. from etl.wind_power.min_sec import TransParam
  9. from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
  10. from service.plt_service import update_trans_transfer_progress
  11. from service.trans_service import get_trans_exec_code
  12. from utils.conf.read_conf import read_conf
  13. from utils.df_utils.util import get_time_space
  14. from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
  15. from utils.log.trans_log import trans_print
  16. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  17. exec("import math")
  18. class StatisticsAndSaveFile(object):
  19. def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map,
  20. rated_power_and_cutout_speed_map):
  21. self.paths_and_table = paths_and_table
  22. self.trans_param = trans_param
  23. self.statistics_map = statistics_map
  24. self.lock = multiprocessing.Manager().Lock()
  25. self.rated_power_and_cutout_speed_map = rated_power_and_cutout_speed_map
  26. def set_statistics_data(self, df):
  27. if not df.empty:
  28. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  29. min_date = df['time_stamp'].min()
  30. max_date = df['time_stamp'].max()
  31. with self.lock:
  32. if 'min_date' in self.statistics_map.keys():
  33. if self.statistics_map['min_date'] > min_date:
  34. self.statistics_map['min_date'] = min_date
  35. else:
  36. self.statistics_map['min_date'] = min_date
  37. if 'max_date' in self.statistics_map.keys():
  38. if self.statistics_map['max_date'] < max_date:
  39. self.statistics_map['max_date'] = max_date
  40. else:
  41. self.statistics_map['max_date'] = max_date
  42. if 'total_count' in self.statistics_map.keys():
  43. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  44. else:
  45. self.statistics_map['total_count'] = df.shape[0]
  46. if 'time_granularity' not in self.statistics_map.keys():
  47. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  48. def save_to_csv(self, filename):
  49. df = read_file_to_df(filename)
  50. if self.trans_param.is_vertical_table:
  51. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  52. values=self.trans_param.vertical_value,
  53. aggfunc='max')
  54. # 重置索引以得到普通的列
  55. df.reset_index(inplace=True)
  56. # 转化风机名称
  57. origin_wind_name = str(df['wind_turbine_number'].values[0])
  58. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  59. # df['wind_turbine_name'] = df['wind_turbine_number']
  60. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  61. self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
  62. wind_col_name = str(df['wind_turbine_number'].values[0])
  63. not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
  64. 'param9', 'param10']
  65. # 删除 有功功率 和 风速均为空的情况
  66. df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
  67. trans_print(wind_col_name, "删除有功功率和风速均为空的情况后:", df.shape)
  68. df.replace(np.nan, -999999999, inplace=True)
  69. number_cols = df.select_dtypes(include=['number']).columns.tolist()
  70. for col in df.columns:
  71. if col not in not_double_cols and col not in number_cols:
  72. if not df[col].isnull().all():
  73. df[col] = pd.to_numeric(df[col], errors='coerce')
  74. # 删除包含NaN的行(即那些列A转换失败的行)
  75. df = df.dropna(subset=[col])
  76. trans_print(wind_col_name, "删除非数值列名:", col)
  77. df.replace(-999999999, np.nan, inplace=True)
  78. df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
  79. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  80. df.dropna(subset=['time_stamp'], inplace=True)
  81. df.sort_values(by='time_stamp', inplace=True)
  82. df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
  83. # 如果秒级有可能合并到分钟级
  84. # TODO add 秒转分钟
  85. if self.trans_param.boolean_sec_to_min:
  86. df['time_stamp'] = df['time_stamp'].apply(lambda x: x + pd.Timedelta(minutes=(10 - x.minute % 10) % 10))
  87. df['time_stamp'] = df['time_stamp'].dt.floor('10T')
  88. df = df.groupby(['wind_turbine_number', 'time_stamp']).mean().reset_index()
  89. trans_print('有功功率前10个', df.head(10)['active_power'].values)
  90. power_df = df[df['active_power'] > 0]
  91. trans_print(wind_col_name, "功率大于0的数量:", power_df.shape)
  92. power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
  93. del power_df
  94. trans_print(wind_col_name, '有功功率,中位数', power)
  95. if power > 100000:
  96. df['active_power'] = df['active_power'] / 1000
  97. ## 做数据检测前,羡强行处理有功功率
  98. # df = df[df['active_power'] < 50000]
  99. rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
  100. if rated_power_and_cutout_speed_tuple is None:
  101. rated_power_and_cutout_speed_tuple = (None, None)
  102. # 如果有需要处理的,先进行代码处理,在进行打标签
  103. exec_code = get_trans_exec_code(self.paths_and_table.batch_no, self.paths_and_table.read_type)
  104. if exec_code:
  105. if 'import ' in exec_code:
  106. raise Exception("执行代码不支持导入包")
  107. exec(exec_code)
  108. class_identifiler = ClassIdentifier(wind_turbine_number=wind_col_name, origin_df=df,
  109. rated_power=rated_power_and_cutout_speed_tuple[0],
  110. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  111. df = class_identifiler.run()
  112. df['year'] = df['time_stamp'].dt.year
  113. df['month'] = df['time_stamp'].dt.month
  114. df['day'] = df['time_stamp'].dt.day
  115. df['time_stamp'] = df['time_stamp'].apply(
  116. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  117. df['wind_turbine_name'] = str(origin_wind_name)
  118. if self.paths_and_table.save_zip:
  119. save_path = path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv.gz')
  120. else:
  121. save_path = path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv')
  122. create_file_path(save_path, is_file_path=True)
  123. if self.paths_and_table.save_zip:
  124. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  125. else:
  126. df.to_csv(save_path, index=False, encoding='utf-8')
  127. self.set_statistics_data(df)
  128. del df
  129. trans_print("保存" + str(wind_col_name) + "成功")
  130. def mutiprocessing_to_save_file(self):
  131. # 开始保存到正式文件
  132. all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
  133. # split_count = self.pathsAndTable.multi_pool_count
  134. split_count = use_files_get_max_cpu_count(all_tmp_files)
  135. all_arrays = split_array(all_tmp_files, split_count)
  136. try:
  137. for index, arr in enumerate(all_arrays):
  138. with multiprocessing.Pool(split_count) as pool:
  139. pool.starmap(self.save_to_csv, [(i,) for i in arr])
  140. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type,
  141. round(50 + 20 * (index + 1) / len(all_arrays), 2),
  142. self.paths_and_table.save_db)
  143. except Exception as e:
  144. trans_print(traceback.format_exc())
  145. message = "保存文件错误,系统返回错误:" + str(e)
  146. raise ValueError(message)
  147. def run(self):
  148. self.mutiprocessing_to_save_file()
  149. update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, 70,
  150. self.paths_and_table.save_db)