# -*- coding: utf-8 -*- # @Time : 2024/5/15 # @Author : 魏志亮 import datetime import multiprocessing import tempfile import traceback from etl.base.TranseParam import TranseParam from service.plt_service import get_all_wind, update_trans_status_error, update_trans_status_running, \ update_trans_status_success from service.trans_service import creat_table_and_add_partition, rename_table, save_file_to_db, drop_table from utils.file.trans_methods import * from utils.zip.unzip import unzip, unrar, get_desc_path class WindFarms(object): def __init__(self, batch_no=None, field_code=None, params: TranseParam = None, wind_full_name=None, save_db=True, header=0): self.batch_no = batch_no self.field_code = field_code self.wind_full_name = wind_full_name self.save_zip = False self.trans_param = params self.exist_wind_names = multiprocessing.Manager().list() self.wind_col_trans = get_all_wind(self.field_code) self.batch_count = 50000 self.save_path = None self.save_db = save_db self.lock = multiprocessing.Manager().Lock() self.statistics_map = multiprocessing.Manager().dict() self.header = header def set_trans_param(self, params: TranseParam): self.trans_param = params read_path = str(params.read_path) if read_path.find(self.wind_full_name) == -1: message = "读取路径与配置路径不匹配:" + self.trans_param.read_path + ",配置文件为:" + self.wind_full_name update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db) raise ValueError(message) self.save_path = os.path.join(read_path[0:read_path.find(self.wind_full_name)], self.wind_full_name, "清理数据") def params_valid(self, not_null_list=list()): for arg in not_null_list: if arg is None or arg == '': raise Exception("Invalid param set :" + arg) def get_save_path(self): return os.path.join(self.save_path, self.batch_no, self.trans_param.read_type) def get_save_tmp_path(self): return os.path.join(tempfile.gettempdir(), self.wind_full_name, self.batch_no, self.trans_param.read_type) def get_excel_tmp_path(self): return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep) def get_read_tmp_path(self): return os.path.join(self.get_save_tmp_path(), 'read_tmp') def df_save_to_tmp_file(self, df=pd.DataFrame(), file=None): if self.trans_param.is_vertical_table: pass else: # 转换字段 if self.trans_param.cols_tran: cols_tran = self.trans_param.cols_tran real_cols_trans = dict() for k, v in cols_tran.items(): if v and not v.startswith("$"): real_cols_trans[v] = k trans_print("包含转换字段,开始处理转换字段") df.rename(columns=real_cols_trans, inplace=True) del_keys = set(df.columns) - set(cols_tran.keys()) for key in del_keys: df.drop(key, axis=1, inplace=True) df = del_blank(df, ['wind_turbine_number']) df = df[df['time_stamp'].isna() == False] if self.trans_param.wind_name_exec: exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )" df['wind_turbine_number'] = eval(exec_str) self.save_to_tmp_csv(df, file) def get_and_remove(self, file, thead_local=None): to_path = self.get_excel_tmp_path() if str(file).endswith("zip"): if str(file).endswith("csv.zip"): copy_to_new(file, file.replace(self.trans_param.read_path, to_path).replace("csv.zip", 'csv.gz')) else: desc_path = file.replace(self.trans_param.read_path, to_path) is_success, e = unzip(file, get_desc_path(desc_path)) self.trans_param.has_zip = True if not is_success: # raise e pass elif str(file).endswith("rar"): desc_path = file.replace(self.trans_param.read_path, to_path) is_success, e = unrar(file, get_desc_path(desc_path)) self.trans_param.has_zip = True if not is_success: # raise e pass else: copy_to_new(file, file.replace(self.trans_param.read_path, to_path)) def read_excel_to_df(self, file_path): read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")] trans_dict = {} for k, v in self.trans_param.cols_tran.items(): if v and not str(v).startswith("$"): trans_dict[v] = k if self.trans_param.is_vertical_table: vertical_cols = self.trans_param.vertical_cols df = read_file_to_df(file_path, vertical_cols, header=self.header) df = df[df[self.trans_param.vertical_key].isin(read_cols)] df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number', self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True) df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna( df[self.trans_param.vertical_key]) return df else: trans_dict = dict() for k, v in self.trans_param.cols_tran.items(): if v and v.startswith("$") or v.find(",") > 0: trans_dict[v] = k if self.trans_param.merge_columns: df = read_file_to_df(file_path, header=self.header) else: if self.trans_param.need_valid_cols: df = read_file_to_df(file_path, read_cols, header=self.header) else: df = read_file_to_df(file_path, header=self.header) # 处理列名前缀问题 if self.trans_param.resolve_col_prefix: columns_dict = dict() for column in df.columns: columns_dict[column] = eval(self.trans_param.resolve_col_prefix) df.rename(columns=columns_dict, inplace=True) for k, v in trans_dict.items(): if k.startswith("$file"): file = ".".join(os.path.basename(file_path).split(".")[0:-1]) if k == "$file": df[v] = str(file) elif k.startswith("$file["): datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":") if len(datas) != 2: raise Exception("字段映射出现错误 :" + str(trans_dict)) df[v] = str(file[int(datas[0]):int(datas[1])]).strip() elif k.find("$file_date") > 0: datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":") if len(datas) != 2: raise Exception("字段映射出现错误 :" + str(trans_dict)) date_str = str(file[int(datas[0]):int(datas[1])]).strip() df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x)) elif k.startswith("$folder"): folder = file_path cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", ""))) for i in range(cengshu): folder = os.path.dirname(folder) df[v] = str(str(folder).split(os.sep)[-1]).strip() return df def _save_to_tmp_csv_by_name(self, df, name): save_name = str(name) + '.csv' save_path = os.path.join(self.get_read_tmp_path(), save_name) create_file_path(save_path, is_file_path=True) with self.lock: if name in self.exist_wind_names: contains_name = True else: contains_name = False self.exist_wind_names.append(name) if contains_name: df.to_csv(save_path, index=False, encoding='utf8', mode='a', header=False) else: df.to_csv(save_path, index=False, encoding='utf8') def save_to_tmp_csv(self, df, file): trans_print("开始保存", str(file), "到临时文件") names = set(df['wind_turbine_number'].values) with multiprocessing.Pool(6) as pool: pool.starmap(self._save_to_tmp_csv_by_name, [(df[df['wind_turbine_number'] == name], name) for name in names]) del df trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names)) def set_statistics_data(self, df): if not df.empty: min_date = pd.to_datetime(df['time_stamp']).min() max_date = pd.to_datetime(df['time_stamp']).max() with self.lock: if 'min_date' in self.statistics_map.keys(): if self.statistics_map['min_date'] > min_date: self.statistics_map['min_date'] = min_date else: self.statistics_map['min_date'] = min_date if 'max_date' in self.statistics_map.keys(): if self.statistics_map['max_date'] < max_date: self.statistics_map['max_date'] = max_date else: self.statistics_map['max_date'] = max_date if 'total_count' in self.statistics_map.keys(): self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0] else: self.statistics_map['total_count'] = df.shape[0] def save_statistics_file(self): save_path = os.path.join(os.path.dirname(self.get_save_path()), self.trans_param.read_type + '_statistics.txt') create_file_path(save_path, is_file_path=True) with open(save_path, 'w', encoding='utf8') as f: f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n") f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n") f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n") f.write("风机数量:" + str(len(read_excel_files(self.get_read_tmp_path()))) + "\n") def save_to_csv(self, filename): df = read_file_to_df(filename) if self.trans_param.is_vertical_table: df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key, values=self.trans_param.vertical_value, aggfunc='max') # 重置索引以得到普通的列 df.reset_index(inplace=True) for k in self.trans_param.cols_tran.keys(): if k not in df.columns: df[k] = None df = df[self.trans_param.cols_tran.keys()] # 转化风机名称 trans_print("开始转化风机名称") # if self.trans_param.wind_name_exec: # exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )" # df['wind_turbine_number'] = eval(exec_str) df['wind_turbine_number'] = df['wind_turbine_number'].astype('str') df['wind_turbine_number'] = df['wind_turbine_number'].map( self.wind_col_trans).fillna( df['wind_turbine_number']) wind_col_name = str(df['wind_turbine_number'].values[0]) # 添加年月日 trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename) trans_print(wind_col_name, "时间原始大小:", df.shape[0]) df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)] trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0]) df['time_stamp'] = pd.to_datetime(df['time_stamp']) df['year'] = df['time_stamp'].dt.year df['month'] = df['time_stamp'].dt.month df['day'] = df['time_stamp'].dt.day df.sort_values(by='time_stamp', inplace=True) df['time_stamp'] = df['time_stamp'].apply( lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) trans_print("处理时间字段结束") # 如果包含*号,祛除 trans_print(wind_col_name, "过滤星号前大小:", df.shape[0]) mask = ~df.applymap(lambda x: isinstance(x, str) and '*' in x).any(axis=1) df = df[mask] trans_print(wind_col_name, "过滤星号后大小:", df.shape[0]) trans_print(wind_col_name, "转化风机名称结束") if self.save_zip: save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv.gz') else: save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv') create_file_path(save_path, is_file_path=True) if self.save_zip: df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8') else: df.to_csv(save_path, index=False, encoding='utf-8') self.set_statistics_data(df) del df trans_print("保存" + str(filename) + ".csv成功") def remove_file_to_tmp_path(self): # 读取文件 try: if os.path.isfile(self.trans_param.read_path): all_files = [self.trans_param.read_path] else: all_files = read_files(self.trans_param.read_path) with multiprocessing.Pool(6) as pool: pool.starmap(self.get_and_remove, [(i,) for i in all_files]) all_files = read_excel_files(self.get_excel_tmp_path()) trans_print('读取文件数量:', len(all_files)) except Exception as e: trans_print(traceback.format_exc()) message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e) raise ValueError(message) return all_files def read_file_and_save_tmp(self): all_files = read_excel_files(self.get_excel_tmp_path()) if self.trans_param.merge_columns: dfs_list = list() index_keys = [self.trans_param.cols_tran['time_stamp']] wind_col = self.trans_param.cols_tran['wind_turbine_number'] if str(wind_col).startswith("$"): wind_col = 'wind_turbine_number' index_keys.append(wind_col) df_map = dict() with multiprocessing.Pool(6) as pool: dfs = pool.starmap(self.read_excel_to_df, [(file,) for file in all_files]) for df in dfs: key = '-'.join(df.columns) if key in df_map.keys(): df_map[key] = pd.concat([df_map[key], df]) else: df_map[key] = df for k, df in df_map.items(): df.drop_duplicates(inplace=True) df.set_index(keys=index_keys, inplace=True) df = df[~df.index.duplicated(keep='first')] dfs_list.append(df) df = pd.concat(dfs_list, axis=1) df.reset_index(inplace=True) try: self.df_save_to_tmp_file(df, "") except Exception as e: trans_print(traceback.format_exc()) message = "合并列出现错误:" + str(e) raise ValueError(message) else: split_count = 6 all_arrays = split_array(all_files, split_count) for arr in all_arrays: with multiprocessing.Pool(split_count) as pool: dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr]) try: for df in dfs: self.df_save_to_tmp_file(df) except Exception as e: trans_print(traceback.format_exc()) message = "整理临时文件,系统返回错误:" + str(e) raise ValueError(message) def mutiprocessing_to_save_file(self): # 开始保存到正式文件 trans_print("开始保存到excel文件") all_tmp_files = read_excel_files(self.get_read_tmp_path()) try: with multiprocessing.Pool(6) as pool: pool.starmap(self.save_to_csv, [(file,) for file in all_tmp_files]) except Exception as e: trans_print(traceback.format_exc()) message = "保存文件错误,系统返回错误:" + str(e) raise ValueError(message) trans_print("结束保存到excel文件") def mutiprocessing_to_save_db(self): # 开始保存到SQL文件 trans_print("开始保存到数据库文件") all_saved_files = read_excel_files(self.get_save_path()) table_name = self.batch_no + "_" + self.trans_param.read_type creat_table_and_add_partition(table_name, len(all_saved_files), self.trans_param.read_type) try: with multiprocessing.Pool(6) as pool: pool.starmap(save_file_to_db, [(table_name, file, self.batch_count) for file in all_saved_files]) except Exception as e: trans_print(traceback.format_exc()) message = "保存到数据库错误,系统返回错误:" + str(e) raise ValueError(message) trans_print("结束保存到数据库文件") def _rename_file(self): save_path = self.get_save_path() files = os.listdir(save_path) files.sort(key=lambda x: int(str(x).split(os.sep)[-1].split(".")[0][1:])) for index, file in enumerate(files): file_path = os.path.join(save_path, 'F' + str(index + 1).zfill(3) + ".csv.gz") os.rename(os.path.join(save_path, file), file_path) def delete_batch_files(self): trans_print("开始删除已存在的批次文件夹") if os.path.exists(self.get_save_path()): shutil.rmtree(self.get_save_path()) trans_print("删除已存在的批次文件夹") def delete_tmp_files(self): trans_print("开始删除临时文件夹") if os.path.exists(self.get_save_tmp_path()): shutil.rmtree(self.get_save_tmp_path()) trans_print("删除临时文件夹删除成功") def delete_batch_db(self): if self.save_db: table_name = "_".join([self.batch_no, self.trans_param.read_type]) renamed_table_name = "del_" + table_name + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S') # rename_table(table_name, renamed_table_name, self.save_db) drop_table(table_name, self.save_db) def run(self, step=0, end=3): begin = datetime.datetime.now() trans_print("开始执行") update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db) if step <= 0 and end >= 0: tmp_begin = datetime.datetime.now() trans_print("开始初始化字段") self.delete_batch_files() self.delete_batch_db() self.params_valid([self.batch_no, self.field_code, self.save_path, self.trans_param.read_type, self.trans_param.read_path, self.wind_full_name]) # if self.trans_param.resolve_col_prefix: # column = "测试" # eval(self.trans_param.resolve_col_prefix) # # if self.trans_param.wind_name_exec: # wind_name = "测试" # eval(self.trans_param.wind_name_exec) trans_print("初始化字段结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:", str(datetime.datetime.now() - begin)) if step <= 1 and end >= 1: # 更新运行状态到运行中 tmp_begin = datetime.datetime.now() self.delete_tmp_files() trans_print("开始保存到临时路径") # 开始读取数据并分类保存临时文件 self.remove_file_to_tmp_path() trans_print("保存到临时路径结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:", str(datetime.datetime.now() - begin)) if step <= 2 and end >= 2: # 更新运行状态到运行中 tmp_begin = datetime.datetime.now() trans_print("开始保存到临时文件") # 开始读取数据并分类保存临时文件 self.read_file_and_save_tmp() trans_print("保存到临时文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:", str(datetime.datetime.now() - begin)) if step <= 3 and end >= 3: tmp_begin = datetime.datetime.now() trans_print("开始保存到文件") self.mutiprocessing_to_save_file() self.save_statistics_file() trans_print("保存到文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:", str(datetime.datetime.now() - begin)) if step <= 4 and end >= 4: if self.save_db: trans_print("开始保存到数据库") tmp_begin = datetime.datetime.now() self.mutiprocessing_to_save_db() trans_print("保存到数据库结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:", str(datetime.datetime.now() - begin)) # 如果end==0 则说明只是进行了验证 if end != 0: update_trans_status_success(self.batch_no, self.trans_param.read_type, len(read_excel_files(self.get_read_tmp_path())), self.save_db) self.delete_tmp_files() trans_print("结束执行", self.trans_param.read_type, ",总耗时:", str(datetime.datetime.now() - begin))