import shutil from os import path, sep from conf.constants import Paths from service.trans_service import creat_min_sec_table, create_warn_fault_table from utils.conf.read_conf import * from utils.log.trans_log import info class PathsAndTable(object): """路径和表管理类""" def __init__(self, id: int = None, task_name: str = None, read_dir: str = None, wind_farm_code: str = None, wind_farm_name: str = None, read_type: str = None, save_db: bool = True, save_zip: bool = True, yaml_config: dict = None, wind_col_trans: dict = None): """ 初始化路径和表管理类 Args: id: 任务ID task_name: 任务名称 read_dir: 读取目录 wind_farm_code: 风电场编码 wind_farm_name: 风电场名称 read_type: 读取类型 save_db: 是否保存到数据库 save_zip: 是否保存为压缩文件 yaml_config: YAML配置 wind_col_trans: 风机列转换映射 """ self.id = id self.task_name = task_name self.read_dir = read_dir self.wind_farm_code = wind_farm_code self.wind_farm_name = wind_farm_name self.read_type = read_type self.save_db = save_db self.save_zip = save_zip self.multi_pool_count = 6 self.yaml_config = yaml_config self.wind_col_trans = wind_col_trans save_path_conf = read_conf(yaml_config, "save_path") self.use_tidb = read_conf(yaml_config, 'use_tidb', False) self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", Paths.DEFAULT_TMP_BASE_PATH) if save_path_conf: self.save_path = save_path_conf + sep + self.wind_farm_name else: find_index = read_dir.find(read_conf(yaml_config, 'etl_origin_path_contain', "收资数据")) if find_index == -1: raise Exception("路径未包含原始数据特定字符:" + read_dir) self.save_path = read_dir[0:find_index] + sep + "清理数据" if self.save_path is None: raise Exception("未配置保存路径:" + read_dir) self.archive_path = read_conf(yaml_config, "archive_path", Paths.DEFAULT_ARCHIVE_PATH) def get_save_path(self) -> str: """ 获取保存路径 Returns: 保存路径 """ return path.join(self.save_path, self.read_type) def get_tmp_path(self) -> str: """ 获取临时路径 Returns: 临时路径 """ return str(path.join(self.tmp_base_path, str(self.id) + "_" + self.task_name + "_" + self.read_type)) def get_excel_tmp_path(self) -> str: """ 获取Excel临时路径 Returns: Excel临时路径 """ return path.join(self.get_tmp_path(), 'excel_tmp' + sep) def get_read_tmp_path(self) -> str: """ 获取读取临时路径 Returns: 读取临时路径 """ return path.join(self.get_tmp_path(), 'read_tmp') def get_merge_tmp_path(self, wind_turbine_number=None) -> str: """ 获取合并临时路径 Args: wind_turbine_number: 风机编号 Returns: 合并临时路径 """ if wind_turbine_number is None: return path.join(self.get_tmp_path(), 'merge_tmp') else: return path.join(self.get_tmp_path(), 'merge_tmp', str(wind_turbine_number)) def get_tmp_formal_path(self) -> str: """ 获取正式临时路径 Returns: 正式临时路径 """ return path.join(self.get_tmp_path(), 'formal_tmp') def get_archive_path(self) -> str: """ 获取归档路径 Returns: 归档路径 """ return path.join(self.archive_path, self.wind_farm_name, self.read_type, f'{self.id}_{self.task_name}') def get_table_name(self) -> str: """ 获取表名 Returns: 表名 """ return "_".join([self.wind_farm_code, self.read_type]) def delete_tmp_files(self) -> None: """ 删除临时文件 """ info("开始删除临时文件夹") if path.exists(self.get_tmp_path()): shutil.rmtree(self.get_tmp_path()) info("删除临时文件夹删除成功") def create_wind_farm_db(self) -> None: """ 创建风电场数据库表 """ if self.save_db: info("开始创建表") if self.read_type in ['second', 'minute']: creat_min_sec_table(self.get_table_name(), self.read_type, self.wind_farm_name, self.use_tidb) elif self.read_type in ['fault', 'warn']: create_warn_fault_table(self.get_table_name(), self.wind_farm_name, ) else: raise Exception("不支持的读取类型:" + self.read_type) info("建表结束")