baseAnalyst.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. import concurrent.futures
  2. import logging
  3. from abc import ABC, abstractmethod
  4. from logging import Logger
  5. from typing import Any, Tuple
  6. from urllib.parse import quote
  7. import numpy as np
  8. import pandas as pd
  9. from algorithmContract.confBusiness import *
  10. from algorithmContract.configAnalysis import ConfigAnalysis
  11. from algorithmContract.contract import Contract
  12. from behavior.dataMarker import DataMarker
  13. from common.appConfig import GetBusinessDbUtil
  14. from common.commonBusiness import CommonBusiness
  15. from numpy.typing import NDArray
  16. from sqlalchemy.sql import text
  17. from utils.directoryUtil import DirectoryUtil as dir
  18. from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
  19. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  20. class BaseAnalyst(ABC):
  21. def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract,
  22. powerFarmInfo: pd.DataFrame, turbineInfo: pd.DataFrame, turbineModelInfo: pd.DataFrame, dataTransfer: pd.DataFrame,
  23. weatherStationInfo: pd.DataFrame, dataFrameContractOfTurbine: pd.DataFrame):
  24. self.logger = logger
  25. self.dbUtil = dbUtil
  26. self.minioUtil = minioUtil
  27. self.conf = conf
  28. # 定义风速区间
  29. self.binsWindSpeed:NDArray[np.floating[Any]] = np.arange(0, 26, 0.5)
  30. self.dataMarker = DataMarker()
  31. self.common = CommonBusiness()
  32. self.customFilterStatusOfTurbine = "valueTurbineStatus"
  33. self.customFilterPitchAngle = "valuePitchAngle"
  34. self.customFilterWindSpeed = "valueWindSpeed"
  35. self.customFilterActivePower = "valueActivePower"
  36. self.customFilterGeneratorSpeed = "valueGeneratorSpeed"
  37. # 加载所有新能源场站信息
  38. self.powerFarmInfo = powerFarmInfo
  39. self.currPowerFarmInfo = self.common.getPowerFarm(
  40. self.conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo)
  41. # 加载所有风电机组信息
  42. self.turbineInfo = turbineInfo
  43. # 加载数据转换信息
  44. # self.dataTransfer = dataTransfer
  45. # 加载机型信息
  46. self.turbineModelInfo = turbineModelInfo
  47. # 加载所有测风塔信息
  48. self.weatherStationInfo = weatherStationInfo
  49. # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
  50. self.dataFrameContractOfTurbine = dataFrameContractOfTurbine
  51. # 图表 轴系设置
  52. # 常值
  53. self.RatedPowerOfTurbine = self.turbineInfo[Field_RatedPower].iloc[0]
  54. # 图表 轴系设置
  55. # 轴系 发电机转速
  56. iGeneratorSpeed = "IgeneratorSpeed"
  57. dGeneratorSpeed = "DgeneratorSpeed"
  58. if self.turbineModelInfo[Field_MotionType] .iloc[0]==2 :
  59. # 直驱 发电机转速
  60. self.axisStepGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone(
  61. conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone(
  62. conf.dataContract.graphSets[dGeneratorSpeed].step) else 1
  63. self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].min if not self.common.isNone(
  64. conf.dataContract.graphSets[dGeneratorSpeed].min) else 5
  65. self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone(
  66. conf.dataContract.graphSets[dGeneratorSpeed].max) else 25
  67. else:
  68. # 非直驱 发电机转速
  69. self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone(
  70. conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone(
  71. conf.dataContract.graphSets[iGeneratorSpeed].step) else 200
  72. self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].min if not self.common.isNone(
  73. conf.dataContract.graphSets[iGeneratorSpeed].min) else 1000
  74. self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone(
  75. conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000
  76. # --------以下为原写法,未区分驱动方式直接选择了非直驱--------
  77. # # 直驱 发电机转速
  78. # self.axisStepGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone(
  79. # conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone(
  80. # conf.dataContract.graphSets[dGeneratorSpeed].step) else 5
  81. # self.axisLowerLimitGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].min if not self.common.isNone(
  82. # conf.dataContract.graphSets[dGeneratorSpeed].min) else 0
  83. # self.axisUpperLimitGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone(
  84. # conf.dataContract.graphSets[dGeneratorSpeed].max) else 30
  85. # # 非直驱 发电机转速
  86. # self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone(
  87. # conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone(
  88. # conf.dataContract.graphSets[iGeneratorSpeed].step) else 200
  89. # self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].min if not self.common.isNone(
  90. # conf.dataContract.graphSets[iGeneratorSpeed].min) else 1000
  91. # self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone(
  92. # conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000
  93. # 轴系 发电机转矩
  94. iGgeneratorTorque = "IgeneratorTorque"
  95. dGgeneratorTorque = "DgeneratorTorque"
  96. # if turbineModelInfo[Field_MotionType].iloc[0]==2 :
  97. # # 直驱 发电机转矩
  98. # self.axisStepGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].step if not self.common.isNone(
  99. # conf.dataContract.graphSets[dGgeneratorTorque]) and not self.common.isNone(
  100. # conf.dataContract.graphSets[dGgeneratorTorque].step) else 10000
  101. # self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].min if not self.common.isNone(
  102. # conf.dataContract.graphSets[dGgeneratorTorque].min) else 0
  103. # self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone(
  104. # conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000
  105. # else:
  106. # # 非直驱 发电机转矩
  107. # self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone(
  108. # conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone(
  109. # conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000
  110. # self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].min if not self.common.isNone(
  111. # conf.dataContract.graphSets[iGgeneratorTorque].min) else 0
  112. # self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].max if not self.common.isNone(
  113. # conf.dataContract.graphSets[iGgeneratorTorque].max) else 12000
  114. # --------以下为原写法,未区分驱动方式直接选择了非直驱--------
  115. # 直驱 发电机转矩
  116. self.axisStepGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].step if not self.common.isNone(
  117. conf.dataContract.graphSets[dGgeneratorTorque]) and not self.common.isNone(
  118. conf.dataContract.graphSets[dGgeneratorTorque].step) else 10000
  119. self.axisLowerLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].min if not self.common.isNone(
  120. conf.dataContract.graphSets[dGgeneratorTorque].min) else 0
  121. self.axisUpperLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone(
  122. conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000
  123. # 非直驱 发电机转矩
  124. self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone(
  125. conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone(
  126. conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000
  127. self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].min if not self.common.isNone(
  128. conf.dataContract.graphSets[iGgeneratorTorque].min) else 0
  129. self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].max if not self.common.isNone(
  130. conf.dataContract.graphSets[iGgeneratorTorque].max) else 12000
  131. # 轴系 有功功率
  132. activePower = "activePower"
  133. self.axisStepActivePower = conf.dataContract.graphSets[activePower].step if not self.common.isNone(
  134. conf.dataContract.graphSets[activePower]) and not self.common.isNone(
  135. conf.dataContract.graphSets[activePower].step) else 250
  136. self.axisLowerLimitActivePower = conf.dataContract.graphSets[activePower].min if not self.common.isNone(
  137. conf.dataContract.graphSets[activePower].min) else 0
  138. self.axisUpperLimitActivePower = conf.dataContract.graphSets[activePower].max if not self.common.isNone(
  139. conf.dataContract.graphSets[activePower].max) else self.RatedPowerOfTurbine*1.2
  140. # 轴系 桨距角
  141. pitchAngle = "pitchAngle"
  142. self.axisStepPitchAngle = conf.dataContract.graphSets[pitchAngle].step if not self.common.isNone(
  143. conf.dataContract.graphSets[pitchAngle]) and not self.common.isNone(
  144. conf.dataContract.graphSets[pitchAngle].step) else 2
  145. self.axisLowerLimitPitchAngle = conf.dataContract.graphSets[pitchAngle].min if not self.common.isNone(
  146. conf.dataContract.graphSets[pitchAngle].min) else -2
  147. self.axisUpperLimitPitchAngle = conf.dataContract.graphSets[pitchAngle].max if not self.common.isNone(
  148. conf.dataContract.graphSets[pitchAngle].max) else 28
  149. # 轴系 风能利用系数
  150. cp = "cp"
  151. self.axisStepCp = conf.dataContract.graphSets[cp].step if not self.common.isNone(
  152. conf.dataContract.graphSets[cp]) and not self.common.isNone(
  153. conf.dataContract.graphSets[cp].step) else 0.1
  154. self.axisLowerLimitCp = conf.dataContract.graphSets[cp].min if not self.common.isNone(
  155. conf.dataContract.graphSets[cp].min) else 0
  156. self.axisUpperLimitCp = conf.dataContract.graphSets[cp].max if not self.common.isNone(
  157. conf.dataContract.graphSets[cp].max) else 1
  158. # 轴系 叶尖速比
  159. tsr = "tsr"
  160. self.axisStepTSR = conf.dataContract.graphSets[tsr].step if not self.common.isNone(
  161. conf.dataContract.graphSets[tsr]) and not self.common.isNone(
  162. conf.dataContract.graphSets[tsr].step) else 5
  163. self.axisLowerLimitTSR = conf.dataContract.graphSets[tsr].min if not self.common.isNone(
  164. conf.dataContract.graphSets[tsr].min) else 0
  165. self.axisUpperLimitTSR = conf.dataContract.graphSets[tsr].max if not self.common.isNone(
  166. conf.dataContract.graphSets[tsr].max) else 20
  167. @ abstractmethod
  168. def typeAnalyst(self):
  169. pass
  170. def getOutputAnalysisDir(self):
  171. """
  172. 获取当前分析的输出目录
  173. """
  174. outputAnalysisDir = f"output/{self.conf.dataContract.dataFilter.powerFarmID}/{self.conf.dataContract.dataFilter.dataBatchNum}/{self.typeAnalyst()}/{self.conf.dataContract.autoOrManual}"
  175. if not dir.check_directory_exists(outputAnalysisDir):
  176. dir.create_directory(outputAnalysisDir)
  177. return outputAnalysisDir
  178. def filterCommon(self, dataFrame: pd.DataFrame, conf: Contract):
  179. # Filter rows where turbine state
  180. if Field_StatusOfTurbine in dataFrame.columns and self.customFilterStatusOfTurbine in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterStatusOfTurbine]):
  181. stateTurbine = conf.dataContract.dataFilter.customFilter[self.customFilterStatusOfTurbine]
  182. dataFrame = dataFrame[dataFrame[Field_StatusOfTurbine].isin(
  183. stateTurbine)]
  184. # Filter rows where wind speed
  185. if Field_WindSpeed in dataFrame.columns and self.customFilterWindSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].min):
  186. windSpeedMin = conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].min
  187. dataFrame = dataFrame[(
  188. dataFrame[Field_WindSpeed] >= windSpeedMin)]
  189. if Field_WindSpeed in dataFrame.columns and self.customFilterWindSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].max):
  190. windSpeedMax = conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].max
  191. dataFrame = dataFrame[(
  192. dataFrame[Field_WindSpeed] < windSpeedMax)]
  193. # Filter rows where pitch
  194. if Field_PitchAngel1 in dataFrame.columns and self.customFilterPitchAngle in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle].min):
  195. anglePitchMin = conf.dataContract.dataFilter.customFilter[
  196. self.customFilterPitchAngle].min
  197. dataFrame = dataFrame[(
  198. dataFrame[Field_PitchAngel1] >= anglePitchMin)]
  199. if Field_PitchAngel1 in dataFrame.columns and self.customFilterPitchAngle in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle].max):
  200. anglePitchMax = conf.dataContract.dataFilter.customFilter[
  201. self.customFilterPitchAngle].max
  202. dataFrame = dataFrame[(
  203. dataFrame[Field_PitchAngel1] < anglePitchMax)]
  204. # Filter rows where power
  205. if Field_ActiverPower in dataFrame.columns and self.customFilterActivePower in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower].min):
  206. activePowerMin = conf.dataContract.dataFilter.customFilter[
  207. self.customFilterActivePower].min
  208. dataFrame = dataFrame[(
  209. dataFrame[Field_ActiverPower] >= activePowerMin)]
  210. if Field_ActiverPower in dataFrame.columns and self.customFilterActivePower in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower].max):
  211. activePowerMax = conf.dataContract.dataFilter.customFilter[
  212. self.customFilterActivePower].max
  213. dataFrame = dataFrame[(
  214. dataFrame[Field_ActiverPower] < activePowerMax)]
  215. # Filter rows where generator speed
  216. if Field_GeneratorSpeed in dataFrame.columns and self.customFilterGeneratorSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed].min):
  217. speedGeneratorMin = conf.dataContract.dataFilter.customFilter[
  218. self.customFilterGeneratorSpeed].min
  219. dataFrame = dataFrame[(
  220. dataFrame[Field_GeneratorSpeed] >= speedGeneratorMin)]
  221. if Field_GeneratorSpeed in dataFrame.columns and self.customFilterGeneratorSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed].max):
  222. speedGeneratorMax = conf.dataContract.dataFilter.customFilter[
  223. self.customFilterGeneratorSpeed].max
  224. dataFrame = dataFrame[(
  225. dataFrame[Field_GeneratorSpeed] < speedGeneratorMax)]
  226. # if not self.common.isNone(confData.field_activePowerSet) and confData.field_activePowerSet in dataFrame.columns:
  227. # dataFrame = dataFrame[dataFrame[confData.field_activePowerSet]
  228. # == confData.rated_power]
  229. # if not self.common.isNone(confData.field_activePowerAvailable) and confData.field_activePowerAvailable in dataFrame.columns \
  230. # and self.node_activePowerAvailable in confData.filter and not self.common.isNone(confData.filter[self.node_activePowerAvailable]):
  231. # state = confData.filter[self.node_activePowerAvailable]
  232. # dataFrame = dataFrame[dataFrame[confData.field_activePowerAvailable].isin(
  233. # state)]
  234. return dataFrame
  235. def filterCustomForTurbine(self, dataFrame: pd.DataFrame, conf: Contract):
  236. return self.filterCommon(dataFrame, conf)
  237. def analysisOfTurbine(self,
  238. outputAnalysisDir,
  239. outputFilePath,
  240. conf: Contract,
  241. turbineCode):
  242. return self.turbineAnalysis(outputAnalysisDir,
  243. outputFilePath, conf, turbineCode)
  244. def turbineAnalysis(self,
  245. outputAnalysisDir,
  246. outputFilePath,
  247. conf: Contract,
  248. turbineCode):
  249. return pd.DataFrame()
  250. def filterCustomForTurbines(self, dataFrame: pd.DataFrame, conf: Contract):
  251. return self.filterCommon(dataFrame, conf)
  252. def analysisOfTurbines(self, outputAnalysisDir, conf: Contract, turbineCodes):
  253. return self.turbinesAnalysis(outputAnalysisDir, conf, turbineCodes)
  254. def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
  255. return pd.DataFrame()
  256. def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst) -> pd.DataFrame:
  257. timeGranularity = next(
  258. (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None)
  259. return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity]
  260. def calculateAngleIncluded(self, array1, array2):
  261. """
  262. 计算两个相同长度角度数组中两两对应角度值的偏差。
  263. 结果限制在-90°到+90°之间,并保留两位小数。
  264. 参数:
  265. array1 (list): 第一个角度数组
  266. array2 (list): 第二个角度数组
  267. 返回:
  268. list: 两两对应角度的偏差列表
  269. """
  270. deviations = []
  271. for angle1, angle2 in zip(array1, array2):
  272. # 计算原始偏差
  273. deviation = angle1-angle2
  274. # 调整偏差,使其位于-180°到+180°范围内
  275. if deviation == 0.0:
  276. deviation = 0.0
  277. else:
  278. deviation = (deviation + 180) % 360 - 180
  279. # 将偏差限制在-90°到+90°范围内
  280. if deviation > 90:
  281. deviation -= 180
  282. elif deviation < -90:
  283. deviation += 180
  284. # 保留两位小数
  285. deviations.append(round(deviation, 2))
  286. return deviations
  287. def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos):
  288. """
  289. 依据机舱位置(角度)、风向计算两者夹角
  290. """
  291. if fieldNacellePos in dataFrame.columns and fieldWindDirect in dataFrame.columns and not dataFrame[fieldNacellePos].isna().all() and not dataFrame[fieldWindDirect].isna().all():
  292. # 计算 angle_included_list
  293. angle_included_list = self.calculateAngleIncluded(
  294. dataFrame[fieldWindDirect],dataFrame[fieldNacellePos] )
  295. # 检查 angle_included_list 的长度是否与 dataFrame 一致
  296. if len(angle_included_list) != len(dataFrame):
  297. raise ValueError(
  298. "The length of the calculated angle_included_list does not match the length of the DataFrame")
  299. # 创建一个新的列来保存计算结果
  300. dataFrame['Calculated_AngleIncluded'] = angle_included_list
  301. # 使用新列填充原列的缺失值
  302. dataFrame[Field_AngleIncluded] = dataFrame[Field_AngleIncluded].fillna(
  303. dataFrame['Calculated_AngleIncluded'])
  304. # 删除临时列
  305. dataFrame.drop(columns=['Calculated_AngleIncluded'], inplace=True)
  306. def recalculationOfFieldPowerFloor(self, dataFrame : pd.DataFrame, fieldActivePower):
  307. """
  308. 功率计算
  309. """
  310. if fieldActivePower in dataFrame.columns:
  311. dataFrame[Field_PowerFloor] = dataFrame[fieldActivePower].apply(
  312. lambda x: int(x / 10) * 10 if pd.notnull(x) or pd.notna(x) else np.nan # 保留NaN值
  313. )
  314. def recalculationOfFieldWindSpeedFloor(self, dataFrame : pd.DataFrame, fieldWindSpeed):
  315. """
  316. 风速计算
  317. """
  318. if fieldWindSpeed in dataFrame.columns:
  319. dataFrame[Field_WindSpeedFloor] = dataFrame[fieldWindSpeed].apply(
  320. lambda x: int(x/1)+0.5 if pd.notnull(x) or pd.notna(x) else np.nan
  321. )
  322. def recalculationOfGeneratorSpeed(self, turbineModelInfo: pd.Series,dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
  323. """
  324. 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
  325. """
  326. if fieldGeneratorSpeed in dataFrame.columns and fieldRotorSpeed in dataFrame.columns:
  327. if turbineModelInfo[Field_MotionType]==2 :
  328. dataFrame[fieldGeneratorSpeed] =dataFrame[fieldGeneratorSpeed].fillna(dataFrame[fieldRotorSpeed])
  329. # dataFrame[fieldGeneratorSpeed] =dataFrame[fieldRotorSpeed]
  330. else:
  331. dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna(
  332. dataFrame[fieldRotorSpeed]*rotationalSpeedRatio)
  333. # dataFrame[fieldGeneratorSpeed] = dataFrame[fieldRotorSpeed]*rotationalSpeedRatio
  334. def recalculationOfRotorSpeed(self,turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
  335. """
  336. 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
  337. """
  338. if fieldRotorSpeed in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
  339. if turbineModelInfo[Field_MotionType]==2 :
  340. dataFrame[fieldRotorSpeed] =dataFrame[fieldRotorSpeed].fillna(dataFrame[fieldGeneratorSpeed])
  341. else:
  342. dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna(
  343. dataFrame[fieldGeneratorSpeed] / rotationalSpeedRatio)
  344. def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed):
  345. """
  346. 风电机组发电机转矩计算,P的单位换成KW转矩计算公式:
  347. P*1000= pi/30*T*n
  348. 30000/pi*P=T*n
  349. 30000/3.1415926*P=T*n
  350. 9549.297*p=T*n
  351. 其中:n为发电机转速,p为有功功率,T为转矩
  352. """
  353. if fieldGeneratorTorque in dataFrame.columns and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
  354. dataFrame[Field_GeneratorTorque] = dataFrame[Field_GeneratorTorque].fillna(
  355. 9549.297 * dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed])
  356. def recalculation(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame):
  357. """
  358. 再计算数据测点
  359. 参数:
  360. dataFrame 原始数据
  361. conf 配置数据
  362. """
  363. self.recalculationOfFieldPowerFloor(dataFrame, Field_ActiverPower)
  364. self.recalculationOfFieldWindSpeedFloor(dataFrame, Field_WindSpeed)
  365. self.recalculationOfGeneratorSpeed(turbineModelInfo,dataFrame, Field_RotorSpeed, Field_GeneratorSpeed,self.turbineModelInfo[Field_RSR].iloc[0])
  366. self.recalculationOfRotorSpeed(turbineModelInfo,dataFrame, Field_RotorSpeed, Field_GeneratorSpeed,self.turbineModelInfo[Field_RSR].iloc[0])
  367. self.recalculationOfRotorTorque(dataFrame, Field_GeneratorTorque, Field_ActiverPower, Field_GeneratorSpeed)
  368. self.recalculationOfIncludedAngle(dataFrame, Field_AngleIncluded, Field_WindDirection, Field_NacPos)
  369. self.common.calculateTSR(dataFrame, self.turbineModelInfo[Field_RotorDiameter].iloc[0])
  370. self.common.calculateCpOfSingleTurbine(dataFrame, self.currPowerFarmInfo[Field_AirDensity], turbineModelInfo[Field_RotorDiameter])
  371. def processDateTimeForAll(self, dataFrame: pd.DataFrame):
  372. dataFrame[Field_UnixYearMonth] = pd.to_datetime(dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
  373. dataFrame[Field_YearMonth] = dataFrame[Field_Time].dt.strftime('%Y-%m')
  374. dataFrame[Field_YearMonthDay] = dataFrame[Field_Time].dt.strftime('%Y-%m-%d')
  375. return dataFrame
  376. def processDateTime(self, dataFrame: pd.DataFrame, fieldTime : str = None):
  377. if Field_Time not in dataFrame.columns:
  378. self.logger.info("Field_Time is not in dataFrame columns, Do not handle time processing")
  379. return dataFrame
  380. if fieldTime is None:
  381. return self.processDateTimeForAll(dataFrame)
  382. timeSeries = pd.Series
  383. if fieldTime == Field_UnixYearMonth:
  384. timeSeries = pd.to_datetime(dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
  385. if fieldTime == Field_YearMonth:
  386. timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m')
  387. if fieldTime == Field_YearMonthDay:
  388. timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m-%d')
  389. dataFrame[fieldTime] = timeSeries
  390. return dataFrame
  391. def loadData(self, powerFarmID: str, timeGranularity: str, turbineCode: str, select: str, condition: str) -> pd.DataFrame:
  392. selectStr = ", ".join(f"{field}" for field in select)
  393. businessDB: DatabaseUtil = GetBusinessDbUtil()
  394. with businessDB.session_scope() as session:
  395. # query_text = f"""SELECT wind_turbine_number,time_stamp, active_power,wind_velocity,cabin_position,true_wind_direction, rotor_speed, generator_speed,actual_torque,yaw_error1,outside_cabin_temperature,cabin_temperature,main_bearing_temperature,gearboxmedium_speed_shaftbearing_temperature,gearbox_low_speed_shaft_bearing_temperature,gearbox_high_speed_shaft_bearing_temperature,generatordrive_end_bearing_temperature,generatornon_drive_end_bearing_temperature,generator_winding1_temperature,pitch_angle_blade_1,pitch_angle_blade_2,pitch_angle_blade_3,front_back_vibration_of_the_cabin,side_to_side_vibration_of_the_cabin
  396. # FROM `{dataBatchNum}_{timeGranularity}`
  397. # WHERE wind_turbine_number IN ({turbineCode}) AND {condition}"""
  398. query_text = f"""SELECT {selectStr}
  399. FROM `{powerFarmID}_{timeGranularity}`
  400. WHERE wind_turbine_number IN ({turbineCode}) AND {condition}"""
  401. query_result = session.execute(text(query_text)).fetchall()
  402. # select = [Field_DeviceCode, Field_Time, Field_ActiverPower, Field_WindSpeed, Field_NacPos, Field_WindDirection, Field_RotorSpeed, Field_GeneratorSpeed, Field_GeneratorTorque, Field_AngleIncluded, Field_EnvTemp, Field_NacTemp, Field_MainBearTemp, Field_GbMsBearTemp, Field_GbLsBearTemp,
  403. # Field_GbHsBearTemp, Field_GeneratorDE, Field_GeneratorNDE, Field_GenWiTemp1, Field_PitchAngel1, Field_PitchAngel2, Field_PitchAngel3, Field_NacFbVib, Field_NacLrVib]
  404. dataFrame = pd.DataFrame(query_result, columns=select)
  405. return dataFrame
  406. def dataProcess(self, powerFarmID: str, dataBatchNum: str, timeGranularity: str, turbineCode: str, select: str, customCondition) -> Tuple[pd.DataFrame, str, str]:
  407. self.logger.info(
  408. f"load data -> Time Granulary: {timeGranularity} Power Farm: {powerFarmID} Batch: {dataBatchNum} Turbine: {turbineCode} .")
  409. # Get current turbine info
  410. currTurbineInfo = self.common.getTurbineInfo(self.conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo)
  411. # Get turbine model info
  412. currTurbineModelInfo = self.common.getTurbineModelByTurbine(currTurbineInfo, self.turbineModelInfo)
  413. # select DB data
  414. dataFrameOfTurbine = self.loadData(powerFarmID, timeGranularity, f"'{turbineCode}'", select, customCondition)
  415. if dataFrameOfTurbine.empty:
  416. self.logger.warning(f"{turbineCode} Time Granulary: {timeGranularity} scada data is empty.")
  417. return pd.DataFrame(), dataBatchNum, turbineCode
  418. else:
  419. if timeGranularity in ['fault', 'warn']:
  420. return dataFrameOfTurbine, dataBatchNum, turbineCode
  421. else:
  422. # Add property to dataFrame
  423. self.addPropertyToDataFrame(dataFrameOfTurbine, currTurbineInfo, currTurbineModelInfo)
  424. # Add engine_name to dataFrame
  425. dataFrameOfTurbine[Field_NameOfTurbine] = currTurbineInfo.loc[Field_NameOfTurbine]
  426. # Additional data processing steps
  427. self.processDateTime(dataFrameOfTurbine)
  428. # Recalculation
  429. self.recalculation(currTurbineModelInfo, dataFrameOfTurbine)
  430. # Filter data
  431. dataFrameOfTurbine = self.filterCommon(dataFrameOfTurbine, self.conf)
  432. return dataFrameOfTurbine, dataBatchNum, turbineCode
  433. def selectTimeCondition(self,conf: Contract, conditions: list[str]):
  434. """
  435. 时间过滤条件组装
  436. 从json配置中获取时间过滤条件进行组装
  437. """
  438. # 时间过滤条件
  439. if conf.dataContract.dataFilter.beginTime:
  440. conditions.append(f"time_stamp >= '{conf.dataContract.dataFilter.beginTime}'")
  441. if conf.dataContract.dataFilter.endTime:
  442. conditions.append(f"time_stamp <= '{conf.dataContract.dataFilter.endTime}'")
  443. # 排除月份
  444. if conf.dataContract.dataFilter.excludingMonths:
  445. excluding_months = ", ".join(f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
  446. excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
  447. conditions.append(excluding_condition)
  448. def selectLabCondition(self,conditions: list[str]):
  449. """
  450. lab 过滤条件组装
  451. 根据lab获取相应的数据
  452. 逻辑实现在相应的Behavior子类中,子类没有实现则无需过滤
  453. """
  454. return
  455. def selectAllCondition(self,conf: Contract):
  456. conditions = []
  457. # 时间过滤条件
  458. self.selectTimeCondition(conf,conditions)
  459. #lab过滤
  460. self.selectLabCondition(conditions)
  461. return " AND ".join(conditions) if conditions else "1=1"
  462. def selectFaultTimeCondition(self,conf: Contract, conditions: list[str]):
  463. """
  464. 时间过滤条件组装
  465. 从json配置中获取时间过滤条件进行组装
  466. """
  467. # 时间过滤条件
  468. if conf.dataContract.dataFilter.beginTime:
  469. conditions.append(f"begin_time >= '{conf.dataContract.dataFilter.beginTime}'")
  470. if conf.dataContract.dataFilter.endTime:
  471. conditions.append(f"end_time <= '{conf.dataContract.dataFilter.endTime}'")
  472. # 排除月份
  473. if conf.dataContract.dataFilter.excludingMonths:
  474. excluding_months = ", ".join(f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
  475. excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
  476. conditions.append(excluding_condition)
  477. # 故障数据过滤条件
  478. def selectAllFaultCondition(self,conf: Contract):
  479. conditions = []
  480. # 时间过滤条件
  481. self.selectFaultTimeCondition(conf,conditions)
  482. #lab过滤
  483. self.selectLabCondition(conditions)
  484. return " AND ".join(conditions) if conditions else "1=1"
  485. def processTurbineData(self, turbines, conf: Contract, select: str):
  486. try:
  487. # add "where" condition
  488. select_conditions = self.selectAllCondition(conf)
  489. configAnalysisDF = pd.DataFrame(
  490. [config.to_dict() for config in conf.dataContract.configAnalysis])
  491. scadaTimeGranularities = configAnalysisDF["scada"].unique()
  492. dictionary = dict()
  493. for timeGranularity in scadaTimeGranularities:
  494. dataFrames = []
  495. dataFrameOfTurbines = pd.DataFrame()
  496. if timeGranularity in ['fault', 'warn']:
  497. select_conditions=self.selectAllFaultCondition(conf)
  498. maxWorkers = 5
  499. with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
  500. futures = [
  501. executor.submit(self.dataProcess, conf.dataContract.dataFilter.powerFarmID,
  502. conf.dataContract.dataFilter.dataBatchNum, timeGranularity, turbine, select, select_conditions)
  503. for turbine in turbines
  504. ]
  505. for future in concurrent.futures.as_completed(futures):
  506. try:
  507. dataFrameOfTurbine, dataBatchNum, turbineCode = future.result()
  508. dataFrames.append(dataFrameOfTurbine)
  509. self.logger.info(f"data frame append,dataBatchNum: {dataBatchNum} turbineCode: {turbineCode}")
  510. except Exception as exc:
  511. raise exc
  512. dataFrameOfTurbines = pd.concat(dataFrames, ignore_index=True)
  513. self.logger.info(f"data frame concat dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} finish")
  514. if dataFrameOfTurbines.empty:
  515. excption = CustomError(102)
  516. self.logger.warning(
  517. f"{excption.message} Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch : {conf.dataContract.dataFilter.dataBatchNum} Time Granularity : {timeGranularity}")
  518. raise excption
  519. else:
  520. if Field_DeviceCode in dataFrameOfTurbines.columns and Field_CodeOfTurbine not in dataFrameOfTurbines.columns:
  521. dataFrameOfTurbines = dataFrameOfTurbines.rename(columns={Field_DeviceCode: Field_CodeOfTurbine})
  522. dictionary[timeGranularity] = dataFrameOfTurbines
  523. return dictionary
  524. except Exception as e:
  525. self.logger.error(f"Error processing turbine data: {e}")
  526. raise
  527. def addPropertyToDataFrame(self,dataFrameOfTurbine : pd.DataFrame, currTurbineInfo : pd.Series, currTurbineModelInfo: pd.Series):
  528. """
  529. 用来添加额外当前风机属性
  530. 在business中相应的分析员中有实现,没有实现就无额外参数添加
  531. """
  532. return
  533. def escape_special_characters(self, original_string:str):
  534. """
  535. ---废弃---
  536. 特殊字符url编码处理
  537. "/" 符号单独处理
  538. """
  539. retrun_string = quote(original_string)
  540. if "/" in retrun_string:
  541. retrun_string = retrun_string.replace("/","%2F")
  542. return retrun_string