1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- # -*- coding: utf-8 -*-
- # @Time : 2024/5/15
- # @Author : 魏志亮
- import datetime
- import multiprocessing
- from etl.base.PathsAndTable import PathsAndTable
- from etl.base.TransParam import TransParam
- from etl.step.ClearData import ClearData
- from etl.step.ReadAndSaveTmp import ReadAndSaveTmp
- from etl.step.SaveToDb import SaveToDb
- from etl.step.StatisticsAndSaveFile import StatisticsAndSaveFile
- from etl.step.UnzipAndRemove import UnzipAndRemove
- from service.plt_service import get_all_wind, update_trans_status_running, \
- update_trans_status_success, update_trans_transfer_progress
- from service.trans_service import batch_statistics
- from utils.df_utils.util import get_time_space
- from utils.file.trans_methods import *
- class WindFarms(object):
- def __init__(self, batch_no=None, batch_name=None, field_code=None, field_name=None, params: TransParam = None,
- save_db=True, header=0, trans_param: TransParam = None):
- self.batch_no = batch_no
- self.batch_name = batch_name
- self.field_code = field_code
- self.field_name = field_name
- self.save_zip = False
- self.trans_param = params
- self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
- self.batch_count = 50000
- self.save_path = None
- self.save_db = save_db
- self.statistics_map = multiprocessing.Manager().dict()
- self.header = header
- self.trans_param = trans_param
- self.trans_param.wind_col_trans = self.wind_col_trans
- self.pathsAndTable = PathsAndTable(batch_no, batch_name, self.trans_param.read_path, self.field_name,
- self.trans_param.read_type, save_db, save_zip=self.save_zip)
- def run(self, step=0, end=4):
- begin = datetime.datetime.now()
- trans_print("开始执行")
- update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
- if step <= 0 and end >= 0:
- clean_data = ClearData(self.pathsAndTable)
- clean_data.run()
- if step <= 1 and end >= 1:
- # 更新运行状态到运行中
- unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
- unzip_and_remove.run()
- if step <= 2 and end >= 2:
- read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
- read_and_save_tmp.run()
- if step <= 3 and end >= 3:
- # 保存到正式文件
- statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
- self.rated_power_and_cutout_speed_map)
- statistics_and_save_file.run()
- if step <= 4 and end >= 4:
- if self.save_db:
- save_to_db = SaveToDb(self.pathsAndTable)
- save_to_db.run()
- update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db)
- # 如果end==0 则说明只是进行了验证
- if end >= 4:
- all_files = read_excel_files(self.pathsAndTable.get_save_path())
- if step <= 3:
- update_trans_status_success(self.batch_no, self.trans_param.read_type,
- len(all_files),
- self.statistics_map['time_granularity'],
- self.statistics_map['min_date'], self.statistics_map['max_date'],
- self.statistics_map['total_count'], self.save_db)
- else:
- df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
- df['time_stamp'] = pd.to_datetime(df['time_stamp'])
- time_granularity = get_time_space(df, 'time_stamp')
- batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
- if batch_data is not None:
- update_trans_status_success(self.batch_no, self.trans_param.read_type,
- len(read_excel_files(self.pathsAndTable.get_save_path())),
- time_granularity,
- batch_data['min_date'], batch_data['max_date'],
- batch_data['total_count'], self.save_db)
- else:
- update_trans_status_success(self.batch_no, self.trans_param.read_type,
- len(read_excel_files(self.pathsAndTable.get_save_path())),
- time_granularity,
- None, None,
- None, self.save_db)
- trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
- str(datetime.datetime.now() - begin))
|