import datetime import multiprocessing import os import traceback import pandas as pd import numpy as np from etl.base import TransParam from etl.base.PathsAndTable import PathsAndTable from etl.step.ClassIdentifier import ClassIdentifier from service.plt_service import update_trans_transfer_progress from utils.conf.read_conf import read_conf from utils.df_utils.util import get_time_space from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array from utils.log.trans_log import trans_print from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, print_memory_usage class StatisticsAndSaveFile(object): def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map, rated_power_and_cutout_speed_map): self.paths_and_table = paths_and_table self.trans_param = trans_param self.statistics_map = statistics_map self.lock = multiprocessing.Manager().Lock() self.rated_power_and_cutout_speed_map = rated_power_and_cutout_speed_map def set_statistics_data(self, df): if not df.empty: df['time_stamp'] = pd.to_datetime(df['time_stamp']) min_date = df['time_stamp'].min() max_date = df['time_stamp'].max() with self.lock: if 'min_date' in self.statistics_map.keys(): if self.statistics_map['min_date'] > min_date: self.statistics_map['min_date'] = min_date else: self.statistics_map['min_date'] = min_date if 'max_date' in self.statistics_map.keys(): if self.statistics_map['max_date'] < max_date: self.statistics_map['max_date'] = max_date else: self.statistics_map['max_date'] = max_date if 'total_count' in self.statistics_map.keys(): self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0] else: self.statistics_map['total_count'] = df.shape[0] if 'time_granularity' not in self.statistics_map.keys(): self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp') def save_statistics_file(self): save_path = os.path.join(os.path.dirname(self.paths_and_table.get_save_path()), self.paths_and_table.read_type + '_statistics.txt') create_file_path(save_path, is_file_path=True) with open(save_path, 'w', encoding='utf8') as f: f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n") f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n") f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n") f.write("风机数量:" + str(len(read_excel_files(self.paths_and_table.get_read_tmp_path()))) + "\n") def check_data_validity(self, df): pass def save_to_csv(self, filename): print_memory_usage("开始读取csv:" + os.path.basename(filename)) df = read_file_to_df(filename) if self.trans_param.is_vertical_table: df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key, values=self.trans_param.vertical_value, aggfunc='max') # 重置索引以得到普通的列 df.reset_index(inplace=True) print_memory_usage("结束读取csv,:" + os.path.basename(filename)) # 转化风机名称 trans_print("开始转化风机名称") df['wind_turbine_number'] = df['wind_turbine_number'].astype('str') df['wind_turbine_name'] = df['wind_turbine_number'] df['wind_turbine_number'] = df['wind_turbine_number'].map( self.trans_param.wind_col_trans).fillna(df['wind_turbine_number']) wind_col_name = str(df['wind_turbine_number'].values[0]) print_memory_usage("转化风机名称结束:" + wind_col_name) not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8', 'param9', 'param10'] solve_time_begin = datetime.datetime.now() trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0]) df.replace(np.nan, -999999999, inplace=True) number_cols = df.select_dtypes(include=['number']).columns.tolist() for col in df.columns: if col not in not_double_cols and col not in number_cols: if not df[col].isnull().all(): df[col] = pd.to_numeric(df[col], errors='coerce') # 删除包含NaN的行(即那些列A转换失败的行) df = df.dropna(subset=[col]) trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0]) df.replace(-999999999, np.nan, inplace=True) print_memory_usage("处理非法数据大小结束:" + wind_col_name) trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0]) df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True) trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0]) trans_print("处理非法重复数据结束,耗时:", datetime.datetime.now() - solve_time_begin) print_memory_usage("处理重复数据结束:" + wind_col_name) # 添加年月日 solve_time_begin = datetime.datetime.now() trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename) trans_print(wind_col_name, "时间原始大小:", df.shape[0]) # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)] # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0]) df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce") df.dropna(subset=['time_stamp'], inplace=True) trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0]) df.sort_values(by='time_stamp', inplace=True) trans_print("处理时间字段结束,耗时:", datetime.datetime.now() - solve_time_begin) print_memory_usage("处理时间结果:" + wind_col_name) df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]] print_memory_usage("删减无用字段后内存占用:" + wind_col_name) rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name)) if rated_power_and_cutout_speed_tuple is None: rated_power_and_cutout_speed_tuple = (None, None) print_memory_usage("打标签前内存占用:" + wind_col_name) class_identifiler = ClassIdentifier(wind_turbine_number=wind_col_name, origin_df=df, rated_power=rated_power_and_cutout_speed_tuple[0], cut_out_speed=rated_power_and_cutout_speed_tuple[1]) df = class_identifiler.run() print_memory_usage("打标签后内存占用:" + wind_col_name) df['year'] = df['time_stamp'].dt.year df['month'] = df['time_stamp'].dt.month df['day'] = df['time_stamp'].dt.day df['time_stamp'] = df['time_stamp'].apply( lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) print_memory_usage("添加年月日后:" + wind_col_name) if self.paths_and_table.save_zip: save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv.gz') else: save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv') create_file_path(save_path, is_file_path=True) if self.paths_and_table.save_zip: df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8') else: df.to_csv(save_path, index=False, encoding='utf-8') self.set_statistics_data(df) del df trans_print("保存" + str(wind_col_name) + "成功") def mutiprocessing_to_save_file(self): print_memory_usage("开始执行,占用内存") # 开始保存到正式文件 trans_print("开始保存到excel文件") all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path()) # split_count = self.pathsAndTable.multi_pool_count split_count = use_files_get_max_cpu_count(all_tmp_files) all_arrays = split_array(all_tmp_files, split_count) try: for index, arr in enumerate(all_arrays): with multiprocessing.Pool(split_count) as pool: pool.starmap(self.save_to_csv, [(i,) for i in arr]) update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, round(50 + 20 * (index + 1) / len(all_arrays), 2), self.paths_and_table.save_db) except Exception as e: trans_print(traceback.format_exc()) message = "保存文件错误,系统返回错误:" + str(e) raise ValueError(message) trans_print("结束保存到excel文件") def run(self): trans_print("开始保存数据到正式文件") begin = datetime.datetime.now() self.mutiprocessing_to_save_file() update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, 70, self.paths_and_table.save_db) trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin)