WindFarms.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import datetime
  5. import multiprocessing
  6. import pandas as pd
  7. from etl.base.PathsAndTable import PathsAndTable
  8. from etl.base.TransParam import TransParam
  9. from etl.step.ClearData import ClearData
  10. from etl.step.ReadAndSaveTmp import ReadAndSaveTmp
  11. from etl.step.SaveToDb import SaveToDb
  12. from etl.step.StatisticsAndSaveFile import StatisticsAndSaveFile
  13. from etl.step.UnzipAndRemove import UnzipAndRemove
  14. from service.plt_service import get_all_wind, update_trans_status_running, \
  15. update_trans_status_success, update_trans_transfer_progress
  16. from service.trans_service import batch_statistics
  17. from utils.df_utils.util import get_time_space
  18. from utils.file.trans_methods import *
  19. class WindFarms(object):
  20. def __init__(self, batch_no=None, batch_name=None, field_code=None, field_name=None, params: TransParam = None,
  21. save_db=True, header=0, trans_param: TransParam = None):
  22. self.batch_no = batch_no
  23. self.batch_name = batch_name
  24. self.field_code = field_code
  25. self.field_name = field_name
  26. self.save_zip = False
  27. self.trans_param = params
  28. self.exist_wind_names = multiprocessing.Manager().list()
  29. self.wind_col_trans = get_all_wind(self.field_code)
  30. self.batch_count = 50000
  31. self.save_path = None
  32. self.save_db = save_db
  33. self.lock = multiprocessing.Manager().Lock()
  34. self.statistics_map = multiprocessing.Manager().dict()
  35. self.header = header
  36. self.trans_param = trans_param
  37. self.trans_param.wind_col_trans = self.wind_col_trans
  38. self.pathsAndTable = PathsAndTable(batch_no, batch_name, self.trans_param.read_path, self.field_name,
  39. self.trans_param.read_type, save_db, save_zip=self.save_zip)
  40. def run(self, step=0, end=4):
  41. begin = datetime.datetime.now()
  42. trans_print("开始执行")
  43. update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
  44. if step <= 0 and end >= 0:
  45. cleanData = ClearData(self.pathsAndTable)
  46. cleanData.run()
  47. if step <= 1 and end >= 1:
  48. # 更新运行状态到运行中
  49. unzipAndRemove = UnzipAndRemove(self.pathsAndTable)
  50. unzipAndRemove.run()
  51. if step <= 2 and end >= 2:
  52. # 更新运行状态到运行中
  53. readAndSaveTmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
  54. readAndSaveTmp.run()
  55. if step <= 3 and end >= 3:
  56. # 保存到正式文件
  57. statisticsAndSaveFile = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map)
  58. statisticsAndSaveFile.run()
  59. if step <= 4 and end >= 4:
  60. if self.save_db:
  61. saveToDb = SaveToDb(self.pathsAndTable)
  62. saveToDb.run()
  63. update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db)
  64. # 如果end==0 则说明只是进行了验证
  65. if end >= 4:
  66. all_files = read_excel_files(self.pathsAndTable.get_save_path())
  67. if step <= 3:
  68. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  69. len(all_files),
  70. self.statistics_map['time_granularity'],
  71. self.statistics_map['min_date'], self.statistics_map['max_date'],
  72. self.statistics_map['total_count'], self.save_db)
  73. else:
  74. df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
  75. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  76. time_granularity = get_time_space(df, 'time_stamp')
  77. batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
  78. if batch_data is not None:
  79. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  80. len(read_excel_files(self.pathsAndTable.get_save_path())),
  81. time_granularity,
  82. batch_data['min_date'], batch_data['max_date'],
  83. batch_data['total_count'], self.save_db)
  84. else:
  85. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  86. len(read_excel_files(self.pathsAndTable.get_save_path())),
  87. time_granularity,
  88. None, None,
  89. None, self.save_db)
  90. trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
  91. str(datetime.datetime.now() - begin))