BaseDataTrans.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import datetime
  2. import sys
  3. import traceback
  4. from etl.common.PathsAndTable import PathsAndTable
  5. from etl.common.ClearData import ClearData
  6. from etl.common.SaveToDb import SaveToDb
  7. from etl.common.UnzipAndRemove import UnzipAndRemove
  8. from service.plt_service import get_all_wind, get_batch_exec_data, get_data_by_batch_no_and_type, \
  9. update_trans_status_success, update_trans_status_error, update_trans_status_running
  10. from tmp_file.power_derating_for_chunlin import read_excel_files
  11. from utils.log.trans_log import trans_print, set_trance_id
  12. class BaseDataTrans(object):
  13. def __init__(self, data: dict = None, save_db=True, step=0, end=4):
  14. self.batch_no = data['batch_code']
  15. self.batch_name = data['batch_name']
  16. self.read_type = data['transfer_type']
  17. self.read_path = data['transfer_addr']
  18. self.field_code = data['field_code']
  19. self.field_name = data['field_name']
  20. self.save_zip = False
  21. self.step = step
  22. self.end = end
  23. self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
  24. self.batch_count = 100000
  25. self.save_db = save_db
  26. self.filed_conf = self.get_filed_conf()
  27. self.pathsAndTable = PathsAndTable(self.batch_no, self.batch_name, self.read_path, self.field_name,
  28. self.read_type, save_db, self.save_zip)
  29. def get_filed_conf(self):
  30. raise NotImplementedError("需要实现 获取点检表 方法")
  31. # 第一步 清理数据
  32. def clean_file_and_db(self):
  33. clean_data = ClearData(self.pathsAndTable)
  34. clean_data.run()
  35. # 第二步 解压 移动到临时文件
  36. def unzip_or_remove_to_tmp_dir(self):
  37. # 解压并删除
  38. unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
  39. unzip_and_remove.run()
  40. # 第三步 读取 并 保存到临时文件
  41. def read_and_save_tmp_file(self):
  42. raise NotImplementedError("第三步未做实现")
  43. # 第四步 统计 并 保存到正式文件
  44. def statistics_and_save_to_file(self):
  45. raise NotImplementedError("第四步未做实现")
  46. # 第五步 保存到数据库
  47. def save_to_db(self):
  48. save_to_db = SaveToDb(self.pathsAndTable, self.batch_count)
  49. save_to_db.run()
  50. # 最后更新执行程度
  51. def update_exec_progress(self):
  52. update_trans_status_success(self.batch_no, self.read_type,
  53. len(read_excel_files(self.pathsAndTable.get_save_path())),
  54. None, None, None, None, self.save_db)
  55. def run(self):
  56. total_begin = datetime.datetime.now()
  57. try:
  58. trance_id = '-'.join([self.batch_no, self.field_name, self.read_type])
  59. set_trance_id(trance_id)
  60. update_trans_status_running(self.batch_no, self.read_type, self.save_db)
  61. if self.step <= 0 and self.end >= 0:
  62. begin = datetime.datetime.now()
  63. trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
  64. self.clean_file_and_db()
  65. trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
  66. if self.step <= 1 and self.end >= 1:
  67. begin = datetime.datetime.now()
  68. trans_print("开始解压移动文件")
  69. self.unzip_or_remove_to_tmp_dir()
  70. trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
  71. if self.step <= 2 and self.end >= 2:
  72. begin = datetime.datetime.now()
  73. trans_print("开始保存数据到临时文件")
  74. self.read_and_save_tmp_file()
  75. trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
  76. if self.step <= 3 and self.end >= 3:
  77. begin = datetime.datetime.now()
  78. trans_print("开始保存数据到正式文件")
  79. self.statistics_and_save_to_file()
  80. trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
  81. if self.step <= 4 and self.end >= 4:
  82. begin = datetime.datetime.now()
  83. trans_print("开始保存到数据库,是否存库:", self.pathsAndTable.save_db)
  84. self.save_to_db()
  85. trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
  86. self.update_exec_progress()
  87. except Exception as e:
  88. trans_print(traceback.format_exc())
  89. update_trans_status_error(self.batch_no, self.read_type, str(e), self.save_db)
  90. finally:
  91. self.pathsAndTable.delete_tmp_files()
  92. trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))
  93. if __name__ == '__main__':
  94. test = BaseDataTrans(save_db=False, batch_no="WOF053600062-WOB000010", read_type="fault")
  95. test.run()