# -*- coding: utf-8 -*- # @Time : 2024/5/15 # @Author : 魏志亮 import datetime import multiprocessing from etl.base.PathsAndTable import PathsAndTable from etl.base.TransParam import TransParam from etl.step.ClearData import ClearData from etl.step.ReadAndSaveTmp import ReadAndSaveTmp from etl.step.SaveToDb import SaveToDb from etl.step.StatisticsAndSaveFile import StatisticsAndSaveFile from etl.step.UnzipAndRemove import UnzipAndRemove from service.plt_service import get_all_wind, update_trans_status_running, \ update_trans_status_success, update_trans_transfer_progress from service.trans_service import batch_statistics from utils.df_utils.util import get_time_space from utils.file.trans_methods import * class WindFarms(object): def __init__(self, batch_no=None, batch_name=None, field_code=None, field_name=None, params: TransParam = None, save_db=True, header=0, trans_param: TransParam = None): self.batch_no = batch_no self.batch_name = batch_name self.field_code = field_code self.field_name = field_name self.save_zip = False self.trans_param = params self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code) self.batch_count = 50000 self.save_path = None self.save_db = save_db self.statistics_map = multiprocessing.Manager().dict() self.header = header self.trans_param = trans_param self.trans_param.wind_col_trans = self.wind_col_trans self.pathsAndTable = PathsAndTable(batch_no, batch_name, self.trans_param.read_path, self.field_name, self.trans_param.read_type, save_db, save_zip=self.save_zip) def run(self, step=0, end=4): begin = datetime.datetime.now() trans_print("开始执行") update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db) if step <= 0 and end >= 0: clean_data = ClearData(self.pathsAndTable) clean_data.run() if step <= 1 and end >= 1: # 更新运行状态到运行中 unzip_and_remove = UnzipAndRemove(self.pathsAndTable) unzip_and_remove.run() if step <= 2 and end >= 2: read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param) read_and_save_tmp.run() if step <= 3 and end >= 3: # 保存到正式文件 statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map, self.rated_power_and_cutout_speed_map) statistics_and_save_file.run() if step <= 4 and end >= 4: if self.save_db: save_to_db = SaveToDb(self.pathsAndTable) save_to_db.run() update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db) # 如果end==0 则说明只是进行了验证 if end >= 4: all_files = read_excel_files(self.pathsAndTable.get_save_path()) if step <= 3: update_trans_status_success(self.batch_no, self.trans_param.read_type, len(all_files), self.statistics_map['time_granularity'], self.statistics_map['min_date'], self.statistics_map['max_date'], self.statistics_map['total_count'], self.save_db) else: df = read_file_to_df(all_files[0], read_cols=['time_stamp']) df['time_stamp'] = pd.to_datetime(df['time_stamp']) time_granularity = get_time_space(df, 'time_stamp') batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type])) if batch_data is not None: update_trans_status_success(self.batch_no, self.trans_param.read_type, len(read_excel_files(self.pathsAndTable.get_save_path())), time_granularity, batch_data['min_date'], batch_data['max_date'], batch_data['total_count'], self.save_db) else: update_trans_status_success(self.batch_no, self.trans_param.read_type, len(read_excel_files(self.pathsAndTable.get_save_path())), time_granularity, None, None, None, self.save_db) trans_print("结束执行", self.trans_param.read_type, ",总耗时:", str(datetime.datetime.now() - begin))