|
- import concurrent.futures
- import logging
- from abc import ABC, abstractmethod
- from logging import Logger
- from typing import Any, Tuple
- from urllib.parse import quote
- import numpy as np
- import pandas as pd
- import traceback
- from algorithmContract.confBusiness import *
- from algorithmContract.configAnalysis import ConfigAnalysis
- from algorithmContract.contract import Contract
- from behavior.dataMarker import DataMarker
- from common.appConfig import GetBusinessDbUtil
- from common.commonBusiness import CommonBusiness
- from numpy.typing import NDArray
- from sqlalchemy.sql import text
- from utils.directoryUtil import DirectoryUtil as dir
- from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
- from utils.rdbmsUtil.databaseUtil import DatabaseUtil
- class BaseAnalyst(ABC):
- def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract,
- powerFarmInfo: pd.DataFrame, turbineInfo: pd.DataFrame, turbineModelInfo: pd.DataFrame, dataTransfer: pd.DataFrame,
- weatherStationInfo: pd.DataFrame, dataFrameContractOfTurbine: pd.DataFrame):
- self.logger = logger
- self.dbUtil = dbUtil
- self.minioUtil = minioUtil
- self.conf = conf
- # 定义风速区间
- self.binsWindSpeed: NDArray[np.floating[Any]] = np.arange(0, 26, 0.5)
- self.dataMarker = DataMarker()
- self.common = CommonBusiness()
- self.customFilterStatusOfTurbine = "valueTurbineStatus"
- self.customFilterPitchAngle = "valuePitchAngle"
- self.customFilterWindSpeed = "valueWindSpeed"
- self.customFilterActivePower = "valueActivePower"
- self.customFilterGeneratorSpeed = "valueGeneratorSpeed"
- # 加载所有新能源场站信息
- self.powerFarmInfo = powerFarmInfo
- self.currPowerFarmInfo = self.common.getPowerFarm(
- self.conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo)
- # 加载所有风电机组信息
- self.turbineInfo = turbineInfo
- # 加载数据转换信息
- # self.dataTransfer = dataTransfer
- # 加载机型信息
- self.turbineModelInfo = turbineModelInfo
- # 加载所有测风塔信息
- self.weatherStationInfo = weatherStationInfo
- # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
- self.dataFrameContractOfTurbine = dataFrameContractOfTurbine
- # 图表 轴系设置
- # 常值
- self.RatedPowerOfTurbine = self.turbineInfo[Field_RatedPower].iloc[0]
- # 图表 轴系设置
- # 轴系 发电机转速
- iGeneratorSpeed = "IgeneratorSpeed"
- dGeneratorSpeed = "DgeneratorSpeed"
- if self.turbineModelInfo[Field_MotionType] .iloc[0] == 2:
- # 直驱 发电机转速
- self.axisStepGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone(
- conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone(
- conf.dataContract.graphSets[dGeneratorSpeed].step) else 1
- self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].min if not self.common.isNone(
- conf.dataContract.graphSets[dGeneratorSpeed].min) else 5
- self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone(
- conf.dataContract.graphSets[dGeneratorSpeed].max) else 25
- else:
- # 非直驱 发电机转速
- self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone(
- conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone(
- conf.dataContract.graphSets[iGeneratorSpeed].step) else 200
- self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].min if not self.common.isNone(
- conf.dataContract.graphSets[iGeneratorSpeed].min) else 1000
- self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone(
- conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000
- # --------以下为原写法,未区分驱动方式直接选择了非直驱--------
- # # 直驱 发电机转速
- # self.axisStepGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone(
- # conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone(
- # conf.dataContract.graphSets[dGeneratorSpeed].step) else 5
- # self.axisLowerLimitGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].min if not self.common.isNone(
- # conf.dataContract.graphSets[dGeneratorSpeed].min) else 0
- # self.axisUpperLimitGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone(
- # conf.dataContract.graphSets[dGeneratorSpeed].max) else 30
- # # 非直驱 发电机转速
- # self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone(
- # conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone(
- # conf.dataContract.graphSets[iGeneratorSpeed].step) else 200
- # self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].min if not self.common.isNone(
- # conf.dataContract.graphSets[iGeneratorSpeed].min) else 1000
- # self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone(
- # conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000
- # 轴系 发电机转矩
- iGgeneratorTorque = "IgeneratorTorque"
- dGgeneratorTorque = "DgeneratorTorque"
- # if turbineModelInfo[Field_MotionType].iloc[0]==2 :
- # # 直驱 发电机转矩
- # self.axisStepGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].step if not self.common.isNone(
- # conf.dataContract.graphSets[dGgeneratorTorque]) and not self.common.isNone(
- # conf.dataContract.graphSets[dGgeneratorTorque].step) else 10000
- # self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].min if not self.common.isNone(
- # conf.dataContract.graphSets[dGgeneratorTorque].min) else 0
- # self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone(
- # conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000
- # else:
- # # 非直驱 发电机转矩
- # self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone(
- # conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone(
- # conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000
- # self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].min if not self.common.isNone(
- # conf.dataContract.graphSets[iGgeneratorTorque].min) else 0
- # self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].max if not self.common.isNone(
- # conf.dataContract.graphSets[iGgeneratorTorque].max) else 12000
- # --------以下为原写法,未区分驱动方式直接选择了非直驱--------
- # 直驱 发电机转矩
- self.axisStepGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].step if not self.common.isNone(
- conf.dataContract.graphSets[dGgeneratorTorque]) and not self.common.isNone(
- conf.dataContract.graphSets[dGgeneratorTorque].step) else 10000
- self.axisLowerLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].min if not self.common.isNone(
- conf.dataContract.graphSets[dGgeneratorTorque].min) else 0
- self.axisUpperLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone(
- conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000
- # 非直驱 发电机转矩
- self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone(
- conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone(
- conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000
- self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].min if not self.common.isNone(
- conf.dataContract.graphSets[iGgeneratorTorque].min) else 0
- self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].max if not self.common.isNone(
- conf.dataContract.graphSets[iGgeneratorTorque].max) else 12000
- # 轴系 有功功率
- activePower = "activePower"
- self.axisStepActivePower = conf.dataContract.graphSets[activePower].step if not self.common.isNone(
- conf.dataContract.graphSets[activePower]) and not self.common.isNone(
- conf.dataContract.graphSets[activePower].step) else 250
- self.axisLowerLimitActivePower = conf.dataContract.graphSets[activePower].min if not self.common.isNone(
- conf.dataContract.graphSets[activePower].min) else 0
- self.axisUpperLimitActivePower = conf.dataContract.graphSets[activePower].max if not self.common.isNone(
- conf.dataContract.graphSets[activePower].max) else self.RatedPowerOfTurbine*1.2
- # 轴系 桨距角
- pitchAngle = "pitchAngle"
- self.axisStepPitchAngle = conf.dataContract.graphSets[pitchAngle].step if not self.common.isNone(
- conf.dataContract.graphSets[pitchAngle]) and not self.common.isNone(
- conf.dataContract.graphSets[pitchAngle].step) else 2
- self.axisLowerLimitPitchAngle = conf.dataContract.graphSets[pitchAngle].min if not self.common.isNone(
- conf.dataContract.graphSets[pitchAngle].min) else -2
- self.axisUpperLimitPitchAngle = conf.dataContract.graphSets[pitchAngle].max if not self.common.isNone(
- conf.dataContract.graphSets[pitchAngle].max) else 28
- # 轴系 风能利用系数
- cp = "cp"
- self.axisStepCp = conf.dataContract.graphSets[cp].step if not self.common.isNone(
- conf.dataContract.graphSets[cp]) and not self.common.isNone(
- conf.dataContract.graphSets[cp].step) else 0.1
- self.axisLowerLimitCp = conf.dataContract.graphSets[cp].min if not self.common.isNone(
- conf.dataContract.graphSets[cp].min) else 0
- self.axisUpperLimitCp = conf.dataContract.graphSets[cp].max if not self.common.isNone(
- conf.dataContract.graphSets[cp].max) else 1
- # 轴系 叶尖速比
- tsr = "tsr"
- self.axisStepTSR = conf.dataContract.graphSets[tsr].step if not self.common.isNone(
- conf.dataContract.graphSets[tsr]) and not self.common.isNone(
- conf.dataContract.graphSets[tsr].step) else 5
- self.axisLowerLimitTSR = conf.dataContract.graphSets[tsr].min if not self.common.isNone(
- conf.dataContract.graphSets[tsr].min) else 0
- self.axisUpperLimitTSR = conf.dataContract.graphSets[tsr].max if not self.common.isNone(
- conf.dataContract.graphSets[tsr].max) else 20
- @abstractmethod
- def typeAnalyst(self):
- pass
- def getOutputAnalysisDir(self):
- """
- 获取当前分析的输出目录
- """
- outputAnalysisDir = f"output/{self.conf.dataContract.dataFilter.powerFarmID}/{self.conf.dataContract.dataFilter.dataBatchNum}/{self.typeAnalyst()}/{self.conf.dataContract.autoOrManual}"
- if not dir.check_directory_exists(outputAnalysisDir):
- dir.create_directory(outputAnalysisDir)
- return outputAnalysisDir
- def filterCommon(self, dataFrame: pd.DataFrame, conf: Contract):
- # Filter rows where turbine state
- if Field_StatusOfTurbine in dataFrame.columns and self.customFilterStatusOfTurbine in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterStatusOfTurbine]):
- stateTurbine = conf.dataContract.dataFilter.customFilter[self.customFilterStatusOfTurbine]
- dataFrame = dataFrame[dataFrame[Field_StatusOfTurbine].isin(
- stateTurbine)]
- # Filter rows where wind speed
- if Field_WindSpeed in dataFrame.columns and self.customFilterWindSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].min):
- windSpeedMin = conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].min
- dataFrame = dataFrame[(
- dataFrame[Field_WindSpeed] >= windSpeedMin)]
- if Field_WindSpeed in dataFrame.columns and self.customFilterWindSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].max):
- windSpeedMax = conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].max
- dataFrame = dataFrame[(
- dataFrame[Field_WindSpeed] < windSpeedMax)]
- # Filter rows where pitch
- if Field_PitchAngel1 in dataFrame.columns and self.customFilterPitchAngle in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle].min):
- anglePitchMin = conf.dataContract.dataFilter.customFilter[
- self.customFilterPitchAngle].min
- dataFrame = dataFrame[(
- dataFrame[Field_PitchAngel1] >= anglePitchMin)]
- if Field_PitchAngel1 in dataFrame.columns and self.customFilterPitchAngle in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle].max):
- anglePitchMax = conf.dataContract.dataFilter.customFilter[
- self.customFilterPitchAngle].max
- dataFrame = dataFrame[(
- dataFrame[Field_PitchAngel1] < anglePitchMax)]
- # Filter rows where power
- if Field_ActiverPower in dataFrame.columns and self.customFilterActivePower in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower].min):
- activePowerMin = conf.dataContract.dataFilter.customFilter[
- self.customFilterActivePower].min
- dataFrame = dataFrame[(
- dataFrame[Field_ActiverPower] >= activePowerMin)]
- if Field_ActiverPower in dataFrame.columns and self.customFilterActivePower in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower].max):
- activePowerMax = conf.dataContract.dataFilter.customFilter[
- self.customFilterActivePower].max
- dataFrame = dataFrame[(
- dataFrame[Field_ActiverPower] < activePowerMax)]
- # Filter rows where generator speed
- if Field_GeneratorSpeed in dataFrame.columns and self.customFilterGeneratorSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed].min):
- speedGeneratorMin = conf.dataContract.dataFilter.customFilter[
- self.customFilterGeneratorSpeed].min
- dataFrame = dataFrame[(
- dataFrame[Field_GeneratorSpeed] >= speedGeneratorMin)]
- if Field_GeneratorSpeed in dataFrame.columns and self.customFilterGeneratorSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed].max):
- speedGeneratorMax = conf.dataContract.dataFilter.customFilter[
- self.customFilterGeneratorSpeed].max
- dataFrame = dataFrame[(
- dataFrame[Field_GeneratorSpeed] < speedGeneratorMax)]
- # if not self.common.isNone(confData.field_activePowerSet) and confData.field_activePowerSet in dataFrame.columns:
- # dataFrame = dataFrame[dataFrame[confData.field_activePowerSet]
- # == confData.rated_power]
- # if not self.common.isNone(confData.field_activePowerAvailable) and confData.field_activePowerAvailable in dataFrame.columns \
- # and self.node_activePowerAvailable in confData.filter and not self.common.isNone(confData.filter[self.node_activePowerAvailable]):
- # state = confData.filter[self.node_activePowerAvailable]
- # dataFrame = dataFrame[dataFrame[confData.field_activePowerAvailable].isin(
- # state)]
- return dataFrame
- def filterCustomForTurbine(self, dataFrame: pd.DataFrame, conf: Contract):
- return self.filterCommon(dataFrame, conf)
- def analysisOfTurbine(self,
- outputAnalysisDir,
- outputFilePath,
- conf: Contract,
- turbineCode):
- return self.turbineAnalysis(outputAnalysisDir,
- outputFilePath, conf, turbineCode)
- def turbineAnalysis(self,
- outputAnalysisDir,
- outputFilePath,
- conf: Contract,
- turbineCode):
- return pd.DataFrame()
- def filterCustomForTurbines(self, dataFrame: pd.DataFrame, conf: Contract):
- return self.filterCommon(dataFrame, conf)
- def analysisOfTurbines(self, outputAnalysisDir, conf: Contract, turbineCodes):
- # self.logger.info(
- # f"typeAnalyst: {self.typeAnalyst()} method: analysisOfTurbines , turbineCodes : {turbineCodes}")
- return self.turbinesAnalysis(outputAnalysisDir, conf, turbineCodes)
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- return pd.DataFrame()
- def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst) -> pd.DataFrame:
- timeGranularity = next(
- (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None)
- return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity]
- def calculateAngleIncluded(self, array1, array2):
- """
- 计算两个相同长度角度数组中两两对应角度值的偏差。
- 结果限制在-90°到+90°之间,并保留两位小数。
- 参数:
- array1 (list): 第一个角度数组
- array2 (list): 第二个角度数组
- 返回:
- list: 两两对应角度的偏差列表
- """
- deviations = []
- for angle1, angle2 in zip(array1, array2):
- # 计算原始偏差
- deviation = angle1-angle2
- # 调整偏差,使其位于-180°到+180°范围内
- if deviation == 0.0:
- deviation = 0.0
- else:
- deviation = (deviation + 180) % 360 - 180
- # 将偏差限制在-90°到+90°范围内
- if deviation > 90:
- deviation -= 180
- elif deviation < -90:
- deviation += 180
- # 保留两位小数
- deviations.append(round(deviation, 2))
- return deviations
- def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos):
- """
- 依据机舱位置(角度)、风向计算两者夹角
- """
- if fieldNacellePos in dataFrame.columns and fieldWindDirect in dataFrame.columns and not dataFrame[fieldNacellePos].isna().all() and not dataFrame[fieldWindDirect].isna().all():
- # 计算 angle_included_list
- angle_included_list = self.calculateAngleIncluded(
- dataFrame[fieldWindDirect], dataFrame[fieldNacellePos])
- # 检查 angle_included_list 的长度是否与 dataFrame 一致
- if len(angle_included_list) != len(dataFrame):
- raise ValueError(
- "The length of the calculated angle_included_list does not match the length of the DataFrame")
- # 创建一个新的列来保存计算结果
- dataFrame['Calculated_AngleIncluded'] = angle_included_list
- # 使用新列填充原列的缺失值
- dataFrame[Field_AngleIncluded] = dataFrame[Field_AngleIncluded].fillna(
- dataFrame['Calculated_AngleIncluded'])
- # 删除临时列
- dataFrame.drop(columns=['Calculated_AngleIncluded'], inplace=True)
- def recalculationOfFieldPowerFloor(self, dataFrame: pd.DataFrame, fieldActivePower):
- """
- 功率计算
- """
- if fieldActivePower in dataFrame.columns:
- dataFrame[Field_PowerFloor] = dataFrame[fieldActivePower].apply(
- lambda x: int(
- x / 10) * 10 if pd.notnull(x) or pd.notna(x) else np.nan # 保留NaN值
- )
- def recalculationOfFieldWindSpeedFloor(self, dataFrame: pd.DataFrame, fieldWindSpeed):
- """
- 风速计算
- """
- if fieldWindSpeed in dataFrame.columns:
- dataFrame[Field_WindSpeedFloor] = dataFrame[fieldWindSpeed].apply(
- lambda x: int(x/1) +
- 0.5 if pd.notnull(x) or pd.notna(x) else np.nan
- )
- def recalculationOfGeneratorSpeed(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
- """
- 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
- """
- if fieldGeneratorSpeed in dataFrame.columns and fieldRotorSpeed in dataFrame.columns:
- if turbineModelInfo[Field_MotionType] == 2:
- dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna(
- dataFrame[fieldRotorSpeed])
- # dataFrame[fieldGeneratorSpeed] =dataFrame[fieldRotorSpeed]
- else:
- dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna(
- dataFrame[fieldRotorSpeed]*rotationalSpeedRatio)
- # dataFrame[fieldGeneratorSpeed] = dataFrame[fieldRotorSpeed]*rotationalSpeedRatio
- def recalculationOfRotorSpeed(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
- """
- 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
- """
- if fieldRotorSpeed in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
- if turbineModelInfo[Field_MotionType] == 2:
- dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna(
- dataFrame[fieldGeneratorSpeed])
- else:
- dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna(
- dataFrame[fieldGeneratorSpeed] / rotationalSpeedRatio)
- def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed):
- """
- 风电机组发电机转矩计算,P的单位换成KW转矩计算公式:
- P*1000= pi/30*T*n
- 30000/pi*P=T*n
- 30000/3.1415926*P=T*n
- 9549.297*p=T*n
- 其中:n为发电机转速,p为有功功率,T为转矩
- """
- if fieldGeneratorTorque in dataFrame.columns and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
- dataFrame[Field_GeneratorTorque] = dataFrame[Field_GeneratorTorque].fillna(
- 9549.297 * dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed])
- def recalculation(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame):
- """
- 再计算数据测点
- 参数:
- dataFrame 原始数据
- conf 配置数据
- """
- self.recalculationOfFieldPowerFloor(dataFrame, Field_ActiverPower)
- self.recalculationOfFieldWindSpeedFloor(dataFrame, Field_WindSpeed)
- self.recalculationOfGeneratorSpeed(
- turbineModelInfo, dataFrame, Field_RotorSpeed, Field_GeneratorSpeed, self.turbineModelInfo[Field_RSR].iloc[0])
- self.recalculationOfRotorSpeed(turbineModelInfo, dataFrame, Field_RotorSpeed,
- Field_GeneratorSpeed, self.turbineModelInfo[Field_RSR].iloc[0])
- self.recalculationOfRotorTorque(
- dataFrame, Field_GeneratorTorque, Field_ActiverPower, Field_GeneratorSpeed)
- self.recalculationOfIncludedAngle(
- dataFrame, Field_AngleIncluded, Field_WindDirection, Field_NacPos)
- self.common.calculateTSR(
- dataFrame, self.turbineModelInfo[Field_RotorDiameter].iloc[0])
- self.common.calculateCpOfSingleTurbine(
- dataFrame, self.currPowerFarmInfo[Field_AirDensity], turbineModelInfo[Field_RotorDiameter])
- def processDateTimeForAll(self, dataFrame: pd.DataFrame):
- dataFrame[Field_UnixYearMonth] = pd.to_datetime(
- dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
- dataFrame[Field_YearMonth] = dataFrame[Field_Time].dt.strftime('%Y-%m')
- dataFrame[Field_YearMonthDay] = dataFrame[Field_Time].dt.strftime(
- '%Y-%m-%d')
- return dataFrame
- def processDateTime(self, dataFrame: pd.DataFrame, fieldTime: str = None):
- if Field_Time not in dataFrame.columns:
- self.logger.info(
- "Field_Time is not in dataFrame columns, Do not handle time processing")
- return dataFrame
- if fieldTime is None:
- return self.processDateTimeForAll(dataFrame)
- timeSeries = pd.Series
- if fieldTime == Field_UnixYearMonth:
- timeSeries = pd.to_datetime(
- dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
- if fieldTime == Field_YearMonth:
- timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m')
- if fieldTime == Field_YearMonthDay:
- timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m-%d')
- dataFrame[fieldTime] = timeSeries
- return dataFrame
- def loadData(self, powerFarmID: str, timeGranularity: str, turbineCode: str, select: str, condition: str) -> pd.DataFrame:
- selectStr = ", ".join(f"{field}" for field in select)
- businessDB: DatabaseUtil = GetBusinessDbUtil()
- with businessDB.session_scope() as session:
- # query_text = f"""SELECT wind_turbine_number,time_stamp, active_power,wind_velocity,cabin_position,true_wind_direction, rotor_speed, generator_speed,actual_torque,yaw_error1,outside_cabin_temperature,cabin_temperature,main_bearing_temperature,gearboxmedium_speed_shaftbearing_temperature,gearbox_low_speed_shaft_bearing_temperature,gearbox_high_speed_shaft_bearing_temperature,generatordrive_end_bearing_temperature,generatornon_drive_end_bearing_temperature,generator_winding1_temperature,pitch_angle_blade_1,pitch_angle_blade_2,pitch_angle_blade_3,front_back_vibration_of_the_cabin,side_to_side_vibration_of_the_cabin
- # FROM `{dataBatchNum}_{timeGranularity}`
- # WHERE wind_turbine_number IN ({turbineCode}) AND {condition}"""
- query_text = f"""SELECT {selectStr}
- FROM `{powerFarmID}_{timeGranularity}`
- WHERE wind_turbine_number IN ({turbineCode}) AND {condition}"""
- query_result = session.execute(text(query_text)).fetchall()
- # select = [Field_DeviceCode, Field_Time, Field_ActiverPower, Field_WindSpeed, Field_NacPos, Field_WindDirection, Field_RotorSpeed, Field_GeneratorSpeed, Field_GeneratorTorque, Field_AngleIncluded, Field_EnvTemp, Field_NacTemp, Field_MainBearTemp, Field_GbMsBearTemp, Field_GbLsBearTemp,
- # Field_GbHsBearTemp, Field_GeneratorDE, Field_GeneratorNDE, Field_GenWiTemp1, Field_PitchAngel1, Field_PitchAngel2, Field_PitchAngel3, Field_NacFbVib, Field_NacLrVib]
- dataFrame = pd.DataFrame(query_result, columns=select)
- return dataFrame
- def dataProcess(self, powerFarmID: str, dataBatchNum: str, timeGranularity: str, turbineCode: str, select: str, customCondition) -> Tuple[pd.DataFrame, str, str]:
- self.logger.info(
- f"load data -> Time Granulary: {timeGranularity} Power Farm: {powerFarmID} Batch: {dataBatchNum} Turbine: {turbineCode} .")
- # Get current turbine info
- currTurbineInfo = self.common.getTurbineInfo(
- self.conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo)
- # Get turbine model info
- currTurbineModelInfo = self.common.getTurbineModelByTurbine(
- currTurbineInfo, self.turbineModelInfo)
- # select DB data
- dataFrameOfTurbine = self.loadData(
- powerFarmID, timeGranularity, f"'{turbineCode}'", select, customCondition)
- if dataFrameOfTurbine.empty:
- self.logger.warning(
- f"{turbineCode} Time Granulary: {timeGranularity} scada data is empty.")
- return pd.DataFrame(), dataBatchNum, turbineCode
- else:
- if timeGranularity in ['fault', 'warn']:
- return dataFrameOfTurbine, dataBatchNum, turbineCode
- else:
- # Add property to dataFrame
- self.addPropertyToDataFrame(
- dataFrameOfTurbine, currTurbineInfo, currTurbineModelInfo)
- # Add engine_name to dataFrame
- dataFrameOfTurbine[Field_NameOfTurbine] = currTurbineInfo.loc[Field_NameOfTurbine]
- # Additional data processing steps
- self.processDateTime(dataFrameOfTurbine)
- # Recalculation
- self.recalculation(currTurbineModelInfo, dataFrameOfTurbine)
- # Filter data
- dataFrameOfTurbine = self.filterCommon(
- dataFrameOfTurbine, self.conf)
- return dataFrameOfTurbine, dataBatchNum, turbineCode
- def selectTimeCondition(self, conf: Contract, conditions: list[str]):
- """
- 时间过滤条件组装
- 从json配置中获取时间过滤条件进行组装
- """
- # 时间过滤条件
- if conf.dataContract.dataFilter.beginTime:
- conditions.append(
- f"time_stamp >= '{conf.dataContract.dataFilter.beginTime}'")
- if conf.dataContract.dataFilter.endTime:
- conditions.append(
- f"time_stamp <= '{conf.dataContract.dataFilter.endTime}'")
- # 排除月份
- if conf.dataContract.dataFilter.excludingMonths:
- excluding_months = ", ".join(
- f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
- excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
- conditions.append(excluding_condition)
- def selectLabCondition(self, conditions: list[str]):
- """
- lab 过滤条件组装
- 根据lab获取相应的数据
- 逻辑实现在相应的Behavior子类中,子类没有实现则无需过滤
- """
- return
- def selectAllCondition(self, conf: Contract):
- conditions = []
- # 时间过滤条件
- self.selectTimeCondition(conf, conditions)
- # lab过滤
- self.selectLabCondition(conditions)
- return " AND ".join(conditions) if conditions else "1=1"
- def selectFaultTimeCondition(self, conf: Contract, conditions: list[str]):
- """
- 时间过滤条件组装
- 从json配置中获取时间过滤条件进行组装
- """
- # 时间过滤条件
- if conf.dataContract.dataFilter.beginTime:
- conditions.append(
- f"begin_time >= '{conf.dataContract.dataFilter.beginTime}'")
- if conf.dataContract.dataFilter.endTime:
- conditions.append(
- f"end_time <= '{conf.dataContract.dataFilter.endTime}'")
- # 排除月份
- if conf.dataContract.dataFilter.excludingMonths:
- excluding_months = ", ".join(
- f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
- excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
- conditions.append(excluding_condition)
- # 故障数据过滤条件
- def selectAllFaultCondition(self, conf: Contract):
- conditions = []
- # 时间过滤条件
- self.selectFaultTimeCondition(conf, conditions)
- # lab过滤
- self.selectLabCondition(conditions)
- return " AND ".join(conditions) if conditions else "1=1"
- def processTurbineData(self, turbines, conf: Contract, select: str):
- try:
- # add "where" condition
- select_conditions = self.selectAllCondition(conf)
- configAnalysisDF = pd.DataFrame(
- [config.to_dict() for config in conf.dataContract.configAnalysis])
- configAnalysisDF = configAnalysisDF[(configAnalysisDF["className"] == self.__class__.__name__)]
- scadaTimeGranularities = configAnalysisDF["scada"].unique()
- self.logger.info(
- f"typeAnalyst: {self.typeAnalyst()} method: processTurbineData , scadaTimeGranularities : {scadaTimeGranularities} current class : {self.__class__.__name__}" )
- dictionary = dict()
- for timeGranularity in scadaTimeGranularities:
- dataFrames = []
- dataFrameOfTurbines = pd.DataFrame()
- if timeGranularity in ['fault', 'warn']:
- select_conditions = self.selectAllFaultCondition(conf)
- maxWorkers = 5
- with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
- futures = [
- executor.submit(self.dataProcess, conf.dataContract.dataFilter.powerFarmID,
- conf.dataContract.dataFilter.dataBatchNum, timeGranularity, turbine, select, select_conditions)
- for turbine in turbines
- ]
- for future in concurrent.futures.as_completed(futures):
- try:
- dataFrameOfTurbine, dataBatchNum, turbineCode = future.result()
- dataFrames.append(dataFrameOfTurbine)
- self.logger.info(
- f"typeAnalyst: {self.typeAnalyst()} data frame append,dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} turbineCode: {turbineCode}")
- except Exception as exc:
- raise exc
- dataFrameOfTurbines = pd.concat(dataFrames, ignore_index=True)
- self.logger.info(
- f"typeAnalyst: {self.typeAnalyst()} data frame concat dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} finish")
- if dataFrameOfTurbines.empty:
- excption = CustomError(102)
- self.logger.warning(
- f"{excption.message} typeAnalyst: {self.typeAnalyst()} Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch : {conf.dataContract.dataFilter.dataBatchNum} Time Granularity : {timeGranularity}")
- raise excption
- else:
- self.logger.info(
- f"typeAnalyst: {self.typeAnalyst()} data frame concat dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} dataFrameOfTurbines : {dataFrameOfTurbines}")
- if Field_DeviceCode in dataFrameOfTurbines.columns and Field_CodeOfTurbine not in dataFrameOfTurbines.columns:
- dataFrameOfTurbines = dataFrameOfTurbines.rename(
- columns={Field_DeviceCode: Field_CodeOfTurbine})
- dictionary[timeGranularity] = dataFrameOfTurbines
- return dictionary
- except Exception as e:
- self.logger.error(
- f"Error processing turbine data:{traceback.format_exc()}")
- raise
- def addPropertyToDataFrame(self, dataFrameOfTurbine: pd.DataFrame, currTurbineInfo: pd.Series, currTurbineModelInfo: pd.Series):
- """
- 用来添加额外当前风机属性
- 在business中相应的分析员中有实现,没有实现就无额外参数添加
- """
- return
- def escape_special_characters(self, original_string: str):
- """
- ---废弃---
- 特殊字符url编码处理
- "/" 符号单独处理
- """
- retrun_string = quote(original_string)
- if "/" in retrun_string:
- retrun_string = retrun_string.replace("/", "%2F")
- return retrun_string
|