# -*- coding: utf-8 -*- # @Time : 2024/5/15 # @Author : 魏志亮 import multiprocessing import os.path from typing import Optional from etl.common.BaseDataTrans import BaseDataTrans from etl.common.CombineAndSaveFormalFile import CombineAndSaveFormalFile from etl.wind_power.min_sec.ReadAndSaveTmp import ReadAndSaveTmp from etl.wind_power.min_sec.StatisticsAndSaveTmpFormalFile import StatisticsAndSaveTmpFormalFile from etl.wind_power.min_sec.TransParam import TransParam from service.trans_conf_service import update_trans_status_success, update_trans_status_error from service.trans_service import get_min_sec_conf from utils.conf.read_conf import read_conf from utils.log.trans_log import error class MinSecTrans(BaseDataTrans): """分钟/秒级数据转换类""" # 转换列名列表 TRANS_COLS = [ 'wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed', 'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3', 'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power', 'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature', 'generatornon_drive_end_bearing_temperature', 'wind_turbine_status', 'wind_turbine_status2', 'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin', 'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque', 'clockwise_yaw_count', 'counterclockwise_yaw_count', 'unusable', 'power_curve_available', 'required_gearbox_speed', 'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature', 'main_bearing_temperature_2', 'gearbox_high_speed_shaft_bearing_temperature', 'gearboxmedium_speed_shaftbearing_temperature', 'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature', 'generator_winding2_temperature', 'generator_winding3_temperature', 'turbulence_intensity', 'grid_a_phase_current', 'grid_b_phase_current', 'grid_c_phase_current', 'reactive_power', 'param1', 'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10' ] def __init__(self, data: dict = None, save_db: bool = True, yaml_config: dict = None, step: int = 0, end: int = 999): """ 初始化分钟/秒级数据转换类 Args: data: 任务数据字典 save_db: 是否保存到数据库 yaml_config: YAML配置 step: 开始步骤 end: 结束步骤 """ super(MinSecTrans, self).__init__(data, save_db, yaml_config, step, end) self.statistics_map = multiprocessing.Manager().dict() self.trans_param = self.get_trans_param() self.trans_param.wind_col_trans = self.wind_col_trans def get_filed_conf(self): """获取配置""" return get_min_sec_conf(self.wind_farm_code, self.transfer_type) def get_trans_param(self) -> Optional[TransParam]: """ 获取转换参数 Returns: TransParam对象 """ conf_map = self.get_filed_conf() if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0: message = f"未找到{self.id}的{self.transfer_type}配置" error(message) update_trans_status_error(self.id, message, self.save_db) return None else: resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix') wind_name_exec = read_conf(conf_map, 'wind_name_exec', None) is_vertical_table = read_conf(conf_map, 'is_vertical_table', False) merge_columns = read_conf(conf_map, 'merge_columns', False) vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',') index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',') vertical_key = read_conf(conf_map, 'vertical_col_key') vertical_value = read_conf(conf_map, 'vertical_col_value') need_valid_cols = not merge_columns boolean_sec_to_min = read_conf(conf_map, 'boolean_sec_to_min', 0) boolean_sec_to_min = int(boolean_sec_to_min) == 1 cols_trans_all = dict() for col in self.TRANS_COLS: cols_trans_all[col] = read_conf(conf_map, col, '') return TransParam(read_type=self.transfer_type, read_path=self.read_dir, cols_tran=cols_trans_all, wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table, vertical_cols=vertical_cols, vertical_key=vertical_key, vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns, resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols, boolean_sec_to_min=boolean_sec_to_min) def read_and_save_tmp_file(self): """第三步:读取并保存到临时文件""" read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param) read_and_save_tmp.run() def statistics_and_save_tmp_formal_file(self): """第四步:统计并保存到正式文件""" # 保存到正式文件 statistics_and_save_tmp_formal_file = StatisticsAndSaveTmpFormalFile(self.pathsAndTable, self.trans_param, self.statistics_map, self.rated_power_and_cutout_speed_map) statistics_and_save_tmp_formal_file.run() def combine_and_save_formal_file(self): """合并并保存正式文件""" combine_and_save_formal_file = CombineAndSaveFormalFile(self.pathsAndTable) self.update_files = combine_and_save_formal_file.run() def update_exec_progress(self): """最后更新执行进度""" all_files = set([os.path.basename(i) for i in self.update_files]) update_trans_status_success(self.id, len(all_files), self.statistics_map['time_granularity'], self.statistics_map['min_date'], self.statistics_map['max_date'], self.statistics_map['total_count'], self.save_db)