import importlib import pandas as pd from logging import Logger from utils.jsonUtil import JsonUtil from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient from utils.rdbmsUtil.databaseUtil import DatabaseUtil from algorithmContract.confBusiness import * from algorithmContract.contract import Contract,LoadAnalysisInput, Analysis from algorithmContract.const import * from behavior.baseAnalyst import BaseAnalyst from behavior.dalAnalyst import DALAnalyst from algorithm.dataProcessor import DataProcessor from common.commonBusiness import CommonBusiness from common.appConfig import AppConfig from sqlalchemy.orm import Session from sqlalchemy.sql import text def buildDynamicInstance(modulePath, className, conf: Contract,logger:Logger,dbUtil:dict[str,DatabaseUtil],minioUtil:ThreadSafeMinioClient, powerFarmInfo:pd.DataFrame,turbineInfo:pd.DataFrame, turbineModelInfo:pd.DataFrame,dataTransfer:pd.DataFrame, weatherStationInfo:pd.DataFrame,dataFrameContractOfTurbine:pd.DataFrame, ): # 动态导入模块 module = importlib.import_module(modulePath) # 获取类 cls = getattr(module, className) # 创建类实例 instance:BaseAnalyst = cls(logger,dbUtil,minioUtil,conf,powerFarmInfo,turbineInfo,turbineModelInfo,dataTransfer,weatherStationInfo,dataFrameContractOfTurbine) instance.powerFarmInfo=powerFarmInfo instance.turbineInfo=turbineInfo instance.turbineModelInfo=turbineModelInfo # instance.dataTransfer=dataTransfer instance.weatherStationInfo=weatherStationInfo instance.dataFrameContractOfTurbine=dataFrameContractOfTurbine return instance def dynamic_instance_and_call(module_path, class_name, method_name, *args, **kwargs): # 动态导入模块 module = importlib.import_module(module_path) # 获取类 cls = getattr(module, class_name) # 创建类实例 instance = cls() # 获取方法 method = getattr(instance, method_name) # 调用方法 method(*args, **kwargs) class ServiceOfDataAnalysis: def __init__(self, logger:Logger,dbUtil:dict,minioUtil:ThreadSafeMinioClient,conf:Contract): self.logger = logger self.dbUtil=dbUtil self.minioUtil=minioUtil self.conf=conf self.common = CommonBusiness() dal = DALAnalyst(logger, dbUtil) # 加载所有新能源场站信息 self.powerFarmInfo:pd.DataFrame = dal.loadPowerFarmInfos( conf.dataContract.dataFilter.powerFarmID) if self.powerFarmInfo.empty: raise CustomError(104) # 加载所有风电机组信息 self.turbineInfo:pd.DataFrame= dal.loadTurbineInfos( conf.dataContract.dataFilter.powerFarmID) if self.turbineInfo.empty: raise CustomError(105) # 加载数据转换信息 # self.dataTransfer:pd.DataFrame=dal.loadDataTransfer(conf.dataContract.dataFilter.powerFarmID,conf.dataContract.dataFilter.dataBatchNum) # if self.dataTransfer.empty: # raise CustomError(106) self.dataTransfer:pd.DataFrame=None # 加载机型信息 self.turbineModelInfo:pd.DataFrame= dal.loadTurbineModelInfos( self.turbineInfo[Field_MillTypeCode].unique().tolist()) if self.turbineModelInfo.empty: raise CustomError(107) # 加载所有新能源场站,及所属风电机组机型的合同功率曲线 self.dataFrameContractOfTurbine:pd.DataFrame = dal.processContractData( self.common, conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo[Field_AirDensity].iloc[0], self.turbineModelInfo) if self.dataFrameContractOfTurbine.empty: raise CustomError(108) # 加载所有测风塔信息 self.weatherStationInfo:pd.DataFrame = dal.loadWeatherStationInfos( conf.dataContract.dataFilter.powerFarmID) def loadJson(self, filePath): # 打开并读取JSON文件 with open(filePath, 'r') as f: data = JsonUtil.read_json(filePath) return data def executeAnalysis(self): analysts = [] for dynamicAnalyst in self.conf.dataContract.configAnalysis: package = dynamicAnalyst.package className = dynamicAnalyst.className methodName = dynamicAnalyst.methodName analyst = buildDynamicInstance(package, className, self.conf,self.logger,self.dbUtil,self.minioUtil, self.powerFarmInfo,self.turbineInfo, self.turbineModelInfo,self.dataTransfer, self.weatherStationInfo,self.dataFrameContractOfTurbine) analysts.append(analyst) process = DataProcessor(logger=self.logger,dbUtil=self.dbUtil,minioUtil=self.minioUtil,conf=self.conf) # 加载所有新能源场站信息 process.powerFarmInfo = self.powerFarmInfo # 加载所有风电机组信息 process.turbineInfo= self.turbineInfo # # 加载机型信息 process.turbineModelInfo= self.turbineModelInfo # 加载所有测风塔信息 process.weatherStationInfo = self.weatherStationInfo # 加载所有新能源场站,及所属风电机组机型的合同功率曲线 process.dataFrameContractOfTurbine = self.dataFrameContractOfTurbine # process.dataTransfer=self.dataTransfer for analyst in analysts: process.attach(analyst) process.executeAnalysis(self.conf) for analyst in analysts: process.detach(analyst) def analysis(self): self.logger.info("analysis executing....") self.executeAnalysis() # 使用多线程时,matplotlib绘图报错 # 在使用Python语言的matplotlib包时,如果尝试在多线程环境中创建图形界面,你可能会遇到“QWidget-: Must construct a QApplication before a QWidget”的错误。 # 这个错误通常是因为matplotlib的图形后端(backend)依赖于Qt框架,而Qt框架要求在一个QApplication实例被创建之后再创建任何QWidget对象。 # 在多线程环境中,每个线程都应该有一个它自己的事件循环。然而,QApplication实例通常是全局的,并且应该只在主线程中创建一次。 # 如果你尝试在一个非主线程中创建QApplication或者QWidget,就会遇到这个问题。 # with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # futures=[executor.submit(executeAnalysis, config) for config in configs] # for future in concurrent.futures.as_completed(futures): # try: # result = future.result() # except Exception as exc: # print(f'生成异常: {exc}') # else: # print(f'任务返回结果: {result}')