WindFarms.py 4.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import datetime
  5. import multiprocessing
  6. from etl.base.PathsAndTable import PathsAndTable
  7. from etl.base.TransParam import TransParam
  8. from etl.step.ClearData import ClearData
  9. from etl.step.ReadAndSaveTmp import ReadAndSaveTmp
  10. from etl.step.SaveToDb import SaveToDb
  11. from etl.step.StatisticsAndSaveFile import StatisticsAndSaveFile
  12. from etl.step.UnzipAndRemove import UnzipAndRemove
  13. from service.plt_service import get_all_wind, update_trans_status_running, \
  14. update_trans_status_success, update_trans_transfer_progress
  15. from service.trans_service import batch_statistics
  16. from utils.df_utils.util import get_time_space
  17. from utils.file.trans_methods import *
  18. class WindFarms(object):
  19. def __init__(self, batch_no=None, batch_name=None, field_code=None, field_name=None, params: TransParam = None,
  20. save_db=True, header=0, trans_param: TransParam = None):
  21. self.batch_no = batch_no
  22. self.batch_name = batch_name
  23. self.field_code = field_code
  24. self.field_name = field_name
  25. self.save_zip = False
  26. self.trans_param = params
  27. self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
  28. self.batch_count = 50000
  29. self.save_path = None
  30. self.save_db = save_db
  31. self.statistics_map = multiprocessing.Manager().dict()
  32. self.header = header
  33. self.trans_param = trans_param
  34. self.trans_param.wind_col_trans = self.wind_col_trans
  35. self.pathsAndTable = PathsAndTable(batch_no, batch_name, self.trans_param.read_path, self.field_name,
  36. self.trans_param.read_type, save_db, save_zip=self.save_zip)
  37. def run(self, step=0, end=4):
  38. begin = datetime.datetime.now()
  39. trans_print("开始执行")
  40. update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
  41. if step <= 0 and end >= 0:
  42. clean_data = ClearData(self.pathsAndTable)
  43. clean_data.run()
  44. if step <= 1 and end >= 1:
  45. # 更新运行状态到运行中
  46. unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
  47. unzip_and_remove.run()
  48. if step <= 2 and end >= 2:
  49. read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
  50. read_and_save_tmp.run()
  51. if step <= 3 and end >= 3:
  52. # 保存到正式文件
  53. statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
  54. self.rated_power_and_cutout_speed_map)
  55. statistics_and_save_file.run()
  56. if step <= 4 and end >= 4:
  57. if self.save_db:
  58. save_to_db = SaveToDb(self.pathsAndTable)
  59. save_to_db.run()
  60. update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db)
  61. # 如果end==0 则说明只是进行了验证
  62. if end >= 4:
  63. all_files = read_excel_files(self.pathsAndTable.get_save_path())
  64. if step <= 3:
  65. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  66. len(all_files),
  67. self.statistics_map['time_granularity'],
  68. self.statistics_map['min_date'], self.statistics_map['max_date'],
  69. self.statistics_map['total_count'], self.save_db)
  70. else:
  71. df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
  72. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  73. time_granularity = get_time_space(df, 'time_stamp')
  74. batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
  75. if batch_data is not None:
  76. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  77. len(read_excel_files(self.pathsAndTable.get_save_path())),
  78. time_granularity,
  79. batch_data['min_date'], batch_data['max_date'],
  80. batch_data['total_count'], self.save_db)
  81. else:
  82. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  83. len(read_excel_files(self.pathsAndTable.get_save_path())),
  84. time_granularity,
  85. None, None,
  86. None, self.save_db)
  87. trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
  88. str(datetime.datetime.now() - begin))