BaseDataTrans.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import datetime
  2. import traceback
  3. from etl.common.ClearData import ClearData
  4. from etl.common.PathsAndTable import PathsAndTable
  5. from etl.common.SaveToDb import SaveToDb
  6. from etl.common.UnzipAndRemove import UnzipAndRemove
  7. from service.plt_service import get_all_wind, update_trans_status_success, update_trans_status_error, \
  8. update_trans_status_running
  9. from utils.file.trans_methods import read_excel_files
  10. from utils.log.trans_log import trans_print, set_trance_id
  11. class BaseDataTrans(object):
  12. def __init__(self, data: dict = None, save_db=True, step=0, end=4):
  13. self.batch_no = data['batch_code']
  14. self.batch_name = data['batch_name']
  15. self.read_type = data['transfer_type']
  16. self.read_path = data['transfer_addr']
  17. self.field_code = data['field_code']
  18. self.field_name = data['field_name']
  19. self.save_zip = False
  20. self.step = step
  21. self.end = end
  22. self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
  23. self.batch_count = 100000
  24. self.save_db = save_db
  25. self.filed_conf = self.get_filed_conf()
  26. # trans_print("是否是秒转分钟:", self.boolean_sec_to_min)
  27. try:
  28. self.pathsAndTable = PathsAndTable(self.batch_no, self.batch_name, self.read_path, self.field_name,
  29. self.read_type, save_db, self.save_zip)
  30. except Exception as e:
  31. trans_print(traceback.format_exc())
  32. update_trans_status_error(self.batch_no, self.read_type, str(e), self.save_db)
  33. raise e
  34. def get_filed_conf(self):
  35. raise NotImplementedError("需要实现 获取点检表 方法")
  36. # 第一步 清理数据
  37. def clean_file_and_db(self):
  38. clean_data = ClearData(self.pathsAndTable)
  39. clean_data.run()
  40. # 第二步 解压 移动到临时文件
  41. def unzip_or_remove_to_tmp_dir(self):
  42. # 解压并删除
  43. unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
  44. unzip_and_remove.run()
  45. # 第三步 读取 并 保存到临时文件
  46. def read_and_save_tmp_file(self):
  47. raise NotImplementedError("第三步未做实现")
  48. # 第四步 统计 并 保存到正式文件
  49. def statistics_and_save_to_file(self):
  50. raise NotImplementedError("第四步未做实现")
  51. # 第五步 保存到数据库
  52. def save_to_db(self):
  53. save_to_db = SaveToDb(self.pathsAndTable, self.batch_count)
  54. save_to_db.run()
  55. # 最后更新执行程度
  56. def update_exec_progress(self):
  57. update_trans_status_success(self.batch_no, self.read_type,
  58. len(read_excel_files(self.pathsAndTable.get_save_path())),
  59. None, None, None, None, self.save_db)
  60. def run(self):
  61. total_begin = datetime.datetime.now()
  62. try:
  63. trance_id = '-'.join([self.batch_no, self.field_name, self.read_type])
  64. set_trance_id(trance_id)
  65. update_trans_status_running(self.batch_no, self.read_type, self.save_db)
  66. if self.step <= 0 and self.end >= 0:
  67. begin = datetime.datetime.now()
  68. trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
  69. self.clean_file_and_db()
  70. trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  71. datetime.datetime.now() - total_begin)
  72. if self.step <= 1 and self.end >= 1:
  73. begin = datetime.datetime.now()
  74. trans_print("开始解压移动文件")
  75. self.unzip_or_remove_to_tmp_dir()
  76. trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:",
  77. datetime.datetime.now() - total_begin)
  78. if self.step <= 2 and self.end >= 2:
  79. begin = datetime.datetime.now()
  80. trans_print("开始保存数据到临时文件")
  81. self.read_and_save_tmp_file()
  82. trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  83. datetime.datetime.now() - total_begin)
  84. if self.step <= 3 and self.end >= 3:
  85. begin = datetime.datetime.now()
  86. trans_print("开始保存数据到正式文件")
  87. self.statistics_and_save_to_file()
  88. trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  89. datetime.datetime.now() - total_begin)
  90. if self.step <= 4 and self.end >= 4:
  91. begin = datetime.datetime.now()
  92. trans_print("开始保存到数据库,是否存库:", self.pathsAndTable.save_db)
  93. self.save_to_db()
  94. trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  95. datetime.datetime.now() - total_begin)
  96. self.update_exec_progress()
  97. except Exception as e:
  98. trans_print(traceback.format_exc())
  99. update_trans_status_error(self.batch_no, self.read_type, str(e), self.save_db)
  100. raise e
  101. finally:
  102. self.pathsAndTable.delete_tmp_files()
  103. trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))
  104. if __name__ == '__main__':
  105. test = BaseDataTrans(save_db=False, batch_no="WOF053600062-WOB000010", read_type="fault")
  106. test.run()