serviceOfDataAnalysis.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import sys
  2. import pandas as pd
  3. from logging import Logger
  4. from utils.jsonUtil import JsonUtil
  5. from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
  6. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  7. from algorithmContract.confBusiness import *
  8. from algorithmContract.contract import Contract,LoadAnalysisInput, Analysis
  9. from algorithmContract.const import *
  10. from behavior.baseAnalyst import BaseAnalyst
  11. from behavior.dalAnalyst import DALAnalyst
  12. from pathlib import Path
  13. # 获取项目根目录(WTOAAM文件夹)
  14. BASE_DIR = str(Path(__file__).resolve().parent.parent.parent.parent) # 向上四级到项目根目录
  15. if BASE_DIR not in sys.path:
  16. sys.path.append(BASE_DIR)
  17. from dataAnalysisBusiness.algorithm.dataProcessor import DataProcessor
  18. from common.commonBusiness import CommonBusiness
  19. from common.appConfig import AppConfig
  20. from sqlalchemy.orm import Session
  21. from sqlalchemy.sql import text
  22. def buildDynamicInstance(modulePath, className, conf: Contract,logger:Logger,dbUtil:dict[str,DatabaseUtil],minioUtil:ThreadSafeMinioClient,
  23. powerFarmInfo:pd.DataFrame,turbineInfo:pd.DataFrame,
  24. turbineModelInfo:pd.DataFrame,dataTransfer:pd.DataFrame,
  25. weatherStationInfo:pd.DataFrame,dataFrameContractOfTurbine:pd.DataFrame,
  26. ):
  27. # 动态导入模块
  28. module = importlib.import_module(modulePath)
  29. # 获取类
  30. cls = getattr(module, className)
  31. # 创建类实例
  32. instance:BaseAnalyst = cls(logger,dbUtil,minioUtil,conf,powerFarmInfo,turbineInfo,turbineModelInfo,dataTransfer,weatherStationInfo,dataFrameContractOfTurbine)
  33. instance.powerFarmInfo=powerFarmInfo
  34. instance.turbineInfo=turbineInfo
  35. instance.turbineModelInfo=turbineModelInfo
  36. # instance.dataTransfer=dataTransfer
  37. instance.weatherStationInfo=weatherStationInfo
  38. instance.dataFrameContractOfTurbine=dataFrameContractOfTurbine
  39. return instance
  40. def dynamic_instance_and_call(module_path, class_name, method_name, *args, **kwargs):
  41. # 动态导入模块
  42. module = importlib.import_module(module_path)
  43. # 获取类
  44. cls = getattr(module, class_name)
  45. # 创建类实例
  46. instance = cls()
  47. # 获取方法
  48. method = getattr(instance, method_name)
  49. # 调用方法
  50. method(*args, **kwargs)
  51. class ServiceOfDataAnalysis:
  52. def __init__(self, logger:Logger,dbUtil:dict,minioUtil:ThreadSafeMinioClient,conf:Contract):
  53. self.logger = logger
  54. self.dbUtil=dbUtil
  55. self.minioUtil=minioUtil
  56. self.conf=conf
  57. self.common = CommonBusiness()
  58. dal = DALAnalyst(logger, dbUtil)
  59. # 加载所有新能源场站信息
  60. self.powerFarmInfo:pd.DataFrame = dal.loadPowerFarmInfos(
  61. conf.dataContract.dataFilter.powerFarmID)
  62. if self.powerFarmInfo.empty:
  63. raise CustomError(104)
  64. # 加载所有风电机组信息
  65. self.turbineInfo:pd.DataFrame= dal.loadTurbineInfos(
  66. conf.dataContract.dataFilter.powerFarmID)
  67. if self.turbineInfo.empty:
  68. raise CustomError(105)
  69. # 加载数据转换信息
  70. # self.dataTransfer:pd.DataFrame=dal.loadDataTransfer(conf.dataContract.dataFilter.powerFarmID,conf.dataContract.dataFilter.dataBatchNum)
  71. # if self.dataTransfer.empty:
  72. # raise CustomError(106)
  73. self.dataTransfer:pd.DataFrame=None
  74. # 加载机型信息
  75. self.turbineModelInfo:pd.DataFrame= dal.loadTurbineModelInfos(
  76. self.turbineInfo[Field_MillTypeCode].unique().tolist())
  77. if self.turbineModelInfo.empty:
  78. raise CustomError(107)
  79. # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
  80. self.dataFrameContractOfTurbine:pd.DataFrame = dal.processContractData(
  81. self.common, conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo[Field_AirDensity].iloc[0], self.turbineModelInfo)
  82. if self.dataFrameContractOfTurbine.empty:
  83. raise CustomError(108)
  84. # 加载所有测风塔信息
  85. self.weatherStationInfo:pd.DataFrame = dal.loadWeatherStationInfos(
  86. conf.dataContract.dataFilter.powerFarmID)
  87. def loadJson(self, filePath):
  88. # 打开并读取JSON文件
  89. with open(filePath, 'r') as f:
  90. data = JsonUtil.read_json(filePath)
  91. return data
  92. def executeAnalysis(self):
  93. analysts = []
  94. for dynamicAnalyst in self.conf.dataContract.configAnalysis:
  95. package = dynamicAnalyst.package
  96. className = dynamicAnalyst.className
  97. methodName = dynamicAnalyst.methodName
  98. analyst = buildDynamicInstance(package, className, self.conf,self.logger,self.dbUtil,self.minioUtil,
  99. self.powerFarmInfo,self.turbineInfo,
  100. self.turbineModelInfo,self.dataTransfer,
  101. self.weatherStationInfo,self.dataFrameContractOfTurbine)
  102. analysts.append(analyst)
  103. process = DataProcessor(logger=self.logger,dbUtil=self.dbUtil,minioUtil=self.minioUtil,conf=self.conf)
  104. # 加载所有新能源场站信息
  105. process.powerFarmInfo = self.powerFarmInfo
  106. # 加载所有风电机组信息
  107. process.turbineInfo= self.turbineInfo
  108. # # 加载机型信息
  109. process.turbineModelInfo= self.turbineModelInfo
  110. # 加载所有测风塔信息
  111. process.weatherStationInfo = self.weatherStationInfo
  112. # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
  113. process.dataFrameContractOfTurbine = self.dataFrameContractOfTurbine
  114. # process.dataTransfer=self.dataTransfer
  115. for analyst in analysts:
  116. process.attach(analyst)
  117. process.executeAnalysis(self.conf)
  118. for analyst in analysts:
  119. process.detach(analyst)
  120. def analysis(self):
  121. self.logger.info("analysis executing....")
  122. self.executeAnalysis()
  123. # 使用多线程时,matplotlib绘图报错
  124. # 在使用Python语言的matplotlib包时,如果尝试在多线程环境中创建图形界面,你可能会遇到“QWidget-: Must construct a QApplication before a QWidget”的错误。
  125. # 这个错误通常是因为matplotlib的图形后端(backend)依赖于Qt框架,而Qt框架要求在一个QApplication实例被创建之后再创建任何QWidget对象。
  126. # 在多线程环境中,每个线程都应该有一个它自己的事件循环。然而,QApplication实例通常是全局的,并且应该只在主线程中创建一次。
  127. # 如果你尝试在一个非主线程中创建QApplication或者QWidget,就会遇到这个问题。
  128. # with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  129. # futures=[executor.submit(executeAnalysis, config) for config in configs]
  130. # for future in concurrent.futures.as_completed(futures):
  131. # try:
  132. # result = future.result()
  133. # except Exception as exc:
  134. # print(f'生成异常: {exc}')
  135. # else:
  136. # print(f'任务返回结果: {result}')