123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import os
- from datetime import datetime, timezone, timedelta
- from logging import Logger
- import pandas as pd
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import text
- import shutil
- from urllib.parse import quote, unquote
- from common.commonBusiness import CommonBusiness
- from common.appConfig import GetBusinessFoundationDbUtil
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract
- from algorithmContract.const import *
- from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
- from utils.rdbmsUtil.databaseUtil import DatabaseUtil
- Const_FileURL = "url"
- class OutputProcessor:
- def __init__(self, conf: Contract, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient) -> None:
- self.conf = conf
- self.autoOrManual = 1 if self.conf.dataContract.autoOrManual == 'automatic' else 0
- self.powerFarmID = self.conf.dataContract.dataFilter.powerFarmID
- self.dataBatchNum = self.conf.dataContract.dataFilter.dataBatchNum
- self.logger = logger
- self.dbUtil = dbUtil
- self.minioUtil = minioUtil
- self.common = CommonBusiness()
- def uploadOfMioIO(self, bucketName: str, uploadFiles):
- """
- 上传文件到minio
- """
- bucketName = bucketName.lower()
- if not self.minioUtil.bucket_exists(bucketName):
- self.minioUtil.create_bucket(bucketName)
- self.minioUtil.set_bucket_policy(bucketName)
- # Upload files
- upload_results = self.minioUtil.upload_files(bucketName, uploadFiles)
- def removeLocalFiles(self, powerFarmID: str, dataBatchNum: str):
- directory = f"output/{powerFarmID}/{dataBatchNum}"
- shutil.rmtree(directory)
- def analysisState(self, session: Session, batchNO: str, analysisState: int, errorState: int = ErrorState.NotErr.value, errorCode: str = None, errorInfo: str = None, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8), analysisProgress: float = 0,analysis_finish_time: datetime = None):
- """
- 写处理状态 至主表 analysis_result
- 写总图(多机组一张图表)上传文件 至子表 analysis_general_file
- 写单机组图(单机组一张图表)上传文件 至子表 analysis_diagram_relation
- """
- sql = text(f"INSERT INTO analysis_result(batch_code, analysis_state, err_state, err_code, err_info, create_time, analysis_progress, analysis_finish_time, update_time) \
- VALUES(:batch_code, :analysis_state, :err_state, :err_code, :err_info, :create_time, :analysis_progress, :analysis_finish_time, :update_time) \
- ON DUPLICATE KEY \
- UPDATE \
- analysis_state=VALUES(analysis_state), \
- err_state=VALUES(err_state), \
- err_code=VALUES(err_code), \
- err_info=VALUES(err_info), \
- update_time=VALUES(update_time), \
- analysis_progress=VALUES(analysis_progress), \
- analysis_finish_time=VALUES(analysis_finish_time);")
- params = {
- "batch_code": None if self.common.isNone(batchNO) else batchNO,
- "analysis_state": None if self.common.isNone(analysisState) else analysisState,
- "err_state": None if self.common.isNone(analysisState) else errorState,
- "err_code": None if self.common.isNone(errorCode) else errorCode,
- "err_info": None if self.common.isNone(errorInfo) else errorInfo,
- "create_time": timestamp,
- "update_time": timestamp,
- "analysis_progress": analysisProgress,
- "analysis_finish_time":analysis_finish_time if analysis_finish_time is not None else None
- }
- session.execute(sql, params)
- def analysisResultForTurbine(self, session: Session, returnDataFrame: pd.DataFrame, timestamp: datetime):
- dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] != 'total') & (
- returnDataFrame[Field_Return_IsSaveDatabase])]
- for index, row in dataFrame.iterrows():
- sql = text(f"""
- INSERT INTO analysis_diagram_relation
- (batch_code, field_engine_code, analysis_type_code, file_addr, auto_analysis, create_time)
- VALUES (:batch_code, :field_engine_code, :analysis_type_code, :file_addr, :auto_analysis, :create_time)
- ON DUPLICATE KEY UPDATE
- field_engine_code=VALUES(field_engine_code),
- analysis_type_code=VALUES(analysis_type_code),
- file_addr=VALUES(file_addr),
- auto_analysis=VALUES(auto_analysis);
- """)
- params = {
- "batch_code": row[Field_Return_BatchCode],
- "field_engine_code": row[Field_CodeOfTurbine],
- "analysis_type_code": row[Field_Return_TypeAnalyst],
- "file_addr": row[Const_FileURL],
- "auto_analysis": self.autoOrManual,
- "create_time": timestamp
- }
- session.execute(sql, params)
- def analysisResultForTotal(self, session: Session, returnDataFrame: pd.DataFrame, timestamp: datetime):
- dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] == 'total') & (
- returnDataFrame[Field_Return_IsSaveDatabase])]
- for index, row in dataFrame.iterrows():
- sql = text(f"""
- INSERT INTO analysis_general_file
- (batch_code, analysis_type_code, engine_type_code, file_addr, auto_analysis, create_time)
- VALUES (:batch_code, :analysis_type_code, :engine_type_code, :file_addr, :auto_analysis, :create_time)
- ON DUPLICATE KEY UPDATE
- analysis_type_code=VALUES(analysis_type_code),
- engine_type_code=VALUES(engine_type_code),
- file_addr=VALUES(file_addr),
- auto_analysis=VALUES(auto_analysis);
- """)
- params = {
- "batch_code": row[Field_Return_BatchCode],
- "analysis_type_code": row[Field_Return_TypeAnalyst],
- "engine_type_code": row[Field_MillTypeCode],
- "file_addr": row[Const_FileURL],
- "auto_analysis": self.autoOrManual,
- "create_time": timestamp
- }
- session.execute(sql, params)
- def process(self, powerFarmID: str, dataBatchNum: str, returnDataFrame: pd.DataFrame, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8)):
- try:
- uploadFiles = []
- if not returnDataFrame.empty:
- returnDataFrame[Const_FileURL] = None
- if Field_Return_IsSaveDatabase in returnDataFrame.columns:
- returnDataFrame[Field_Return_IsSaveDatabase].fillna(
- True, inplace=True)
- else:
- returnDataFrame[Field_Return_IsSaveDatabase] = True
- for index, row in returnDataFrame.iterrows():
- directory, fileName = os.path.split(row[Field_Return_FilePath])
- basePath = f"output/{powerFarmID}"
- subPath = os.path.relpath(directory, basePath)
- remoteFilePath = os.path.join(
- subPath, fileName).replace("\\", "/")
- # arr=["http://",self.minioUtil.client_pool.get_ip_address(),"/",powerFarmID.lower(),"/",remoteFilePath]
- arr = [powerFarmID.lower(), "/", remoteFilePath]
- fileURL = "".join(arr)
- returnDataFrame.at[index, Const_FileURL] = quote(fileURL)
- uploadFiles.append(
- (remoteFilePath, row[Field_Return_FilePath]))
- self.uploadOfMioIO(self.powerFarmID, uploadFiles)
- foundationDB = GetBusinessFoundationDbUtil()
- with foundationDB.session_scope() as session:
- # 第一次调用,设置进度为50%
- self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
- ErrorState.NotErr.value, None, None, timestamp, 50)
- self.analysisResultForTotal(
- session, returnDataFrame, timestamp)
- self.analysisResultForTurbine(
- session, returnDataFrame, timestamp)
- # 获取分析完成的时间
- finish_time = datetime.now(timezone.utc) + timedelta(hours=8)
- # 第二次调用:更新分析状态为已完成,进度为100%,并记录完成时间
- self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
- ErrorState.NotErr.value, None, None, timestamp, 100, analysis_finish_time=finish_time)
- self.removeLocalFiles(powerFarmID, dataBatchNum)
- except Exception as e:
- self.logger.error(e)
- raise e
|