test_run_local.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/11
  3. # @Author : 魏志亮
  4. import os
  5. import sys
  6. def get_exec_data(batch_no=None, read_type=None, run_count=1):
  7. if batch_no and read_type:
  8. data = get_data_by_batch_no_and_type(batch_no, read_type)
  9. if data is None:
  10. raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
  11. else:
  12. data = get_batch_exec_data(run_count)
  13. if data is None:
  14. trans_print("当前有任务在执行")
  15. sys.exit(0)
  16. elif len(data.keys()) == 0:
  17. trans_print("当前无任务")
  18. sys.exit(0)
  19. return data
  20. def run(data: dict = dict(), save_db=False):
  21. exec_process = None
  22. if data['transfer_type'] in ['second', 'minute']:
  23. exec_process = MinSecTrans(data=data, save_db=save_db)
  24. if data['transfer_type'] in ['fault', 'warn']:
  25. exec_process = FaultWarnTrans(data=data, save_db=save_db)
  26. if exec_process is None:
  27. raise Exception("No exec process")
  28. exec_process.run()
  29. if __name__ == '__main__':
  30. env = None
  31. if len(sys.argv) >= 2:
  32. env = sys.argv[1]
  33. else:
  34. env = 'prod'
  35. print(sys.argv)
  36. if env is None:
  37. raise Exception("请配置运行环境")
  38. os.environ['env'] = env
  39. run_count = 1
  40. if len(sys.argv) >= 3:
  41. run_count = int(sys.argv[2])
  42. conf_path = '/data/config/etl_config.yaml'
  43. if len(sys.argv) >= 4:
  44. conf_path = sys.argv[3]
  45. os.environ['ETL_CONF'] = conf_path
  46. from utils.log.trans_log import trans_print
  47. from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
  48. from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
  49. from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
  50. data = dict()
  51. data['batch_code'] = "test"
  52. data['batch_name'] = "test"
  53. data['transfer_type'] = "fault"
  54. # data['transfer_addr'] = r"D:\报警\唐珍风电2023年报警信息.xlsx"
  55. data['transfer_addr'] = r"D:\故障\故障数据\故障记录_20230101_20240101.csv"
  56. data['field_code'] = "test"
  57. data['field_name'] = "唐珍风电故障"
  58. run(data=data, save_db=False)