import os import pandas as pd import numpy as np import plotly.graph_objects as go from plotly.subplots import make_subplots from behavior.analyst import Analyst from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * import plotly.offline as offline class PitchPowerWindSpeedAnalyst(Analyst): def typeAnalyst(self): return "pitch_power_windspeed" # def filterCommon(self,dataFrame:pd.DataFrame, confData:ConfBusiness): # dataFrame=super().filterCommon(dataFrame,confData) # dataFrame=dataFrame[(dataFrame[confData.field_power]>=1350) & (dataFrame[confData.field_power]<=1500)] # return dataFrame def turbinesAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): self.drawGraph(dataFrameMerge, outputAnalysisDir, confData) def drawGraph(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): """ 绘制3D散点图。 参数: df: pandas.DataFrame。 返回: 一个Plotly图形对象。 """ # 检查所需列是否存在 required_columns = {confData.field_pitch_angle1, confData.field_power, confData.field_wind_speed} if not required_columns.issubset(dataFrameMerge.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") # 按设备名分组数据 grouped = dataFrameMerge.groupby(Field_NameOfTurbine) for name, group in grouped: layout = go.Layout( title={ "text": f'3D散点图: 风速 vs. 桨距角 vs. 功率 {name}', "x": 0.5 }, scene=dict( xaxis=dict( title='风速', dtick=2, # 设置轴刻度间隔 range=[0, 26], # 设置轴的范围 ), yaxis=dict( title='桨距角', dtick=confData.graphSets["pitchAngle"]["step"] if not self.common.isNone( confData.graphSets["pitchAngle"]["step"]) else 2, # 设置y轴刻度间隔为0.1 range=[confData.graphSets["pitchAngle"]["min"] if not self.common.isNone( confData.graphSets["pitchAngle"]["min"]) else -2, confData.graphSets["pitchAngle"]["max"] if not self.common.isNone(confData.graphSets["pitchAngle"]["max"]) else 28], # 设置y轴的范围从0到1 ), zaxis=dict( title='功率', dtick=confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250, range=[confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2], ) ), # t为顶部(top)间距,b为底部(bottom)间距 margin=dict(t=50, b=10) ) # 创建 3D 散点图 fig = go.Figure(data=[go.Scatter3d( x=group[confData.field_wind_speed], y=group[confData.field_pitch_angle1], z=group[confData.field_power], mode='markers', # 设置模式为 markers,表示绘制散点图 marker=dict( size=1.5, # 设置散点的大小 # 你还可以设置其他属性,如颜色、透明度等 # color='blue', # opacity=0.8 ) )], layout=layout) # 假设 layout 已经定义好了 # 保存html outputFileHtml = os.path.join(outputAnalysisDir, f"{name}.html") fig.write_html(outputFileHtml) # 保存图表为HTML文件 # offline.plot(fig, filename=outputFileHtml, auto_open=False) # 保存图像 # output_file = os.path.join(outputAnalysisDir, f"{name}.png") # fig.write_image(output_file)