PathsAndTable.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import shutil
  2. from os import path, sep
  3. from service.trans_service import creat_min_sec_table, create_warn_fault_table
  4. from utils.conf.read_conf import *
  5. from utils.log.trans_log import trans_print
  6. class PathsAndTable(object):
  7. def __init__(self, id=None, task_name=None, read_dir=None, wind_farm_code=None, wind_farm_name=None,
  8. read_type=None, save_db=True, save_zip=True, yaml_config=None, wind_col_trans=None):
  9. self.id = id
  10. self.task_name = task_name
  11. self.read_dir = read_dir
  12. self.wind_farm_code = wind_farm_code
  13. self.wind_farm_name = wind_farm_name
  14. self.read_type = read_type
  15. self.save_db = save_db
  16. self.save_zip = save_zip
  17. self.multi_pool_count = 6
  18. self.yaml_config = yaml_config
  19. self.wind_col_trans = wind_col_trans
  20. save_path_conf = read_conf(yaml_config, "save_path")
  21. self.use_tidb = read_conf(yaml_config, 'use_tidb', False)
  22. self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", "/tmp")
  23. if save_path_conf:
  24. self.save_path = save_path_conf + sep + self.wind_farm_name
  25. else:
  26. find_index = read_dir.find(read_conf(yaml_config, 'etl_origin_path_contain', "etl_origin_path_contain"))
  27. if find_index == -1:
  28. raise Exception("路径未包含原始数据特定字符:" + read_dir)
  29. self.save_path = read_dir[0:find_index] + sep + "清理数据"
  30. if self.save_path is None:
  31. raise Exception("未配置保存路径:" + read_dir)
  32. self.archive_path = read_conf(yaml_config, "archive_path", "/tmp/archive")
  33. def get_save_path(self):
  34. return path.join(self.save_path, self.read_type)
  35. def get_tmp_path(self):
  36. return str(path.join(self.tmp_base_path, str(self.id) + "_" + self.task_name + "_" + self.read_type))
  37. def get_excel_tmp_path(self):
  38. return path.join(self.get_tmp_path(), 'excel_tmp' + sep)
  39. def get_read_tmp_path(self):
  40. return path.join(self.get_tmp_path(), 'read_tmp')
  41. def get_merge_tmp_path(self, wind_turbine_number=None):
  42. if wind_turbine_number is None:
  43. return path.join(self.get_tmp_path(), 'merge_tmp')
  44. else:
  45. return path.join(self.get_tmp_path(), 'merge_tmp', str(wind_turbine_number))
  46. def get_tmp_formal_path(self):
  47. return path.join(self.get_tmp_path(), 'formal_tmp')
  48. def get_archive_path(self):
  49. return path.join(self.archive_path, self.wind_farm_name, self.read_type, f'{self.id}_{self.task_name}')
  50. def get_table_name(self):
  51. return "_".join([self.wind_farm_code, self.read_type])
  52. def delete_tmp_files(self):
  53. trans_print("开始删除临时文件夹")
  54. if path.exists(self.get_tmp_path()):
  55. shutil.rmtree(self.get_tmp_path())
  56. trans_print("删除临时文件夹删除成功")
  57. def create_wind_farm_db(self):
  58. if self.save_db:
  59. trans_print("开始创建表")
  60. if self.read_type in ['second', 'minute']:
  61. creat_min_sec_table(self.get_table_name(), self.read_type, self.use_tidb)
  62. elif self.read_type in ['fault', 'warn']:
  63. create_warn_fault_table(self.get_table_name())
  64. else:
  65. raise Exception("不支持的读取类型:" + self.read_type)
  66. trans_print("建表结束")