1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import os
- import shutil
- import tempfile
- import yaml
- from service.trans_service import drop_table, creat_table_and_add_partition
- from utils.log.trans_log import trans_print
- from utils.conf.read_conf import *
- class PathsAndTable(object):
- def __init__(self, batch_no=None, batch_name=None, read_path=None, field_name=None, read_type=None,
- save_db=True, save_zip=True):
- self.batch_no = batch_no
- self.batch_name = batch_name
- self.read_path = read_path
- self.field_name = field_name
- self.read_type = read_type
- self.save_db = save_db
- self.save_zip = save_zip
- self.multi_pool_count = 6
- yaml_config = yaml_conf(r"/data/config/etl_config.yaml")
- save_path_conf = read_conf(yaml_config, "save_path")
- if save_path_conf:
- self.save_path = save_path_conf + os.sep + self.field_name
- else:
- find_index = read_path.find(read_conf(yaml_config, 'etl_origin_path_contain', "etl_origin_path_contain"))
- if find_index == -1:
- raise Exception("路径未包含原始数据特定字符:" + read_path)
- self.save_path = read_path[0:find_index] + os.sep + "清理数据"
- if self.save_path is None:
- raise Exception("未配置保存路径:" + read_path)
- def get_save_path(self):
- return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
- def get_save_tmp_path(self):
- return os.path.join(tempfile.gettempdir(), self.field_name, self.batch_no + "_" + self.batch_name,
- self.read_type)
- def get_excel_tmp_path(self):
- return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
- def get_read_tmp_path(self):
- return os.path.join(self.get_save_tmp_path(), 'read_tmp')
- def get_table_name(self):
- return "_".join([self.batch_no, self.read_type])
- def delete_batch_files(self):
- trans_print("开始删除已存在的批次文件夹")
- if os.path.exists(self.get_save_path()):
- shutil.rmtree(self.get_save_path())
- trans_print("删除已存在的批次文件夹")
- def delete_tmp_files(self):
- trans_print("开始删除临时文件夹")
- if os.path.exists(self.get_save_tmp_path()):
- shutil.rmtree(self.get_save_tmp_path())
- trans_print("删除临时文件夹删除成功")
- def delete_batch_db(self):
- if self.save_db:
- trans_print("开始删除表")
- table_name = self.get_table_name()
- drop_table(table_name, self.save_db)
- trans_print("删除表结束")
- def create_batch_db(self, count=1):
- if self.save_db:
- trans_print("开始创建表")
- creat_table_and_add_partition(self.get_table_name(), count, self.read_type)
- trans_print("建表结束")
|