import os import shutil from service.trans_service import drop_table, creat_table_and_add_partition from utils.conf.read_conf import * from utils.log.trans_log import trans_print class PathsAndTable(object): def __init__(self, batch_no=None, batch_name=None, read_path=None, field_name=None, read_type=None, save_db=True, save_zip=True): self.batch_no = batch_no self.batch_name = batch_name self.read_path = read_path self.field_name = field_name self.read_type = read_type self.save_db = save_db self.save_zip = save_zip self.multi_pool_count = 6 yaml_config = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml")) self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", "/tmp") save_path_conf = read_conf(yaml_config, "save_path") if save_path_conf: self.save_path = save_path_conf + os.sep + self.field_name else: find_index = read_path.find(read_conf(yaml_config, 'etl_origin_path_contain', "etl_origin_path_contain")) if find_index == -1: raise Exception("路径未包含原始数据特定字符:" + read_path) self.save_path = read_path[0:find_index] + os.sep + "清理数据" if self.save_path is None: raise Exception("未配置保存路径:" + read_path) def get_save_path(self): return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type) def get_save_tmp_path(self): return os.path.join(self.tmp_base_path, self.field_name, self.batch_no + "_" + self.batch_name, self.read_type) def get_excel_tmp_path(self): return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep) def get_read_tmp_path(self): return os.path.join(self.get_save_tmp_path(), 'read_tmp') def get_merge_tmp_path(self, wind_turbine_number=None): if wind_turbine_number is None: return os.path.join(self.get_save_tmp_path(), 'merge_tmp') else: return os.path.join(self.get_save_tmp_path(), 'merge_tmp', str(wind_turbine_number)) def get_table_name(self): return "_".join([self.batch_no, self.read_type]) def delete_batch_files(self): trans_print("开始删除已存在的批次文件夹") if os.path.exists(self.get_save_path()): shutil.rmtree(self.get_save_path()) trans_print("删除已存在的批次文件夹") def delete_tmp_files(self): trans_print("开始删除临时文件夹") if os.path.exists(self.get_save_tmp_path()): shutil.rmtree(self.get_save_tmp_path()) trans_print("删除临时文件夹删除成功") def delete_batch_db(self): if self.save_db: trans_print("开始删除表") table_name = self.get_table_name() drop_table(table_name, self.save_db) trans_print("删除表结束") def create_batch_db(self, wind_names=list()): if self.save_db: trans_print("开始创建表") creat_table_and_add_partition(self.get_table_name(), wind_names, self.read_type) trans_print("建表结束")