# -*- coding: utf-8 -*- # @Time : 2024/5/15 # @Author : 魏志亮 import datetime import shutil from base.TranseParam import TranseParam from utils.db.trans_mysql import * from utils.log.trans_log import logger from utils.trans_methods import * from utils.zip.unzip import unzip class WindFarms(object): def __init__(self, name, batch_no=None, save_path=None, params: TranseParam = None): self.name = name self.batch_no = batch_no self.save_path = save_path self.begin = datetime.datetime.now() self.next_time = datetime.datetime.now() self.is_zip = False self.save_zip = False self.trans_param = params def set_trans_param(self, params: TranseParam): self.trans_param = params def __params_valid(self, not_null_list=list()): for arg in not_null_list: if arg is None or arg == '': raise Exception("Invalid param set :" + arg) def __get_save_path(self): return os.path.join(self.save_path, self.name, self.batch_no) def __get_zip_tmp_path(self): return os.path.join(self.__get_save_path(), 'save_tmp') def __get_read_tmp_path(self): return os.path.join(self.__get_save_path(), 'read_tmp') def get_excel_files(self): if self.is_zip: is_success, e = unzip(self.trans_param.read_path, self.__get_zip_tmp_path()) if is_success: self.trans_param.read_path = self.__get_zip_tmp_path() else: raise e return read_excel_files(self.trans_param.read_path) def read_excel_to_df(self, file): return read_file_to_df(file, self.trans_param.read_cols) def save_to_csv(self, df, filename): save_name = str(filename) + ('.csv' if self.save_zip else '.csv.gz') save_path = os.path.join(self.save_path, self.name, self.batch_no, self.trans_param.read_type, save_name) create_file_path(save_path, is_file_path=True) if self.save_zip: df[df[self.trans_param.wind_col] == filename].to_csv(save_path, compression='.gzip', index=False) else: df[df[self.trans_param.wind_col] == filename].to_csv(save_path, index=False) trans_print("保存" + str(filename) + ".csv成功") def save_to_db(self, df, filename): df.to_sql(name=str(self.batch_no), con=engine.connect(), index=False, if_exists='append', chunksize=1000000) trans_print("文件:", filename, "保存数据库成功") def run(self): trans_print("开始执行", self.name, self.trans_param.read_type) self.__params_valid([self.name, self.batch_no, self.save_path, self.trans_param.read_type, self.trans_param.read_path, self.trans_param.time_col, self.trans_param.wind_col]) # 读取文件 try: all_files = self.get_excel_files() trans_print('读取文件数量:', len(all_files)) except Exception as e: logger.exception(e) message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e) update_transe_status(self.batch_no, self.trans_param.read_type, "error", message) raise e # 开始读取数据 df = pd.DataFrame() for file in all_files: try: df = pd.concat([df, self.read_excel_to_df(file)]) except Exception as e: logger.exception(e) message = "读取文件错误:" + file + ",系统返回错误:" + str(e) update_transe_status(self.batch_no, self.trans_param.read_type, "error", message) raise e # 转换字段 if self.trans_param.cols_tran: cols_tran = self.trans_param.cols_tran real_cols_trans = dict() for k, v in cols_tran.items(): if v: real_cols_trans[v] = k logger.info("包含转换字段,开始处理转换字段") df.rename(columns=real_cols_trans, inplace=True) if self.trans_param.wind_col in real_cols_trans.keys(): self.trans_param.wind_col = real_cols_trans[self.trans_param.wind_col] for k in cols_tran.keys(): if k not in df.columns: df[k] = None # 添加年月日 if self.trans_param.time_col: logger.info("包含时间字段,开始处理时间字段,添加年月日") df[self.trans_param.time_col] = pd.to_datetime(df[self.trans_param.time_col]) df['year'] = df[self.trans_param.time_col].dt.year df['month'] = df[self.trans_param.time_col].dt.month df['day'] = df[self.trans_param.time_col].dt.day df.sort_values(by=self.trans_param.time_col, inplace=True) logger.info("处理时间字段结束") # 开始保存 try: names = set(df[self.trans_param.wind_col]) trans_print(names, self.trans_param.wind_col) for filename in names: self.save_to_csv(df[df[self.trans_param.wind_col] == filename], filename) # self.save_to_db(df[df[self.trans_param.wind_col] == filename], filename) except Exception as e: logger.exception(e) message = "保存文件错误:" + self.save_path + ",系统返回错误:" + str(e) update_transe_status(self.batch_no, self.trans_param.read_type, "error", message) raise e update_transe_status(self.batch_no, self.trans_param.read_type, "success", "") if self.is_zip: trans_print("开始删除解压进临时文件夹") shutil.rmtree(self.__get_zip_tmp_path()) trans_print("删除解压进临时文件夹删除成功") if __name__ == '__main__': aa = WindFarms("test", "test_path") aa.run()