# -*- coding: utf-8 -*- # @Time : 2024/6/11 # @Author : 魏志亮 import os import sys import traceback def run_schedule(step=0, end=4, run_count=1): # 更新超时任务 update_timeout_trans_data() data = get_exec_data(run_count) if data is None: trans_print("当前有任务在执行") elif len(data.keys()) == 0: trans_print("当前无任务") else: batch_no = data['batch_code'] batch_name = data['batch_name'] transfer_type = data['transfer_type'] transfer_file_addr = data['transfer_addr'] field_code = data['field_code'] field_name = data['field_name'] __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code, save_db=True) def run_local(step=0, end=3, batch_no=None, batch_name='', transfer_type=None, transfer_file_addr=None, field_name=None, field_code="测试", save_db=False): if batch_no is None or str(batch_no).strip() == '': return "批次编号不能为空" if transfer_type not in ['second', 'minute', 'second_1']: return "查询类型错误" if transfer_file_addr is None or str(transfer_file_addr).strip() == '': return "文件路径不能为空" __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code, save_db=save_db) def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr=None, field_name=None, field_code="测试", save_db=False): trance_id = '-'.join([batch_no, field_name, transfer_type]) set_trance_id(trance_id) conf_map = get_trans_conf(field_code, field_name, transfer_type) if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0: message = f"未找到{field_name}的{transfer_type}配置" trans_print(message) update_trans_status_error(batch_no, transfer_type, message, save_db) else: resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix') wind_name_exec = read_conf(conf_map, 'wind_name_exec', None) is_vertical_table = read_conf(conf_map, 'is_vertical_table', False) merge_columns = read_conf(conf_map, 'merge_columns', False) vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',') index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',') vertical_key = read_conf(conf_map, 'vertical_col_key') vertical_value = read_conf(conf_map, 'vertical_col_value') need_valid_cols = not merge_columns begin_header = read_conf(conf_map, 'begin_header', 0) cols_trans_all = dict() trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed', 'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3', 'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power', 'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature', 'generatornon_drive_end_bearing_temperature', 'wind_turbine_status', 'wind_turbine_status2', 'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin', 'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque', 'clockwise_yaw_count', 'counterclockwise_yaw_count', 'unusable', 'power_curve_available', 'required_gearbox_speed', 'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature', 'gearbox_high_speed_shaft_bearing_temperature', 'gearboxmedium_speed_shaftbearing_temperature', 'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature', 'generator_winding2_temperature', 'generator_winding3_temperature', 'turbulence_intensity', 'param1', 'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10'] for col in trans_cols: cols_trans_all[col] = read_conf(conf_map, col, '') params = TransParam(read_type=transfer_type, read_path=transfer_file_addr, cols_tran=cols_trans_all, wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table, vertical_cols=vertical_cols, vertical_key=vertical_key, vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns, resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols, header=begin_header) try: trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code, field_name=field_name, save_db=save_db, header=begin_header, trans_param=params) trans_subject.run(step=step, end=end) except Exception as e: trans_print(traceback.format_exc()) message = "系统返回错误:" + str(e) update_trans_status_error(batch_no, transfer_type, message, save_db) finally: set_trance_id("") # trans_subject.pathsAndTable.delete_tmp_files() if __name__ == '__main__': env = None if len(sys.argv) >= 2: env = sys.argv[1] else: env = 'dev' print(sys.argv) if env is None: raise Exception("请配置运行环境") os.environ['env'] = env run_count = 1 if len(sys.argv) >= 3: run_count = int(sys.argv[2]) from utils.log.trans_log import trans_print, set_trance_id from etl.base.TransParam import TransParam from etl.base.WindFarms import WindFarms from service.plt_service import get_exec_data, update_trans_status_error, update_timeout_trans_data from service.trans_service import get_trans_conf from utils.conf.read_conf import read_conf # run_schedule(run_count=run_count) # run_local(0, 3, batch_no='test_11', batch_name='test', transfer_type='minute', # transfer_file_addr=r'D:\trans_data\密马风电场\收资数据\minute', field_name='密马风电场', # field_code="WOF035200003", save_db=False) run_local(4, 4, batch_no='WOF053600062-WOB000010', batch_name='ZYFDC000013', transfer_type='second', transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据', field_name='招远风电场', field_code="WOF053600062", save_db=True) # run_local(0, 3, batch_no='WOF043600007-WOB000001', batch_name='XALFDC0814', transfer_type='second', # transfer_file_addr=r'D:\trans_data\新艾里风电场\收资数据\1号风机', field_name='新艾里风电场', # field_code="WOF043600007", save_db=False)