PathsAndTable.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import os
  2. import shutil
  3. import tempfile
  4. import yaml
  5. from service.trans_service import drop_table, creat_table_and_add_partition
  6. from utils.log.trans_log import trans_print
  7. from utils.conf.read_conf import *
  8. class PathsAndTable(object):
  9. def __init__(self, batch_no=None, batch_name=None, read_path=None, field_name=None, read_type=None,
  10. save_db=True, save_zip=True):
  11. self.batch_no = batch_no
  12. self.batch_name = batch_name
  13. self.read_path = read_path
  14. self.field_name = field_name
  15. self.read_type = read_type
  16. self.save_db = save_db
  17. self.save_zip = save_zip
  18. self.multi_pool_count = 6
  19. yaml_config = yaml_conf(r"/data/config/etl_config.yaml")
  20. save_path_conf = read_conf(yaml_config, "save_path")
  21. if save_path_conf:
  22. self.save_path = save_path_conf + os.sep + self.field_name
  23. else:
  24. find_index = read_path.find(read_conf(yaml_config, 'etl_origin_path_contain', "etl_origin_path_contain"))
  25. if find_index == -1:
  26. raise Exception("路径未包含原始数据特定字符:" + read_path)
  27. self.save_path = read_path[0:find_index] + os.sep + "清理数据"
  28. if self.save_path is None:
  29. raise Exception("未配置保存路径:" + read_path)
  30. def get_save_path(self):
  31. return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
  32. def get_save_tmp_path(self):
  33. return os.path.join(tempfile.gettempdir(), self.field_name, self.batch_no + "_" + self.batch_name,
  34. self.read_type)
  35. def get_excel_tmp_path(self):
  36. return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
  37. def get_read_tmp_path(self):
  38. return os.path.join(self.get_save_tmp_path(), 'read_tmp')
  39. def get_table_name(self):
  40. return "_".join([self.batch_no, self.read_type])
  41. def delete_batch_files(self):
  42. trans_print("开始删除已存在的批次文件夹")
  43. if os.path.exists(self.get_save_path()):
  44. shutil.rmtree(self.get_save_path())
  45. trans_print("删除已存在的批次文件夹")
  46. def delete_tmp_files(self):
  47. trans_print("开始删除临时文件夹")
  48. if os.path.exists(self.get_save_tmp_path()):
  49. shutil.rmtree(self.get_save_tmp_path())
  50. trans_print("删除临时文件夹删除成功")
  51. def delete_batch_db(self):
  52. if self.save_db:
  53. trans_print("开始删除表")
  54. table_name = self.get_table_name()
  55. drop_table(table_name, self.save_db)
  56. trans_print("删除表结束")
  57. def create_batch_db(self, count=1):
  58. if self.save_db:
  59. trans_print("开始创建表")
  60. creat_table_and_add_partition(self.get_table_name(), count, self.read_type)
  61. trans_print("建表结束")