outputProcessor.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import os
  2. from datetime import datetime, timezone, timedelta
  3. from logging import Logger
  4. import pandas as pd
  5. from sqlalchemy.orm import Session
  6. from sqlalchemy.sql import text
  7. import shutil
  8. from urllib.parse import quote, unquote
  9. from common.commonBusiness import CommonBusiness
  10. from common.appConfig import GetBusinessFoundationDbUtil
  11. from algorithmContract.confBusiness import *
  12. from algorithmContract.contract import Contract
  13. from algorithmContract.const import *
  14. from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
  15. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  16. Const_FileURL = "url"
  17. class OutputProcessor:
  18. def __init__(self, conf: Contract, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient) -> None:
  19. self.conf = conf
  20. self.autoOrManual = 1 if self.conf.dataContract.autoOrManual == 'automatic' else 0
  21. self.powerFarmID = self.conf.dataContract.dataFilter.powerFarmID
  22. self.dataBatchNum = self.conf.dataContract.dataFilter.dataBatchNum
  23. self.logger = logger
  24. self.dbUtil = dbUtil
  25. self.minioUtil = minioUtil
  26. self.common = CommonBusiness()
  27. def uploadOfMioIO(self, bucketName: str, uploadFiles):
  28. """
  29. 上传文件到minio
  30. """
  31. bucketName = bucketName.lower()
  32. if not self.minioUtil.bucket_exists(bucketName):
  33. self.minioUtil.create_bucket(bucketName)
  34. self.minioUtil.set_bucket_policy(bucketName)
  35. # Upload files
  36. upload_results = self.minioUtil.upload_files(bucketName, uploadFiles)
  37. def removeLocalFiles(self, powerFarmID: str, dataBatchNum: str):
  38. directory = f"output/{powerFarmID}/{dataBatchNum}"
  39. shutil.rmtree(directory)
  40. def analysisState(self, session: Session, batchNO: str, analysisState: int, errorState: int = ErrorState.NotErr.value, errorCode: str = None, errorInfo: str = None, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8), analysisProgress: float = 0):
  41. """
  42. 写处理状态 至主表 analysis_result
  43. 写总图(多机组一张图表)上传文件 至子表 analysis_general_file
  44. 写单机组图(单机组一张图表)上传文件 至子表 analysis_diagram_relation
  45. """
  46. sql = text(f"INSERT INTO analysis_result(batch_code, analysis_state, err_state, err_code, err_info, create_time,analysis_progress) \
  47. VALUES(:batch_code, :analysis_state, :err_state,:err_code, :err_info, :create_time,:analysis_progress) \
  48. ON DUPLICATE KEY \
  49. UPDATE \
  50. analysis_state=VALUES(analysis_state),err_state=VALUES(err_state),err_code=VALUES(err_code), \
  51. err_info=VALUES(err_info),update_time=VALUES(update_time),analysis_progress=VALUES(analysis_progress);")
  52. params = {
  53. "batch_code": None if self.common.isNone(batchNO) else batchNO,
  54. "analysis_state": None if self.common.isNone(analysisState) else analysisState,
  55. "err_state": None if self.common.isNone(analysisState) else errorState,
  56. "err_code": None if self.common.isNone(errorCode) else errorCode,
  57. "err_info": None if self.common.isNone(errorInfo) else errorInfo,
  58. "create_time": timestamp,
  59. "update_time": timestamp,
  60. "analysis_progress": analysisProgress
  61. }
  62. session.execute(sql, params)
  63. def analysisResultForTurbine(self, session: Session, returnDataFrame: pd.DataFrame, timestamp: datetime):
  64. dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] != 'total') & (
  65. returnDataFrame[Field_Return_IsSaveDatabase])]
  66. for index, row in dataFrame.iterrows():
  67. sql = text(f"""
  68. INSERT INTO analysis_diagram_relation
  69. (batch_code, field_engine_code, analysis_type_code, file_addr, auto_analysis, create_time)
  70. VALUES (:batch_code, :field_engine_code, :analysis_type_code, :file_addr, :auto_analysis, :create_time)
  71. ON DUPLICATE KEY UPDATE
  72. field_engine_code=VALUES(field_engine_code),
  73. analysis_type_code=VALUES(analysis_type_code),
  74. file_addr=VALUES(file_addr),
  75. auto_analysis=VALUES(auto_analysis);
  76. """)
  77. params = {
  78. "batch_code": row[Field_Return_BatchCode],
  79. "field_engine_code": row[Field_CodeOfTurbine],
  80. "analysis_type_code": row[Field_Return_TypeAnalyst],
  81. "file_addr": row[Const_FileURL],
  82. "auto_analysis": self.autoOrManual,
  83. "create_time": timestamp
  84. }
  85. session.execute(sql, params)
  86. def analysisResultForTotal(self, session: Session, returnDataFrame: pd.DataFrame, timestamp: datetime):
  87. dataFrame = returnDataFrame[(returnDataFrame[Field_CodeOfTurbine] == 'total') & (
  88. returnDataFrame[Field_Return_IsSaveDatabase])]
  89. for index, row in dataFrame.iterrows():
  90. sql = text(f"""
  91. INSERT INTO analysis_general_file
  92. (batch_code, analysis_type_code, file_addr, auto_analysis, create_time)
  93. VALUES (:batch_code, :analysis_type_code, :file_addr, :auto_analysis, :create_time)
  94. ON DUPLICATE KEY UPDATE
  95. analysis_type_code=VALUES(analysis_type_code),
  96. file_addr=VALUES(file_addr),
  97. auto_analysis=VALUES(auto_analysis);
  98. """)
  99. params = {
  100. "batch_code": row[Field_Return_BatchCode],
  101. "analysis_type_code": row[Field_Return_TypeAnalyst],
  102. "file_addr": row[Const_FileURL],
  103. "auto_analysis": self.autoOrManual,
  104. "create_time": timestamp
  105. }
  106. session.execute(sql, params)
  107. def process(self, powerFarmID: str, dataBatchNum: str, returnDataFrame: pd.DataFrame, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8)):
  108. try:
  109. uploadFiles = []
  110. if not returnDataFrame.empty:
  111. returnDataFrame[Const_FileURL] = None
  112. if Field_Return_IsSaveDatabase in returnDataFrame.columns:
  113. returnDataFrame[Field_Return_IsSaveDatabase].fillna(
  114. True, inplace=True)
  115. else:
  116. returnDataFrame[Field_Return_IsSaveDatabase] = True
  117. for index, row in returnDataFrame.iterrows():
  118. directory, fileName = os.path.split(row[Field_Return_FilePath])
  119. basePath = f"output/{powerFarmID}"
  120. subPath = os.path.relpath(directory, basePath)
  121. remoteFilePath = os.path.join(
  122. subPath, fileName).replace("\\", "/")
  123. # arr=["http://",self.minioUtil.client_pool.get_ip_address(),"/",powerFarmID.lower(),"/",remoteFilePath]
  124. arr = [powerFarmID.lower(), "/", remoteFilePath]
  125. fileURL = "".join(arr)
  126. returnDataFrame.at[index, Const_FileURL] = quote(fileURL)
  127. uploadFiles.append(
  128. (remoteFilePath, row[Field_Return_FilePath]))
  129. self.uploadOfMioIO(self.powerFarmID, uploadFiles)
  130. foundationDB = GetBusinessFoundationDbUtil()
  131. with foundationDB.session_scope() as session:
  132. self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
  133. ErrorState.NotErr.value, None, None, timestamp, 100)
  134. self.analysisResultForTotal(
  135. session, returnDataFrame, timestamp)
  136. self.analysisResultForTurbine(
  137. session, returnDataFrame, timestamp)
  138. self.removeLocalFiles(powerFarmID, dataBatchNum)
  139. except Exception as e:
  140. self.logger.error(e)
  141. raise e