baseAnalyst.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. import concurrent.futures
  2. import logging
  3. from abc import ABC, abstractmethod
  4. from logging import Logger
  5. from typing import Any, Tuple
  6. from urllib.parse import quote
  7. import numpy as np
  8. import pandas as pd
  9. import traceback
  10. from algorithmContract.confBusiness import *
  11. from algorithmContract.configAnalysis import ConfigAnalysis
  12. from algorithmContract.contract import Contract
  13. from behavior.dataMarker import DataMarker
  14. from common.appConfig import GetBusinessDbUtil
  15. from common.commonBusiness import CommonBusiness
  16. from numpy.typing import NDArray
  17. from sqlalchemy.sql import text
  18. from utils.directoryUtil import DirectoryUtil as dir
  19. from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
  20. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  21. class BaseAnalyst(ABC):
  22. def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract,
  23. powerFarmInfo: pd.DataFrame, turbineInfo: pd.DataFrame, turbineModelInfo: pd.DataFrame, dataTransfer: pd.DataFrame,
  24. weatherStationInfo: pd.DataFrame, dataFrameContractOfTurbine: pd.DataFrame):
  25. self.logger = logger
  26. self.dbUtil = dbUtil
  27. self.minioUtil = minioUtil
  28. self.conf = conf
  29. # 定义风速区间
  30. self.binsWindSpeed: NDArray[np.floating[Any]] = np.arange(0, 26, 0.5)
  31. self.dataMarker = DataMarker()
  32. self.common = CommonBusiness()
  33. self.customFilterStatusOfTurbine = "valueTurbineStatus"
  34. self.customFilterPitchAngle = "valuePitchAngle"
  35. self.customFilterWindSpeed = "valueWindSpeed"
  36. self.customFilterActivePower = "valueActivePower"
  37. self.customFilterGeneratorSpeed = "valueGeneratorSpeed"
  38. # 加载所有新能源场站信息
  39. self.powerFarmInfo = powerFarmInfo
  40. self.currPowerFarmInfo = self.common.getPowerFarm(
  41. self.conf.dataContract.dataFilter.powerFarmID, self.powerFarmInfo)
  42. # 加载所有风电机组信息
  43. self.turbineInfo = turbineInfo
  44. # 加载数据转换信息
  45. # self.dataTransfer = dataTransfer
  46. # 加载机型信息
  47. self.turbineModelInfo = turbineModelInfo
  48. # 加载所有测风塔信息
  49. self.weatherStationInfo = weatherStationInfo
  50. # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
  51. self.dataFrameContractOfTurbine = dataFrameContractOfTurbine
  52. # 图表 轴系设置
  53. # 常值
  54. self.RatedPowerOfTurbine = self.turbineInfo[Field_RatedPower].iloc[0]
  55. # 图表 轴系设置
  56. # 轴系 发电机转速
  57. iGeneratorSpeed = "IgeneratorSpeed"
  58. dGeneratorSpeed = "DgeneratorSpeed"
  59. if self.turbineModelInfo[Field_MotionType] .iloc[0] == 2:
  60. # 直驱 发电机转速
  61. self.axisStepGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone(
  62. conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone(
  63. conf.dataContract.graphSets[dGeneratorSpeed].step) else 1
  64. self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].min if not self.common.isNone(
  65. conf.dataContract.graphSets[dGeneratorSpeed].min) else 5
  66. self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone(
  67. conf.dataContract.graphSets[dGeneratorSpeed].max) else 25
  68. else:
  69. # 非直驱 发电机转速
  70. self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone(
  71. conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone(
  72. conf.dataContract.graphSets[iGeneratorSpeed].step) else 200
  73. self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].min if not self.common.isNone(
  74. conf.dataContract.graphSets[iGeneratorSpeed].min) else 1000
  75. self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone(
  76. conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000
  77. # --------以下为原写法,未区分驱动方式直接选择了非直驱--------
  78. # # 直驱 发电机转速
  79. # self.axisStepGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone(
  80. # conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone(
  81. # conf.dataContract.graphSets[dGeneratorSpeed].step) else 5
  82. # self.axisLowerLimitGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].min if not self.common.isNone(
  83. # conf.dataContract.graphSets[dGeneratorSpeed].min) else 0
  84. # self.axisUpperLimitGeneratorSpeedWithDirect = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone(
  85. # conf.dataContract.graphSets[dGeneratorSpeed].max) else 30
  86. # # 非直驱 发电机转速
  87. # self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone(
  88. # conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone(
  89. # conf.dataContract.graphSets[iGeneratorSpeed].step) else 200
  90. # self.axisLowerLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].min if not self.common.isNone(
  91. # conf.dataContract.graphSets[iGeneratorSpeed].min) else 1000
  92. # self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone(
  93. # conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000
  94. # 轴系 发电机转矩
  95. iGgeneratorTorque = "IgeneratorTorque"
  96. dGgeneratorTorque = "DgeneratorTorque"
  97. # if turbineModelInfo[Field_MotionType].iloc[0]==2 :
  98. # # 直驱 发电机转矩
  99. # self.axisStepGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].step if not self.common.isNone(
  100. # conf.dataContract.graphSets[dGgeneratorTorque]) and not self.common.isNone(
  101. # conf.dataContract.graphSets[dGgeneratorTorque].step) else 10000
  102. # self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].min if not self.common.isNone(
  103. # conf.dataContract.graphSets[dGgeneratorTorque].min) else 0
  104. # self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone(
  105. # conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000
  106. # else:
  107. # # 非直驱 发电机转矩
  108. # self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone(
  109. # conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone(
  110. # conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000
  111. # self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].min if not self.common.isNone(
  112. # conf.dataContract.graphSets[iGgeneratorTorque].min) else 0
  113. # self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].max if not self.common.isNone(
  114. # conf.dataContract.graphSets[iGgeneratorTorque].max) else 12000
  115. # --------以下为原写法,未区分驱动方式直接选择了非直驱--------
  116. # 直驱 发电机转矩
  117. self.axisStepGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].step if not self.common.isNone(
  118. conf.dataContract.graphSets[dGgeneratorTorque]) and not self.common.isNone(
  119. conf.dataContract.graphSets[dGgeneratorTorque].step) else 10000
  120. self.axisLowerLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].min if not self.common.isNone(
  121. conf.dataContract.graphSets[dGgeneratorTorque].min) else 0
  122. self.axisUpperLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone(
  123. conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000
  124. # 非直驱 发电机转矩
  125. self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone(
  126. conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone(
  127. conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000
  128. self.axisLowerLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].min if not self.common.isNone(
  129. conf.dataContract.graphSets[iGgeneratorTorque].min) else 0
  130. self.axisUpperLimitGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].max if not self.common.isNone(
  131. conf.dataContract.graphSets[iGgeneratorTorque].max) else 12000
  132. # 轴系 有功功率
  133. activePower = "activePower"
  134. self.axisStepActivePower = conf.dataContract.graphSets[activePower].step if not self.common.isNone(
  135. conf.dataContract.graphSets[activePower]) and not self.common.isNone(
  136. conf.dataContract.graphSets[activePower].step) else 250
  137. self.axisLowerLimitActivePower = conf.dataContract.graphSets[activePower].min if not self.common.isNone(
  138. conf.dataContract.graphSets[activePower].min) else 0
  139. self.axisUpperLimitActivePower = conf.dataContract.graphSets[activePower].max if not self.common.isNone(
  140. conf.dataContract.graphSets[activePower].max) else self.RatedPowerOfTurbine*1.2
  141. # 轴系 桨距角
  142. pitchAngle = "pitchAngle"
  143. self.axisStepPitchAngle = conf.dataContract.graphSets[pitchAngle].step if not self.common.isNone(
  144. conf.dataContract.graphSets[pitchAngle]) and not self.common.isNone(
  145. conf.dataContract.graphSets[pitchAngle].step) else 2
  146. self.axisLowerLimitPitchAngle = conf.dataContract.graphSets[pitchAngle].min if not self.common.isNone(
  147. conf.dataContract.graphSets[pitchAngle].min) else -2
  148. self.axisUpperLimitPitchAngle = conf.dataContract.graphSets[pitchAngle].max if not self.common.isNone(
  149. conf.dataContract.graphSets[pitchAngle].max) else 28
  150. # 轴系 风能利用系数
  151. cp = "cp"
  152. self.axisStepCp = conf.dataContract.graphSets[cp].step if not self.common.isNone(
  153. conf.dataContract.graphSets[cp]) and not self.common.isNone(
  154. conf.dataContract.graphSets[cp].step) else 0.1
  155. self.axisLowerLimitCp = conf.dataContract.graphSets[cp].min if not self.common.isNone(
  156. conf.dataContract.graphSets[cp].min) else 0
  157. self.axisUpperLimitCp = conf.dataContract.graphSets[cp].max if not self.common.isNone(
  158. conf.dataContract.graphSets[cp].max) else 1
  159. # 轴系 叶尖速比
  160. tsr = "tsr"
  161. self.axisStepTSR = conf.dataContract.graphSets[tsr].step if not self.common.isNone(
  162. conf.dataContract.graphSets[tsr]) and not self.common.isNone(
  163. conf.dataContract.graphSets[tsr].step) else 5
  164. self.axisLowerLimitTSR = conf.dataContract.graphSets[tsr].min if not self.common.isNone(
  165. conf.dataContract.graphSets[tsr].min) else 0
  166. self.axisUpperLimitTSR = conf.dataContract.graphSets[tsr].max if not self.common.isNone(
  167. conf.dataContract.graphSets[tsr].max) else 20
  168. @abstractmethod
  169. def typeAnalyst(self):
  170. pass
  171. def getOutputAnalysisDir(self):
  172. """
  173. 获取当前分析的输出目录
  174. """
  175. outputAnalysisDir = f"output/{self.conf.dataContract.dataFilter.powerFarmID}/{self.conf.dataContract.dataFilter.dataBatchNum}/{self.typeAnalyst()}/{self.conf.dataContract.autoOrManual}"
  176. if not dir.check_directory_exists(outputAnalysisDir):
  177. dir.create_directory(outputAnalysisDir)
  178. return outputAnalysisDir
  179. def filterCommon(self, dataFrame: pd.DataFrame, conf: Contract):
  180. # Filter rows where turbine state
  181. if Field_StatusOfTurbine in dataFrame.columns and self.customFilterStatusOfTurbine in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterStatusOfTurbine]):
  182. stateTurbine = conf.dataContract.dataFilter.customFilter[self.customFilterStatusOfTurbine]
  183. dataFrame = dataFrame[dataFrame[Field_StatusOfTurbine].isin(
  184. stateTurbine)]
  185. # Filter rows where wind speed
  186. if Field_WindSpeed in dataFrame.columns and self.customFilterWindSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].min):
  187. windSpeedMin = conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].min
  188. dataFrame = dataFrame[(
  189. dataFrame[Field_WindSpeed] >= windSpeedMin)]
  190. if Field_WindSpeed in dataFrame.columns and self.customFilterWindSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].max):
  191. windSpeedMax = conf.dataContract.dataFilter.customFilter[self.customFilterWindSpeed].max
  192. dataFrame = dataFrame[(
  193. dataFrame[Field_WindSpeed] < windSpeedMax)]
  194. # Filter rows where pitch
  195. if Field_PitchAngel1 in dataFrame.columns and self.customFilterPitchAngle in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle].min):
  196. anglePitchMin = conf.dataContract.dataFilter.customFilter[
  197. self.customFilterPitchAngle].min
  198. dataFrame = dataFrame[(
  199. dataFrame[Field_PitchAngel1] >= anglePitchMin)]
  200. if Field_PitchAngel1 in dataFrame.columns and self.customFilterPitchAngle in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterPitchAngle].max):
  201. anglePitchMax = conf.dataContract.dataFilter.customFilter[
  202. self.customFilterPitchAngle].max
  203. dataFrame = dataFrame[(
  204. dataFrame[Field_PitchAngel1] < anglePitchMax)]
  205. # Filter rows where power
  206. if Field_ActiverPower in dataFrame.columns and self.customFilterActivePower in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower].min):
  207. activePowerMin = conf.dataContract.dataFilter.customFilter[
  208. self.customFilterActivePower].min
  209. dataFrame = dataFrame[(
  210. dataFrame[Field_ActiverPower] >= activePowerMin)]
  211. if Field_ActiverPower in dataFrame.columns and self.customFilterActivePower in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterActivePower].max):
  212. activePowerMax = conf.dataContract.dataFilter.customFilter[
  213. self.customFilterActivePower].max
  214. dataFrame = dataFrame[(
  215. dataFrame[Field_ActiverPower] < activePowerMax)]
  216. # Filter rows where generator speed
  217. if Field_GeneratorSpeed in dataFrame.columns and self.customFilterGeneratorSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed].min):
  218. speedGeneratorMin = conf.dataContract.dataFilter.customFilter[
  219. self.customFilterGeneratorSpeed].min
  220. dataFrame = dataFrame[(
  221. dataFrame[Field_GeneratorSpeed] >= speedGeneratorMin)]
  222. if Field_GeneratorSpeed in dataFrame.columns and self.customFilterGeneratorSpeed in conf.dataContract.dataFilter.customFilter.keys() and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed]) and not self.common.isNone(conf.dataContract.dataFilter.customFilter[self.customFilterGeneratorSpeed].max):
  223. speedGeneratorMax = conf.dataContract.dataFilter.customFilter[
  224. self.customFilterGeneratorSpeed].max
  225. dataFrame = dataFrame[(
  226. dataFrame[Field_GeneratorSpeed] < speedGeneratorMax)]
  227. # if not self.common.isNone(confData.field_activePowerSet) and confData.field_activePowerSet in dataFrame.columns:
  228. # dataFrame = dataFrame[dataFrame[confData.field_activePowerSet]
  229. # == confData.rated_power]
  230. # if not self.common.isNone(confData.field_activePowerAvailable) and confData.field_activePowerAvailable in dataFrame.columns \
  231. # and self.node_activePowerAvailable in confData.filter and not self.common.isNone(confData.filter[self.node_activePowerAvailable]):
  232. # state = confData.filter[self.node_activePowerAvailable]
  233. # dataFrame = dataFrame[dataFrame[confData.field_activePowerAvailable].isin(
  234. # state)]
  235. return dataFrame
  236. def filterCustomForTurbine(self, dataFrame: pd.DataFrame, conf: Contract):
  237. return self.filterCommon(dataFrame, conf)
  238. def analysisOfTurbine(self,
  239. outputAnalysisDir,
  240. outputFilePath,
  241. conf: Contract,
  242. turbineCode):
  243. return self.turbineAnalysis(outputAnalysisDir,
  244. outputFilePath, conf, turbineCode)
  245. def turbineAnalysis(self,
  246. outputAnalysisDir,
  247. outputFilePath,
  248. conf: Contract,
  249. turbineCode):
  250. return pd.DataFrame()
  251. def filterCustomForTurbines(self, dataFrame: pd.DataFrame, conf: Contract):
  252. return self.filterCommon(dataFrame, conf)
  253. def analysisOfTurbines(self, outputAnalysisDir, conf: Contract, turbineCodes):
  254. # self.logger.info(
  255. # f"typeAnalyst: {self.typeAnalyst()} method: analysisOfTurbines , turbineCodes : {turbineCodes}")
  256. return self.turbinesAnalysis(outputAnalysisDir, conf, turbineCodes)
  257. def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
  258. return pd.DataFrame()
  259. def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst) -> pd.DataFrame:
  260. timeGranularity = next(
  261. (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None)
  262. return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity]
  263. def calculateAngleIncluded(self, array1, array2):
  264. """
  265. 计算两个相同长度角度数组中两两对应角度值的偏差。
  266. 结果限制在-90°到+90°之间,并保留两位小数。
  267. 参数:
  268. array1 (list): 第一个角度数组
  269. array2 (list): 第二个角度数组
  270. 返回:
  271. list: 两两对应角度的偏差列表
  272. """
  273. deviations = []
  274. for angle1, angle2 in zip(array1, array2):
  275. # 计算原始偏差
  276. deviation = angle1-angle2
  277. # 调整偏差,使其位于-180°到+180°范围内
  278. if deviation == 0.0:
  279. deviation = 0.0
  280. else:
  281. deviation = (deviation + 180) % 360 - 180
  282. # 将偏差限制在-90°到+90°范围内
  283. if deviation > 90:
  284. deviation -= 180
  285. elif deviation < -90:
  286. deviation += 180
  287. # 保留两位小数
  288. deviations.append(round(deviation, 2))
  289. return deviations
  290. def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos):
  291. """
  292. 依据机舱位置(角度)、风向计算两者夹角
  293. """
  294. if fieldNacellePos in dataFrame.columns and fieldWindDirect in dataFrame.columns and not dataFrame[fieldNacellePos].isna().all() and not dataFrame[fieldWindDirect].isna().all():
  295. # 计算 angle_included_list
  296. angle_included_list = self.calculateAngleIncluded(
  297. dataFrame[fieldWindDirect], dataFrame[fieldNacellePos])
  298. # 检查 angle_included_list 的长度是否与 dataFrame 一致
  299. if len(angle_included_list) != len(dataFrame):
  300. raise ValueError(
  301. "The length of the calculated angle_included_list does not match the length of the DataFrame")
  302. # 创建一个新的列来保存计算结果
  303. dataFrame['Calculated_AngleIncluded'] = angle_included_list
  304. # 使用新列填充原列的缺失值
  305. dataFrame[Field_AngleIncluded] = dataFrame[Field_AngleIncluded].fillna(
  306. dataFrame['Calculated_AngleIncluded'])
  307. # 删除临时列
  308. dataFrame.drop(columns=['Calculated_AngleIncluded'], inplace=True)
  309. def recalculationOfFieldPowerFloor(self, dataFrame: pd.DataFrame, fieldActivePower):
  310. """
  311. 功率计算
  312. """
  313. if fieldActivePower in dataFrame.columns:
  314. dataFrame[Field_PowerFloor] = dataFrame[fieldActivePower].apply(
  315. lambda x: int(
  316. x / 10) * 10 if pd.notnull(x) or pd.notna(x) else np.nan # 保留NaN值
  317. )
  318. def recalculationOfFieldWindSpeedFloor(self, dataFrame: pd.DataFrame, fieldWindSpeed):
  319. """
  320. 风速计算
  321. """
  322. if fieldWindSpeed in dataFrame.columns:
  323. dataFrame[Field_WindSpeedFloor] = dataFrame[fieldWindSpeed].apply(
  324. lambda x: int(x/1) +
  325. 0.5 if pd.notnull(x) or pd.notna(x) else np.nan
  326. )
  327. def recalculationOfGeneratorSpeed(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
  328. """
  329. 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
  330. """
  331. if fieldGeneratorSpeed in dataFrame.columns and fieldRotorSpeed in dataFrame.columns:
  332. if turbineModelInfo[Field_MotionType] == 2:
  333. dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna(
  334. dataFrame[fieldRotorSpeed])
  335. # dataFrame[fieldGeneratorSpeed] =dataFrame[fieldRotorSpeed]
  336. else:
  337. dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna(
  338. dataFrame[fieldRotorSpeed]*rotationalSpeedRatio)
  339. # dataFrame[fieldGeneratorSpeed] = dataFrame[fieldRotorSpeed]*rotationalSpeedRatio
  340. def recalculationOfRotorSpeed(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
  341. """
  342. 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
  343. """
  344. if fieldRotorSpeed in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
  345. if turbineModelInfo[Field_MotionType] == 2:
  346. dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna(
  347. dataFrame[fieldGeneratorSpeed])
  348. else:
  349. dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna(
  350. dataFrame[fieldGeneratorSpeed] / rotationalSpeedRatio)
  351. def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed):
  352. """
  353. 风电机组发电机转矩计算,P的单位换成KW转矩计算公式:
  354. P*1000= pi/30*T*n
  355. 30000/pi*P=T*n
  356. 30000/3.1415926*P=T*n
  357. 9549.297*p=T*n
  358. 其中:n为发电机转速,p为有功功率,T为转矩
  359. """
  360. if fieldGeneratorTorque in dataFrame.columns and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
  361. dataFrame[Field_GeneratorTorque] = dataFrame[Field_GeneratorTorque].fillna(
  362. 9549.297 * dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed])
  363. def recalculation(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame):
  364. """
  365. 再计算数据测点
  366. 参数:
  367. dataFrame 原始数据
  368. conf 配置数据
  369. """
  370. self.recalculationOfFieldPowerFloor(dataFrame, Field_ActiverPower)
  371. self.recalculationOfFieldWindSpeedFloor(dataFrame, Field_WindSpeed)
  372. self.recalculationOfGeneratorSpeed(
  373. turbineModelInfo, dataFrame, Field_RotorSpeed, Field_GeneratorSpeed, self.turbineModelInfo[Field_RSR].iloc[0])
  374. self.recalculationOfRotorSpeed(turbineModelInfo, dataFrame, Field_RotorSpeed,
  375. Field_GeneratorSpeed, self.turbineModelInfo[Field_RSR].iloc[0])
  376. self.recalculationOfRotorTorque(
  377. dataFrame, Field_GeneratorTorque, Field_ActiverPower, Field_GeneratorSpeed)
  378. self.recalculationOfIncludedAngle(
  379. dataFrame, Field_AngleIncluded, Field_WindDirection, Field_NacPos)
  380. self.common.calculateTSR(
  381. dataFrame, self.turbineModelInfo[Field_RotorDiameter].iloc[0])
  382. self.common.calculateCpOfSingleTurbine(
  383. dataFrame, self.currPowerFarmInfo[Field_AirDensity], turbineModelInfo[Field_RotorDiameter])
  384. def processDateTimeForAll(self, dataFrame: pd.DataFrame):
  385. dataFrame[Field_UnixYearMonth] = pd.to_datetime(
  386. dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
  387. dataFrame[Field_YearMonth] = dataFrame[Field_Time].dt.strftime('%Y-%m')
  388. dataFrame[Field_YearMonthDay] = dataFrame[Field_Time].dt.strftime(
  389. '%Y-%m-%d')
  390. return dataFrame
  391. def processDateTime(self, dataFrame: pd.DataFrame, fieldTime: str = None):
  392. if Field_Time not in dataFrame.columns:
  393. self.logger.info(
  394. "Field_Time is not in dataFrame columns, Do not handle time processing")
  395. return dataFrame
  396. if fieldTime is None:
  397. return self.processDateTimeForAll(dataFrame)
  398. timeSeries = pd.Series
  399. if fieldTime == Field_UnixYearMonth:
  400. timeSeries = pd.to_datetime(
  401. dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
  402. if fieldTime == Field_YearMonth:
  403. timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m')
  404. if fieldTime == Field_YearMonthDay:
  405. timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m-%d')
  406. dataFrame[fieldTime] = timeSeries
  407. return dataFrame
  408. def loadData(self, powerFarmID: str, timeGranularity: str, turbineCode: str, select: str, condition: str) -> pd.DataFrame:
  409. selectStr = ", ".join(f"{field}" for field in select)
  410. businessDB: DatabaseUtil = GetBusinessDbUtil()
  411. with businessDB.session_scope() as session:
  412. # query_text = f"""SELECT wind_turbine_number,time_stamp, active_power,wind_velocity,cabin_position,true_wind_direction, rotor_speed, generator_speed,actual_torque,yaw_error1,outside_cabin_temperature,cabin_temperature,main_bearing_temperature,gearboxmedium_speed_shaftbearing_temperature,gearbox_low_speed_shaft_bearing_temperature,gearbox_high_speed_shaft_bearing_temperature,generatordrive_end_bearing_temperature,generatornon_drive_end_bearing_temperature,generator_winding1_temperature,pitch_angle_blade_1,pitch_angle_blade_2,pitch_angle_blade_3,front_back_vibration_of_the_cabin,side_to_side_vibration_of_the_cabin
  413. # FROM `{dataBatchNum}_{timeGranularity}`
  414. # WHERE wind_turbine_number IN ({turbineCode}) AND {condition}"""
  415. query_text = f"""SELECT {selectStr}
  416. FROM `{powerFarmID}_{timeGranularity}`
  417. WHERE wind_turbine_number IN ({turbineCode}) AND {condition}"""
  418. query_result = session.execute(text(query_text)).fetchall()
  419. # select = [Field_DeviceCode, Field_Time, Field_ActiverPower, Field_WindSpeed, Field_NacPos, Field_WindDirection, Field_RotorSpeed, Field_GeneratorSpeed, Field_GeneratorTorque, Field_AngleIncluded, Field_EnvTemp, Field_NacTemp, Field_MainBearTemp, Field_GbMsBearTemp, Field_GbLsBearTemp,
  420. # Field_GbHsBearTemp, Field_GeneratorDE, Field_GeneratorNDE, Field_GenWiTemp1, Field_PitchAngel1, Field_PitchAngel2, Field_PitchAngel3, Field_NacFbVib, Field_NacLrVib]
  421. dataFrame = pd.DataFrame(query_result, columns=select)
  422. return dataFrame
  423. def dataProcess(self, powerFarmID: str, dataBatchNum: str, timeGranularity: str, turbineCode: str, select: str, customCondition) -> Tuple[pd.DataFrame, str, str]:
  424. self.logger.info(
  425. f"load data -> Time Granulary: {timeGranularity} Power Farm: {powerFarmID} Batch: {dataBatchNum} Turbine: {turbineCode} .")
  426. # Get current turbine info
  427. currTurbineInfo = self.common.getTurbineInfo(
  428. self.conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo)
  429. # Get turbine model info
  430. currTurbineModelInfo = self.common.getTurbineModelByTurbine(
  431. currTurbineInfo, self.turbineModelInfo)
  432. # select DB data
  433. dataFrameOfTurbine = self.loadData(
  434. powerFarmID, timeGranularity, f"'{turbineCode}'", select, customCondition)
  435. if dataFrameOfTurbine.empty:
  436. self.logger.warning(
  437. f"{turbineCode} Time Granulary: {timeGranularity} scada data is empty.")
  438. return pd.DataFrame(), dataBatchNum, turbineCode
  439. else:
  440. if timeGranularity in ['fault', 'warn']:
  441. return dataFrameOfTurbine, dataBatchNum, turbineCode
  442. else:
  443. # Add property to dataFrame
  444. self.addPropertyToDataFrame(
  445. dataFrameOfTurbine, currTurbineInfo, currTurbineModelInfo)
  446. # Add engine_name to dataFrame
  447. dataFrameOfTurbine[Field_NameOfTurbine] = currTurbineInfo.loc[Field_NameOfTurbine]
  448. # Additional data processing steps
  449. self.processDateTime(dataFrameOfTurbine)
  450. # Recalculation
  451. self.recalculation(currTurbineModelInfo, dataFrameOfTurbine)
  452. # Filter data
  453. dataFrameOfTurbine = self.filterCommon(
  454. dataFrameOfTurbine, self.conf)
  455. return dataFrameOfTurbine, dataBatchNum, turbineCode
  456. def selectTimeCondition(self, conf: Contract, conditions: list[str]):
  457. """
  458. 时间过滤条件组装
  459. 从json配置中获取时间过滤条件进行组装
  460. """
  461. # 时间过滤条件
  462. if conf.dataContract.dataFilter.beginTime:
  463. conditions.append(
  464. f"time_stamp >= '{conf.dataContract.dataFilter.beginTime}'")
  465. if conf.dataContract.dataFilter.endTime:
  466. conditions.append(
  467. f"time_stamp <= '{conf.dataContract.dataFilter.endTime}'")
  468. # 排除月份
  469. if conf.dataContract.dataFilter.excludingMonths:
  470. excluding_months = ", ".join(
  471. f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
  472. excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
  473. conditions.append(excluding_condition)
  474. def selectLabCondition(self, conditions: list[str]):
  475. """
  476. lab 过滤条件组装
  477. 根据lab获取相应的数据
  478. 逻辑实现在相应的Behavior子类中,子类没有实现则无需过滤
  479. """
  480. return
  481. def selectAllCondition(self, conf: Contract):
  482. conditions = []
  483. # 时间过滤条件
  484. self.selectTimeCondition(conf, conditions)
  485. # lab过滤
  486. self.selectLabCondition(conditions)
  487. return " AND ".join(conditions) if conditions else "1=1"
  488. def selectFaultTimeCondition(self, conf: Contract, conditions: list[str]):
  489. """
  490. 时间过滤条件组装
  491. 从json配置中获取时间过滤条件进行组装
  492. """
  493. # 时间过滤条件
  494. if conf.dataContract.dataFilter.beginTime:
  495. conditions.append(
  496. f"begin_time >= '{conf.dataContract.dataFilter.beginTime}'")
  497. if conf.dataContract.dataFilter.endTime:
  498. conditions.append(
  499. f"end_time <= '{conf.dataContract.dataFilter.endTime}'")
  500. # 排除月份
  501. if conf.dataContract.dataFilter.excludingMonths:
  502. excluding_months = ", ".join(
  503. f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
  504. excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
  505. conditions.append(excluding_condition)
  506. # 故障数据过滤条件
  507. def selectAllFaultCondition(self, conf: Contract):
  508. conditions = []
  509. # 时间过滤条件
  510. self.selectFaultTimeCondition(conf, conditions)
  511. # lab过滤
  512. self.selectLabCondition(conditions)
  513. return " AND ".join(conditions) if conditions else "1=1"
  514. def processTurbineData(self, turbines, conf: Contract, select: str):
  515. try:
  516. # add "where" condition
  517. select_conditions = self.selectAllCondition(conf)
  518. configAnalysisDF = pd.DataFrame(
  519. [config.to_dict() for config in conf.dataContract.configAnalysis])
  520. configAnalysisDF = configAnalysisDF[(configAnalysisDF["className"] == self.__class__.__name__)]
  521. scadaTimeGranularities = configAnalysisDF["scada"].unique()
  522. self.logger.info(
  523. f"typeAnalyst: {self.typeAnalyst()} method: processTurbineData , scadaTimeGranularities : {scadaTimeGranularities} current class : {self.__class__.__name__}" )
  524. dictionary = dict()
  525. for timeGranularity in scadaTimeGranularities:
  526. dataFrames = []
  527. dataFrameOfTurbines = pd.DataFrame()
  528. if timeGranularity in ['fault', 'warn']:
  529. select_conditions = self.selectAllFaultCondition(conf)
  530. maxWorkers = 5
  531. with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
  532. futures = [
  533. executor.submit(self.dataProcess, conf.dataContract.dataFilter.powerFarmID,
  534. conf.dataContract.dataFilter.dataBatchNum, timeGranularity, turbine, select, select_conditions)
  535. for turbine in turbines
  536. ]
  537. for future in concurrent.futures.as_completed(futures):
  538. try:
  539. dataFrameOfTurbine, dataBatchNum, turbineCode = future.result()
  540. dataFrames.append(dataFrameOfTurbine)
  541. self.logger.info(
  542. f"typeAnalyst: {self.typeAnalyst()} data frame append,dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} turbineCode: {turbineCode}")
  543. except Exception as exc:
  544. raise exc
  545. dataFrameOfTurbines = pd.concat(dataFrames, ignore_index=True)
  546. self.logger.info(
  547. f"typeAnalyst: {self.typeAnalyst()} data frame concat dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} finish")
  548. if dataFrameOfTurbines.empty:
  549. excption = CustomError(102)
  550. self.logger.warning(
  551. f"{excption.message} typeAnalyst: {self.typeAnalyst()} Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch : {conf.dataContract.dataFilter.dataBatchNum} Time Granularity : {timeGranularity}")
  552. raise excption
  553. else:
  554. self.logger.info(
  555. f"typeAnalyst: {self.typeAnalyst()} data frame concat dataBatchNum: {dataBatchNum} timeGranularity: {timeGranularity} dataFrameOfTurbines : {dataFrameOfTurbines}")
  556. if Field_DeviceCode in dataFrameOfTurbines.columns and Field_CodeOfTurbine not in dataFrameOfTurbines.columns:
  557. dataFrameOfTurbines = dataFrameOfTurbines.rename(
  558. columns={Field_DeviceCode: Field_CodeOfTurbine})
  559. dictionary[timeGranularity] = dataFrameOfTurbines
  560. return dictionary
  561. except Exception as e:
  562. self.logger.error(
  563. f"Error processing turbine data:{traceback.format_exc()}")
  564. raise
  565. def addPropertyToDataFrame(self, dataFrameOfTurbine: pd.DataFrame, currTurbineInfo: pd.Series, currTurbineModelInfo: pd.Series):
  566. """
  567. 用来添加额外当前风机属性
  568. 在business中相应的分析员中有实现,没有实现就无额外参数添加
  569. """
  570. return
  571. def escape_special_characters(self, original_string: str):
  572. """
  573. ---废弃---
  574. 特殊字符url编码处理
  575. "/" 符号单独处理
  576. """
  577. retrun_string = quote(original_string)
  578. if "/" in retrun_string:
  579. retrun_string = retrun_string.replace("/", "%2F")
  580. return retrun_string