import datetime import traceback from etl.common.ArchiveFile import ArchiveFile from etl.common.ClearData import ClearData from etl.common.PathsAndTable import PathsAndTable from etl.common.SaveToDb import SaveToDb from etl.common.UnzipAndRemove import UnzipAndRemove from service.plt_service import get_all_wind from service.trans_conf_service import update_trans_status_success, update_trans_status_error, \ update_trans_status_running from utils.file.trans_methods import read_excel_files from utils.log.trans_log import trans_print, set_trance_id class BaseDataTrans(object): def __init__(self, data: dict = None, save_db=True, yaml_config=None, step=0, end=6): self.id = data['id'] self.task_name = data['task_name'] self.transfer_type = data['transfer_type'] self.read_dir = data['read_dir'] self.wind_farm_code = data['wind_farm_code'] self.wind_farm_name = data['wind_farm_name'] self.yaml_config = yaml_config self.save_zip = False self.step = step self.end = end self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.wind_farm_code) self.batch_count = 100000 self.save_db = save_db self.filed_conf = self.get_filed_conf() self.update_files = list() try: self.pathsAndTable = PathsAndTable(self.id, self.task_name, self.read_dir, self.wind_farm_code, self.wind_farm_name, self.transfer_type, save_db, self.save_zip, self.yaml_config, self.wind_col_trans) except Exception as e: trans_print(traceback.format_exc()) update_trans_status_error(self.id, str(e), self.save_db) raise e def get_filed_conf(self): raise NotImplementedError("需要实现 获取点检表 方法") # 清理数据 def clean_file_and_db(self): clean_data = ClearData(self.pathsAndTable) clean_data.run() # 解压 移动到临时文件 def unzip_or_remove_to_tmp_dir(self): # 解压并删除 unzip_and_remove = UnzipAndRemove(self.pathsAndTable) unzip_and_remove.run() # 读取并保存到临时文件 def read_and_save_tmp_file(self): raise NotImplementedError("读取并保存到临时文件未做实现") # 读取并保存到临时正式文件 def statistics_and_save_tmp_formal_file(self): raise NotImplementedError("读取并保存到临时正式文件未做实现") # 归档文件 def archive_file(self): archive_file = ArchiveFile(self.pathsAndTable, self.id) archive_file.run() # 合并到正式文件 def combine_and_save_formal_file(self): raise NotImplementedError("合并到正式文件未做实现") # 保存到数据库 def save_to_db(self): save_to_db = SaveToDb(self.pathsAndTable, self.update_files, self.batch_count) save_to_db.run() # 最后更新执行程度 def update_exec_progress(self): update_trans_status_success(self.id, len(read_excel_files(self.pathsAndTable.get_save_path())), None, None, None, None, self.save_db) def run(self): total_begin = datetime.datetime.now() try: trance_id = '-'.join([str(self.id), self.wind_farm_name, self.transfer_type]) set_trance_id(trance_id) update_trans_status_running(self.id, self.save_db) now_index = 0 # 0 if self.step <= now_index <= self.end: begin = datetime.datetime.now() trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path()) self.clean_file_and_db() trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin) now_index = now_index + 1 # 1 if self.step <= now_index <= self.end: begin = datetime.datetime.now() trans_print("开始解压移动文件") self.unzip_or_remove_to_tmp_dir() trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin) now_index = now_index + 1 # 2 if self.step <= now_index <= self.end: begin = datetime.datetime.now() trans_print("开始保存数据到临时文件") self.read_and_save_tmp_file() trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin) now_index = now_index + 1 # 3 if self.step <= now_index <= self.end: begin = datetime.datetime.now() trans_print("开始保存到临时正式文件") self.statistics_and_save_tmp_formal_file() trans_print("保存到临时正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin) now_index = now_index + 1 # 4 if self.step <= now_index <= self.end: begin = datetime.datetime.now() trans_print("开始保存归档文件") self.archive_file() trans_print("保存到保存归档文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin) now_index = now_index + 1 # 5 if self.step <= now_index <= self.end: begin = datetime.datetime.now() trans_print("开始保存数据到正式文件") self.combine_and_save_formal_file() trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin) now_index = now_index + 1 # 6 if self.step <= now_index <= self.end: begin = datetime.datetime.now() trans_print("开始保存到数据库,是否存库:", self.pathsAndTable.save_db) self.save_to_db() trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - total_begin) self.update_exec_progress() except Exception as e: trans_print(traceback.format_exc()) update_trans_status_error(self.id, str(e), self.save_db) raise e finally: self.pathsAndTable.delete_tmp_files() trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))