import datetime import json import multiprocessing from os.path import basename, dirname import pandas as pd from service.plt_service import get_all_wind from service.trans_service import get_wave_conf, save_df_to_db, get_or_create_wave_table, \ get_wave_data, delete_exist_wave_data from utils.file.trans_methods import * from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent class WaveTrans(object): def __init__(self, field_code, read_path, save_path: str): self.field_code = field_code self.read_path = read_path self.save_path = save_path self.begin = datetime.datetime.now() def get_data_exec(self, func_code, arg): exec(func_code) return locals()['get_data'](arg) def del_exists_data(self, df): min_date, max_date = df['time_stamp'].min(), df['time_stamp'].max() db_df = get_wave_data(self.field_code + '_wave', min_date, max_date) exists_df = pd.merge(db_df, df, on=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name'], how='inner') ids = [int(i) for i in exists_df['id'].to_list()] if ids: delete_exist_wave_data(self.field_code + "_wave", ids) def run(self): all_files = read_files(self.read_path, ['csv']) print(len) # 最大取系统cpu的 1/2 split_count = get_available_cpu_count_with_percent(1 / 2) all_wind, _ = get_all_wind(self.field_code, False) get_or_create_wave_table(self.field_code + '_wave') wave_conf = get_wave_conf(self.field_code) base_param_exec = wave_conf['base_param_exec'] map_dict = {} if base_param_exec: base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', ' ') print(base_param_exec) if 'import ' in base_param_exec: raise Exception("方法不支持import方法") mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value] for point in mesure_poins: map_dict[wave_conf[point]] = point.replace('conf_', '') map_dict['rotational_speed'] = 'rotational_speed' with multiprocessing.Pool(split_count) as pool: file_datas = pool.starmap(self.get_data_exec, [(base_param_exec, i) for i in all_files]) print("读取文件耗时:", datetime.datetime.now() - self.begin) result_list = list() for file_data in file_datas: wind_turbine_name, time_stamp, sampling_frequency, rotational_speed, mesure_point_name, mesure_data = \ file_data[0], file_data[1], file_data[2], file_data[3], file_data[4], file_data[5] if mesure_point_name in map_dict.keys(): result_list.append( [wind_turbine_name, time_stamp, sampling_frequency, 'rotational_speed', [float(rotational_speed)]]) result_list.append( [wind_turbine_name, time_stamp, sampling_frequency, mesure_point_name, mesure_data]) df = pd.DataFrame(result_list, columns=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name', 'mesure_data']) df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce') df['mesure_point_name'] = df['mesure_point_name'].map(map_dict) df.dropna(subset=['mesure_point_name'], inplace=True) df['wind_turbine_number'] = df['wind_turbine_name'].map(all_wind).fillna(df['wind_turbine_name']) df['mesure_data'] = df['mesure_data'].apply(lambda x: json.dumps(x)) df.sort_values(by=['time_stamp', 'mesure_point_name'], inplace=True) self.del_exists_data(df) save_df_to_db(self.field_code + '_wave', df, batch_count=1000) print("总耗时:", datetime.datetime.now() - self.begin)