MinSecTrans.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import multiprocessing
  5. import os.path
  6. from typing import Optional
  7. from etl.common.BaseDataTrans import BaseDataTrans
  8. from etl.common.CombineAndSaveFormalFile import CombineAndSaveFormalFile
  9. from etl.wind_power.min_sec.ReadAndSaveTmp import ReadAndSaveTmp
  10. from etl.wind_power.min_sec.StatisticsAndSaveTmpFormalFile import StatisticsAndSaveTmpFormalFile
  11. from etl.wind_power.min_sec.TransParam import TransParam
  12. from service.trans_conf_service import update_trans_status_success, update_trans_status_error
  13. from service.trans_service import get_min_sec_conf
  14. from utils.conf.read_conf import read_conf
  15. from utils.log.trans_log import error
  16. class MinSecTrans(BaseDataTrans):
  17. """分钟/秒级数据转换类"""
  18. # 转换列名列表
  19. TRANS_COLS = [
  20. 'wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
  21. 'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
  22. 'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
  23. 'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
  24. 'generatornon_drive_end_bearing_temperature', 'wind_turbine_status', 'wind_turbine_status2',
  25. 'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
  26. 'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
  27. 'clockwise_yaw_count', 'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
  28. 'required_gearbox_speed',
  29. 'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
  30. 'main_bearing_temperature_2', 'gearbox_high_speed_shaft_bearing_temperature',
  31. 'gearboxmedium_speed_shaftbearing_temperature',
  32. 'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
  33. 'generator_winding2_temperature', 'generator_winding3_temperature',
  34. 'turbulence_intensity', 'grid_a_phase_current', 'grid_b_phase_current',
  35. 'grid_c_phase_current', 'reactive_power', 'param1', 'param2', 'param3', 'param4', 'param5',
  36. 'param6', 'param7', 'param8', 'param9', 'param10'
  37. ]
  38. def __init__(self, data: dict = None, save_db: bool = True, yaml_config: dict = None, step: int = 0,
  39. end: int = 999):
  40. """
  41. 初始化分钟/秒级数据转换类
  42. Args:
  43. data: 任务数据字典
  44. save_db: 是否保存到数据库
  45. yaml_config: YAML配置
  46. step: 开始步骤
  47. end: 结束步骤
  48. """
  49. super(MinSecTrans, self).__init__(data, save_db, yaml_config, step, end)
  50. self.statistics_map = multiprocessing.Manager().dict()
  51. self.trans_param = self.get_trans_param()
  52. self.trans_param.wind_col_trans = self.wind_col_trans
  53. def get_filed_conf(self):
  54. """获取配置"""
  55. return get_min_sec_conf(self.wind_farm_code, self.transfer_type)
  56. def get_trans_param(self) -> Optional[TransParam]:
  57. """
  58. 获取转换参数
  59. Returns:
  60. TransParam对象
  61. """
  62. conf_map = self.get_filed_conf()
  63. if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
  64. message = f"未找到{self.id}的{self.transfer_type}配置"
  65. error(message)
  66. update_trans_status_error(self.id, message, self.save_db)
  67. return None
  68. else:
  69. resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
  70. wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
  71. is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
  72. merge_columns = read_conf(conf_map, 'merge_columns', False)
  73. vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
  74. index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
  75. vertical_key = read_conf(conf_map, 'vertical_col_key')
  76. vertical_value = read_conf(conf_map, 'vertical_col_value')
  77. need_valid_cols = not merge_columns
  78. boolean_sec_to_min = read_conf(conf_map, 'boolean_sec_to_min', 0)
  79. boolean_sec_to_min = int(boolean_sec_to_min) == 1
  80. cols_trans_all = dict()
  81. for col in self.TRANS_COLS:
  82. cols_trans_all[col] = read_conf(conf_map, col, '')
  83. return TransParam(read_type=self.transfer_type, read_path=self.read_dir,
  84. cols_tran=cols_trans_all,
  85. wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
  86. vertical_cols=vertical_cols, vertical_key=vertical_key,
  87. vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
  88. resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols,
  89. boolean_sec_to_min=boolean_sec_to_min)
  90. def read_and_save_tmp_file(self):
  91. """第三步:读取并保存到临时文件"""
  92. read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
  93. read_and_save_tmp.run()
  94. def statistics_and_save_tmp_formal_file(self):
  95. """第四步:统计并保存到正式文件"""
  96. # 保存到正式文件
  97. statistics_and_save_tmp_formal_file = StatisticsAndSaveTmpFormalFile(self.pathsAndTable, self.trans_param,
  98. self.statistics_map,
  99. self.rated_power_and_cutout_speed_map)
  100. statistics_and_save_tmp_formal_file.run()
  101. def combine_and_save_formal_file(self):
  102. """合并并保存正式文件"""
  103. combine_and_save_formal_file = CombineAndSaveFormalFile(self.pathsAndTable)
  104. self.update_files = combine_and_save_formal_file.run()
  105. def update_exec_progress(self):
  106. """最后更新执行进度"""
  107. all_files = set([os.path.basename(i) for i in self.update_files])
  108. update_trans_status_success(self.id, len(all_files),
  109. self.statistics_map['time_granularity'],
  110. self.statistics_map['min_date'], self.statistics_map['max_date'],
  111. self.statistics_map['total_count'], self.save_db)