schedule_service.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/11
  3. # @Author : 魏志亮
  4. import os
  5. import sys
  6. import traceback
  7. def run_schedule(step=0, end=4):
  8. data = get_exec_data()
  9. if data is None:
  10. trans_print("当前有任务在执行")
  11. elif len(data.keys()) == 0:
  12. trans_print("当前无任务")
  13. else:
  14. batch_no = data['batch_code']
  15. batch_name = data['batch_name']
  16. transfer_type = data['transfer_type']
  17. transfer_file_addr = data['transfer_addr']
  18. field_code = data['field_code']
  19. field_name = data['field_name']
  20. __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
  21. save_db=True)
  22. def run_local(step=0, end=3, batch_no=None, batch_name='', transfer_type=None, transfer_file_addr=None, field_name=None,
  23. field_code="测试", save_db=False):
  24. if batch_no is None or str(batch_no).strip() == '':
  25. return "批次编号不能为空"
  26. if transfer_type not in ['second', 'minute', 'second_1']:
  27. return "查询类型错误"
  28. if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
  29. return "文件路径不能为空"
  30. __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
  31. save_db=save_db)
  32. def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr=None, field_name=None,
  33. field_code="测试",
  34. save_db=False):
  35. trance_id = '-'.join([batch_no, field_name, transfer_type])
  36. set_trance_id(trance_id)
  37. conf_map = get_trans_conf(field_name, transfer_type)
  38. if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
  39. message = f"未找到{field_name}的{transfer_type}配置"
  40. trans_print(message)
  41. update_trans_status_error(batch_no, transfer_type, message, save_db)
  42. else:
  43. resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
  44. wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
  45. wind_full_name = read_conf(conf_map, 'wind_full_name')
  46. is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
  47. merge_columns = read_conf(conf_map, 'merge_columns', False)
  48. vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
  49. index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
  50. vertical_key = read_conf(conf_map, 'vertical_col_key')
  51. vertical_value = read_conf(conf_map, 'vertical_col_value')
  52. need_valid_cols = not merge_columns
  53. begin_header = read_conf(conf_map, 'begin_header', 0)
  54. cols_trans_all = dict()
  55. trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
  56. 'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
  57. 'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
  58. 'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
  59. 'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
  60. 'wind_turbine_status2',
  61. 'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
  62. 'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
  63. 'clockwise_yaw_count',
  64. 'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
  65. 'required_gearbox_speed',
  66. 'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
  67. 'gearbox_high_speed_shaft_bearing_temperature',
  68. 'gearboxmedium_speed_shaftbearing_temperature',
  69. 'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
  70. 'generator_winding2_temperature', 'generator_winding3_temperature',
  71. 'turbulence_intensity', 'param1',
  72. 'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
  73. for col in trans_cols:
  74. cols_trans_all[col] = read_conf(conf_map, col, '')
  75. trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
  76. wind_full_name=wind_full_name, save_db=save_db, header=begin_header)
  77. params = TranseParam(read_type=transfer_type, read_path=transfer_file_addr,
  78. cols_tran=cols_trans_all,
  79. wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
  80. vertical_cols=vertical_cols, vertical_key=vertical_key,
  81. vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
  82. resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
  83. trans_subject.set_trans_param(params)
  84. try:
  85. trans_subject.run(step=step, end=end)
  86. except Exception as e:
  87. trans_print(traceback.format_exc())
  88. message = "系统返回错误:" + str(e)
  89. update_trans_status_error(batch_no, transfer_type, message, save_db)
  90. finally:
  91. set_trance_id("")
  92. trans_subject.delete_tmp_files()
  93. if __name__ == '__main__':
  94. env = None
  95. if len(sys.argv) >= 2:
  96. env = sys.argv[1]
  97. print(sys.argv)
  98. if env is None:
  99. raise Exception("请配置运行环境")
  100. os.environ['env'] = env
  101. from utils.log.trans_log import trans_print, set_trance_id
  102. from etl.base.TranseParam import TranseParam
  103. from etl.base.WindFarms import WindFarms
  104. from service.plt_service import get_exec_data, update_trans_status_error
  105. from service.trans_service import get_trans_conf
  106. from utils.conf.read_conf import read_conf
  107. run_schedule()
  108. # run_local(4, 4, batch_no='WOF053600062-WOB00022', batch_name='test-6-24', transfer_type='second',
  109. # transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据', field_name='招远风电场',
  110. # field_code="测试", save_db=True)