serviceOfDataAnalysis.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import importlib
  2. import pandas as pd
  3. from logging import Logger
  4. from utils.jsonUtil import JsonUtil
  5. from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
  6. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  7. from algorithmContract.confBusiness import *
  8. from algorithmContract.contract import Contract,LoadAnalysisInput, Analysis
  9. from algorithmContract.const import *
  10. from behavior.baseAnalyst import BaseAnalyst
  11. from behavior.dalAnalyst import DALAnalyst
  12. from algorithm.dataProcessor import DataProcessor
  13. from common.commonBusiness import CommonBusiness
  14. from common.appConfig import AppConfig
  15. from sqlalchemy.orm import Session
  16. from sqlalchemy.sql import text
  17. def buildDynamicInstance(modulePath, className, conf: Contract,logger:Logger,dbUtil:dict[str,DatabaseUtil],minioUtil:ThreadSafeMinioClient,
  18. powerFarmInfo:pd.DataFrame,turbineInfo:pd.DataFrame,
  19. turbineModelInfo:pd.DataFrame,dataTransfer:pd.DataFrame,
  20. weatherStationInfo:pd.DataFrame,dataFrameContractOfTurbine:pd.DataFrame,
  21. ):
  22. # 动态导入模块
  23. module = importlib.import_module(modulePath)
  24. # 获取类
  25. cls = getattr(module, className)
  26. # 创建类实例
  27. instance:BaseAnalyst = cls(logger,dbUtil,minioUtil,conf,powerFarmInfo,turbineInfo,turbineModelInfo,dataTransfer,weatherStationInfo,dataFrameContractOfTurbine)
  28. instance.powerFarmInfo=powerFarmInfo
  29. instance.turbineInfo=turbineInfo
  30. instance.turbineModelInfo=turbineModelInfo
  31. # instance.dataTransfer=dataTransfer
  32. instance.weatherStationInfo=weatherStationInfo
  33. instance.dataFrameContractOfTurbine=dataFrameContractOfTurbine
  34. return instance
  35. def dynamic_instance_and_call(module_path, class_name, method_name, *args, **kwargs):
  36. # 动态导入模块
  37. module = importlib.import_module(module_path)
  38. # 获取类
  39. cls = getattr(module, class_name)
  40. # 创建类实例
  41. instance = cls()
  42. # 获取方法
  43. method = getattr(instance, method_name)
  44. # 调用方法
  45. method(*args, **kwargs)
  46. class ServiceOfDataAnalysis:
  47. def __init__(self, logger:Logger,dbUtil:dict,minioUtil:ThreadSafeMinioClient,conf:Contract):
  48. self.logger = logger
  49. self.dbUtil=dbUtil
  50. self.minioUtil=minioUtil
  51. self.conf=conf
  52. self.common = CommonBusiness()
  53. dal = DALAnalyst(logger, dbUtil)
  54. # 加载所有新能源场站信息
  55. self.powerFarmInfo:pd.DataFrame = dal.loadPowerFarmInfos(
  56. conf.dataContract.dataFilter.powerFarmID)
  57. if self.powerFarmInfo.empty:
  58. raise CustomError(104)
  59. # 加载所有风电机组信息
  60. self.turbineInfo:pd.DataFrame= dal.loadTurbineInfos(
  61. conf.dataContract.dataFilter.powerFarmID)
  62. if self.turbineInfo.empty:
  63. raise CustomError(105)
  64. # 加载数据转换信息
  65. # self.dataTransfer:pd.DataFrame=dal.loadDataTransfer(conf.dataContract.dataFilter.powerFarmID,conf.dataContract.dataFilter.dataBatchNum)
  66. # if self.dataTransfer.empty:
  67. # raise CustomError(106)
  68. self.dataTransfer:pd.DataFrame=None
  69. # 加载机型信息
  70. self.turbineModelInfo:pd.DataFrame= dal.loadTurbineModelInfos(
  71. self.turbineInfo[Field_MillTypeCode].unique().tolist())
  72. if self.turbineModelInfo.empty:
  73. raise CustomError(107)
  74. # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
  75. self.dataFrameContractOfTurbine:pd.DataFrame = dal.processContractData(
  76. self.common, conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo[Field_AirDensity].iloc[0], self.turbineModelInfo)
  77. if self.dataFrameContractOfTurbine.empty:
  78. raise CustomError(108)
  79. # 加载所有测风塔信息
  80. self.weatherStationInfo:pd.DataFrame = dal.loadWeatherStationInfos(
  81. conf.dataContract.dataFilter.powerFarmID)
  82. def loadJson(self, filePath):
  83. # 打开并读取JSON文件
  84. with open(filePath, 'r') as f:
  85. data = JsonUtil.read_json(filePath)
  86. return data
  87. def executeAnalysis(self):
  88. analysts = []
  89. for dynamicAnalyst in self.conf.dataContract.configAnalysis:
  90. package = dynamicAnalyst.package
  91. className = dynamicAnalyst.className
  92. methodName = dynamicAnalyst.methodName
  93. analyst = buildDynamicInstance(package, className, self.conf,self.logger,self.dbUtil,self.minioUtil,
  94. self.powerFarmInfo,self.turbineInfo,
  95. self.turbineModelInfo,self.dataTransfer,
  96. self.weatherStationInfo,self.dataFrameContractOfTurbine)
  97. analysts.append(analyst)
  98. process = DataProcessor(logger=self.logger,dbUtil=self.dbUtil,minioUtil=self.minioUtil,conf=self.conf)
  99. # 加载所有新能源场站信息
  100. process.powerFarmInfo = self.powerFarmInfo
  101. # 加载所有风电机组信息
  102. process.turbineInfo= self.turbineInfo
  103. # # 加载机型信息
  104. process.turbineModelInfo= self.turbineModelInfo
  105. # 加载所有测风塔信息
  106. process.weatherStationInfo = self.weatherStationInfo
  107. # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
  108. process.dataFrameContractOfTurbine = self.dataFrameContractOfTurbine
  109. # process.dataTransfer=self.dataTransfer
  110. for analyst in analysts:
  111. process.attach(analyst)
  112. process.executeAnalysis(self.conf)
  113. for analyst in analysts:
  114. process.detach(analyst)
  115. def analysis(self):
  116. self.logger.info("analysis executing....")
  117. self.executeAnalysis()
  118. # 使用多线程时,matplotlib绘图报错
  119. # 在使用Python语言的matplotlib包时,如果尝试在多线程环境中创建图形界面,你可能会遇到“QWidget-: Must construct a QApplication before a QWidget”的错误。
  120. # 这个错误通常是因为matplotlib的图形后端(backend)依赖于Qt框架,而Qt框架要求在一个QApplication实例被创建之后再创建任何QWidget对象。
  121. # 在多线程环境中,每个线程都应该有一个它自己的事件循环。然而,QApplication实例通常是全局的,并且应该只在主线程中创建一次。
  122. # 如果你尝试在一个非主线程中创建QApplication或者QWidget,就会遇到这个问题。
  123. # with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
  124. # futures=[executor.submit(executeAnalysis, config) for config in configs]
  125. # for future in concurrent.futures.as_completed(futures):
  126. # try:
  127. # result = future.result()
  128. # except Exception as exc:
  129. # print(f'生成异常: {exc}')
  130. # else:
  131. # print(f'任务返回结果: {result}')