PathsAndTable.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import os
  2. import shutil
  3. import tempfile
  4. import yaml
  5. from service.trans_service import drop_table, creat_table_and_add_partition
  6. from utils.log.trans_log import trans_print
  7. from utils.conf.read_conf import *
  8. class PathsAndTable(object):
  9. def __init__(self, batch_no=None, batch_name=None, read_path=None, field_name=None, read_type=None,
  10. save_db=True, save_zip=True):
  11. self.batch_no = batch_no
  12. self.batch_name = batch_name
  13. self.read_path = read_path
  14. self.field_name = field_name
  15. self.read_type = read_type
  16. self.save_db = save_db
  17. self.save_zip = save_zip
  18. self.multi_pool_count = 6
  19. yaml_config = yaml_conf(r"/data/config/etl_config.yaml")
  20. save_path_conf = read_conf(yaml_config, "save_path")
  21. if save_path_conf:
  22. self.save_path = save_path_conf + os.sep + self.field_name
  23. else:
  24. find_index = read_path.find(read_conf(yaml_config, 'etl_origin_path_contain', "etl_origin_path_contain"))
  25. if find_index == -1:
  26. raise Exception("路径未包含原始数据特定字符:" + read_path)
  27. self.save_path = read_path[0:find_index] + os.sep + "清理数据"
  28. if self.save_path is None:
  29. raise Exception("未配置保存路径:" + read_path)
  30. def get_save_path(self):
  31. return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
  32. def get_save_tmp_path(self):
  33. return os.path.join(tempfile.gettempdir(), self.field_name, self.batch_no + "_" + self.batch_name,
  34. self.read_type)
  35. def get_excel_tmp_path(self):
  36. return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
  37. def get_read_tmp_path(self):
  38. return os.path.join(self.get_save_tmp_path(), 'read_tmp')
  39. def get_merge_tmp_path(self, wind_turbine_number=None):
  40. if wind_turbine_number is None:
  41. return os.path.join(self.get_save_tmp_path(), 'merge_tmp')
  42. else:
  43. return os.path.join(self.get_save_tmp_path(), 'merge_tmp', str(wind_turbine_number))
  44. def get_table_name(self):
  45. return "_".join([self.batch_no, self.read_type])
  46. def delete_batch_files(self):
  47. trans_print("开始删除已存在的批次文件夹")
  48. if os.path.exists(self.get_save_path()):
  49. shutil.rmtree(self.get_save_path())
  50. trans_print("删除已存在的批次文件夹")
  51. def delete_tmp_files(self):
  52. trans_print("开始删除临时文件夹")
  53. if os.path.exists(self.get_save_tmp_path()):
  54. shutil.rmtree(self.get_save_tmp_path())
  55. trans_print("删除临时文件夹删除成功")
  56. def delete_batch_db(self):
  57. if self.save_db:
  58. trans_print("开始删除表")
  59. table_name = self.get_table_name()
  60. drop_table(table_name, self.save_db)
  61. trans_print("删除表结束")
  62. def create_batch_db(self, wind_names=list()):
  63. if self.save_db:
  64. trans_print("开始创建表")
  65. creat_table_and_add_partition(self.get_table_name(), wind_names, self.read_type)
  66. trans_print("建表结束")