123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- import datetime
- import traceback
- from etl.common.ArchiveFile import ArchiveFile
- from etl.common.ClearData import ClearData
- from etl.common.PathsAndTable import PathsAndTable
- from etl.common.SaveToDb import SaveToDb
- from etl.common.UnzipAndRemove import UnzipAndRemove
- from service.plt_service import get_all_wind
- from service.trans_conf_service import update_trans_status_success, update_trans_status_error, \
- update_trans_status_running
- from utils.file.trans_methods import read_excel_files
- from utils.log.trans_log import trans_print, set_trance_id
- class BaseDataTrans(object):
- def __init__(self, data: dict = None, save_db=True, yaml_config=None, step=0, end=999):
- self.id = data['id']
- self.task_name = data['task_name']
- self.transfer_type = data['transfer_type']
- self.read_dir = data['read_dir']
- self.wind_farm_code = data['wind_farm_code']
- self.wind_farm_name = data['wind_farm_name']
- self.yaml_config = yaml_config
- self.save_zip = False
- self.step = step
- self.end = end
- self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.wind_farm_code)
- self.batch_count = 100000
- self.save_db = save_db
- self.filed_conf = self.get_filed_conf()
- self.update_files = list()
- try:
- self.pathsAndTable = PathsAndTable(self.id, self.task_name, self.read_dir, self.wind_farm_code,
- self.wind_farm_name, self.transfer_type, save_db, self.save_zip,
- self.yaml_config, self.wind_col_trans)
- except Exception as e:
- trans_print(traceback.format_exc())
- update_trans_status_error(self.id, str(e), self.save_db)
- raise e
- def get_filed_conf(self):
- raise NotImplementedError("需要实现 获取点检表 方法")
- # 清理数据
- def clean_file_and_db(self):
- clean_data = ClearData(self.pathsAndTable)
- clean_data.run()
- # 解压 移动到临时文件
- def unzip_or_remove_to_tmp_dir(self):
- # 解压并删除
- unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
- unzip_and_remove.run()
- # 读取并保存到临时文件
- def read_and_save_tmp_file(self):
- raise NotImplementedError("读取并保存到临时文件未做实现")
- # 读取并保存到临时正式文件
- def statistics_and_save_tmp_formal_file(self):
- raise NotImplementedError("读取并保存到临时正式文件未做实现")
- # 归档文件
- def archive_file(self):
- archive_file = ArchiveFile(self.pathsAndTable, self.id)
- archive_file.run()
- # 合并到正式文件
- def combine_and_save_formal_file(self):
- raise NotImplementedError("合并到正式文件未做实现")
- # 保存到数据库
- def save_to_db(self):
- save_to_db = SaveToDb(self.pathsAndTable, self.update_files, self.batch_count)
- save_to_db.run()
- # 最后更新执行程度
- def update_exec_progress(self):
- update_trans_status_success(self.id,
- len(read_excel_files(self.pathsAndTable.get_save_path())),
- None, None, None, None, self.save_db)
- def run(self):
- total_begin = datetime.datetime.now()
- try:
- trance_id = '-'.join([str(self.id), self.wind_farm_name, self.transfer_type])
- set_trance_id(trance_id)
- update_trans_status_running(self.id, self.save_db)
- now_index = 0
- # 0
- if self.step <= now_index <= self.end:
- begin = datetime.datetime.now()
- trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
- self.clean_file_and_db()
- trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
- now_index = now_index + 1
- # 1
- if self.step <= now_index <= self.end:
- begin = datetime.datetime.now()
- trans_print("开始解压移动文件")
- self.unzip_or_remove_to_tmp_dir()
- trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
- now_index = now_index + 1
- # 2
- if self.step <= now_index <= self.end:
- begin = datetime.datetime.now()
- trans_print("开始保存数据到临时文件")
- self.read_and_save_tmp_file()
- trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
- now_index = now_index + 1
- # 3
- if self.step <= now_index <= self.end:
- begin = datetime.datetime.now()
- trans_print("开始保存到临时正式文件")
- self.statistics_and_save_tmp_formal_file()
- trans_print("保存到临时正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
- now_index = now_index + 1
- # 4
- if self.step <= now_index <= self.end:
- begin = datetime.datetime.now()
- trans_print("开始保存归档文件")
- self.archive_file()
- trans_print("保存到保存归档文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
- now_index = now_index + 1
- # 5
- if self.step <= now_index <= self.end:
- begin = datetime.datetime.now()
- trans_print("开始保存数据到正式文件")
- self.combine_and_save_formal_file()
- trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
- now_index = now_index + 1
- # 6
- if self.step <= now_index <= self.end:
- begin = datetime.datetime.now()
- trans_print("开始保存到数据库,是否存库:", self.pathsAndTable.save_db)
- self.save_to_db()
- trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
- datetime.datetime.now() - total_begin)
- self.update_exec_progress()
- except Exception as e:
- trans_print(traceback.format_exc())
- update_trans_status_error(self.id, str(e), self.save_db)
- raise e
- finally:
- self.pathsAndTable.delete_tmp_files()
- trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))
|