import sys import pandas as pd from logging import Logger from utils.jsonUtil import JsonUtil from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient from utils.rdbmsUtil.databaseUtil import DatabaseUtil from algorithmContract.confBusiness import * from algorithmContract.contract import Contract,LoadAnalysisInput, Analysis from algorithmContract.const import * from behavior.baseAnalyst import BaseAnalyst from behavior.dalAnalyst import DALAnalyst from pathlib import Path # 获取项目根目录(WTOAAM文件夹) BASE_DIR = str(Path(__file__).resolve().parent.parent.parent.parent) # 向上四级到项目根目录 if BASE_DIR not in sys.path: sys.path.append(BASE_DIR) from dataAnalysisBusiness.algorithm.dataProcessor import DataProcessor from common.commonBusiness import CommonBusiness from common.appConfig import AppConfig from sqlalchemy.orm import Session from sqlalchemy.sql import text def buildDynamicInstance(modulePath, className, conf: Contract,logger:Logger,dbUtil:dict[str,DatabaseUtil],minioUtil:ThreadSafeMinioClient, powerFarmInfo:pd.DataFrame,turbineInfo:pd.DataFrame, turbineModelInfo:pd.DataFrame,dataTransfer:pd.DataFrame, weatherStationInfo:pd.DataFrame,dataFrameContractOfTurbine:pd.DataFrame, ): # 动态导入模块 module = importlib.import_module(modulePath) # 获取类 cls = getattr(module, className) # 创建类实例 instance:BaseAnalyst = cls(logger,dbUtil,minioUtil,conf,powerFarmInfo,turbineInfo,turbineModelInfo,dataTransfer,weatherStationInfo,dataFrameContractOfTurbine) instance.powerFarmInfo=powerFarmInfo instance.turbineInfo=turbineInfo instance.turbineModelInfo=turbineModelInfo # instance.dataTransfer=dataTransfer instance.weatherStationInfo=weatherStationInfo instance.dataFrameContractOfTurbine=dataFrameContractOfTurbine return instance def dynamic_instance_and_call(module_path, class_name, method_name, *args, **kwargs): # 动态导入模块 module = importlib.import_module(module_path) # 获取类 cls = getattr(module, class_name) # 创建类实例 instance = cls() # 获取方法 method = getattr(instance, method_name) # 调用方法 method(*args, **kwargs) class ServiceOfDataAnalysis: def __init__(self, logger:Logger,dbUtil:dict,minioUtil:ThreadSafeMinioClient,conf:Contract): self.logger = logger self.dbUtil=dbUtil self.minioUtil=minioUtil self.conf=conf self.common = CommonBusiness() dal = DALAnalyst(logger, dbUtil) # 加载所有新能源场站信息 self.powerFarmInfo:pd.DataFrame = dal.loadPowerFarmInfos( conf.dataContract.dataFilter.powerFarmID) if self.powerFarmInfo.empty: raise CustomError(104) # 加载所有风电机组信息 self.turbineInfo:pd.DataFrame= dal.loadTurbineInfos( conf.dataContract.dataFilter.powerFarmID) if self.turbineInfo.empty: raise CustomError(105) # 加载数据转换信息 # self.dataTransfer:pd.DataFrame=dal.loadDataTransfer(conf.dataContract.dataFilter.powerFarmID,conf.dataContract.dataFilter.dataBatchNum) # if self.dataTransfer.empty: # raise CustomError(106) self.dataTransfer:pd.DataFrame=None # 加载机型信息 self.turbineModelInfo:pd.DataFrame= dal.loadTurbineModelInfos( self.turbineInfo[Field_MillTypeCode].unique().tolist()) if self.turbineModelInfo.empty: raise CustomError(107) # 加载所有新能源场站,及所属风电机组机型的合同功率曲线 self.dataFrameContractOfTurbine:pd.DataFrame = dal.processContractData( self.common, conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo[Field_AirDensity].iloc[0], self.turbineModelInfo) if self.dataFrameContractOfTurbine.empty: raise CustomError(108) # 加载所有测风塔信息 self.weatherStationInfo:pd.DataFrame = dal.loadWeatherStationInfos( conf.dataContract.dataFilter.powerFarmID) def loadJson(self, filePath): # 打开并读取JSON文件 with open(filePath, 'r') as f: data = JsonUtil.read_json(filePath) return data def executeAnalysis(self): analysts = [] for dynamicAnalyst in self.conf.dataContract.configAnalysis: package = dynamicAnalyst.package className = dynamicAnalyst.className methodName = dynamicAnalyst.methodName analyst = buildDynamicInstance(package, className, self.conf,self.logger,self.dbUtil,self.minioUtil, self.powerFarmInfo,self.turbineInfo, self.turbineModelInfo,self.dataTransfer, self.weatherStationInfo,self.dataFrameContractOfTurbine) analysts.append(analyst) process = DataProcessor(logger=self.logger,dbUtil=self.dbUtil,minioUtil=self.minioUtil,conf=self.conf) # 加载所有新能源场站信息 process.powerFarmInfo = self.powerFarmInfo # 加载所有风电机组信息 process.turbineInfo= self.turbineInfo # # 加载机型信息 process.turbineModelInfo= self.turbineModelInfo # 加载所有测风塔信息 process.weatherStationInfo = self.weatherStationInfo # 加载所有新能源场站,及所属风电机组机型的合同功率曲线 process.dataFrameContractOfTurbine = self.dataFrameContractOfTurbine # process.dataTransfer=self.dataTransfer for analyst in analysts: process.attach(analyst) process.executeAnalysis(self.conf) for analyst in analysts: process.detach(analyst) def analysis(self): self.logger.info("analysis executing....") self.executeAnalysis() # 使用多线程时,matplotlib绘图报错 # 在使用Python语言的matplotlib包时,如果尝试在多线程环境中创建图形界面,你可能会遇到“QWidget-: Must construct a QApplication before a QWidget”的错误。 # 这个错误通常是因为matplotlib的图形后端(backend)依赖于Qt框架,而Qt框架要求在一个QApplication实例被创建之后再创建任何QWidget对象。 # 在多线程环境中,每个线程都应该有一个它自己的事件循环。然而,QApplication实例通常是全局的,并且应该只在主线程中创建一次。 # 如果你尝试在一个非主线程中创建QApplication或者QWidget,就会遇到这个问题。 # with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # futures=[executor.submit(executeAnalysis, config) for config in configs] # for future in concurrent.futures.as_completed(futures): # try: # result = future.result() # except Exception as exc: # print(f'生成异常: {exc}') # else: # print(f'任务返回结果: {result}')