test_run_local.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/11
  3. # @Author : 魏志亮
  4. import datetime
  5. import os
  6. import sys
  7. import traceback
  8. from utils.conf.read_conf import yaml_conf, read_conf
  9. def get_exec_data(batch_no=None, read_type=None, run_count=1):
  10. if batch_no and read_type:
  11. data = get_data_by_batch_no_and_type(batch_no, read_type)
  12. if data is None:
  13. raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
  14. else:
  15. data = get_batch_exec_data(run_count)
  16. if data is None:
  17. trans_print("当前有任务在执行")
  18. sys.exit(0)
  19. elif len(data.keys()) == 0:
  20. trans_print("当前无任务")
  21. sys.exit(0)
  22. return data
  23. def run(data: dict = dict(), save_db=False, step=0, end=4):
  24. exec_process = None
  25. if data['transfer_type'] in ['second', 'minute']:
  26. exec_process = MinSecTrans(data=data, save_db=save_db, step=step, end=end)
  27. if data['transfer_type'] in ['fault', 'warn']:
  28. exec_process = FaultWarnTrans(data=data, save_db=save_db, step=step, end=end)
  29. if exec_process is None:
  30. raise Exception("No exec process")
  31. exec_process.run()
  32. if __name__ == '__main__':
  33. env = 'dev'
  34. if len(sys.argv) >= 2:
  35. env = sys.argv[1]
  36. conf_path = os.path.abspath(f"./conf/etl_config_{env}.yaml")
  37. os.environ['ETL_CONF'] = conf_path
  38. yaml_config = yaml_conf(conf_path)
  39. os.environ['env'] = env
  40. run_count = int(read_conf(yaml_config, "run_batch_count", 1))
  41. from utils.log.trans_log import trans_print
  42. from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
  43. from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
  44. from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
  45. begin = datetime.datetime.now()
  46. data = dict()
  47. data['batch_code'] = 'xinhuashuidian'
  48. data['batch_name'] = '新华水电故障'
  49. data['transfer_type'] = 'fault'
  50. data['transfer_addr'] = r'D:\data\新华水电\收资数据\故障告警\汇能机组数据-故障'
  51. data['field_code'] = 'xinhuashuidian'
  52. data['field_name'] = '新华水电'
  53. try:
  54. run(data=data, save_db=False, step=0, end=3)
  55. except Exception as e:
  56. trans_print(traceback.format_exc())
  57. trans_print("执行结束,总耗时:", datetime.datetime.now() - begin)