# -*- coding: utf-8 -*- # @Time : 2024/5/17 # @Author : 魏志亮 import datetime import pandas as pd from base.TranseParam import TranseParam from base.WindFarms import WindFarms from utils.conf.read_conf import read_yaml_file, read_param_from_yaml_file from utils.db.trans_mysql import get_exec_data from utils.log.trans_log import trans_print, logger, init_log def init_params() -> list[WindFarms]: df = get_exec_data() print(df) exec_subjects = list() path = r'D:\transdata\test\虹梯官风电场-山西-大唐\高频数据2.zip' changshang_name = '虹梯官风电场' # read_type = 'minute' read_type = 'second' batch_no = 'batch_' + str(datetime.datetime.now().strftime('%Y%m%d')) df = pd.DataFrame(columns=['batch_no', 'transfer_type', 'transfer_file_addr', 'field_code', 'field_name'], data=[ [batch_no, read_type, path, 'WOF01000002', changshang_name]]) if df is None: trans_print("当前任务正在执行") if df.empty: trans_print("当前没有需要执行的任务") else: for batch_no, transfer_type, transfer_file_addr, field_code, field_name in zip(df['batch_no'], df['transfer_type'], df['transfer_file_addr'], df['field_code'], df['field_name']): init_log(batch_no, field_name, transfer_type) yaml_datas = read_yaml_file(field_name, transfer_type) time_col = read_param_from_yaml_file(yaml_datas, 'time_col') wind_col = read_param_from_yaml_file(yaml_datas, 'wind_col') wind_name_exec = read_param_from_yaml_file(yaml_datas, 'wind_name_exec', None) cols_trans_all = read_param_from_yaml_file(yaml_datas, 'trans_col') wind_full_name = read_param_from_yaml_file(yaml_datas, 'wind_full_name') is_vertical_table = read_param_from_yaml_file(yaml_datas, 'is_vertical_table', False) vertical_cols = read_param_from_yaml_file(yaml_datas['vertical_table_conf'], 'read_cols', list()) vertical_key = read_param_from_yaml_file(yaml_datas['vertical_table_conf'], 'col_key') vertical_value = read_param_from_yaml_file(yaml_datas['vertical_table_conf'], 'col_value') index_cols = read_param_from_yaml_file(yaml_datas['vertical_table_conf'], 'index_cols') merge_columns = read_param_from_yaml_file(yaml_datas, 'merge_columns', False) trans_col_exec = read_param_from_yaml_file(yaml_datas, 'trans_col_exec') need_valid_cols = read_param_from_yaml_file(yaml_datas, 'need_valid_cols', True) trans_subject = WindFarms(field_name, batch_no=batch_no, field_code=field_code, wind_full_name=wind_full_name) params = TranseParam(read_type=transfer_type, read_path=transfer_file_addr, cols_tran=cols_trans_all, time_col=time_col, wind_col=wind_col, wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table, vertical_cols=vertical_cols, vertical_key=vertical_key, vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns, trans_col_exec=trans_col_exec, need_valid_cols=need_valid_cols) trans_subject.set_trans_param(params) exec_subjects.append(trans_subject) return exec_subjects if __name__ == '__main__': exec_subjects = init_params() for exec_subject in exec_subjects: try: exec_subject.run() # exec_subject.delete_batch_db() # exec_subject.mutiprocessing_to_save_db() except Exception as e: print(e) logger.exception(e)