import datetime import json import multiprocessing from os.path import basename, dirname import pandas as pd from service.trans_service import get_wave_conf, save_file_to_db, save_df_to_db from utils.file.trans_methods import * from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent class WaveTrans(object): def __init__(self, field_code, read_path, save_path: str): self.field_code = field_code self.read_path = read_path self.save_path = save_path self.begin = datetime.datetime.now() # def get_data(self, file_path): # df = pd.read_csv(file_path, encoding=detect_file_encoding(file_path), header=None) # data = [i for i in df[0].values] # filename = os.path.basename(file_path) # wind_num = filename.split('_')[1] # cedian = '齿轮箱' + filename.split('_齿轮箱')[1].split('_Time')[0] # cedian_time = filename.split('风机_')[1].split('_齿轮箱')[0].replace('_', ':') # name_tmp = 'Time_' + filename.split('Time_')[1].split('_cms')[0] # pinlv = name_tmp[0:name_tmp.rfind('_')] # zhuansu = name_tmp[name_tmp.rfind('_') + 1:] # # df = pd.DataFrame() # df['风机编号'] = [wind_num, wind_num] # df['时间'] = [cedian_time, cedian_time] # df['频率'] = [pinlv, pinlv] # df['测点'] = ['转速', cedian] # df['数据'] = [[float(zhuansu)], data] # # return df def get_data_exec(self, func_code, arg): exec(func_code) return locals()['get_data'](arg) def run(self, map_dict=dict()): all_files = read_files(self.read_path, ['csv']) print(len) # 最大取系统cpu的 1/2 split_count = get_available_cpu_count_with_percent(1 / 2) wave_conf = get_wave_conf(self.field_code) base_param_exec = wave_conf['base_param_exec'] map_dict = {} if base_param_exec: base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', ' ') print(base_param_exec) # exec(base_param_exec) mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value] for point in mesure_poins: map_dict[wave_conf[point]] = point with multiprocessing.Pool(split_count) as pool: file_datas = pool.starmap(self.get_data_exec, [(base_param_exec, i) for i in all_files]) # for file_data in file_datas: # wind_num, data_time, frequency, rotational_speed, measurementp_name, data = file_data[0], file_data[1], \ # file_data[2], file_data[3], file_data[4], result_list = list() for file_data in file_datas: wind_turbine_name, time_stamp, sampling_frequency, rotational_speed, mesure_point_name, mesure_data = \ file_data[0], file_data[1], file_data[2], file_data[3], file_data[4], file_data[5] result_list.append( [wind_turbine_name, time_stamp, sampling_frequency, 'rotational_speed', [float(rotational_speed)]]) result_list.append( [wind_turbine_name, time_stamp, sampling_frequency, mesure_point_name, mesure_data]) df = pd.DataFrame(result_list, columns=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name', 'mesure_data']) df['mesure_point_name'] = df['mesure_point_name'].map(map_dict).fillna(df['mesure_point_name']) df['mesure_data'] = df['mesure_data'].apply(lambda x: json.dumps(x)) save_df_to_db('SKF001_wave', df, batch_count=1000) print("总耗时:", datetime.datetime.now() - self.begin)