app.py 4.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/17
  3. # @Author : 魏志亮
  4. import datetime
  5. import pandas as pd
  6. from base.TranseParam import TranseParam
  7. from base.WindFarms import WindFarms
  8. from utils.conf.read_conf import read_yaml_file, read_param_from_yaml_file
  9. from utils.db.trans_mysql import get_exec_data
  10. from utils.log.trans_log import trans_print, logger, init_log
  11. def init_params() -> list[WindFarms]:
  12. df = get_exec_data()
  13. print(df)
  14. exec_subjects = list()
  15. path = r'D:\transdata\test\虹梯官风电场-山西-大唐\高频数据2.zip'
  16. changshang_name = '虹梯官风电场'
  17. # read_type = 'minute'
  18. read_type = 'second'
  19. batch_no = 'batch_' + str(datetime.datetime.now().strftime('%Y%m%d'))
  20. df = pd.DataFrame(columns=['batch_no', 'transfer_type', 'transfer_file_addr', 'field_code', 'field_name'],
  21. data=[
  22. [batch_no, read_type, path, 'WOF01000002', changshang_name]])
  23. if df is None:
  24. trans_print("当前任务正在执行")
  25. if df.empty:
  26. trans_print("当前没有需要执行的任务")
  27. else:
  28. for batch_no, transfer_type, transfer_file_addr, field_code, field_name in zip(df['batch_no'],
  29. df['transfer_type'],
  30. df['transfer_file_addr'],
  31. df['field_code'],
  32. df['field_name']):
  33. init_log(batch_no, field_name, transfer_type)
  34. yaml_datas = read_yaml_file(field_name, transfer_type)
  35. time_col = read_param_from_yaml_file(yaml_datas, 'time_col')
  36. wind_col = read_param_from_yaml_file(yaml_datas, 'wind_col')
  37. wind_name_exec = read_param_from_yaml_file(yaml_datas, 'wind_name_exec', None)
  38. cols_trans_all = read_param_from_yaml_file(yaml_datas, 'trans_col')
  39. wind_full_name = read_param_from_yaml_file(yaml_datas, 'wind_full_name')
  40. is_vertical_table = read_param_from_yaml_file(yaml_datas, 'is_vertical_table', False)
  41. vertical_cols = read_param_from_yaml_file(yaml_datas['vertical_table_conf'], 'read_cols', list())
  42. vertical_key = read_param_from_yaml_file(yaml_datas['vertical_table_conf'], 'col_key')
  43. vertical_value = read_param_from_yaml_file(yaml_datas['vertical_table_conf'], 'col_value')
  44. index_cols = read_param_from_yaml_file(yaml_datas['vertical_table_conf'], 'index_cols')
  45. merge_columns = read_param_from_yaml_file(yaml_datas, 'merge_columns', False)
  46. trans_col_exec = read_param_from_yaml_file(yaml_datas, 'trans_col_exec')
  47. need_valid_cols = read_param_from_yaml_file(yaml_datas, 'need_valid_cols', True)
  48. trans_subject = WindFarms(field_name, batch_no=batch_no, field_code=field_code,
  49. wind_full_name=wind_full_name)
  50. params = TranseParam(read_type=transfer_type, read_path=transfer_file_addr,
  51. cols_tran=cols_trans_all, time_col=time_col, wind_col=wind_col,
  52. wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
  53. vertical_cols=vertical_cols, vertical_key=vertical_key,
  54. vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
  55. trans_col_exec=trans_col_exec, need_valid_cols=need_valid_cols)
  56. trans_subject.set_trans_param(params)
  57. exec_subjects.append(trans_subject)
  58. return exec_subjects
  59. if __name__ == '__main__':
  60. exec_subjects = init_params()
  61. for exec_subject in exec_subjects:
  62. try:
  63. exec_subject.run()
  64. # exec_subject.delete_batch_db()
  65. # exec_subject.mutiprocessing_to_save_db()
  66. except Exception as e:
  67. print(e)
  68. logger.exception(e)