BaseDataTrans.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import datetime
  2. import traceback
  3. from etl.common.ArchiveFile import ArchiveFile
  4. from etl.common.ClearData import ClearData
  5. from etl.common.PathsAndTable import PathsAndTable
  6. from etl.common.SaveToDb import SaveToDb
  7. from etl.common.UnzipAndRemove import UnzipAndRemove
  8. from service.plt_service import get_all_wind
  9. from service.trans_conf_service import update_trans_status_success, update_trans_status_error, \
  10. update_trans_status_running
  11. from utils.file.trans_methods import read_excel_files
  12. from utils.log.trans_log import trans_print, set_trance_id
  13. class BaseDataTrans(object):
  14. def __init__(self, data: dict = None, save_db=True, yaml_config=None, step=0, end=999):
  15. self.id = data['id']
  16. self.task_name = data['task_name']
  17. self.transfer_type = data['transfer_type']
  18. self.read_dir = data['read_dir']
  19. self.wind_farm_code = data['wind_farm_code']
  20. self.wind_farm_name = data['wind_farm_name']
  21. self.yaml_config = yaml_config
  22. self.save_zip = False
  23. self.step = step
  24. self.end = end
  25. self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.wind_farm_code)
  26. self.batch_count = 100000
  27. self.save_db = save_db
  28. self.filed_conf = self.get_filed_conf()
  29. self.update_files = list()
  30. try:
  31. self.pathsAndTable = PathsAndTable(self.id, self.task_name, self.read_dir, self.wind_farm_code,
  32. self.wind_farm_name, self.transfer_type, save_db, self.save_zip,
  33. self.yaml_config, self.wind_col_trans)
  34. except Exception as e:
  35. trans_print(traceback.format_exc())
  36. update_trans_status_error(self.id, str(e), self.save_db)
  37. raise e
  38. def get_filed_conf(self):
  39. raise NotImplementedError("需要实现 获取点检表 方法")
  40. # 清理数据
  41. def clean_file_and_db(self):
  42. clean_data = ClearData(self.pathsAndTable)
  43. clean_data.run()
  44. # 解压 移动到临时文件
  45. def unzip_or_remove_to_tmp_dir(self):
  46. # 解压并删除
  47. unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
  48. unzip_and_remove.run()
  49. # 读取并保存到临时文件
  50. def read_and_save_tmp_file(self):
  51. raise NotImplementedError("读取并保存到临时文件未做实现")
  52. # 读取并保存到临时正式文件
  53. def statistics_and_save_tmp_formal_file(self):
  54. raise NotImplementedError("读取并保存到临时正式文件未做实现")
  55. # 归档文件
  56. def archive_file(self):
  57. archive_file = ArchiveFile(self.pathsAndTable, self.id)
  58. archive_file.run()
  59. # 合并到正式文件
  60. def combine_and_save_formal_file(self):
  61. raise NotImplementedError("合并到正式文件未做实现")
  62. # 保存到数据库
  63. def save_to_db(self):
  64. save_to_db = SaveToDb(self.pathsAndTable, self.update_files, self.batch_count)
  65. save_to_db.run()
  66. # 最后更新执行程度
  67. def update_exec_progress(self):
  68. update_trans_status_success(self.id,
  69. len(read_excel_files(self.pathsAndTable.get_save_path())),
  70. None, None, None, None, self.save_db)
  71. def run(self):
  72. total_begin = datetime.datetime.now()
  73. try:
  74. trance_id = '-'.join([str(self.id), self.wind_farm_name, self.transfer_type])
  75. set_trance_id(trance_id)
  76. update_trans_status_running(self.id, self.save_db)
  77. now_index = 0
  78. # 0
  79. if self.step <= now_index <= self.end:
  80. begin = datetime.datetime.now()
  81. trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
  82. self.clean_file_and_db()
  83. trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  84. datetime.datetime.now() - total_begin)
  85. now_index = now_index + 1
  86. # 1
  87. if self.step <= now_index <= self.end:
  88. begin = datetime.datetime.now()
  89. trans_print("开始解压移动文件")
  90. self.unzip_or_remove_to_tmp_dir()
  91. trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:",
  92. datetime.datetime.now() - total_begin)
  93. now_index = now_index + 1
  94. # 2
  95. if self.step <= now_index <= self.end:
  96. begin = datetime.datetime.now()
  97. trans_print("开始保存数据到临时文件")
  98. self.read_and_save_tmp_file()
  99. trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  100. datetime.datetime.now() - total_begin)
  101. now_index = now_index + 1
  102. # 3
  103. if self.step <= now_index <= self.end:
  104. begin = datetime.datetime.now()
  105. trans_print("开始保存到临时正式文件")
  106. self.statistics_and_save_tmp_formal_file()
  107. trans_print("保存到临时正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  108. datetime.datetime.now() - total_begin)
  109. now_index = now_index + 1
  110. # 4
  111. if self.step <= now_index <= self.end:
  112. begin = datetime.datetime.now()
  113. trans_print("开始保存归档文件")
  114. self.archive_file()
  115. trans_print("保存到保存归档文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  116. datetime.datetime.now() - total_begin)
  117. now_index = now_index + 1
  118. # 5
  119. if self.step <= now_index <= self.end:
  120. begin = datetime.datetime.now()
  121. trans_print("开始保存数据到正式文件")
  122. self.combine_and_save_formal_file()
  123. trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  124. datetime.datetime.now() - total_begin)
  125. now_index = now_index + 1
  126. # 6
  127. if self.step <= now_index <= self.end:
  128. begin = datetime.datetime.now()
  129. trans_print("开始保存到数据库,是否存库:", self.pathsAndTable.save_db)
  130. self.save_to_db()
  131. trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
  132. datetime.datetime.now() - total_begin)
  133. self.update_exec_progress()
  134. except Exception as e:
  135. trans_print(traceback.format_exc())
  136. update_trans_status_error(self.id, str(e), self.save_db)
  137. raise e
  138. finally:
  139. self.pathsAndTable.delete_tmp_files()
  140. trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))