PathsAndTable.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import os
  2. import shutil
  3. from service.trans_service import drop_table, creat_table_and_add_partition
  4. from utils.conf.read_conf import *
  5. from utils.log.trans_log import trans_print
  6. class PathsAndTable(object):
  7. def __init__(self, batch_no=None, batch_name=None, read_path=None, field_name=None, read_type=None,
  8. save_db=True, save_zip=True):
  9. self.batch_no = batch_no
  10. self.batch_name = batch_name
  11. self.read_path = read_path
  12. self.field_name = field_name
  13. self.read_type = read_type
  14. self.save_db = save_db
  15. self.save_zip = save_zip
  16. self.multi_pool_count = 6
  17. yaml_config = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
  18. self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", "/tmp")
  19. save_path_conf = read_conf(yaml_config, "save_path")
  20. if save_path_conf:
  21. self.save_path = save_path_conf + os.sep + self.field_name
  22. else:
  23. find_index = read_path.find(read_conf(yaml_config, 'etl_origin_path_contain', "etl_origin_path_contain"))
  24. if find_index == -1:
  25. raise Exception("路径未包含原始数据特定字符:" + read_path)
  26. self.save_path = read_path[0:find_index] + os.sep + "清理数据"
  27. if self.save_path is None:
  28. raise Exception("未配置保存路径:" + read_path)
  29. def get_save_path(self):
  30. return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
  31. def get_save_tmp_path(self):
  32. return os.path.join(self.tmp_base_path, self.field_name, self.batch_no + "_" + self.batch_name,
  33. self.read_type)
  34. def get_excel_tmp_path(self):
  35. return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
  36. def get_read_tmp_path(self):
  37. return os.path.join(self.get_save_tmp_path(), 'read_tmp')
  38. def get_merge_tmp_path(self, wind_turbine_number=None):
  39. if wind_turbine_number is None:
  40. return os.path.join(self.get_save_tmp_path(), 'merge_tmp')
  41. else:
  42. return os.path.join(self.get_save_tmp_path(), 'merge_tmp', str(wind_turbine_number))
  43. def get_table_name(self):
  44. return "_".join([self.batch_no, self.read_type])
  45. def delete_batch_files(self):
  46. trans_print("开始删除已存在的批次文件夹")
  47. if os.path.exists(self.get_save_path()):
  48. shutil.rmtree(self.get_save_path())
  49. trans_print("删除已存在的批次文件夹")
  50. def delete_tmp_files(self):
  51. trans_print("开始删除临时文件夹")
  52. if os.path.exists(self.get_save_tmp_path()):
  53. shutil.rmtree(self.get_save_tmp_path())
  54. trans_print("删除临时文件夹删除成功")
  55. def delete_batch_db(self):
  56. if self.save_db:
  57. trans_print("开始删除表")
  58. table_name = self.get_table_name()
  59. drop_table(table_name, self.save_db)
  60. trans_print("删除表结束")
  61. def create_batch_db(self, wind_names=list()):
  62. if self.save_db:
  63. trans_print("开始创建表")
  64. creat_table_and_add_partition(self.get_table_name(), wind_names, self.read_type)
  65. trans_print("建表结束")