123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import sys
- import pandas as pd
- from logging import Logger
- from utils.jsonUtil import JsonUtil
- from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
- from utils.rdbmsUtil.databaseUtil import DatabaseUtil
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract,LoadAnalysisInput, Analysis
- from algorithmContract.const import *
- from behavior.baseAnalyst import BaseAnalyst
- from behavior.dalAnalyst import DALAnalyst
- from pathlib import Path
- # 获取项目根目录(WTOAAM文件夹)
- BASE_DIR = str(Path(__file__).resolve().parent.parent.parent.parent) # 向上四级到项目根目录
- if BASE_DIR not in sys.path:
- sys.path.append(BASE_DIR)
- from dataAnalysisBusiness.algorithm.dataProcessor import DataProcessor
- from common.commonBusiness import CommonBusiness
- from common.appConfig import AppConfig
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import text
- def buildDynamicInstance(modulePath, className, conf: Contract,logger:Logger,dbUtil:dict[str,DatabaseUtil],minioUtil:ThreadSafeMinioClient,
- powerFarmInfo:pd.DataFrame,turbineInfo:pd.DataFrame,
- turbineModelInfo:pd.DataFrame,dataTransfer:pd.DataFrame,
- weatherStationInfo:pd.DataFrame,dataFrameContractOfTurbine:pd.DataFrame,
- ):
- # 动态导入模块
- module = importlib.import_module(modulePath)
- # 获取类
- cls = getattr(module, className)
- # 创建类实例
- instance:BaseAnalyst = cls(logger,dbUtil,minioUtil,conf,powerFarmInfo,turbineInfo,turbineModelInfo,dataTransfer,weatherStationInfo,dataFrameContractOfTurbine)
- instance.powerFarmInfo=powerFarmInfo
- instance.turbineInfo=turbineInfo
- instance.turbineModelInfo=turbineModelInfo
- # instance.dataTransfer=dataTransfer
- instance.weatherStationInfo=weatherStationInfo
- instance.dataFrameContractOfTurbine=dataFrameContractOfTurbine
-
- return instance
- def dynamic_instance_and_call(module_path, class_name, method_name, *args, **kwargs):
- # 动态导入模块
- module = importlib.import_module(module_path)
- # 获取类
- cls = getattr(module, class_name)
- # 创建类实例
- instance = cls()
- # 获取方法
- method = getattr(instance, method_name)
- # 调用方法
- method(*args, **kwargs)
- class ServiceOfDataAnalysis:
- def __init__(self, logger:Logger,dbUtil:dict,minioUtil:ThreadSafeMinioClient,conf:Contract):
- self.logger = logger
- self.dbUtil=dbUtil
- self.minioUtil=minioUtil
- self.conf=conf
- self.common = CommonBusiness()
- dal = DALAnalyst(logger, dbUtil)
- # 加载所有新能源场站信息
- self.powerFarmInfo:pd.DataFrame = dal.loadPowerFarmInfos(
- conf.dataContract.dataFilter.powerFarmID)
-
- if self.powerFarmInfo.empty:
- raise CustomError(104)
- # 加载所有风电机组信息
- self.turbineInfo:pd.DataFrame= dal.loadTurbineInfos(
- conf.dataContract.dataFilter.powerFarmID)
-
- if self.turbineInfo.empty:
- raise CustomError(105)
- # 加载数据转换信息
- # self.dataTransfer:pd.DataFrame=dal.loadDataTransfer(conf.dataContract.dataFilter.powerFarmID,conf.dataContract.dataFilter.dataBatchNum)
- # if self.dataTransfer.empty:
- # raise CustomError(106)
- self.dataTransfer:pd.DataFrame=None
- # 加载机型信息
- self.turbineModelInfo:pd.DataFrame= dal.loadTurbineModelInfos(
- self.turbineInfo[Field_MillTypeCode].unique().tolist())
-
- if self.turbineModelInfo.empty:
- raise CustomError(107)
-
- # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
- self.dataFrameContractOfTurbine:pd.DataFrame = dal.processContractData(
- self.common, conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo[Field_AirDensity].iloc[0], self.turbineModelInfo)
-
- if self.dataFrameContractOfTurbine.empty:
- raise CustomError(108)
- # 加载所有测风塔信息
- self.weatherStationInfo:pd.DataFrame = dal.loadWeatherStationInfos(
- conf.dataContract.dataFilter.powerFarmID)
- def loadJson(self, filePath):
- # 打开并读取JSON文件
- with open(filePath, 'r') as f:
- data = JsonUtil.read_json(filePath)
- return data
- def executeAnalysis(self):
- analysts = []
- for dynamicAnalyst in self.conf.dataContract.configAnalysis:
- package = dynamicAnalyst.package
- className = dynamicAnalyst.className
- methodName = dynamicAnalyst.methodName
- analyst = buildDynamicInstance(package, className, self.conf,self.logger,self.dbUtil,self.minioUtil,
- self.powerFarmInfo,self.turbineInfo,
- self.turbineModelInfo,self.dataTransfer,
- self.weatherStationInfo,self.dataFrameContractOfTurbine)
- analysts.append(analyst)
- process = DataProcessor(logger=self.logger,dbUtil=self.dbUtil,minioUtil=self.minioUtil,conf=self.conf)
- # 加载所有新能源场站信息
- process.powerFarmInfo = self.powerFarmInfo
- # 加载所有风电机组信息
- process.turbineInfo= self.turbineInfo
- # # 加载机型信息
- process.turbineModelInfo= self.turbineModelInfo
- # 加载所有测风塔信息
- process.weatherStationInfo = self.weatherStationInfo
- # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
- process.dataFrameContractOfTurbine = self.dataFrameContractOfTurbine
- # process.dataTransfer=self.dataTransfer
- for analyst in analysts:
- process.attach(analyst)
- process.executeAnalysis(self.conf)
- for analyst in analysts:
- process.detach(analyst)
- def analysis(self):
- self.logger.info("analysis executing....")
- self.executeAnalysis()
- # 使用多线程时,matplotlib绘图报错
- # 在使用Python语言的matplotlib包时,如果尝试在多线程环境中创建图形界面,你可能会遇到“QWidget-: Must construct a QApplication before a QWidget”的错误。
- # 这个错误通常是因为matplotlib的图形后端(backend)依赖于Qt框架,而Qt框架要求在一个QApplication实例被创建之后再创建任何QWidget对象。
- # 在多线程环境中,每个线程都应该有一个它自己的事件循环。然而,QApplication实例通常是全局的,并且应该只在主线程中创建一次。
- # 如果你尝试在一个非主线程中创建QApplication或者QWidget,就会遇到这个问题。
- # with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
- # futures=[executor.submit(executeAnalysis, config) for config in configs]
- # for future in concurrent.futures.as_completed(futures):
- # try:
- # result = future.result()
- # except Exception as exc:
- # print(f'生成异常: {exc}')
- # else:
- # print(f'任务返回结果: {result}')
|