WindFarms.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import datetime
  5. import multiprocessing
  6. import pandas as pd
  7. from etl.base.PathsAndTable import PathsAndTable
  8. from etl.base.TransParam import TransParam
  9. from etl.step.ClassIdentifier import ClassIdentifier
  10. from etl.step.ClearData import ClearData
  11. from etl.step.ReadAndSaveTmp import ReadAndSaveTmp
  12. from etl.step.SaveToDb import SaveToDb
  13. from etl.step.StatisticsAndSaveFile import StatisticsAndSaveFile
  14. from etl.step.UnzipAndRemove import UnzipAndRemove
  15. from service.plt_service import get_all_wind, update_trans_status_running, \
  16. update_trans_status_success, update_trans_transfer_progress
  17. from service.trans_service import batch_statistics
  18. from utils.df_utils.util import get_time_space
  19. from utils.file.trans_methods import *
  20. class WindFarms(object):
  21. def __init__(self, batch_no=None, batch_name=None, field_code=None, field_name=None, params: TransParam = None,
  22. save_db=True, header=0, trans_param: TransParam = None):
  23. self.batch_no = batch_no
  24. self.batch_name = batch_name
  25. self.field_code = field_code
  26. self.field_name = field_name
  27. self.save_zip = False
  28. self.trans_param = params
  29. self.exist_wind_names = multiprocessing.Manager().list()
  30. self.wind_col_trans, self.rated_power_map = get_all_wind(self.field_code)
  31. self.batch_count = 50000
  32. self.save_path = None
  33. self.save_db = save_db
  34. self.lock = multiprocessing.Manager().Lock()
  35. self.statistics_map = multiprocessing.Manager().dict()
  36. self.header = header
  37. self.trans_param = trans_param
  38. self.trans_param.wind_col_trans = self.wind_col_trans
  39. self.pathsAndTable = PathsAndTable(batch_no, batch_name, self.trans_param.read_path, self.field_name,
  40. self.trans_param.read_type, save_db, save_zip=self.save_zip)
  41. def run(self, step=0, end=4):
  42. begin = datetime.datetime.now()
  43. trans_print("开始执行")
  44. update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
  45. if step <= 0 and end >= 0:
  46. cleanData = ClearData(self.pathsAndTable)
  47. cleanData.run()
  48. if step <= 1 and end >= 1:
  49. # 更新运行状态到运行中
  50. unzipAndRemove = UnzipAndRemove(self.pathsAndTable)
  51. unzipAndRemove.run()
  52. if step <= 2 and end >= 2:
  53. readAndSaveTmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
  54. readAndSaveTmp.run()
  55. if step <= 3 and end >= 3:
  56. # 保存到正式文件
  57. statisticsAndSaveFile = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
  58. self.rated_power_map)
  59. statisticsAndSaveFile.run()
  60. if step <= 4 and end >= 4:
  61. if self.save_db:
  62. saveToDb = SaveToDb(self.pathsAndTable)
  63. saveToDb.run()
  64. update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db)
  65. # 如果end==0 则说明只是进行了验证
  66. if end >= 4:
  67. all_files = read_excel_files(self.pathsAndTable.get_save_path())
  68. if step <= 3:
  69. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  70. len(all_files),
  71. self.statistics_map['time_granularity'],
  72. self.statistics_map['min_date'], self.statistics_map['max_date'],
  73. self.statistics_map['total_count'], self.save_db)
  74. else:
  75. df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
  76. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  77. time_granularity = get_time_space(df, 'time_stamp')
  78. batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
  79. if batch_data is not None:
  80. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  81. len(read_excel_files(self.pathsAndTable.get_save_path())),
  82. time_granularity,
  83. batch_data['min_date'], batch_data['max_date'],
  84. batch_data['total_count'], self.save_db)
  85. else:
  86. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  87. len(read_excel_files(self.pathsAndTable.get_save_path())),
  88. time_granularity,
  89. None, None,
  90. None, self.save_db)
  91. trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
  92. str(datetime.datetime.now() - begin))