outputProcessor.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import os
  2. from datetime import datetime, timezone, timedelta
  3. from logging import Logger
  4. import pandas as pd
  5. from sqlalchemy.orm import Session
  6. from sqlalchemy.sql import text
  7. import shutil
  8. from urllib.parse import quote, unquote
  9. from common.commonBusiness import CommonBusiness
  10. from common.appConfig import GetBusinessFoundationDbUtil
  11. from algorithmContract.confBusiness import *
  12. from algorithmContract.contract import Contract
  13. from algorithmContract.const import *
  14. from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
  15. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  16. Const_FileURL = "url"
  17. class OutputProcessor:
  18. def __init__(self, conf: Contract, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient) -> None:
  19. self.conf = conf
  20. self.autoOrManual = 1 if self.conf.dataContract.autoOrManual == 'automatic' else 0
  21. self.powerFarmID = self.conf.dataContract.dataFilter.powerFarmID
  22. self.dataBatchNum = self.conf.dataContract.dataFilter.dataBatchNum
  23. self.logger = logger
  24. self.dbUtil = dbUtil
  25. self.minioUtil = minioUtil
  26. self.common = CommonBusiness()
  27. def uploadOfMioIO(self, bucketName: str, uploadFiles):
  28. """
  29. 上传文件到minio
  30. """
  31. bucketName = bucketName.lower()
  32. if not self.minioUtil.bucket_exists(bucketName):
  33. self.minioUtil.create_bucket(bucketName)
  34. self.minioUtil.set_bucket_policy(bucketName)
  35. # Upload files
  36. upload_results = self.minioUtil.upload_files(bucketName, uploadFiles)
  37. def removeLocalFiles(self, powerFarmID: str, dataBatchNum: str):
  38. directory = f"output/{powerFarmID}/{dataBatchNum}"
  39. shutil.rmtree(directory)
  40. def analysisState(self, session: Session, batchNO: str, analysisState: int, errorState: int = ErrorState.NotErr.value, errorCode: str = None, errorInfo: str = None, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8), analysisProgress: float = 0,analysis_finish_time: datetime = None):
  41. """
  42. 写处理状态 至主表 analysis_result
  43. 写总图(多机组一张图表)上传文件 至子表 analysis_general_file
  44. 写单机组图(单机组一张图表)上传文件 至子表 analysis_diagram_relation
  45. """
  46. sql = text(f"INSERT INTO analysis_result(batch_code, analysis_state, err_state, err_code, err_info, create_time, analysis_progress, analysis_finish_time, update_time) \
  47. VALUES(:batch_code, :analysis_state, :err_state, :err_code, :err_info, :create_time, :analysis_progress, :analysis_finish_time, :update_time) \
  48. ON DUPLICATE KEY \
  49. UPDATE \
  50. analysis_state=VALUES(analysis_state), \
  51. err_state=VALUES(err_state), \
  52. err_code=VALUES(err_code), \
  53. err_info=VALUES(err_info), \
  54. update_time=VALUES(update_time), \
  55. analysis_progress=VALUES(analysis_progress), \
  56. analysis_finish_time=VALUES(analysis_finish_time);")
  57. params = {
  58. "batch_code": None if self.common.isNone(batchNO) else batchNO,
  59. "analysis_state": None if self.common.isNone(analysisState) else analysisState,
  60. "err_state": None if self.common.isNone(analysisState) else errorState,
  61. "err_code": None if self.common.isNone(errorCode) else errorCode,
  62. "err_info": None if self.common.isNone(errorInfo) else errorInfo,
  63. "create_time": timestamp,
  64. "update_time": timestamp,
  65. "analysis_progress": analysisProgress,
  66. "analysis_finish_time":analysis_finish_time if analysis_finish_time is not None else None
  67. }
  68. session.execute(sql, params)
  69. def analysisResultForTurbine(self, session: Session, returnDataFrame: pd.DataFrame, timestamp: datetime):
  70. dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] != 'total') & (
  71. returnDataFrame[Field_Return_IsSaveDatabase])]
  72. for index, row in dataFrame.iterrows():
  73. sql = text(f"""
  74. INSERT INTO analysis_diagram_relation
  75. (batch_code, field_engine_code, analysis_type_code, file_addr, auto_analysis, create_time)
  76. VALUES (:batch_code, :field_engine_code, :analysis_type_code, :file_addr, :auto_analysis, :create_time)
  77. ON DUPLICATE KEY UPDATE
  78. field_engine_code=VALUES(field_engine_code),
  79. analysis_type_code=VALUES(analysis_type_code),
  80. file_addr=VALUES(file_addr),
  81. auto_analysis=VALUES(auto_analysis);
  82. """)
  83. params = {
  84. "batch_code": row[Field_Return_BatchCode],
  85. "field_engine_code": row[Field_CodeOfTurbine],
  86. "analysis_type_code": row[Field_Return_TypeAnalyst],
  87. "file_addr": row[Const_FileURL],
  88. "auto_analysis": self.autoOrManual,
  89. "create_time": timestamp
  90. }
  91. session.execute(sql, params)
  92. def analysisResultForTotal(self, session: Session, returnDataFrame: pd.DataFrame, timestamp: datetime):
  93. dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] == 'total') & (
  94. returnDataFrame[Field_Return_IsSaveDatabase])]
  95. for index, row in dataFrame.iterrows():
  96. sql = text(f"""
  97. INSERT INTO analysis_general_file
  98. (batch_code, analysis_type_code, engine_type_code, file_addr, auto_analysis, create_time)
  99. VALUES (:batch_code, :analysis_type_code, :engine_type_code, :file_addr, :auto_analysis, :create_time)
  100. ON DUPLICATE KEY UPDATE
  101. analysis_type_code=VALUES(analysis_type_code),
  102. engine_type_code=VALUES(engine_type_code),
  103. file_addr=VALUES(file_addr),
  104. auto_analysis=VALUES(auto_analysis);
  105. """)
  106. params = {
  107. "batch_code": row[Field_Return_BatchCode],
  108. "analysis_type_code": row[Field_Return_TypeAnalyst],
  109. "engine_type_code": row[Field_MillTypeCode],
  110. "file_addr": row[Const_FileURL],
  111. "auto_analysis": self.autoOrManual,
  112. "create_time": timestamp
  113. }
  114. session.execute(sql, params)
  115. def process(self, powerFarmID: str, dataBatchNum: str, returnDataFrame: pd.DataFrame, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8)):
  116. try:
  117. uploadFiles = []
  118. if not returnDataFrame.empty:
  119. returnDataFrame[Const_FileURL] = None
  120. if Field_Return_IsSaveDatabase in returnDataFrame.columns:
  121. returnDataFrame[Field_Return_IsSaveDatabase].fillna(
  122. True, inplace=True)
  123. else:
  124. returnDataFrame[Field_Return_IsSaveDatabase] = True
  125. for index, row in returnDataFrame.iterrows():
  126. directory, fileName = os.path.split(row[Field_Return_FilePath])
  127. basePath = f"output/{powerFarmID}"
  128. subPath = os.path.relpath(directory, basePath)
  129. remoteFilePath = os.path.join(
  130. subPath, fileName).replace("\\", "/")
  131. # arr=["http://",self.minioUtil.client_pool.get_ip_address(),"/",powerFarmID.lower(),"/",remoteFilePath]
  132. arr = [powerFarmID.lower(), "/", remoteFilePath]
  133. fileURL = "".join(arr)
  134. returnDataFrame.at[index, Const_FileURL] = quote(fileURL)
  135. uploadFiles.append(
  136. (remoteFilePath, row[Field_Return_FilePath]))
  137. self.uploadOfMioIO(self.powerFarmID, uploadFiles)
  138. foundationDB = GetBusinessFoundationDbUtil()
  139. with foundationDB.session_scope() as session:
  140. # 第一次调用,设置进度为50%
  141. self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
  142. ErrorState.NotErr.value, None, None, timestamp, 50)
  143. self.analysisResultForTotal(
  144. session, returnDataFrame, timestamp)
  145. self.analysisResultForTurbine(
  146. session, returnDataFrame, timestamp)
  147. # 获取分析完成的时间
  148. finish_time = datetime.now(timezone.utc) + timedelta(hours=8)
  149. # 第二次调用:更新分析状态为已完成,进度为100%,并记录完成时间
  150. self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
  151. ErrorState.NotErr.value, None, None, timestamp, 100, analysis_finish_time=finish_time)
  152. self.removeLocalFiles(powerFarmID, dataBatchNum)
  153. except Exception as e:
  154. self.logger.error(e)
  155. raise e