import os from datetime import datetime, timezone, timedelta from logging import Logger import pandas as pd from sqlalchemy.orm import Session from sqlalchemy.sql import text import shutil from urllib.parse import quote, unquote from common.commonBusiness import CommonBusiness from common.appConfig import GetBusinessFoundationDbUtil from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from algorithmContract.const import * from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient from utils.rdbmsUtil.databaseUtil import DatabaseUtil Const_FileURL = "url" class OutputProcessor: def __init__(self, conf: Contract, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient) -> None: self.conf = conf self.autoOrManual = 1 if self.conf.dataContract.autoOrManual == 'automatic' else 0 self.powerFarmID = self.conf.dataContract.dataFilter.powerFarmID self.dataBatchNum = self.conf.dataContract.dataFilter.dataBatchNum self.logger = logger self.dbUtil = dbUtil self.minioUtil = minioUtil self.common = CommonBusiness() def uploadOfMioIO(self, bucketName: str, uploadFiles): """ 上传文件到minio """ bucketName = bucketName.lower() if not self.minioUtil.bucket_exists(bucketName): self.minioUtil.create_bucket(bucketName) self.minioUtil.set_bucket_policy(bucketName) # Upload files upload_results = self.minioUtil.upload_files(bucketName, uploadFiles) def removeLocalFiles(self, powerFarmID: str, dataBatchNum: str): directory = f"output/{powerFarmID}/{dataBatchNum}" shutil.rmtree(directory) def analysisState(self, session: Session, batchNO: str, analysisState: int, errorState: int = ErrorState.NotErr.value, errorCode: str = None, errorInfo: str = None, analysisProgress: float = 0,analysis_finish_time: datetime=datetime.now(timezone.utc) + timedelta(hours=8)): """ 写处理状态 至主表 analysis_result 写总图(多机组一张图表)上传文件 至子表 analysis_general_file 写单机组图(单机组一张图表)上传文件 至子表 analysis_diagram_relation """ sql = text(f"INSERT INTO analysis_result(batch_code, analysis_state, err_state, err_code, err_info, analysis_progress, analysis_finish_time) \ VALUES(:batch_code, :analysis_state, :err_state, :err_code, :err_info, :analysis_progress, :analysis_finish_time) \ ON DUPLICATE KEY \ UPDATE \ analysis_state=VALUES(analysis_state), \ err_state=VALUES(err_state), \ err_code=VALUES(err_code), \ err_info=VALUES(err_info), \ analysis_progress=VALUES(analysis_progress), \ analysis_finish_time=VALUES(analysis_finish_time);") params = { "batch_code": None if self.common.isNone(batchNO) else batchNO, "analysis_state": None if self.common.isNone(analysisState) else analysisState, "err_state": None if self.common.isNone(analysisState) else errorState, "err_code": None if self.common.isNone(errorCode) else errorCode, "err_info": None if self.common.isNone(errorInfo) else errorInfo, "analysis_progress": analysisProgress, "analysis_finish_time":analysis_finish_time if analysis_finish_time is not None else None } session.execute(sql, params) def analysisResultForTurbine(self, session: Session, returnDataFrame: pd.DataFrame): dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] != 'total') & ( returnDataFrame[Field_Return_IsSaveDatabase])] for index, row in dataFrame.iterrows(): sql = text(f""" INSERT INTO analysis_diagram_relation (batch_code, field_engine_code, analysis_type_code, file_addr, auto_analysis) VALUES (:batch_code, :field_engine_code, :analysis_type_code, :file_addr, :auto_analysis) ON DUPLICATE KEY UPDATE field_engine_code=VALUES(field_engine_code), analysis_type_code=VALUES(analysis_type_code), file_addr=VALUES(file_addr), auto_analysis=VALUES(auto_analysis); """) params = { "batch_code": row[Field_Return_BatchCode], "field_engine_code": row[Field_CodeOfTurbine], "analysis_type_code": row[Field_Return_TypeAnalyst], "file_addr": row[Const_FileURL], "auto_analysis": self.autoOrManual } session.execute(sql, params) def analysisResultForTotal(self, session: Session, returnDataFrame: pd.DataFrame): dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] == 'total') & ( returnDataFrame[Field_Return_IsSaveDatabase])] for index, row in dataFrame.iterrows(): sql = text(f""" INSERT INTO analysis_general_file (batch_code, analysis_type_code, engine_type_code, file_addr, auto_analysis) VALUES (:batch_code, :analysis_type_code, :engine_type_code, :file_addr, :auto_analysis) ON DUPLICATE KEY UPDATE analysis_type_code=VALUES(analysis_type_code), engine_type_code=VALUES(engine_type_code), file_addr=VALUES(file_addr), auto_analysis=VALUES(auto_analysis); """) params = { "batch_code": row[Field_Return_BatchCode], "analysis_type_code": row[Field_Return_TypeAnalyst], "engine_type_code": row[Field_MillTypeCode], "file_addr": row[Const_FileURL], "auto_analysis": self.autoOrManual } session.execute(sql, params) def process(self, powerFarmID: str, dataBatchNum: str, returnDataFrame: pd.DataFrame, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8)): try: uploadFiles = [] if not returnDataFrame.empty: returnDataFrame[Const_FileURL] = None if Field_Return_IsSaveDatabase in returnDataFrame.columns: returnDataFrame[Field_Return_IsSaveDatabase].fillna( True, inplace=True) else: returnDataFrame[Field_Return_IsSaveDatabase] = True for index, row in returnDataFrame.iterrows(): directory, fileName = os.path.split(row[Field_Return_FilePath]) basePath = f"output/{powerFarmID}" subPath = os.path.relpath(directory, basePath) remoteFilePath = os.path.join( subPath, fileName).replace("\\", "/") # arr=["http://",self.minioUtil.client_pool.get_ip_address(),"/",powerFarmID.lower(),"/",remoteFilePath] arr = [powerFarmID.lower(), "/", remoteFilePath] fileURL = "".join(arr) returnDataFrame.at[index, Const_FileURL] = quote(fileURL) uploadFiles.append( (remoteFilePath, row[Field_Return_FilePath])) self.uploadOfMioIO(self.powerFarmID, uploadFiles) foundationDB = GetBusinessFoundationDbUtil() with foundationDB.session_scope() as session: # 第一次调用,设置进度为50% self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value, ErrorState.NotErr.value, None, None, 50) self.analysisResultForTotal( session, returnDataFrame) self.analysisResultForTurbine( session, returnDataFrame) # 获取分析完成的时间 finish_time = datetime.now(timezone.utc) + timedelta(hours=8) # 第二次调用:更新分析状态为已完成,进度为100%,并记录完成时间 self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value, ErrorState.NotErr.value, None, None, 100, analysis_finish_time=finish_time) self.removeLocalFiles(powerFarmID, dataBatchNum) except Exception as e: self.logger.error(e) raise e