import concurrent.futures import logging from abc import ABC, abstractmethod from logging import Logger from typing import Any, Tuple from urllib.parse import quote import numpy as np import pandas as pd import traceback from algorithmContract.confBusiness import * from algorithmContract.configAnalysis import ConfigAnalysis from algorithmContract.contract import Contract from behavior.dataMarker import DataMarker from common.appConfig import GetBusinessDbUtil from common.commonBusiness import CommonBusiness from numpy.typing import NDArray from sqlalchemy.sql import text from utils.directoryUtil import DirectoryUtil as dir from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient from utils.rdbmsUtil.databaseUtil import DatabaseUtil class BaseAnalyst(ABC): def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract, powerFarmInfo: pd.DataFrame, turbineInfo: pd.DataFrame, turbineModelInfo: pd.DataFrame, dataTransfer: pd.DataFrame, weatherStationInfo: pd.DataFrame, dataFrameContractOfTurbine: pd.DataFrame): self.logger = logger self.dbUtil = dbUtil self.minioUtil = minioUtil self.conf = conf # 定义风速区间 self.binsWindSpeed: NDArray[np.floating[Any]] = np.arange(0, 26, 0.5) self.dataMarker = DataMarker() self.common = CommonBusiness() self.customFilterStatusOfTurbine = "valueTurbineStatus" self.customFilterPitchAngle = "valuePitchAngle" self.customFilterWindSpeed = "valueWindSpeed" self.customFilterActivePower = "valueActivePower" self.customFilterGeneratorSpeed = "valueGeneratorSpeed" # 加载所有新能源场站信息 self.powerFarmInfo = powerFarmInfo self.currPowerFarmInfo = self.common.getPowerFarm( self.conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo) # 加载所有风电机组信息 self.turbineInfo = turbineInfo # 加载数据转换信息 # self.dataTransfer = dataTransfer # 加载机型信息 self.turbineModelInfo = turbineModelInfo # 加载所有测风塔信息 self.weatherStationInfo = weatherStationInfo # 加载所有新能源场站,及所属风电机组机型的合同功率曲线 self.dataFrameContractOfTurbine = dataFrameContractOfTurbine # 图表 轴系设置 # 常值 self.RatedPowerOfTurbine = self.turbineInfo[Field_RatedPower].iloc[0] # 图表 轴系设置 # 轴系 发电机转速 iGeneratorSpeed = "IgeneratorSpeed" dGeneratorSpeed = "DgeneratorSpeed" if self.turbineModelInfo[Field_MotionType] .iloc[0] == 2: # 直驱 发电机转速 self.axisStepGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone( conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone( conf.dataContract.graphSets[dGeneratorSpeed].step) else 1 self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].min if not self.common.isNone( conf.dataContract.graphSets[dGeneratorSpeed].min) else 5 self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone( conf.dataContract.graphSets[dGeneratorSpeed].max) else 25 else: # 非直驱 发电机转速 self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone( conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone( conf.dataContract.graphSets[iGeneratorSpeed].step) else 200 self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].min if not self.common.isNone( conf.dataContract.graphSets[iGeneratorSpeed].min) else 1000 self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone( conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000 # --------以下为原写法,未区分驱动方式直接选择了非直驱-------- # # 直驱 发电机转速 # self.axisStepGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone( # conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone( # conf.dataContract.graphSets[dGeneratorSpeed].step) else 5 # self.axisLowerLimitGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].min if not self.common.isNone( # conf.dataContract.graphSets[dGeneratorSpeed].min) else 0 # self.axisUpperLimitGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone( # conf.dataContract.graphSets[dGeneratorSpeed].max) else 30 # # 非直驱 发电机转速 # self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone( # conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone( # conf.dataContract.graphSets[iGeneratorSpeed].step) else 200 # self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].min if not self.common.isNone( # conf.dataContract.graphSets[iGeneratorSpeed].min) else 1000 # self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone( # conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000 # 轴系 发电机转矩 iGgeneratorTorque = "IgeneratorTorque" dGgeneratorTorque = "DgeneratorTorque" # if turbineModelInfo[Field_MotionType].iloc[0]==2 : # # 直驱 发电机转矩 # self.axisStepGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].step if not self.common.isNone( # conf.dataContract.graphSets[dGgeneratorTorque]) and not self.common.isNone( # conf.dataContract.graphSets[dGgeneratorTorque].step) else 10000 # self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].min if not self.common.isNone( # conf.dataContract.graphSets[dGgeneratorTorque].min) else 0 # self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone( # conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000 # else: # # 非直驱 发电机转矩 # self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone( # conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone( # conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000 # self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].min if not self.common.isNone( # conf.dataContract.graphSets[iGgeneratorTorque].min) else 0 # self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].max if not self.common.isNone( # conf.dataContract.graphSets[iGgeneratorTorque].max) else 12000 # --------以下为原写法,未区分驱动方式直接选择了非直驱-------- # 直驱 发电机转矩 self.axisStepGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].step if not self.common.isNone( conf.dataContract.graphSets[dGgeneratorTorque]) and not self.common.isNone( conf.dataContract.graphSets[dGgeneratorTorque].step) else 10000 self.axisLowerLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].min if not self.common.isNone( conf.dataContract.graphSets[dGgeneratorTorque].min) else 0 self.axisUpperLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone( conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000 # 非直驱 发电机转矩 self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone( conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone( conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000 self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].min if not self.common.isNone( conf.dataContract.graphSets[iGgeneratorTorque].min) else 0 self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].max if not self.common.isNone( conf.dataContract.graphSets[iGgeneratorTorque].max) else 12000 # 轴系 有功功率 activePower = "activePower" self.axisStepActivePower = conf.dataContract.graphSets[activePower].step if not self.common.isNone( conf.dataContract.graphSets[activePower]) and not self.common.isNone( conf.dataContract.graphSets[activePower].step) else 250 self.axisLowerLimitActivePower = conf.dataContract.graphSets[activePower].min if not self.common.isNone( conf.dataContract.graphSets[activePower].min) else 0 self.axisUpperLimitActivePower = conf.dataContract.graphSets[activePower].max if not self.common.isNone( conf.dataContract.graphSets[activePower].max) else self.RatedPowerOfTurbine*1.2 # 轴系 桨距角 pitchAngle = "pitchAngle" self.axisStepPitchAngle = conf.dataContract.graphSets[pitchAngle].step if not self.common.isNone( conf.dataContract.graphSets[pitchAngle]) and not self.common.isNone( conf.dataContract.graphSets[pitchAngle].step) else 2 self.axisLowerLimitPitchAngle = conf.dataContract.graphSets[pitchAngle].min if not self.common.isNone( conf.dataContract.graphSets[pitchAngle].min) else -2 self.axisUpperLimitPitchAngle = conf.dataContract.graphSets[pitchAngle].max if not self.common.isNone( conf.dataContract.graphSets[pitchAngle].max) else 28 # 轴系 风能利用系数 cp = "cp" self.axisStepCp = conf.dataContract.graphSets[cp].step if not self.common.isNone( conf.dataContract.graphSets[cp]) and not self.common.isNone( conf.dataContract.graphSets[cp].step) else 0.1 self.axisLowerLimitCp = conf.dataContract.graphSets[cp].min if not self.common.isNone( conf.dataContract.graphSets[cp].min) else 0 self.axisUpperLimitCp = conf.dataContract.graphSets[cp].max if not self.common.isNone( conf.dataContract.graphSets[cp].max) else 1 # 轴系 叶尖速比 tsr = "tsr" self.axisStepTSR = conf.dataContract.graphSets[tsr].step if not self.common.isNone( conf.dataContract.graphSets[tsr]) and not self.common.isNone( conf.dataContract.graphSets[tsr].step) else 5 self.axisLowerLimitTSR = conf.dataContract.graphSets[tsr].min if not self.common.isNone( conf.dataContract.graphSets[tsr].min) else 0 self.axisUpperLimitTSR = conf.dataContract.graphSets[tsr].max if not self.common.isNone( conf.dataContract.graphSets[tsr].max) else 20 @abstractmethod def typeAnalyst(self): pass def getOutputAnalysisDir(self): """ 获取当前分析的输出目录 """ outputAnalysisDir = f"output/{self.conf.dataContract.dataFilter.powerFarmID}/{self.conf.dataContract.dataFilter.dataBatchNum}/{self.typeAnalyst()}/{self.conf.dataContract.autoOrManual}" if not dir.check_directory_exists(outputAnalysisDir): dir.create_directory(outputAnalysisDir) return outputAnalysisDir def filterCommon(self, dataFrame: pd.DataFrame, conf: Contract): # Filter rows where turbine state if Field_StatusOfTurbine in dataFrame.columns and self.customFilterStatusOfTurbine in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterStatusOfTurbine]): stateTurbine = conf.dataContract.dataFilter.customFilter[self.customFilterStatusOfTurbine] dataFrame = dataFrame[dataFrame[Field_StatusOfTurbine].isin( stateTurbine)] # Filter rows where wind speed if Field_WindSpeed in dataFrame.columns and self.customFilterWindSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].min): windSpeedMin = conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].min dataFrame = dataFrame[( dataFrame[Field_WindSpeed] >= windSpeedMin)] if Field_WindSpeed in dataFrame.columns and self.customFilterWindSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].max): windSpeedMax = conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].max dataFrame = dataFrame[( dataFrame[Field_WindSpeed] < windSpeedMax)] # Filter rows where pitch if Field_PitchAngel1 in dataFrame.columns and self.customFilterPitchAngle in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle].min): anglePitchMin = conf.dataContract.dataFilter.customFilter[ self.customFilterPitchAngle].min dataFrame = dataFrame[( dataFrame[Field_PitchAngel1] >= anglePitchMin)] if Field_PitchAngel1 in dataFrame.columns and self.customFilterPitchAngle in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle].max): anglePitchMax = conf.dataContract.dataFilter.customFilter[ self.customFilterPitchAngle].max dataFrame = dataFrame[( dataFrame[Field_PitchAngel1] < anglePitchMax)] # Filter rows where power if Field_ActiverPower in dataFrame.columns and self.customFilterActivePower in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower].min): activePowerMin = conf.dataContract.dataFilter.customFilter[ self.customFilterActivePower].min dataFrame = dataFrame[( dataFrame[Field_ActiverPower] >= activePowerMin)] if Field_ActiverPower in dataFrame.columns and self.customFilterActivePower in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower].max): activePowerMax = conf.dataContract.dataFilter.customFilter[ self.customFilterActivePower].max dataFrame = dataFrame[( dataFrame[Field_ActiverPower] < activePowerMax)] # Filter rows where generator speed if Field_GeneratorSpeed in dataFrame.columns and self.customFilterGeneratorSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed].min): speedGeneratorMin = conf.dataContract.dataFilter.customFilter[ self.customFilterGeneratorSpeed].min dataFrame = dataFrame[( dataFrame[Field_GeneratorSpeed] >= speedGeneratorMin)] if Field_GeneratorSpeed in dataFrame.columns and self.customFilterGeneratorSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed].max): speedGeneratorMax = conf.dataContract.dataFilter.customFilter[ self.customFilterGeneratorSpeed].max dataFrame = dataFrame[( dataFrame[Field_GeneratorSpeed] < speedGeneratorMax)] # if not self.common.isNone(confData.field_activePowerSet) and confData.field_activePowerSet in dataFrame.columns: # dataFrame = dataFrame[dataFrame[confData.field_activePowerSet] # == confData.rated_power] # if not self.common.isNone(confData.field_activePowerAvailable) and confData.field_activePowerAvailable in dataFrame.columns \ # and self.node_activePowerAvailable in confData.filter and not self.common.isNone(confData.filter[self.node_activePowerAvailable]): # state = confData.filter[self.node_activePowerAvailable] # dataFrame = dataFrame[dataFrame[confData.field_activePowerAvailable].isin( # state)] return dataFrame def filterCustomForTurbine(self, dataFrame: pd.DataFrame, conf: Contract): return self.filterCommon(dataFrame, conf) def analysisOfTurbine(self, outputAnalysisDir, outputFilePath, conf: Contract, turbineCode): return self.turbineAnalysis(outputAnalysisDir, outputFilePath, conf, turbineCode) def turbineAnalysis(self, outputAnalysisDir, outputFilePath, conf: Contract, turbineCode): return pd.DataFrame() def filterCustomForTurbines(self, dataFrame: pd.DataFrame, conf: Contract): return self.filterCommon(dataFrame, conf) def analysisOfTurbines(self, outputAnalysisDir, conf: Contract, turbineCodes): # self.logger.info( # f"typeAnalyst: {self.typeAnalyst()} method: analysisOfTurbines , turbineCodes : {turbineCodes}") return self.turbinesAnalysis(outputAnalysisDir, conf, turbineCodes) def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): return pd.DataFrame() def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst) -> pd.DataFrame: timeGranularity = next( (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None) return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity] def calculateAngleIncluded(self, array1, array2): """ 计算两个相同长度角度数组中两两对应角度值的偏差。 结果限制在-90°到+90°之间,并保留两位小数。 参数: array1 (list): 第一个角度数组 array2 (list): 第二个角度数组 返回: list: 两两对应角度的偏差列表 """ deviations = [] for angle1, angle2 in zip(array1, array2): # 计算原始偏差 deviation = angle1-angle2 # 调整偏差,使其位于-180°到+180°范围内 if deviation == 0.0: deviation = 0.0 else: deviation = (deviation + 180) % 360 - 180 # 将偏差限制在-90°到+90°范围内 if deviation > 90: deviation -= 180 elif deviation < -90: deviation += 180 # 保留两位小数 deviations.append(round(deviation, 2)) return deviations def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos): """ 依据机舱位置(角度)、风向计算两者夹角 """ if fieldNacellePos in dataFrame.columns and fieldWindDirect in dataFrame.columns and not dataFrame[fieldNacellePos].isna().all() and not dataFrame[fieldWindDirect].isna().all(): # 计算 angle_included_list angle_included_list = self.calculateAngleIncluded( dataFrame[fieldWindDirect], dataFrame[fieldNacellePos]) # 检查 angle_included_list 的长度是否与 dataFrame 一致 if len(angle_included_list) != len(dataFrame): raise ValueError( "The length of the calculated angle_included_list does not match the length of the DataFrame") # 创建一个新的列来保存计算结果 dataFrame['Calculated_AngleIncluded'] = angle_included_list # 使用新列填充原列的缺失值 dataFrame[Field_AngleIncluded] = dataFrame[Field_AngleIncluded].fillna( dataFrame['Calculated_AngleIncluded']) # 删除临时列 dataFrame.drop(columns=['Calculated_AngleIncluded'], inplace=True) def recalculationOfFieldPowerFloor(self, dataFrame: pd.DataFrame, fieldActivePower): """ 功率计算 """ if fieldActivePower in dataFrame.columns: dataFrame[Field_PowerFloor] = dataFrame[fieldActivePower].apply( lambda x: int( x / 10) * 10 if pd.notnull(x) or pd.notna(x) else np.nan # 保留NaN值 ) def recalculationOfFieldWindSpeedFloor(self, dataFrame: pd.DataFrame, fieldWindSpeed): """ 风速计算 """ if fieldWindSpeed in dataFrame.columns: dataFrame[Field_WindSpeedFloor] = dataFrame[fieldWindSpeed].apply( lambda x: int(x/1) + 0.5 if pd.notnull(x) or pd.notna(x) else np.nan ) def recalculationOfGeneratorSpeed(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float): """ 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速 """ if fieldGeneratorSpeed in dataFrame.columns and fieldRotorSpeed in dataFrame.columns: if turbineModelInfo[Field_MotionType] == 2: dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna( dataFrame[fieldRotorSpeed]) # dataFrame[fieldGeneratorSpeed] =dataFrame[fieldRotorSpeed] else: dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna( dataFrame[fieldRotorSpeed]*rotationalSpeedRatio) # dataFrame[fieldGeneratorSpeed] = dataFrame[fieldRotorSpeed]*rotationalSpeedRatio def recalculationOfRotorSpeed(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float): """ 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速 """ if fieldRotorSpeed in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns: if turbineModelInfo[Field_MotionType] == 2: dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna( dataFrame[fieldGeneratorSpeed]) else: dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna( dataFrame[fieldGeneratorSpeed] / rotationalSpeedRatio) def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed): """ 风电机组发电机转矩计算,P的单位换成KW转矩计算公式: P*1000= pi/30*T*n 30000/pi*P=T*n 30000/3.1415926*P=T*n 9549.297*p=T*n 其中:n为发电机转速,p为有功功率,T为转矩 """ if fieldGeneratorTorque in dataFrame.columns and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns: dataFrame[Field_GeneratorTorque] = dataFrame[Field_GeneratorTorque].fillna( 9549.297 * dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed]) def recalculation(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame): """ 再计算数据测点 参数: dataFrame 原始数据 conf 配置数据 """ self.recalculationOfFieldPowerFloor(dataFrame, Field_ActiverPower) self.recalculationOfFieldWindSpeedFloor(dataFrame, Field_WindSpeed) self.recalculationOfGeneratorSpeed( turbineModelInfo, dataFrame, Field_RotorSpeed, Field_GeneratorSpeed, self.turbineModelInfo[Field_RSR].iloc[0]) self.recalculationOfRotorSpeed(turbineModelInfo, dataFrame, Field_RotorSpeed, Field_GeneratorSpeed, self.turbineModelInfo[Field_RSR].iloc[0]) self.recalculationOfRotorTorque( dataFrame, Field_GeneratorTorque, Field_ActiverPower, Field_GeneratorSpeed) self.recalculationOfIncludedAngle( dataFrame, Field_AngleIncluded, Field_WindDirection, Field_NacPos) self.common.calculateTSR( dataFrame, self.turbineModelInfo[Field_RotorDiameter].iloc[0]) self.common.calculateCpOfSingleTurbine( dataFrame, self.currPowerFarmInfo[Field_AirDensity], turbineModelInfo[Field_RotorDiameter]) def processDateTimeForAll(self, dataFrame: pd.DataFrame): dataFrame[Field_UnixYearMonth] = pd.to_datetime( dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp()) dataFrame[Field_YearMonth] = dataFrame[Field_Time].dt.strftime('%Y-%m') dataFrame[Field_YearMonthDay] = dataFrame[Field_Time].dt.strftime( '%Y-%m-%d') return dataFrame def processDateTime(self, dataFrame: pd.DataFrame, fieldTime: str = None): if Field_Time not in dataFrame.columns: self.logger.info( "Field_Time is not in dataFrame columns, Do not handle time processing") return dataFrame if fieldTime is None: return self.processDateTimeForAll(dataFrame) timeSeries = pd.Series if fieldTime == Field_UnixYearMonth: timeSeries = pd.to_datetime( dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp()) if fieldTime == Field_YearMonth: timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m') if fieldTime == Field_YearMonthDay: timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m-%d') dataFrame[fieldTime] = timeSeries return dataFrame def loadData(self, powerFarmID: str, timeGranularity: str, turbineCode: str, select: str, condition: str) -> pd.DataFrame: selectStr = ", ".join(f"{field}" for field in select) businessDB: DatabaseUtil = GetBusinessDbUtil() with businessDB.session_scope() as session: # query_text = f"""SELECT wind_turbine_number,time_stamp, active_power,wind_velocity,cabin_position,true_wind_direction, rotor_speed, generator_speed,actual_torque,yaw_error1,outside_cabin_temperature,cabin_temperature,main_bearing_temperature,gearboxmedium_speed_shaftbearing_temperature,gearbox_low_speed_shaft_bearing_temperature,gearbox_high_speed_shaft_bearing_temperature,generatordrive_end_bearing_temperature,generatornon_drive_end_bearing_temperature,generator_winding1_temperature,pitch_angle_blade_1,pitch_angle_blade_2,pitch_angle_blade_3,front_back_vibration_of_the_cabin,side_to_side_vibration_of_the_cabin # FROM `{dataBatchNum}_{timeGranularity}` # WHERE wind_turbine_number IN ({turbineCode}) AND {condition}""" query_text = f"""SELECT {selectStr} FROM `{powerFarmID}_{timeGranularity}` WHERE wind_turbine_number IN ({turbineCode}) AND {condition}""" query_result = session.execute(text(query_text)).fetchall() # select = [Field_DeviceCode, Field_Time, Field_ActiverPower, Field_WindSpeed, Field_NacPos, Field_WindDirection, Field_RotorSpeed, Field_GeneratorSpeed, Field_GeneratorTorque, Field_AngleIncluded, Field_EnvTemp, Field_NacTemp, Field_MainBearTemp, Field_GbMsBearTemp, Field_GbLsBearTemp, # Field_GbHsBearTemp, Field_GeneratorDE, Field_GeneratorNDE, Field_GenWiTemp1, Field_PitchAngel1, Field_PitchAngel2, Field_PitchAngel3, Field_NacFbVib, Field_NacLrVib] dataFrame = pd.DataFrame(query_result, columns=select) return dataFrame def dataProcess(self, powerFarmID: str, dataBatchNum: str, timeGranularity: str, turbineCode: str, select: str, customCondition) -> Tuple[pd.DataFrame, str, str]: self.logger.info( f"load data -> Time Granulary: {timeGranularity} Power Farm: {powerFarmID} Batch: {dataBatchNum} Turbine: {turbineCode} .") # Get current turbine info currTurbineInfo = self.common.getTurbineInfo( self.conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo) # Get turbine model info currTurbineModelInfo = self.common.getTurbineModelByTurbine( currTurbineInfo, self.turbineModelInfo) # select DB data dataFrameOfTurbine = self.loadData( powerFarmID, timeGranularity, f"'{turbineCode}'", select, customCondition) if dataFrameOfTurbine.empty: self.logger.warning( f"{turbineCode} Time Granulary: {timeGranularity} scada data is empty.") return pd.DataFrame(), dataBatchNum, turbineCode else: if timeGranularity in ['fault', 'warn']: return dataFrameOfTurbine, dataBatchNum, turbineCode else: # Add property to dataFrame self.addPropertyToDataFrame( dataFrameOfTurbine, currTurbineInfo, currTurbineModelInfo) # Add engine_name to dataFrame dataFrameOfTurbine[Field_NameOfTurbine] = currTurbineInfo.loc[Field_NameOfTurbine] # Additional data processing steps self.processDateTime(dataFrameOfTurbine) # Recalculation self.recalculation(currTurbineModelInfo, dataFrameOfTurbine) # Filter data dataFrameOfTurbine = self.filterCommon( dataFrameOfTurbine, self.conf) return dataFrameOfTurbine, dataBatchNum, turbineCode def selectTimeCondition(self, conf: Contract, conditions: list[str]): """ 时间过滤条件组装 从json配置中获取时间过滤条件进行组装 """ # 时间过滤条件 if conf.dataContract.dataFilter.beginTime: conditions.append( f"time_stamp >= '{conf.dataContract.dataFilter.beginTime}'") if conf.dataContract.dataFilter.endTime: conditions.append( f"time_stamp <= '{conf.dataContract.dataFilter.endTime}'") # 排除月份 if conf.dataContract.dataFilter.excludingMonths: excluding_months = ", ".join( f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths) excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})" conditions.append(excluding_condition) def selectLabCondition(self, conditions: list[str]): """ lab 过滤条件组装 根据lab获取相应的数据 逻辑实现在相应的Behavior子类中,子类没有实现则无需过滤 """ return def selectAllCondition(self, conf: Contract): conditions = [] # 时间过滤条件 self.selectTimeCondition(conf, conditions) # lab过滤 self.selectLabCondition(conditions) return " AND ".join(conditions) if conditions else "1=1" def selectFaultTimeCondition(self, conf: Contract, conditions: list[str]): """ 时间过滤条件组装 从json配置中获取时间过滤条件进行组装 """ # 时间过滤条件 if conf.dataContract.dataFilter.beginTime: conditions.append( f"begin_time >= '{conf.dataContract.dataFilter.beginTime}'") if conf.dataContract.dataFilter.endTime: conditions.append( f"end_time <= '{conf.dataContract.dataFilter.endTime}'") # 排除月份 if conf.dataContract.dataFilter.excludingMonths: excluding_months = ", ".join( f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths) excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})" conditions.append(excluding_condition) # 故障数据过滤条件 def selectAllFaultCondition(self, conf: Contract): conditions = [] # 时间过滤条件 self.selectFaultTimeCondition(conf, conditions) # lab过滤 self.selectLabCondition(conditions) return " AND ".join(conditions) if conditions else "1=1" def processTurbineData(self, turbines, conf: Contract, select: str): try: # add "where" condition select_conditions = self.selectAllCondition(conf) configAnalysisDF = pd.DataFrame( [config.to_dict() for config in conf.dataContract.configAnalysis]) configAnalysisDF = configAnalysisDF[(configAnalysisDF["className"] == self.__class__.__name__)] scadaTimeGranularities = configAnalysisDF["scada"].unique() self.logger.info( f"typeAnalyst: {self.typeAnalyst()} method: processTurbineData , scadaTimeGranularities : {scadaTimeGranularities} current class : {self.__class__.__name__}" ) dictionary = dict() for timeGranularity in scadaTimeGranularities: dataFrames = [] dataFrameOfTurbines = pd.DataFrame() if timeGranularity in ['fault', 'warn']: select_conditions = self.selectAllFaultCondition(conf) maxWorkers = 5 with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor: futures = [ executor.submit(self.dataProcess, conf.dataContract.dataFilter.powerFarmID, conf.dataContract.dataFilter.dataBatchNum, timeGranularity, turbine, select, select_conditions) for turbine in turbines ] for future in concurrent.futures.as_completed(futures): try: dataFrameOfTurbine, dataBatchNum, turbineCode = future.result() dataFrames.append(dataFrameOfTurbine) self.logger.info( f"typeAnalyst: {self.typeAnalyst()} data frame append,dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} turbineCode: {turbineCode}") except Exception as exc: raise exc dataFrameOfTurbines = pd.concat(dataFrames, ignore_index=True) self.logger.info( f"typeAnalyst: {self.typeAnalyst()} data frame concat dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} finish") if dataFrameOfTurbines.empty: excption = CustomError(102) self.logger.warning( f"{excption.message} typeAnalyst: {self.typeAnalyst()} Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch : {conf.dataContract.dataFilter.dataBatchNum} Time Granularity : {timeGranularity}") raise excption else: self.logger.info( f"typeAnalyst: {self.typeAnalyst()} data frame concat dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} dataFrameOfTurbines : {dataFrameOfTurbines}") if Field_DeviceCode in dataFrameOfTurbines.columns and Field_CodeOfTurbine not in dataFrameOfTurbines.columns: dataFrameOfTurbines = dataFrameOfTurbines.rename( columns={Field_DeviceCode: Field_CodeOfTurbine}) dictionary[timeGranularity] = dataFrameOfTurbines return dictionary except Exception as e: self.logger.error( f"Error processing turbine data:{traceback.format_exc()}") raise def addPropertyToDataFrame(self, dataFrameOfTurbine: pd.DataFrame, currTurbineInfo: pd.Series, currTurbineModelInfo: pd.Series): """ 用来添加额外当前风机属性 在business中相应的分析员中有实现,没有实现就无额外参数添加 """ return def escape_special_characters(self, original_string: str): """ ---废弃--- 特殊字符url编码处理 "/" 符号单独处理 """ retrun_string = quote(original_string) if "/" in retrun_string: retrun_string = retrun_string.replace("/", "%2F") return retrun_string