import json import multiprocessing import traceback from typing import Tuple from conf.constants import ParallelProcessing, Types from service.plt_service import get_all_wind from service.trans_conf_service import update_trans_status_running, update_trans_transfer_progress, \ update_trans_status_success, update_trans_status_error from service.trans_service import get_wave_conf, save_df_to_db, get_or_create_wave_table, \ get_wave_data, delete_exist_wave_data from utils.file.trans_methods import * from utils.log.trans_log import set_trance_id, info, error from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent exec("from os.path import *") exec("import re") class WaveTrans(object): """波形数据转换类""" def __init__(self, id: int, wind_farm_code: str, read_dir: str): """ 初始化波形数据转换类 Args: id: 任务ID wind_farm_code: 风电场编码 read_dir: 读取目录 """ self.id = id self.wind_farm_code = wind_farm_code self.read_dir = read_dir self.begin = datetime.datetime.now() self.engine_count = 0 self.min_date = None self.max_date = None self.data_count = 0 def get_data_exec(self, func_code: str, filepath: str, measupoint_names: List[str]) -> Optional[Tuple]: """ 执行数据获取函数 Args: func_code: 函数代码 filepath: 文件路径 measupoint_names: 测量点名称列表 Returns: 数据元组 """ exec(func_code) return locals()['get_data'](filepath, measupoint_names) def del_exists_data(self, df: pd.DataFrame): """ 删除已存在的数据 Args: df: 数据帧 """ min_date, max_date = df['time_stamp'].min(), df['time_stamp'].max() db_df = get_wave_data(self.wind_farm_code + '_wave', min_date, max_date) exists_df = pd.merge(db_df, df, on=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name'], how='inner') ids = [int(i) for i in exists_df['id'].to_list()] if ids: delete_exist_wave_data(self.wind_farm_code + "_wave", ids) def run(self): """运行波形数据转换""" update_trans_status_running(self.id) trance_id = '-'.join([self.wind_farm_code, 'wave']) set_trance_id(trance_id) all_files = read_files(self.read_dir, ['txt', 'csv']) update_trans_transfer_progress(self.id, 5) # 最大取系统cpu的 1/2 split_count = get_available_cpu_count_with_percent(1 / 2) # 限制最大进程数 split_count = min(split_count, ParallelProcessing.MAX_PROCESSES) all_wind, _ = get_all_wind(self.wind_farm_code, False) get_or_create_wave_table(self.wind_farm_code + '_wave') wave_conf = get_wave_conf(self.wind_farm_code) base_param_exec = wave_conf.get('base_param_exec', '') map_dict = {} if base_param_exec: base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', ' ') info(base_param_exec) if 'import ' in base_param_exec: raise Exception("方法不支持import方法") mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value] for point in mesure_poins: map_dict[wave_conf[point].strip()] = point.replace('conf_', '') wind_turbine_name_set = set() # 优化批次大小 batch_size = split_count * 10 all_array = split_array(all_files, batch_size) total_index = len(all_array) for index, now_array in enumerate(all_array): index_begin = datetime.datetime.now() with multiprocessing.Pool(split_count) as pool: try: file_datas = pool.starmap(self.get_data_exec, [(base_param_exec, i, list(map_dict.keys())) for i in now_array]) info(f'总数:{len(now_array)},返回个数{len(file_datas)}') except Exception as e: message = str(e) error(traceback.format_exc()) update_trans_status_error(self.id, message[0:len(message) if len(message) < 100 else 100]) raise e update_trans_transfer_progress(self.id, 20 + int(index / total_index * 60)) info("读取文件耗时:", datetime.datetime.now() - self.begin) result_list = list() for file_data in file_datas: if file_data: wind_turbine_name, time_stamp, sampling_frequency, rotational_speed, mesure_point_name, type, mesure_data = \ file_data[0], file_data[1], file_data[2], file_data[3], file_data[4], file_data[5], file_data[6] if mesure_point_name in map_dict: wind_turbine_name_set.add(wind_turbine_name) if self.min_date is None or self.min_date > time_stamp: self.min_date = time_stamp if self.max_date is None or self.max_date < time_stamp: self.max_date = time_stamp result_list.append( [wind_turbine_name, time_stamp, rotational_speed, sampling_frequency, mesure_point_name, type, mesure_data]) if result_list: self.data_count += len(result_list) df = pd.DataFrame(result_list, columns=['wind_turbine_name', 'time_stamp', 'rotational_speed', 'sampling_frequency', 'mesure_point_name', 'type', 'mesure_data']) df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') df['mesure_point_name'] = df['mesure_point_name'].map(map_dict) df.dropna(subset=['mesure_point_name'], inplace=True) df['wind_turbine_number'] = df['wind_turbine_name'].map(all_wind).fillna(df['wind_turbine_name']) # 批量处理JSON序列化 df['mesure_data'] = df['mesure_data'].apply(lambda x: json.dumps(x)) df.sort_values(by=['time_stamp', 'mesure_point_name'], inplace=True) # self.del_exists_data(df) save_df_to_db(self.wind_farm_code + '_wave', df, batch_count=400) info(f"总共{total_index}组,当前{index + 1}", "本次写入耗时:", datetime.datetime.now() - index_begin, "总耗时:", datetime.datetime.now() - self.begin) update_trans_status_success(self.id, len(wind_turbine_name_set), Types.WAVE, self.min_date, self.max_date, self.data_count) info("总耗时:", datetime.datetime.now() - self.begin)