powerCurveAnalyst.py 19 KB


  1. import os
  2. import numpy as np
  3. import pandas as pd
  4. import plotly.graph_objects as go
  5. from algorithmContract.confBusiness import *
  6. from algorithmContract.contract import Contract
  7. from behavior.analystWithGoodPoint import AnalystWithGoodPoint
  8. from utils.jsonUtil import JsonUtil
  9. class PowerCurveAnalyst(AnalystWithGoodPoint):
  10. """
  11. 风电机组功率曲线散点分析。
  12. 秒级scada数据运算太慢,建议使用分钟级scada数据
  13. """
  14. def typeAnalyst(self):
  15. return "power_curve"
  16. def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
  17. dictionary = self.processTurbineData(turbineCodes, conf, [
  18. Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower])
  19. dataFrameOfTurbines = self.userDataFrame(
  20. dictionary, conf.dataContract.configAnalysis, self)
  21. # 检查所需列是否存在
  22. required_columns = {Field_WindSpeed, Field_ActiverPower}
  23. if not required_columns.issubset(dataFrameOfTurbines.columns):
  24. raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}")
  25. turbrineInfos = self.common.getTurbineInfos(
  26. conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
  27. groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode)
  28. returnDatas = []
  29. for turbineModelCode, group in groupedOfTurbineModel:
  30. currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist()
  31. currTurbineModeInfo = self.common.getTurbineModelByCode(
  32. turbineModelCode, self.turbineModelInfo)
  33. dataFrameOfContractPowerCurve = self.dataFrameContractOfTurbine[
  34. self.dataFrameContractOfTurbine[Field_MillTypeCode] == turbineModelCode]
  35. currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin(
  36. currTurbineCodes)]
  37. # 【新增筛选逻辑】 只在画图计算前清洗,剔除高风速下的低功率(停机)数据
  38. # +1. 获取额定风速。
  39. # 如果 confBusiness 中定义了 Field_RatedWindSpeed 且机型信息里有该字段,则使用;否则给一个默认值(例如 11m/s 或 12m/s),防止报错。
  40. rated_ws = 11 # 默认值,防报错
  41. if 'Field_RatedWindSpeed' in globals() and Field_RatedWindSpeed in currTurbineModeInfo:
  42. rated_ws = currTurbineModeInfo[Field_RatedWindSpeed]
  43. # +2. 执行过滤:保留 (风速 <= 额定风速) 或者 (风速 > 额定风速 且 功率 >= 20) 的数据
  44. # 先做一个 .copy() 防止 SettingWithCopyWarning
  45. currDataFrameOfTurbines = currDataFrameOfTurbines.copy()
  46. mask_bad_data = (currDataFrameOfTurbines[Field_WindSpeed] > rated_ws) & (currDataFrameOfTurbines[Field_ActiverPower] < 20)
  47. currDataFrameOfTurbines = currDataFrameOfTurbines[~mask_bad_data]
  48. powerCurveDataOfTurbines = self.dataReprocess(
  49. currDataFrameOfTurbines, self.binsWindSpeed)
  50. # returnData = self.drawOfPowerCurve(
  51. # powerCurveDataOfTurbines, outputAnalysisDir, conf, dataFrameOfContractPowerCurve, currTurbineModeInfo)
  52. # returnDatas.append(returnData)
  53. returnJsonData= self.outputPowerCurveData(conf,outputAnalysisDir,currTurbineModeInfo,powerCurveDataOfTurbines,dataFrameOfContractPowerCurve)
  54. returnDatas.append(returnJsonData)
  55. returnResult = pd.concat(returnDatas, ignore_index=True)
  56. return returnResult
  57. def outputPowerCurveData(self, conf: Contract, outputAnalysisDir: str, turbineModelInfo: pd.Series, powerCurveDataOfTurbines: pd.DataFrame, dataFrameOfContractPowerCurve: pd.DataFrame) -> pd.DataFrame:
  58. turbineCodes = powerCurveDataOfTurbines[Field_CodeOfTurbine].unique()
  59. jsonDictionary = self.convert2Json(turbineModelInfo,turbineCodes=turbineCodes,
  60. dataFrameOfTurbines=powerCurveDataOfTurbines, dataFrameOfContract=dataFrameOfContractPowerCurve)
  61. jsonFileName = f"power_curve-{turbineModelInfo[Field_MillTypeCode]}.json"
  62. jsonFilePath = os.path.join(outputAnalysisDir, jsonFileName)
  63. JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath)
  64. result_rows = []
  65. result_rows.append({
  66. Field_Return_TypeAnalyst: self.typeAnalyst(),
  67. Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
  68. Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
  69. Field_CodeOfTurbine: Const_Output_Total,
  70. Field_MillTypeCode:turbineModelInfo[Field_MillTypeCode],
  71. Field_Return_FilePath: jsonFilePath,
  72. Field_Return_IsSaveDatabase: True
  73. })
  74. for turbineCode in turbineCodes:
  75. data:pd.DataFrame=powerCurveDataOfTurbines[powerCurveDataOfTurbines[Field_CodeOfTurbine]==turbineCode]
  76. jsonFileName2 = f"power_curve-{data[Field_NameOfTurbine].iloc[0]}.json"
  77. jsonFilePath2 = os.path.join(outputAnalysisDir, jsonFileName2)
  78. JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath2)
  79. result_rows.append({
  80. Field_Return_TypeAnalyst: self.typeAnalyst(),
  81. Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
  82. Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
  83. Field_CodeOfTurbine: turbineCode,
  84. Field_Return_FilePath: jsonFilePath2,
  85. Field_Return_IsSaveDatabase: True
  86. })
  87. returnDatas = pd.DataFrame(result_rows)
  88. return returnDatas
  89. def convert2Json(self, turbineModelInfo: pd.Series,turbineCodes, dataFrameOfTurbines: pd.DataFrame, dataFrameOfContract: pd.DataFrame):
  90. result = {
  91. "analysisTypeCode":"功率曲线分析",
  92. "engineTypeCode": turbineModelInfo[Field_MillTypeCode] ,
  93. "engineTypeName": turbineModelInfo[Field_MachineTypeCode] ,
  94. "data": []
  95. }
  96. # 定义要替换的空值类型
  97. na_values = {pd.NA, float('nan')}
  98. # 从对象A提取数据
  99. for turbineCode in turbineCodes:
  100. data:pd.DataFrame=dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine]==turbineCode]
  101. engine_data = {
  102. "enginName": data[Field_NameOfTurbine].iloc[0],
  103. "enginCode": turbineCode,
  104. "xData": data[Field_WindSpeed].replace(na_values, None).tolist(),
  105. "yData": data[Field_ActiverPower].replace(na_values, None).tolist(),
  106. "zData": []
  107. }
  108. result["data"].append(engine_data)
  109. # 从对象B提取数据
  110. contract_curve = {
  111. "enginName": "合同功率曲线",
  112. "xData": dataFrameOfContract[Field_WindSpeed].replace(na_values, None).tolist(),
  113. "yData": dataFrameOfContract[Field_ActiverPower].replace(na_values, None).tolist(),
  114. "zData": []
  115. }
  116. result["data"].append(contract_curve)
  117. return result
  118. # def buildPowerCurveData(self, group: pd.DataFrame, fieldWindSpeed: str, fieldActivePower: str, bins) -> pd.DataFrame:
  119. # """
  120. # 计算设备的功率曲线。
  121. # """
  122. # powerCut = group.groupby(pd.cut(group[fieldWindSpeed], bins, labels=np.arange(0, 25.5, 0.5))).agg({
  123. # fieldActivePower: 'median',
  124. # fieldWindSpeed: ['median', 'count']
  125. # })
  126. # wind_count = powerCut[fieldWindSpeed]['count'].tolist()
  127. # line = powerCut[fieldActivePower]['median'].round(decimals=2).tolist()
  128. # act_line = pd.DataFrame([powerCut.index, wind_count, line]).T
  129. # act_line.columns = [Field_WindSpeed,
  130. # 'EffectiveQuantity', Field_ActiverPower]
  131. # return act_line
  132. def buildPowerCurveData(self, group: pd.DataFrame, fieldWindSpeed: str, fieldActivePower: str, bins) -> pd.DataFrame:
  133. """
  134. 计算设备的功率曲线。
  135. """
  136. # 1. 按照固定步长进行分箱统计
  137. # 注意:这里使用的是固定的 bins (0, 0.5, 1.0 ... 25.0),即使某区间没数据,也会生成一行索引,只是值为 NaN
  138. powerCut = group.groupby(pd.cut(group[fieldWindSpeed], bins, labels=np.arange(0, 25.5, 0.5))).agg({
  139. fieldActivePower: 'median',
  140. fieldWindSpeed: ['median', 'count']
  141. })
  142. # 2. 提取数据
  143. wind_count = powerCut[fieldWindSpeed]['count'].tolist()
  144. # 获取原始的中位数序列(包含 NaN)
  145. power_series = powerCut[fieldActivePower]['median']
  146. # 3. 处理不连续(NaN)的情况
  147. # 步骤 A: 线性插值 (Interpolate)
  148. # 解决中间的断档。例如:[1000, NaN, 1200] -> [1000, 1100, 1200]
  149. # limit_direction='forward' 表示只向后插值,防止低风速段无数据时胡乱填充
  150. power_series = power_series.interpolate(method='linear', limit_direction='forward')
  151. # 步骤 B: 前向填充 (Forward Fill)
  152. # 解决高风速段的断档。
  153. # 场景:筛选后,20m/s 以上全是 NaN。
  154. # 逻辑:既然是高风速,且之前已经达到了额定功率,那么后面缺失的值应该维持在最后一次观测到的功率(即额定功率)。
  155. power_series = power_series.ffill()
  156. # 步骤 C: (可选) 0值填充
  157. # 如果低风速段(开头)是 NaN,通常是因为没风,补 0
  158. power_series = power_series.fillna(0)
  159. line = power_series.round(decimals=2).tolist()
  160. # 4. 组装结果
  161. act_line = pd.DataFrame([powerCut.index, wind_count, line]).T
  162. act_line.columns = [Field_WindSpeed,
  163. 'EffectiveQuantity', Field_ActiverPower]
  164. return act_line
  165. def dataReprocess(self, dataFrameMerge: pd.DataFrame, binsWindSpeed) -> pd.DataFrame:
  166. # 初始化结果DataFrame
  167. dataFrames = []
  168. # 按设备名分组数据
  169. grouped = dataFrameMerge.groupby(
  170. [Field_NameOfTurbine, Field_CodeOfTurbine])
  171. # 计算每个设备的功率曲线
  172. for name, group in grouped:
  173. dataFramePowerCurveTurbine = self.buildPowerCurveData(
  174. group, Field_WindSpeed, Field_ActiverPower, binsWindSpeed)
  175. dataFramePowerCurveTurbine[Field_NameOfTurbine] = name[0]
  176. dataFramePowerCurveTurbine[Field_CodeOfTurbine] = name[1]
  177. dataFrames.append(dataFramePowerCurveTurbine)
  178. # 绘制全场功率曲线图
  179. dataFrameReprocess: pd.DataFrame = pd.concat(
  180. dataFrames, ignore_index=True).reset_index(drop=True)
  181. return dataFrameReprocess
  182. def drawOfPowerCurve(self, powerCurveOfTurbines: pd.DataFrame, outputAnalysisDir, conf: Contract, dataFrameGuaranteePowerCurve: pd.DataFrame, turbineModelInfo: pd.Series):
  183. """
  184. 生成功率曲线并保存为文件。
  185. 参数:
  186. frames (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。
  187. outputAnalysisDir (str): 分析输出目录。
  188. confData (ConfBusiness): 配置
  189. """
  190. # 绘制全场功率曲线图
  191. # ress =self.dataReprocess(dataFrameMerge,self.binsWindSpeed) # all_res.reset_index(drop=True)
  192. df1 = self.plot_power_curve(
  193. powerCurveOfTurbines, outputAnalysisDir, dataFrameGuaranteePowerCurve, Field_NameOfTurbine, conf, turbineModelInfo)
  194. # 绘制每个设备的功率曲线图
  195. grouped = powerCurveOfTurbines.groupby(
  196. [Field_NameOfTurbine, Field_CodeOfTurbine])
  197. df2 = pd.DataFrame() # 新建一个空表格,与返回的单图功率曲线合并
  198. for name, group in grouped:
  199. df_temp2 = self.plot_single_power_curve(
  200. powerCurveOfTurbines, group, dataFrameGuaranteePowerCurve, name, outputAnalysisDir, conf)
  201. df2 = pd.concat([df2, df_temp2], ignore_index=True)
  202. # 总图与单图的表格合并
  203. df = pd.concat([df1, df2], ignore_index=True)
  204. return df
  205. def plot_power_curve(self, ress, output_path, dataFrameGuaranteePowerCurve: pd.DataFrame, Field_NameOfTurbine, conf: Contract, turbineModelInfo: pd.Series):
  206. """
  207. 绘制全场功率曲线图。
  208. """
  209. # colors = px.colors.sequential.Turbo
  210. fig = go.Figure()
  211. for turbine_num in ress[Field_NameOfTurbine].unique():
  212. turbine_data = ress[ress[Field_NameOfTurbine] == turbine_num]
  213. # 循环创建风速-功率折线
  214. fig.add_trace(go.Scatter(
  215. x=turbine_data[Field_WindSpeed],
  216. y=turbine_data[Field_ActiverPower],
  217. mode='lines',
  218. # line=dict(color=colors[idx % len(colors)]),
  219. name=f'{turbine_num}' # 使用风电机组编号作为图例的名称
  220. )
  221. )
  222. if not ress.empty and Field_CutInWS in ress.columns and ress[Field_CutInWS].notna().any():
  223. cut_in_ws = ress[Field_CutInWS].min() - 1
  224. else:
  225. cut_in_ws = 2
  226. fig.add_trace(go.Scatter(
  227. x=dataFrameGuaranteePowerCurve[Field_WindSpeed],
  228. y=dataFrameGuaranteePowerCurve[Field_ActiverPower],
  229. # mode='lines',
  230. # line=dict(color='red', dash='dash'),
  231. mode='lines+markers',
  232. line=dict(color='red'),
  233. marker=dict(color='red', size=5),
  234. name='合同功率曲线',
  235. showlegend=True
  236. )
  237. )
  238. # 创建布局
  239. fig.update_layout(
  240. title={
  241. "text": f'功率曲线-{turbineModelInfo[Field_MachineTypeCode]}',
  242. 'x': 0.5
  243. },
  244. # legend_title='Turbine',
  245. xaxis=dict(
  246. title='风速',
  247. dtick=1,
  248. tickangle=-45,
  249. range=[cut_in_ws, 25]
  250. ),
  251. yaxis=dict(
  252. title='有功功率',
  253. dtick=self.axisStepActivePower,
  254. range=[self.axisLowerLimitActivePower,
  255. self.axisUpperLimitActivePower]
  256. ),
  257. legend=dict(
  258. orientation="h", # Horizontal orientation
  259. xanchor="center", # Anchor the legend to the center
  260. x=0.5, # Position legend at the center of the x-axis
  261. y=-0.2, # Position legend below the x-axis
  262. # itemsizing='constant', # Keep the size of the legend entries constant
  263. # itemwidth=50
  264. )
  265. )
  266. # 保存HTML
  267. htmlFileName = '全场-{}-{}-功率曲线.html'.format(self.powerFarmInfo[Field_PowerFarmName].iloc[0],turbineModelInfo[Field_MillTypeCode])
  268. htmlFilePath = os.path.join(output_path, htmlFileName)
  269. fig.write_html(htmlFilePath)
  270. result_rows = []
  271. result_rows.append({
  272. Field_Return_TypeAnalyst: self.typeAnalyst(),
  273. Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
  274. Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
  275. Field_CodeOfTurbine: Const_Output_Total,
  276. Field_Return_FilePath: htmlFilePath,
  277. Field_Return_IsSaveDatabase: False
  278. })
  279. result_df = pd.DataFrame(result_rows)
  280. return result_df
  281. def plot_single_power_curve(self, ress, group, dataFrameGuaranteePowerCurve: pd.DataFrame, turbineName, outputAnalysisDir, conf: Contract):
  282. fig = go.Figure()
  283. for turbine_num in ress[Field_NameOfTurbine].unique():
  284. turbine_data = ress[ress[Field_NameOfTurbine] == turbine_num]
  285. # 循环创建风速-功率折线
  286. fig.add_trace(go.Scatter(
  287. x=turbine_data[Field_WindSpeed],
  288. y=turbine_data[Field_ActiverPower],
  289. mode='lines',
  290. line=dict(color='lightgrey'),
  291. name=f'{turbine_num}',
  292. showlegend=False
  293. )
  294. )
  295. if not ress.empty and Field_CutInWS in ress.columns and ress[Field_CutInWS].notna().any():
  296. cut_in_ws = ress[Field_CutInWS].min() - 1
  297. else:
  298. cut_in_ws = 2
  299. fig.add_trace(go.Scatter(
  300. x=group[Field_WindSpeed],
  301. y=group[Field_ActiverPower],
  302. mode='lines',
  303. line=dict(color='darkblue'),
  304. name=Field_ActiverPower,
  305. showlegend=False
  306. )
  307. )
  308. fig.add_trace(go.Scatter(
  309. x=dataFrameGuaranteePowerCurve[Field_WindSpeed],
  310. y=dataFrameGuaranteePowerCurve[Field_ActiverPower],
  311. mode='lines+markers',
  312. line=dict(color='red'),
  313. marker=dict(color='red', size=5),
  314. name='合同功率曲线',
  315. showlegend=True
  316. )
  317. )
  318. # 创建布局
  319. fig.update_layout(
  320. title={
  321. "text": f'机组: {turbineName[0]}'
  322. },
  323. legend=dict(
  324. orientation="h", # 或者 "v" 表示垂直
  325. yanchor="bottom", # 图例垂直对齐方式
  326. y=0, # 图例距离y轴下边界的距离(0到1之间)
  327. xanchor="right", # 图例水平对齐方式
  328. x=1, # 图例距离x轴右边界的距离(0到1之间)
  329. bgcolor='rgba(255,255,255,0)'
  330. ),
  331. xaxis=dict(
  332. title='风速',
  333. dtick=1,
  334. tickangle=-45,
  335. range=[cut_in_ws, 25]
  336. ),
  337. yaxis=dict(
  338. title='有功功率',
  339. dtick=self.axisStepActivePower,
  340. range=[self.axisLowerLimitActivePower,
  341. self.axisUpperLimitActivePower]
  342. )
  343. )
  344. # 保存图像
  345. # pngFileName = f"{turbineName[0]}.png"
  346. # pngFilePath = os.path.join(outputAnalysisDir, pngFileName)
  347. # fig.write_image(pngFilePath, scale=3)
  348. # # 保存HTML
  349. # htmlFileName = f"{turbineName[0]}.html"
  350. # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName)
  351. # fig.write_html(htmlFilePath)
  352. result_rows = []
  353. # result_rows.append({
  354. # Field_Return_TypeAnalyst: self.typeAnalyst(),
  355. # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
  356. # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
  357. # Field_CodeOfTurbine: turbineName[1],
  358. # Field_Return_FilePath: pngFilePath,
  359. # Field_Return_IsSaveDatabase: False
  360. # })
  361. # result_rows.append({
  362. # Field_Return_TypeAnalyst: self.typeAnalyst(),
  363. # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
  364. # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
  365. # Field_CodeOfTurbine: turbineName[1],
  366. # Field_Return_FilePath: htmlFilePath,
  367. # Field_Return_IsSaveDatabase: False
  368. # })
  369. result_df = pd.DataFrame(result_rows)
  370. return result_df