import os from datetime import datetime import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodBadPoint import AnalystWithGoodBadPoint class PitchPowerAnalyst(AnalystWithGoodBadPoint): """ 风电机组变桨-功率分析 """ def typeAnalyst(self): return "pitch_power" def selectColumns(self): return [Field_DeviceCode, Field_Time,Field_WindSpeed, Field_ActiverPower, Field_PitchAngel1] def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf,self.selectColumns()) dataFrame = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self) result_df1 = self.plot_power_pitch_angle(dataFrame, outputAnalysisDir, conf) result_df2 = self.drawScatterGraph(dataFrame, outputAnalysisDir, conf) result_df = pd.concat([result_df1, result_df2], ignore_index=True) return result_df def plot_power_pitch_angle(self, dataFrame:pd.DataFrame, outputAnalysisDir:str, conf: Contract): # 按设备名分组数据 dataFrameMerge = dataFrame[(dataFrame[Field_ActiverPower] > 0)].sort_values(by=Field_YearMonth) grouped = dataFrameMerge.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) # 定义固定的颜色映射列表 fixed_colors = [ "#3E409C", "#476CB9", "#3586BF", "#4FA4B5", "#52A3AE", "#60C5A3", "#85D0AE", "#A8DCA2", "#CFEE9E", "#E4F39E", "#EEF9A7", "#FBFFBE", "#FDF1A9", "#FFE286", "#FFC475", "#FCB06C", "#F78F4F", "#F96F4A", "#E4574C", "#CA3756", "#AF254F" ] # 将 fixed_colors 转换为 Plotly 的 colorscale 格式 fixed_colorscale = [ [i / (len(fixed_colors) - 1), color] for i, color in enumerate(fixed_colors) ] # 遍历每个设备并绘制散点图 result_rows1 = [] for name, group in grouped: # 创建图形 fig = go.Figure() # 添加散点图 fig.add_trace(go.Scatter( x=group[Field_ActiverPower], y=group[Field_PitchAngel1], mode='markers', # marker=dict(color='blue', size=3.5) marker=dict( color=group[Field_UnixYearMonth], colorscale=fixed_colorscale, size=3, opacity=0.7, colorbar=dict( tickvals=np.linspace( group[Field_UnixYearMonth].min(), group[Field_UnixYearMonth].max(), 6), ticktext=[datetime.fromtimestamp(ts).strftime('%Y-%m') for ts in np.linspace( group[Field_UnixYearMonth].min(), group[Field_UnixYearMonth].max(), 6)], thickness=18, len=1, # 设置颜色条的长度,使其占据整个图的高度 outlinecolor='rgba(255,255,255,0)' ), showscale=True ), showlegend=False )) # 设置图形布局 fig.update_layout( title=f'机组: {name[0]}', xaxis=dict( title='功率', range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower], dtick=self.axisStepActivePower, tickangle=-45 # 设置x轴刻度值旋转角度为45度,如果需要 ), yaxis=dict( title='桨距角', range=[self.axisLowerLimitPitchAngle, self.axisUpperLimitPitchAngle], dtick=self.axisStepPitchAngle ), coloraxis=dict( colorbar=dict( title="时间", ticks="outside", len=1, # 设置颜色条的长度,使其占据整个图的高度 thickness=20, # 调整颜色条的宽度 orientation='v', # 设置颜色条为垂直方向 tickmode='array', # 确保刻度按顺序排列 tickvals=dataFrameMerge[Field_YearMonth].unique( ).tolist(), # 确保刻度为唯一的年月 ticktext=dataFrameMerge[Field_YearMonth].unique( ).tolist() # 以%Y-%m格式显示标签 ) ) ) # 保存图像 filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png") fig.write_image(filePathOfImage, width=800, height=600, scale=3) filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html") fig.write_html(filePathOfHtml) result_rows1.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) result_rows1.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfHtml, Field_Return_IsSaveDatabase: True }) result_df1 = pd.DataFrame(result_rows1) return result_df1 def drawScatterGraph(self, dataFrame: pd.DataFrame, outputAnalysisDir: str, conf: Contract): dataFrame = dataFrame[(dataFrame[Field_ActiverPower] > 0)].sort_values( by=Field_YearMonth) grouped = dataFrame.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) ''' # 遍历每个设备的数据 result_rows2 = [] for name, group in grouped: if len(group[Field_YearMonth].unique()) > 1: fig = px.scatter_3d(dataFrame, x=Field_PitchAngel1, y=Field_YearMonth, z=Field_ActiverPower, color=Field_YearMonth, labels={Field_PitchAngel1: '桨距角', Field_YearMonth: '时间', Field_ActiverPower: '功率'}, ) # 设置固定散点大小 fig.update_traces(marker=dict(size=1.5)) # 更新图形的布局 fig.update_layout( title={ "text": f'月度桨距角功率3D散点图: {name[0]}', "x": 0.5 }, scene=dict( xaxis=dict( title='桨距角', dtick=self.axisStepPitchAngle, range=[self.axisLowerLimitPitchAngle, self.axisUpperLimitPitchAngle], ), yaxis=dict( title='时间', tickformat='%Y-%m', # 日期格式, dtick='M1', # 每月一个刻度 showgrid=True, # 显示网格线 ), zaxis=dict( title='功率', dtick=self.axisStepActivePower, range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower], showgrid=True, # 显示网格线 ) ), scene_camera=dict( up=dict(x=0, y=0, z=1), # 保持相机向上方向不变 center=dict(x=0, y=0, z=0), # 保持相机中心位置不变 eye=dict(x=-1.8, y=-1.8, z=1.2) # 调整eye属性以实现水平旋转180° ), # 设置图例标题 # legend_title_text='Time', legend=dict( orientation="h", itemsizing="constant", # Use constant size for legend items itemwidth=80 # Set the width of legend items to 50 pixels ) ) ''' # 假设 colorsList 已经在代码的其他部分定义 colorsList = [ "#3E409C", "#3586BF", "#52A3AE", "#85D0AE", "#A8DCA2", "#FBFFBE", "#FDF1A9", "#FFE286", "#FCB06C", "#F96F4A", "#E4574C", "#AF254F" ] # 遍历每个设备的数据 result_rows2 = [] for name, group in grouped: if len(group[Field_YearMonth].unique()) > 1: fig = px.scatter_3d( group, x=Field_PitchAngel1, y=Field_YearMonth, z=Field_ActiverPower, color=Field_YearMonth, color_discrete_sequence=colorsList, # 使用 colorsList 作为颜色映射 labels={ Field_PitchAngel1: '桨距角', Field_YearMonth: '时间', Field_ActiverPower: '功率' }, ) # 设置固定散点大小 fig.update_traces(marker=dict(size=1.5)) # 更新图形的布局 fig.update_layout( title={ "text": f'月度桨距角功率3D散点图: {name[0]}', "x": 0.5 }, scene=dict( xaxis=dict( title='桨距角', dtick=self.axisStepPitchAngle, range=[self.axisLowerLimitPitchAngle, self.axisUpperLimitPitchAngle], ), yaxis=dict( title='时间', tickformat='%Y-%m', # 日期格式, dtick='M1', # 每月一个刻度 showgrid=True, # 显示网格线 ), zaxis=dict( title='功率', dtick=self.axisStepActivePower, range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower], showgrid=True, # 显示网格线 ) ), scene_camera=dict( up=dict(x=0, y=0, z=1), # 保持相机向上方向不变 center=dict(x=0, y=0, z=0), # 保持相机中心位置不变 eye=dict(x=-1.8, y=-1.8, z=1.2) # 调整eye属性以实现水平旋转180° ), legend=dict( orientation="h", itemsizing="constant", # Use constant size for legend items itemwidth=80 # Set the width of legend items to 50 pixels ) ) # 保存图像 filePathOfHtml = os.path.join( outputAnalysisDir, f"{name[0]}_3D.html") fig.write_html(filePathOfHtml) result_rows2.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfHtml, Field_Return_IsSaveDatabase: True }) result_df2 = pd.DataFrame(result_rows2) return result_df2 # self.drawScatterGraphOfTurbine( # group, outputAnalysisDir, conf, name) # def drawScatterGraphOfTurbine(self, dataFrame: pd.DataFrame, outputAnalysisDir: str, conf: Contract, turbineName: str): # # 创建3D散点图 # fig = px.scatter_3d(dataFrame, # x=Field_PitchAngel1, # y=Field_YearMonth, # z=Field_ActiverPower, # color=Field_YearMonth, # labels={Field_PitchAngel1: 'Pitch Angle', # Field_YearMonth: 'Time', Field_ActiverPower: 'Power'}, # ) # # 设置固定散点大小 # fig.update_traces(marker=dict(size=1.5)) # # 更新图形的布局 # fig.update_layout( # title={ # "text": f'Monthly Pitch-Power 3D Scatter Plot: {turbineName}', # "x": 0.5 # }, # scene=dict( # xaxis=dict( # title='Pitch Angle', # range=[conf.dataContract.graphSets["pitchAngle"]["min"] if not self.common.isNone(conf.dataContract.graphSets["pitchAngle"]["min"]) else -2, # conf.dataContract.graphSets["pitchAngle"]["max"] if not self.common.isNone(conf.dataContract.graphSets["pitchAngle"]["max"]) else 28], # dtick=conf.dataContract.graphSets["pitchAngle"]["step"] if not self.common.isNone(conf.dataContract.graphSets["pitchAngle"]["step"]) else 2, # ), # yaxis=dict( # title='Time', # tickformat='%Y-%m', # 日期格式, # dtick='M1', # 每月一个刻度 # showgrid=True, # 显示网格线 # ), # zaxis=dict( # title='Power', # dtick=conf.dataContract.graphSets["activePower"]["step"] if not self.common.isNone( # conf.dataContract.graphSets["activePower"]) and not self.common.isNone( # conf.dataContract.graphSets["activePower"]["step"]) else 250, # range=[conf.dataContract.graphSets["activePower"]["min"] if not self.common.isNone( # conf.dataContract.graphSets["activePower"]["min"]) else 0, conf.dataContract.graphSets["activePower"]["max"] if not self.common.isNone(conf.dataContract.graphSets["activePower"]["max"]) else conf.rated_power*1.2], # showgrid=True, # 显示网格线 # ) # ), # scene_camera=dict( # up=dict(x=0, y=0, z=1), # 保持相机向上方向不变 # center=dict(x=0, y=0, z=0), # 保持相机中心位置不变 # eye=dict(x=-1.8, y=-1.8, z=1.2) # 调整eye属性以实现水平旋转180° # ), # # 设置图例标题 # legend_title_text='Time' # ) # # 保存图像 # outputFileHtml = os.path.join( # outputAnalysisDir, "{}_3D.html".format(turbineName)) # fig.write_html(outputFileHtml) """" def drawScatterGraph(self, dataFrame: pd.DataFrame, outputAnalysisDir, conf: Contract): ## 绘制变桨-功率分布图并保存为文件。 ## 参数: ## dataFrameMerge (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 ## outputAnalysisDir (str): 分析输出目录。 ## conf (ConfBusiness): 配置 ## 按设备名分组数据 colorsList = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf', '#aec7e8', '#ffbb78'] grouped = dataFrame.groupby(Field_NameOfTurbine) # 遍历每个设备的数据 for name, group in grouped: # 创建颜色映射,将每个年月映射到一个唯一的颜色 unique_months = group[Field_YearMonth].unique() colors = [ colorsList[i % 12] for i in range(len(unique_months))] color_map = dict(zip(unique_months, colors)) # 使用go.Scatter3d创建3D散点图 trace = go.Scatter3d( x=group[Field_PitchAngel1], y=group[Field_YearMonth], z=group[Field_ActiverPower], mode='markers', marker=dict( color=[color_map[month] for month in group[Field_YearMonth]], size=1.5, line=dict( color='rgba(0, 0, 0, 0)', # 设置边框颜色为透明,以去掉白色边框 width=0 # 设置边框宽度为0,进一步确保没有边框 ), opacity=0.8 # 调整散点的透明度,增加透视效果 ) ) # 创建图形 fig = go.Figure(data=[trace]) # 更新图形的布局 fig.update_layout( title={ "text": f'三维散点图{name}', "x": 0.5 }, scene=dict( xaxis=dict( title='桨距角', dtick=conf.dataContract.graphSets["pitchAngle"]["step"] if not self.common.isNone( conf.dataContract.graphSets["pitchAngle"]["step"]) else 2, # 设置y轴刻度间隔为0.1 range=[conf.dataContract.graphSets["pitchAngle"]["min"] if not self.common.isNone( conf.dataContract.graphSets["pitchAngle"]["min"]) else -2, conf.dataContract.graphSets["pitchAngle"]["max"] if not self.common.isNone(conf.dataContract.graphSets["pitchAngle"]["max"]) else 28], # 设置y轴的范围从0到1 showgrid=True, # 显示网格线 ), yaxis=dict( title='时间', tickmode='array', tickvals=unique_months, ticktext=unique_months, showgrid=True, # 显示网格线 categoryorder='category ascending' ), zaxis=dict( title='功率', dtick=conf.dataContract.graphSets["activePower"]["step"] if not self.common.isNone( conf.dataContract.graphSets["activePower"]) and not self.common.isNone( conf.dataContract.graphSets["activePower"]["step"]) else 250, range=[conf.dataContract.graphSets["activePower"]["min"] if not self.common.isNone( conf.dataContract.graphSets["activePower"]["min"]) else 0, conf.dataContract.graphSets["activePower"]["max"] if not self.common.isNone(conf.dataContract.graphSets["activePower"]["max"]) else conf.rated_power*1.2], ) ), scene_camera=dict( up=dict(x=0, y=0, z=1), # 保持相机向上方向不变 center=dict(x=0, y=0, z=0), # 保持相机中心位置不变 eye=dict(x=-1.8, y=-1.8, z=1.2) # 调整eye属性以实现水平旋转180° ), margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距 ) # 保存图像 outputFileHtml = os.path.join( outputAnalysisDir, "{}.html".format(name)) fig.write_html(outputFileHtml) """