123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- import os
- import traceback
- from datetime import datetime, timezone, timedelta
- from logging import Logger
- import concurrent.futures
- import numpy as np
- import pandas as pd
- from sqlalchemy.orm import Session
- from sqlalchemy import create_engine
- from sqlalchemy.sql import text
- from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
- from utils.directoryUtil import DirectoryUtil as dir
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract
- from algorithmContract.configAnalysis import ConfigAnalysis
- from algorithmContract.const import *
- from behavior.analyst import Analyst
- from common.commonBusiness import CommonBusiness
- from common.appConfig import GetDbUtil, GetBusinessDbUtil, GetBusinessFoundationDbUtil
- from utils.rdbmsUtil.databaseUtil import DatabaseUtil
- from behavior.dalAnalyst import DALAnalyst
- from behavior.outputProcessor import OutputProcessor
- class DataProcessor:
- def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract):
- self.logger = logger
- self.dbUtil = dbUtil
- self.minioUtil = minioUtil
- self.conf = conf
- self.common = CommonBusiness()
- self._baseAnalysts = []
- self._analysts = []
- dal = DALAnalyst(logger, dbUtil)
- # 加载所有新能源场站信息
- self.powerFarmInfo: pd.DataFrame
- # 加载所有风电机组信息
- self.turbineInfo: pd.DataFrame
- # 加载数据转换信息
- self.dataTransfer: pd.DataFrame
- # 加载机型信息
- self.turbineModelInfo: pd.DataFrame
- # 加载所有测风塔信息
- self.weatherStationInfo: pd.DataFrame
- # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
- self.dataFrameContractOfTurbine: pd.DataFrame
- def attach(self, analyst: Analyst):
- if analyst not in self._analysts:
- self._analysts.append(analyst)
- def detach(self, analyst: Analyst):
- try:
- self._analysts.remove(analyst)
- except ValueError:
- pass
- def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst: Analyst) -> pd.DataFrame:
- timeGranularity = next(
- (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None)
- return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity]
- def singleTurbineDataNotify(self,analyst,conf:Contract,turbineCode:str):
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- outputFilePath = r"{}/{}{}".format(outputAnalysisDir,
- turbineCode, CSVSuffix)
- return analyst.analysisOfTurbine(
- outputAnalysisDir, outputFilePath, conf, turbineCode)
- def notifyOfTurbine(self, conf: Contract, turbineCode: str) -> pd.DataFrame:
- results =[]
- maxWorkers = 4
- with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
- futures = [
- executor.submit(self.singleTurbineDataNotify,analyst, conf,turbineCode)
- for analyst in self._analysts
- ]
- for future in concurrent.futures.as_completed(futures):
- try:
- result = future.result()
- results.append(result)
- except Exception as exc:
- raise exc
- returnResult = pd.concat(results, ignore_index=True)
- return returnResult
- def multiTubineDataNotify(self,analyst,conf:Contract,turbineCodes):
- try:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- dataFram = analyst.analysisOfTurbines(outputAnalysisDir, conf,turbineCodes)
- self.logger.info(f"analysis type execute result : dataFrame = {dataFram}")
- return dataFram
- except Exception as exception:
- self.logger.info(f"analysis type execute error : {exception.message}")
- traceback.format_exc()
- raise exception
- def notifyOfTurbines(self, conf: Contract,turbineCodes) -> pd.DataFrame:
- results =[]
- maxWorkers = 4
- with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
- futures = [
- executor.submit(self.multiTubineDataNotify,analyst, conf,turbineCodes)
- for analyst in self._analysts
- ]
- self.logger.info("Waiting for all futures to complete...")
- for future in concurrent.futures.as_completed(futures):
- self.logger.info(f"future is : {future}")
- try:
- result = future.result()
- results.append(result)
- except Exception as exc:
- self.logger.info(f"future deal error. future : {future} , error is {exc.message}")
- traceback.format_exc()
- raise exc
- returnResult = pd.concat(results, ignore_index=True)
- return returnResult
- def loadDataCSV(self, dataBatchNum: str, timeGranularity: str, turbineCode: str, condition: str, csvFileDir: str = f"data/mima_dt/second") -> pd.DataFrame:
- csvFilePath = os.path.join(csvFileDir, f"{turbineCode}.csv")
- print("current csv file path:", csvFilePath)
- dataFrame = pd.read_csv(csvFilePath, header=0)
- dataFrame = dataFrame.astype({
- 'wind_turbine_number': 'string',
- 'wind_turbine_name': 'string',
- 'time_stamp': 'datetime64[ns]',
- 'active_power': 'float32',
- 'rotor_speed': 'float32',
- 'generator_speed': 'float32',
- 'wind_velocity': 'float32',
- 'pitch_angle_blade_1': 'float32',
- 'pitch_angle_blade_2': 'float32',
- 'pitch_angle_blade_3': 'float32',
- 'cabin_position': 'float32',
- 'true_wind_direction': 'float32',
- 'yaw_error1': 'float32',
- 'set_value_of_active_power': 'float32',
- 'gearbox_oil_temperature': 'float32',
- 'generatordrive_end_bearing_temperature': 'float32',
- 'generatornon_drive_end_bearing_temperature': 'float32',
- 'cabin_temperature': 'float32',
- 'twisted_cable_angle': 'float32',
- 'front_back_vibration_of_the_cabin': 'float32',
- 'side_to_side_vibration_of_the_cabin': 'float32',
- 'actual_torque': 'float32',
- 'given_torque': 'float32',
- 'clockwise_yaw_count': 'float32',
- 'counterclockwise_yaw_count': 'float32',
- 'unusable': 'float32',
- 'power_curve_available': 'float32',
- 'required_gearbox_speed': 'float32',
- 'inverter_speed_master_control': 'float32',
- 'outside_cabin_temperature': 'float32',
- 'main_bearing_temperature': 'float32',
- 'gearbox_high_speed_shaft_bearing_temperature': 'float32',
- 'gearboxmedium_speed_shaftbearing_temperature': 'float32',
- 'gearbox_low_speed_shaft_bearing_temperature': 'float32',
- 'generator_winding1_temperature': 'float32',
- 'generator_winding2_temperature': 'float32',
- 'generator_winding3_temperature': 'float32',
- 'wind_turbine_status': 'float32',
- 'wind_turbine_status2': 'float32',
- 'turbulence_intensity': 'float32',
- 'year': 'int32',
- 'month': 'int32',
- 'day': 'int32'
- })
- return dataFrame
- def getTurbines(self,conf:Contract,turbineInfo:pd.DataFrame):
- return conf.dataContract.dataFilter.turbines if not self.common.isNone(
- conf.dataContract.dataFilter.turbines) and len(conf.dataContract.dataFilter.turbines) > 0 else turbineInfo[Field_CodeOfTurbine].unique()
- def executeAnalysis(self, conf: Contract):
- # 根据输入参数 conf ,加载台账(电场、机型、机组、测风塔信息)、发电机组scada的分钟级与秒级、测风塔运行、事件等数据
- # 若同一电场,具有多种机型的机组,注意按机型分组分析
- outputProcessor = OutputProcessor(
- conf, self.logger, self.dbUtil, self.minioUtil)
- # Initialize an empty DataFrame to store merged results
- DataFrameOutput = pd.DataFrame()
- try:
- foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
- with foundationDB.session_scope() as foundationSession:
- outputProcessor.analysisState(
- foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=10)
- # 机组筛选
- turbines = self.getTurbines(conf,self.turbineInfo)
- # turbineSqlInStr = ", ".join(f"'{turbine}'" for turbine in turbines) # 使用%s作为占位符,稍后可以替换为实际值
- # dictionary= self.processTurbineData(turbines,conf)
- foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
- with foundationDB.session_scope() as foundationSession:
- outputProcessor.analysisState(
- foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=25)
- self.logger.info(
- f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成机组运行数据加载及再处理,执行请求发电性能分析...")
- for turbine in turbines:
- self.logger.info(
- f"Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch: {conf.dataContract.dataFilter.dataBatchNum} Turbine: {turbine} .")
- dataFrameOfReturn = self.notifyOfTurbine(
- conf, turbine)
- DataFrameOutput = pd.concat(
- [DataFrameOutput, dataFrameOfReturn], ignore_index=True)
- # Notify processing for all turbines
- Outputs = self.notifyOfTurbines( conf,turbines)
- DataFrameOutput = pd.concat(
- [Outputs, DataFrameOutput], ignore_index=True)
- self.logger.info(
- f"批次:{conf.dataContract.dataFilter.dataBatchNum} 执行完成请求发电性能分析,准备输出及更新状态")
- if DataFrameOutput.empty:
- raise CustomError(103)
- else:
- outputProcessor.process(conf.dataContract.dataFilter.powerFarmID,
- conf.dataContract.dataFilter.dataBatchNum, DataFrameOutput)
- self.logger.info(
- f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成请求发电性能分析、输出及更新状态")
- except (CustomError, Exception) as e:
- self.logger.info(f" executeAnalysis error: {e.message}")
- traceback.format_exc()
- try:
- dbUtil = GetBusinessFoundationDbUtil()
- ex = e if isinstance(e, CustomError) else CustomError(-1)
- code = ex.code
- message = ex.message
- with dbUtil.session_scope() as session:
- outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
- AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
- except Exception as e1:
- # 使用 traceback 模块获取完整的异常信息
- error_message = ''.join(traceback.format_exception(e1))
- # 记录异常信息
- self.logger.error(f"捕获到异常: {error_message}")
- return DataFrameOutput
|