123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/6/11
- # @Author : 魏志亮
- from etl.base.TranseParam import TranseParam
- from etl.base.WindFarms import WindFarms
- from service.plt_service import get_exec_data, update_trans_status_error
- from service.trans_service import get_trans_conf
- from utils.conf.read_conf import read_conf
- from utils.log.trans_log import init_log, trans_print, logger
- def run_schedule(step=0, end=3):
- data = get_exec_data()
- if data is None:
- trans_print("当前有任务在执行")
- elif len(data.keys()) == 0:
- trans_print("当前无任务")
- else:
- batch_no = data['batch_code']
- transfer_type = data['transfer_type']
- transfer_file_addr = data['transfer_addr']
- field_code = data['field_code']
- field_name = data['field_name']
- __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, schedule_exec=True)
- def run_local(step=0, end=3, batch_no=None, transfer_type=None, transfer_file_addr=None, field_name=None,
- field_code="测试"):
- if batch_no is None or str(batch_no).strip() == '':
- return "批次编号不能为空"
- if transfer_type not in ['second', 'minute']:
- return "查询类型错误"
- if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
- return "文件路径不能为空"
- __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, schedule_exec=False)
- def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, field_name=None, field_code="测试",
- schedule_exec=False):
- init_log(batch_no, field_name, transfer_type)
- conf_map = get_trans_conf(field_name, transfer_type)
- if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
- message = f"未找到{field_name}的{transfer_type}配置"
- trans_print(message)
- update_trans_status_error(batch_no, transfer_type, message, schedule_exec)
- else:
- resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
- wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
- wind_full_name = read_conf(conf_map, 'wind_full_name')
- is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
- merge_columns = read_conf(conf_map, 'merge_columns', False)
- vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
- index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
- vertical_key = read_conf(conf_map, 'vertical_col_key')
- vertical_value = read_conf(conf_map, 'vertical_col_value')
- need_valid_cols = not merge_columns
- cols_trans_all = dict()
- trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
- 'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
- 'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
- 'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
- 'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
- 'wind_turbine_status2',
- 'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
- 'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
- 'clockwise_yaw_count',
- 'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
- 'required_gearbox_speed',
- 'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
- 'gearbox_high_speed_shaft_bearing_temperature',
- 'gearboxmedium_speed_shaftbearing_temperature',
- 'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
- 'generator_winding2_temperature', 'generator_winding3_temperature',
- 'turbulence_intensity', 'param1',
- 'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
- for col in trans_cols:
- cols_trans_all[col] = read_conf(conf_map, col, '')
- trans_subject = WindFarms(field_name, batch_no=batch_no, field_code=field_code,
- wind_full_name=wind_full_name, schedule_exec=schedule_exec)
- params = TranseParam(read_type=transfer_type, read_path=transfer_file_addr,
- cols_tran=cols_trans_all,
- wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
- vertical_cols=vertical_cols, vertical_key=vertical_key,
- vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
- resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
- trans_subject.set_trans_param(params)
- try:
- trans_subject.run(step=step, end=end)
- except Exception as e:
- logger.exception(e)
- message = "系统返回错误:" + str(e)
- update_trans_status_error(batch_no, transfer_type, message, schedule_exec)
- if __name__ == '__main__':
- step = 0
- end = 3
- batch_no = 'hongtiguan-test'
- transfer_type = 'second'
- transfer_file_addr = r'/data/download/collection_data/1进行中/虹梯官风电场-山西-大唐/收资数据/秒级数据/20240527秒级数据'
- field_name = '虹梯官风电场'
- field_code = "测试"
- run_local(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code)
|