import datetime import multiprocessing import os import traceback import pandas as pd from etl.base import TransParam from etl.base.PathsAndTable import PathsAndTable from service.plt_service import update_trans_transfer_progress from utils.file.trans_methods import read_excel_files, split_array, del_blank, \ create_file_path, read_file_to_df from utils.log.trans_log import trans_print from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count class ReadAndSaveTmp(object): def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam): self.pathsAndTable = pathsAndTable self.trans_param = trans_param self.exist_wind_names = multiprocessing.Manager().list() self.lock = multiprocessing.Manager().Lock() self.file_lock = multiprocessing.Manager().dict() def _save_to_tmp_csv_by_name(self, df, name): save_name = str(name) + '.csv' save_path = os.path.join(self.pathsAndTable.get_read_tmp_path(), save_name) create_file_path(save_path, is_file_path=True) with self.lock: if name in self.exist_wind_names: contains_name = True else: contains_name = False self.exist_wind_names.append(name) if contains_name: df.to_csv(save_path, index=False, encoding='utf8', mode='a', header=False) else: df.to_csv(save_path, index=False, encoding='utf8') def save_merge_data(self, file_path): df = self.read_excel_to_df(file_path) if self.trans_param.wind_name_exec: exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )" df['wind_turbine_number'] = eval(exec_str) names = set(df['wind_turbine_number'].values) cols = list(df.columns) cols.sort() csv_name = "-".join(cols) + ".csv" for name in names: exist_name = name + '-' + csv_name merge_path = self.pathsAndTable.get_merge_tmp_path(name) create_file_path(merge_path) with self.lock: if exist_name in self.exist_wind_names: contains_name = True else: contains_name = False self.exist_wind_names.append(exist_name) save_path = os.path.join(merge_path, csv_name) if contains_name: df.to_csv(save_path, index=False, encoding='utf-8', mode='a', header=False) else: df.to_csv(save_path, index=False, encoding='utf-8') def df_save_to_tmp_file(self, df=pd.DataFrame()): if self.trans_param.is_vertical_table: pass else: # 转换字段 same_col = {} if self.trans_param.cols_tran: cols_tran = self.trans_param.cols_tran real_cols_trans = dict() for k, v in cols_tran.items(): if v and not v.startswith("$"): if v not in real_cols_trans.keys(): real_cols_trans[v] = k else: value = real_cols_trans[v] if value in same_col.keys(): same_col[value].append(k) else: same_col[value] = [k] trans_print("包含转换字段,开始处理转换字段") df.rename(columns=real_cols_trans, inplace=True) # 添加使用同一个excel字段的值 for key in same_col.keys(): for col in same_col[key]: df[col] = df[key] del_keys = set(df.columns) - set(cols_tran.keys()) for key in del_keys: df.drop(key, axis=1, inplace=True) df = del_blank(df, ['wind_turbine_number']) df = df[df['time_stamp'].isna() == False] if self.trans_param.wind_name_exec and not self.trans_param.merge_columns: exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )" df['wind_turbine_number'] = eval(exec_str) # 删除 有功功率 和 风速均为空的情况 df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True) self.save_to_tmp_csv(df) def save_to_tmp_csv(self, df): names = set(df['wind_turbine_number'].values) if names: trans_print("开始保存", str(names), "到临时文件") for name in names: self._save_to_tmp_csv_by_name(df[df['wind_turbine_number'] == name], name) del df trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names)) def merge_df(self, dir_path): all_files = read_excel_files(dir_path) df = pd.DataFrame() for file in all_files: now_df = read_file_to_df(file) now_df.dropna(subset=['time_stamp'], inplace=True) now_df.drop_duplicates(subset=['time_stamp'], inplace=True) now_df.set_index(keys=['time_stamp', 'wind_turbine_number'], inplace=True) df = pd.concat([df, now_df], axis=1) df.reset_index(inplace=True) self.df_save_to_tmp_file(df) return df def read_file_and_save_tmp(self): all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path()) split_count = use_files_get_max_cpu_count(all_files) all_arrays = split_array(all_files, split_count) if self.trans_param.merge_columns: for index, arr in enumerate(all_arrays): try: with multiprocessing.Pool(split_count) as pool: pool.starmap(self.save_merge_data, [(ar,) for ar in arr]) except Exception as e: trans_print(traceback.format_exc()) message = "整理临时文件,系统返回错误:" + str(e) raise ValueError(message) update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, round(20 + 20 * (index + 1) / len(all_arrays), 2), self.pathsAndTable.save_db) dirs = [os.path.join(self.pathsAndTable.get_merge_tmp_path(), dir_name) for dir_name in os.listdir(self.pathsAndTable.get_merge_tmp_path())] dir_total_size = get_dir_size(dirs[0]) split_count = max_file_size_get_max_cpu_count(dir_total_size) all_arrays = split_array(dirs, split_count) for index, arr in enumerate(all_arrays): try: with multiprocessing.Pool(split_count) as pool: pool.starmap(self.merge_df, [(ar,) for ar in arr]) except Exception as e: trans_print(traceback.format_exc()) message = "整理临时文件,系统返回错误:" + str(e) raise ValueError(message) update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, round(20 + 30 * (index + 1) / len(all_arrays), 2), self.pathsAndTable.save_db) else: for index, arr in enumerate(all_arrays): try: with multiprocessing.Pool(split_count) as pool: dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr]) for df in dfs: self.df_save_to_tmp_file(df) except Exception as e: trans_print(traceback.format_exc()) message = "整理临时文件,系统返回错误:" + str(e) raise ValueError(message) update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, round(20 + 30 * (index + 1) / len(all_arrays), 2), self.pathsAndTable.save_db) def read_excel_to_df(self, file_path): read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")] trans_dict = {} for k, v in self.trans_param.cols_tran.items(): if v and not str(v).startswith("$"): trans_dict[v] = k if self.trans_param.is_vertical_table: vertical_cols = self.trans_param.vertical_cols df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header, trans_cols=self.trans_param.vertical_cols) df = df[df[self.trans_param.vertical_key].isin(read_cols)] df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number', self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True) df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna( df[self.trans_param.vertical_key]) return df else: trans_dict = dict() trans_cols = [] for k, v in self.trans_param.cols_tran.items(): if v and v.startswith("$") or v.find(",") > 0: trans_dict[v] = k if v.find("|") > -1: vs = v.split("|") trans_cols.extend(vs) else: trans_cols.append(v) trans_cols = list(set(trans_cols)) if self.trans_param.merge_columns: df = read_file_to_df(file_path, header=self.trans_param.header, trans_cols=trans_cols) else: if self.trans_param.need_valid_cols: df = read_file_to_df(file_path, read_cols, header=self.trans_param.header, trans_cols=trans_cols) else: df = read_file_to_df(file_path, header=self.trans_param.header, trans_cols=trans_cols) # 处理列名前缀问题 if self.trans_param.resolve_col_prefix: columns_dict = dict() for column in df.columns: columns_dict[column] = eval(self.trans_param.resolve_col_prefix) df.rename(columns=columns_dict, inplace=True) if self.trans_param.merge_columns: select_cols = [self.trans_param.cols_tran['wind_turbine_number'], self.trans_param.cols_tran['time_stamp'], 'wind_turbine_number', 'time_stamp'] select_cols.extend(trans_cols) rename_dict = dict() wind_turbine_number_col = self.trans_param.cols_tran['wind_turbine_number'] if wind_turbine_number_col.find("|") > -1: cols = wind_turbine_number_col.split("|") for col in cols: rename_dict[col] = 'wind_turbine_number' time_stamp_col = self.trans_param.cols_tran['time_stamp'] if time_stamp_col.find("|") > -1: cols = time_stamp_col.split("|") for col in cols: rename_dict[col] = 'time_stamp' df.rename(columns=rename_dict, inplace=True) for col in df.columns: if col not in select_cols: del df[col] for k, v in trans_dict.items(): if k.startswith("$file"): file = ".".join(os.path.basename(file_path).split(".")[0:-1]) if k == "$file": ks = k.split("|") bool_contains = False for k_data in ks: if k_data in df.columns or v in df.columns: bool_contains = True if not bool_contains: df[v] = str(file) elif k.startswith("$file["): ks = k.split("|") bool_contains = False for k_data in ks: if k_data in df.columns or v in df.columns: bool_contains = True if not bool_contains: datas = str(ks[0].replace("$file", "").replace("[", "").replace("]", "")).split(":") if len(datas) != 2: raise Exception("字段映射出现错误 :" + str(trans_dict)) df[v] = str(file[int(datas[0]):int(datas[1])]).strip() elif k.startswith("$file.split"): ks = k.split("|") bool_contains = False for k_data in ks: if k_data in df.columns or v in df.columns: bool_contains = True if not bool_contains: datas = str(ks[0]).replace("$file.split(", "").replace(")", "").split(",") split_str = str(datas[0]) split_index = int(datas[1]) df[v] = str(file.split(split_str)[split_index]) elif k.find("$file_date") > 0: datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":") if len(datas) != 2: raise Exception("字段映射出现错误 :" + str(trans_dict)) file = ".".join(os.path.basename(file_path).split(".")[0:-1]) date_str = str(file[int(datas[0]):int(datas[1])]).strip() df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x)) elif k.startswith("$folder"): folder = file_path ks = k.split("|") bool_contains = False for k_data in ks: if k_data in df.columns or v in df.columns: bool_contains = True if not bool_contains: cengshu = int(str(ks[0].replace("$folder", "").replace("[", "").replace("]", ""))) for i in range(cengshu): folder = os.path.dirname(folder) df[v] = str(str(folder).split(os.sep)[-1]).strip() elif k.startswith("$sheet_name"): df[v] = df['sheet_name'] if 'time_stamp' not in df.columns: cols_trans = [i for i in self.trans_param.cols_tran['time_stamp'].split('|')] cols_dict = dict() for col in cols_trans: cols_dict[col] = 'time_stamp' df.rename(columns=cols_dict, inplace=True) if 'wind_turbine_number' not in df.columns: cols_trans = [i for i in self.trans_param.cols_tran['wind_turbine_number'].split('|')] cols_dict = dict() for col in cols_trans: cols_dict[col] = 'wind_turbine_number' df.rename(columns=cols_dict, inplace=True) return df def run(self): trans_print("开始保存数据到临时文件") begin = datetime.datetime.now() self.read_file_and_save_tmp() update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 50, self.pathsAndTable.save_db) trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)