MinSecTrans.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import multiprocessing
  5. import pandas as pd
  6. from etl.common.BaseDataTrans import BaseDataTrans
  7. from etl.wind_power.min_sec.ReadAndSaveTmp import ReadAndSaveTmp
  8. from etl.wind_power.min_sec.StatisticsAndSaveFile import StatisticsAndSaveFile
  9. from etl.wind_power.min_sec.TransParam import TransParam
  10. from service.plt_service import update_trans_status_success, update_trans_status_error
  11. from service.trans_service import batch_statistics, get_min_sec_conf
  12. from utils.conf.read_conf import read_conf
  13. from utils.df_utils.util import get_time_space
  14. from utils.file.trans_methods import read_excel_files, read_file_to_df
  15. from utils.log.trans_log import trans_print
  16. class MinSecTrans(BaseDataTrans):
  17. def __init__(self, data: dict = None, save_db=True, step=0, end=4):
  18. super(MinSecTrans, self).__init__(data, save_db, step, end)
  19. self.statistics_map = multiprocessing.Manager().dict()
  20. self.trans_param = self.get_trans_param()
  21. self.trans_param.wind_col_trans = self.wind_col_trans
  22. def get_filed_conf(self):
  23. return get_min_sec_conf(self.field_code, self.read_type)
  24. def get_trans_param(self):
  25. conf_map = self.get_filed_conf()
  26. if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
  27. message = f"未找到{self.batch_no}的{self.read_type}配置"
  28. trans_print(message)
  29. update_trans_status_error(self.batch_no, self.read_type, message, self.save_db)
  30. else:
  31. resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
  32. wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
  33. is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
  34. merge_columns = read_conf(conf_map, 'merge_columns', False)
  35. vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
  36. index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
  37. vertical_key = read_conf(conf_map, 'vertical_col_key')
  38. vertical_value = read_conf(conf_map, 'vertical_col_value')
  39. need_valid_cols = not merge_columns
  40. boolean_sec_to_min = read_conf(conf_map, 'boolean_sec_to_min', 0)
  41. boolean_sec_to_min = int(boolean_sec_to_min) == 1
  42. # self.boolean_sec_to_min = int(data['boolean_sec_to_min']) == 1 if 'boolean_sec_to_min' in data.keys() else False
  43. cols_trans_all = dict()
  44. trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
  45. 'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
  46. 'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
  47. 'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
  48. 'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
  49. 'wind_turbine_status2',
  50. 'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
  51. 'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
  52. 'clockwise_yaw_count',
  53. 'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
  54. 'required_gearbox_speed',
  55. 'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
  56. 'gearbox_high_speed_shaft_bearing_temperature',
  57. 'gearboxmedium_speed_shaftbearing_temperature',
  58. 'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
  59. 'generator_winding2_temperature', 'generator_winding3_temperature',
  60. 'turbulence_intensity', 'param1',
  61. 'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
  62. for col in trans_cols:
  63. cols_trans_all[col] = read_conf(conf_map, col, '')
  64. return TransParam(read_type=self.read_type, read_path=self.read_path,
  65. cols_tran=cols_trans_all,
  66. wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
  67. vertical_cols=vertical_cols, vertical_key=vertical_key,
  68. vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
  69. resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols,
  70. boolean_sec_to_min=boolean_sec_to_min)
  71. # 第三步 读取 并 保存到临时文件
  72. def read_and_save_tmp_file(self):
  73. read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
  74. read_and_save_tmp.run()
  75. # 第四步 统计 并 保存到正式文件
  76. def statistics_and_save_to_file(self):
  77. # 保存到正式文件
  78. statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
  79. self.rated_power_and_cutout_speed_map)
  80. statistics_and_save_file.run()
  81. # 最后更新执行程度
  82. def update_exec_progress(self):
  83. if self.end >= 4:
  84. all_files = read_excel_files(self.pathsAndTable.get_save_path())
  85. if self.step <= 3:
  86. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  87. len(all_files),
  88. self.statistics_map['time_granularity'],
  89. self.statistics_map['min_date'], self.statistics_map['max_date'],
  90. self.statistics_map['total_count'], self.save_db)
  91. else:
  92. df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
  93. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  94. time_granularity = get_time_space(df, 'time_stamp')
  95. batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
  96. if batch_data is not None:
  97. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  98. len(read_excel_files(self.pathsAndTable.get_save_path())),
  99. time_granularity,
  100. batch_data['min_date'], batch_data['max_date'],
  101. batch_data['total_count'], self.save_db)
  102. else:
  103. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  104. len(read_excel_files(self.pathsAndTable.get_save_path())),
  105. time_granularity,
  106. None, None,
  107. None, self.save_db)