test_run_local_piliang.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/6/11
  3. # @Author : 魏志亮
  4. import datetime
  5. import os
  6. import sys
  7. import traceback
  8. import pandas as pd
  9. from utils.conf.read_conf import yaml_conf, read_conf
  10. def get_exec_data(batch_no=None, read_type=None, run_count=1):
  11. if batch_no and read_type:
  12. data = get_data_by_batch_no_and_type(batch_no, read_type)
  13. if data is None:
  14. raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
  15. else:
  16. data = get_batch_exec_data(run_count)
  17. if data is None:
  18. trans_print("当前有任务在执行")
  19. sys.exit(0)
  20. elif len(data.keys()) == 0:
  21. trans_print("当前无任务")
  22. sys.exit(0)
  23. return data
  24. def run(data: dict = dict(), save_db=False):
  25. exec_process = None
  26. if data['transfer_type'] in ['second', 'minute']:
  27. exec_process = MinSecTrans(data=data, save_db=save_db)
  28. if data['transfer_type'] in ['fault', 'warn']:
  29. exec_process = FaultWarnTrans(data=data, save_db=save_db)
  30. if exec_process is None:
  31. raise Exception("No exec process")
  32. exec_process.run()
  33. if __name__ == '__main__':
  34. env = 'dev'
  35. if len(sys.argv) >= 2:
  36. env = sys.argv[1]
  37. conf_path = os.path.abspath(f"./conf/etl_config_{env}.yaml")
  38. os.environ['ETL_CONF'] = conf_path
  39. yaml_config = yaml_conf(conf_path)
  40. os.environ['env'] = env
  41. run_count = int(read_conf(yaml_config, "run_batch_count", 1))
  42. from utils.log.trans_log import trans_print
  43. from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
  44. from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
  45. from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
  46. from utils.file.trans_methods import read_file_to_df
  47. begin = datetime.datetime.now()
  48. df = read_file_to_df("tmp_file/rebuild_data.csv")
  49. results = list()
  50. data = dict()
  51. for batch_code, batch_name, transfer_type, transfer_addr, field_code, field_name \
  52. in zip(df['batch_code'], df['batch_name'], df['transfer_type'], df['transfer_addr'], df['field_code'],
  53. df['field_name']):
  54. batch_begin = datetime.datetime.now()
  55. transfer_addr = transfer_addr.replace(r"/data/download/collection_data",
  56. r"/data/download/datang_shangxian")
  57. trans_print("开始执行批次:", batch_code, batch_name, transfer_type, field_code, field_name)
  58. trans_print("批次路径:", transfer_addr)
  59. data['batch_code'] = batch_code
  60. data['batch_name'] = batch_name
  61. data['transfer_type'] = transfer_type
  62. data['transfer_addr'] = transfer_addr
  63. data['field_code'] = field_code
  64. data['field_name'] = field_name
  65. try:
  66. run(data=data, save_db=True)
  67. results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'success'))
  68. except Exception as e:
  69. results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'error'))
  70. trans_print(traceback.format_exc())
  71. finally:
  72. trans_print("执行结束,耗时:", datetime.datetime.now() - batch_begin, "总耗时:", datetime.datetime.now() - begin)
  73. for data in results:
  74. trans_print(data)
  75. trans_print("执行结束,总耗时:", datetime.datetime.now() - begin)