PathsAndTable.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import shutil
  2. from os import path, sep
  3. from conf.constants import Paths
  4. from service.trans_service import creat_min_sec_table, create_warn_fault_table
  5. from utils.conf.read_conf import *
  6. from utils.log.trans_log import info
  7. class PathsAndTable(object):
  8. """路径和表管理类"""
  9. def __init__(self, id: int = None, task_name: str = None, read_dir: str = None, wind_farm_code: str = None,
  10. wind_farm_name: str = None, read_type: str = None, save_db: bool = True,
  11. save_zip: bool = True, yaml_config: dict = None, wind_col_trans: dict = None):
  12. """
  13. 初始化路径和表管理类
  14. Args:
  15. id: 任务ID
  16. task_name: 任务名称
  17. read_dir: 读取目录
  18. wind_farm_code: 风电场编码
  19. wind_farm_name: 风电场名称
  20. read_type: 读取类型
  21. save_db: 是否保存到数据库
  22. save_zip: 是否保存为压缩文件
  23. yaml_config: YAML配置
  24. wind_col_trans: 风机列转换映射
  25. """
  26. self.id = id
  27. self.task_name = task_name
  28. self.read_dir = read_dir
  29. self.wind_farm_code = wind_farm_code
  30. self.wind_farm_name = wind_farm_name
  31. self.read_type = read_type
  32. self.save_db = save_db
  33. self.save_zip = save_zip
  34. self.multi_pool_count = 6
  35. self.yaml_config = yaml_config
  36. self.wind_col_trans = wind_col_trans
  37. save_path_conf = read_conf(yaml_config, "save_path")
  38. self.use_tidb = read_conf(yaml_config, 'use_tidb', False)
  39. self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", Paths.DEFAULT_TMP_BASE_PATH)
  40. if save_path_conf:
  41. self.save_path = save_path_conf + sep + self.wind_farm_name
  42. else:
  43. find_index = read_dir.find(read_conf(yaml_config, 'etl_origin_path_contain', "收资数据"))
  44. if find_index == -1:
  45. raise Exception("路径未包含原始数据特定字符:" + read_dir)
  46. self.save_path = read_dir[0:find_index] + sep + "清理数据"
  47. if self.save_path is None:
  48. raise Exception("未配置保存路径:" + read_dir)
  49. self.archive_path = read_conf(yaml_config, "archive_path", Paths.DEFAULT_ARCHIVE_PATH)
  50. def get_save_path(self) -> str:
  51. """
  52. 获取保存路径
  53. Returns:
  54. 保存路径
  55. """
  56. return path.join(self.save_path, self.read_type)
  57. def get_tmp_path(self) -> str:
  58. """
  59. 获取临时路径
  60. Returns:
  61. 临时路径
  62. """
  63. return str(path.join(self.tmp_base_path, str(self.id) + "_" + self.task_name + "_" + self.read_type))
  64. def get_excel_tmp_path(self) -> str:
  65. """
  66. 获取Excel临时路径
  67. Returns:
  68. Excel临时路径
  69. """
  70. return path.join(self.get_tmp_path(), 'excel_tmp' + sep)
  71. def get_read_tmp_path(self) -> str:
  72. """
  73. 获取读取临时路径
  74. Returns:
  75. 读取临时路径
  76. """
  77. return path.join(self.get_tmp_path(), 'read_tmp')
  78. def get_merge_tmp_path(self, wind_turbine_number=None) -> str:
  79. """
  80. 获取合并临时路径
  81. Args:
  82. wind_turbine_number: 风机编号
  83. Returns:
  84. 合并临时路径
  85. """
  86. if wind_turbine_number is None:
  87. return path.join(self.get_tmp_path(), 'merge_tmp')
  88. else:
  89. return path.join(self.get_tmp_path(), 'merge_tmp', str(wind_turbine_number))
  90. def get_tmp_formal_path(self) -> str:
  91. """
  92. 获取正式临时路径
  93. Returns:
  94. 正式临时路径
  95. """
  96. return path.join(self.get_tmp_path(), 'formal_tmp')
  97. def get_archive_path(self) -> str:
  98. """
  99. 获取归档路径
  100. Returns:
  101. 归档路径
  102. """
  103. return path.join(self.archive_path, self.wind_farm_name, self.read_type, f'{self.id}_{self.task_name}')
  104. def get_table_name(self) -> str:
  105. """
  106. 获取表名
  107. Returns:
  108. 表名
  109. """
  110. return "_".join([self.wind_farm_code, self.read_type])
  111. def delete_tmp_files(self) -> None:
  112. """
  113. 删除临时文件
  114. """
  115. info("开始删除临时文件夹")
  116. if path.exists(self.get_tmp_path()):
  117. shutil.rmtree(self.get_tmp_path())
  118. info("删除临时文件夹删除成功")
  119. def create_wind_farm_db(self) -> None:
  120. """
  121. 创建风电场数据库表
  122. """
  123. if self.save_db:
  124. info("开始创建表")
  125. if self.read_type in ['second', 'minute']:
  126. creat_min_sec_table(self.get_table_name(), self.read_type, self.wind_farm_name, self.use_tidb)
  127. elif self.read_type in ['fault', 'warn']:
  128. create_warn_fault_table(self.get_table_name(), self.wind_farm_name, )
  129. else:
  130. raise Exception("不支持的读取类型:" + self.read_type)
  131. info("建表结束")