import shutil from os import path, sep from service.trans_service import creat_min_sec_table, create_warn_fault_table from utils.conf.read_conf import * from utils.log.trans_log import trans_print class PathsAndTable(object): def __init__(self, id=None, task_name=None, read_dir=None, wind_farm_code=None, wind_farm_name=None, read_type=None, save_db=True, save_zip=True, yaml_config=None, wind_col_trans=None): self.id = id self.task_name = task_name self.read_dir = read_dir self.wind_farm_code = wind_farm_code self.wind_farm_name = wind_farm_name self.read_type = read_type self.save_db = save_db self.save_zip = save_zip self.multi_pool_count = 6 self.yaml_config = yaml_config self.wind_col_trans = wind_col_trans save_path_conf = read_conf(yaml_config, "save_path") self.use_tidb = read_conf(yaml_config, 'use_tidb', False) self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", "/tmp") if save_path_conf: self.save_path = save_path_conf + sep + self.wind_farm_name else: find_index = read_dir.find(read_conf(yaml_config, 'etl_origin_path_contain', "etl_origin_path_contain")) if find_index == -1: raise Exception("路径未包含原始数据特定字符:" + read_dir) self.save_path = read_dir[0:find_index] + sep + "清理数据" if self.save_path is None: raise Exception("未配置保存路径:" + read_dir) self.archive_path = read_conf(yaml_config, "archive_path", "/tmp/archive") def get_save_path(self): return path.join(self.save_path, self.read_type) def get_tmp_path(self): return str(path.join(self.tmp_base_path, str(self.id) + "_" + self.task_name + "_" + self.read_type)) def get_excel_tmp_path(self): return path.join(self.get_tmp_path(), 'excel_tmp' + sep) def get_read_tmp_path(self): return path.join(self.get_tmp_path(), 'read_tmp') def get_merge_tmp_path(self, wind_turbine_number=None): if wind_turbine_number is None: return path.join(self.get_tmp_path(), 'merge_tmp') else: return path.join(self.get_tmp_path(), 'merge_tmp', str(wind_turbine_number)) def get_tmp_formal_path(self): return path.join(self.get_tmp_path(), 'formal_tmp') def get_archive_path(self): return path.join(self.archive_path, self.wind_farm_name, self.read_type, f'{self.id}_{self.task_name}') def get_table_name(self): return "_".join([self.wind_farm_code, self.read_type]) def delete_tmp_files(self): trans_print("开始删除临时文件夹") if path.exists(self.get_tmp_path()): shutil.rmtree(self.get_tmp_path()) trans_print("删除临时文件夹删除成功") def create_wind_farm_db(self): if self.save_db: trans_print("开始创建表") if self.read_type in ['second', 'minute']: creat_min_sec_table(self.get_table_name(), self.read_type, self.use_tidb) elif self.read_type in ['fault', 'warn']: create_warn_fault_table(self.get_table_name()) else: raise Exception("不支持的读取类型:" + self.read_type) trans_print("建表结束")