| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import json
- import multiprocessing
- import traceback
- from typing import Tuple
- from conf.constants import ParallelProcessing, Types
- from service.plt_service import get_all_wind
- from service.trans_conf_service import update_trans_status_running, update_trans_transfer_progress, \
- update_trans_status_success, update_trans_status_error
- from service.trans_service import get_wave_conf, save_df_to_db, get_or_create_wave_table, \
- get_wave_data, delete_exist_wave_data
- from utils.file.trans_methods import *
- from utils.log.trans_log import set_trance_id, info, error
- from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
- exec("from os.path import *")
- exec("import re")
- class WaveTrans(object):
- """波形数据转换类"""
- def __init__(self, id: int, wind_farm_code: str, read_dir: str):
- """
- 初始化波形数据转换类
-
- Args:
- id: 任务ID
- wind_farm_code: 风电场编码
- read_dir: 读取目录
- """
- self.id = id
- self.wind_farm_code = wind_farm_code
- self.read_dir = read_dir
- self.begin = datetime.datetime.now()
- self.engine_count = 0
- self.min_date = None
- self.max_date = None
- self.data_count = 0
- def get_data_exec(self, func_code: str, filepath: str, measupoint_names: List[str]) -> Optional[Tuple]:
- """
- 执行数据获取函数
-
- Args:
- func_code: 函数代码
- filepath: 文件路径
- measupoint_names: 测量点名称列表
-
- Returns:
- 数据元组
- """
- exec(func_code)
- return locals()['get_data'](filepath, measupoint_names)
- def del_exists_data(self, df: pd.DataFrame):
- """
- 删除已存在的数据
-
- Args:
- df: 数据帧
- """
- min_date, max_date = df['time_stamp'].min(), df['time_stamp'].max()
- db_df = get_wave_data(self.wind_farm_code + '_wave', min_date, max_date)
- exists_df = pd.merge(db_df, df,
- on=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name'],
- how='inner')
- ids = [int(i) for i in exists_df['id'].to_list()]
- if ids:
- delete_exist_wave_data(self.wind_farm_code + "_wave", ids)
- def run(self):
- """运行波形数据转换"""
- update_trans_status_running(self.id)
- trance_id = '-'.join([self.wind_farm_code, 'wave'])
- set_trance_id(trance_id)
- all_files = read_files(self.read_dir, ['txt', 'csv'])
- update_trans_transfer_progress(self.id, 5)
- # 最大取系统cpu的 1/2
- split_count = get_available_cpu_count_with_percent(1 / 2)
- # 限制最大进程数
- split_count = min(split_count, ParallelProcessing.MAX_PROCESSES)
- all_wind, _ = get_all_wind(self.wind_farm_code, False)
- get_or_create_wave_table(self.wind_farm_code + '_wave')
- wave_conf = get_wave_conf(self.wind_farm_code)
- base_param_exec = wave_conf.get('base_param_exec', '')
- map_dict = {}
- if base_param_exec:
- base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', ' ')
- info(base_param_exec)
- if 'import ' in base_param_exec:
- raise Exception("方法不支持import方法")
- mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value]
- for point in mesure_poins:
- map_dict[wave_conf[point].strip()] = point.replace('conf_', '')
- wind_turbine_name_set = set()
- # 优化批次大小
- batch_size = split_count * 10
- all_array = split_array(all_files, batch_size)
- total_index = len(all_array)
- for index, now_array in enumerate(all_array):
- index_begin = datetime.datetime.now()
- with multiprocessing.Pool(split_count) as pool:
- try:
- file_datas = pool.starmap(self.get_data_exec,
- [(base_param_exec, i, list(map_dict.keys())) for i in now_array])
- info(f'总数:{len(now_array)},返回个数{len(file_datas)}')
- except Exception as e:
- message = str(e)
- error(traceback.format_exc())
- update_trans_status_error(self.id, message[0:len(message) if len(message) < 100 else 100])
- raise e
- update_trans_transfer_progress(self.id, 20 + int(index / total_index * 60))
- info("读取文件耗时:", datetime.datetime.now() - self.begin)
- result_list = list()
- for file_data in file_datas:
- if file_data:
- wind_turbine_name, time_stamp, sampling_frequency, rotational_speed, mesure_point_name, type, mesure_data = \
- file_data[0], file_data[1], file_data[2], file_data[3], file_data[4], file_data[5], file_data[6]
- if mesure_point_name in map_dict:
- wind_turbine_name_set.add(wind_turbine_name)
- if self.min_date is None or self.min_date > time_stamp:
- self.min_date = time_stamp
- if self.max_date is None or self.max_date < time_stamp:
- self.max_date = time_stamp
- result_list.append(
- [wind_turbine_name, time_stamp, rotational_speed, sampling_frequency, mesure_point_name,
- type,
- mesure_data])
- if result_list:
- self.data_count += len(result_list)
- df = pd.DataFrame(result_list,
- columns=['wind_turbine_name', 'time_stamp', 'rotational_speed', 'sampling_frequency',
- 'mesure_point_name', 'type', 'mesure_data'])
- df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
- df['mesure_point_name'] = df['mesure_point_name'].map(map_dict)
- df.dropna(subset=['mesure_point_name'], inplace=True)
- df['wind_turbine_number'] = df['wind_turbine_name'].map(all_wind).fillna(df['wind_turbine_name'])
- # 批量处理JSON序列化
- df['mesure_data'] = df['mesure_data'].apply(lambda x: json.dumps(x))
- df.sort_values(by=['time_stamp', 'mesure_point_name'], inplace=True)
- # self.del_exists_data(df)
- save_df_to_db(self.wind_farm_code + '_wave', df, batch_count=400)
- info(f"总共{total_index}组,当前{index + 1}", "本次写入耗时:", datetime.datetime.now() - index_begin,
- "总耗时:", datetime.datetime.now() - self.begin)
- update_trans_status_success(self.id, len(wind_turbine_name_set), Types.WAVE,
- self.min_date, self.max_date, self.data_count)
- info("总耗时:", datetime.datetime.now() - self.begin)
|