import os import pandas as pd from datetime import datetime import numpy as np import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly.express as px import plotly.io as pio import seaborn as sns import matplotlib.pyplot as plt import matplotlib.cm as cm from matplotlib.ticker import MultipleLocator from matplotlib.colors import Normalize from behavior.analyst import Analyst from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * from datetime import timedelta class GeneratorSpeedPowerAnalyst(Analyst): """ 风电机组发电机转速-有功功率分析 """ def typeAnalyst(self): return "speed_power" def turbinesAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): # self.create_and_save_plots( # dataFrameMerge, outputAnalysisDir, confData) self.drawScatter2DMonthly( dataFrameMerge, outputAnalysisDir, confData) self.drawScatterGraph(dataFrameMerge, outputAnalysisDir, confData) self.drawScatterGraphForTurbines( dataFrameMerge, outputAnalysisDir, confData) """ def create_and_save_plots(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): x_name = 'generator_speed' y_name = 'power' grouped = dataFrameMerge.groupby(Field_NameOfTurbine) for name, group in grouped: # 创建图形和坐标轴 fig, ax = plt.subplots() cmap = cm.get_cmap('rainbow') # 绘制散点图 scatter = ax.scatter(x=group[confData.field_gen_speed]*confData.value_gen_speed_multiple if not self.common.isNone(confData.value_gen_speed_multiple) else group[confData.field_gen_speed], y=group[confData.field_power], c=group['monthIntTime'], cmap=cmap, s=5) # 设置图形标题和坐标轴标签 ax.set_title(f'turbine_name={name}') # 设置x轴的刻度步长 # 假设您想要每100个单位一个刻度 # 创建每100个单位一个刻度的定位器 loc = MultipleLocator(confData.graphSets["generatorSpeed"]["step"] if not self.common.isNone( confData.graphSets["generatorSpeed"]) and not self.common.isNone( confData.graphSets["generatorSpeed"]["step"]) else 200) ax.xaxis.set_major_locator(loc) # 将定位器应用到x轴上 ax.set_xlim(confData.graphSets["generatorSpeed"]["min"] if not self.common.isNone( confData.graphSets["generatorSpeed"]["min"]) else 1000, confData.graphSets["generatorSpeed"]["max"] if not self.common.isNone(confData.graphSets["generatorSpeed"]["max"]) else 2000) # 创建每100个单位一个刻度的定位器 yloc = MultipleLocator(confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250) ax.yaxis.set_major_locator(yloc) # 将定位器应用到y轴上 ax.set_ylim(confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2) ax.set_xlabel(x_name) ax.set_ylabel(y_name) # 设置颜色条 unique_months = len(group['年月'].unique()) ticks = np.linspace(group['monthIntTime'].min( ), group['monthIntTime'].max(), min(unique_months, 6)) # 减少刻度数量 ticklabels = [datetime.fromtimestamp( tick).strftime('%Y-%m') for tick in ticks] norm = Normalize(group['monthIntTime'].min(), group['monthIntTime'].max()) sm = cm.ScalarMappable(norm=norm, cmap=cmap) # 添加颜色条 cbar = fig.colorbar(sm, ax=ax) cbar.set_ticks(ticks) cbar.set_ticklabels(ticklabels) # 旋转x轴刻度标签 plt.xticks(rotation=45) plt.tight_layout() plt.title(f'{Field_NameOfTurbine}={name}') # 保存图片到指定路径 output_file = os.path.join(outputAnalysisDir, f"{name}.png") plt.savefig(output_file, bbox_inches='tight', dpi=120) plt.close() """ def drawScatter2DMonthlyOfTurbine(self, dataFrame: pd.DataFrame, outputAnalysisDir: str, confData: ConfBusiness, turbineName: str): # 设置颜色条参数 dataFrame = dataFrame.sort_values(by=Field_YearMonth) # 绘制 Plotly 散点图 fig = px.scatter( dataFrame, x=dataFrame[confData.field_gen_speed], y=dataFrame[confData.field_power], color=Field_YearMonth, color_continuous_scale='Rainbow', # 颜色条样式 labels={confData.field_gen_speed: 'Generator Speed', Field_YearMonth: 'Time', confData.field_power: 'Power'}, ) # 设置固定散点大小 fig.update_traces(marker=dict(size=3)) # 如果需要颜色轴的刻度和标签 # 以下是以比例方式进行色彩的可视化处理 fig.update_layout( title={ "text": f'Monthly generator speed power scatter plot {turbineName}', "x": 0.5 }, xaxis=dict( title='Generator Speed', dtick=confData.graphSets["generatorSpeed"]["step"] if not self.common.isNone( confData.graphSets["generatorSpeed"]) and not self.common.isNone( confData.graphSets["generatorSpeed"]["step"]) else 200, range=[ confData.graphSets["generatorSpeed"]["min"] if not self.common.isNone( confData.graphSets["generatorSpeed"]["min"]) else 1000, confData.graphSets["generatorSpeed"]["max"] if not self.common.isNone(confData.graphSets["generatorSpeed"]["max"]) else 2000 ], tickangle=45 ), yaxis=dict( title='Power', dtick=confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250, range=[confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2], ), coloraxis=dict( colorbar=dict( title="Time", ticks="outside", len=1, # 设置颜色条的长度,使其占据整个图的高度 thickness=20, # 调整颜色条的宽度 orientation='v', # 设置颜色条为垂直方向 tickmode='array', # 确保刻度按顺序排列 tickvals=dataFrame[Field_YearMonth].unique( ).tolist(), # 确保刻度为唯一的年月 ticktext=dataFrame[Field_YearMonth].unique( ).tolist() # 以%Y-%m格式显示标签 ) ) ) # 保存图片 outputFilePathPNG = os.path.join( outputAnalysisDir, f"{turbineName}.png") pio.write_image(fig, outputFilePathPNG, format='png', width=800, height=600, scale=1) return fig def drawScatterGraphOfTurbine(self, dataFrame: pd.DataFrame, outputAnalysisDir: str, confData: ConfBusiness, turbineName: str): # 创建3D散点图 fig = px.scatter_3d(dataFrame, x=confData.field_gen_speed, y=Field_YearMonth, z=confData.field_power, color=Field_YearMonth, labels={confData.field_gen_speed: 'Generator Speed', Field_YearMonth: 'Time', confData.field_power: 'Power'}, ) # 设置固定散点大小 fig.update_traces(marker=dict(size=1.5)) # 更新图形的布局 fig.update_layout( title={ "text": f'Monthly generator speed power scatter plot {turbineName}', "x": 0.5 }, scene=dict( xaxis=dict( title='Generator Speed', dtick=confData.graphSets["generatorSpeed"]["step"] if not self.common.isNone( confData.graphSets["generatorSpeed"]["step"]) else 200, # 设置y轴刻度间隔为0.1 range=[confData.graphSets["generatorSpeed"]["min"] if not self.common.isNone( confData.graphSets["generatorSpeed"]["min"]) else 1000, confData.graphSets["generatorSpeed"]["max"] if not self.common.isNone(confData.graphSets["generatorSpeed"]["max"]) else 2000], # 设置y轴的范围从0到1 showgrid=True, # 显示网格线 ), yaxis=dict( title='Time', tickformat='%Y-%m', # 日期格式, showgrid=True, # 显示网格线 ), zaxis=dict( title='Power', dtick=confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250, range=[confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2], showgrid=True, # 显示网格线 ) ), scene_camera=dict( up=dict(x=0, y=0, z=1), # 保持相机向上方向不变 center=dict(x=0, y=0, z=0), # 保持相机中心位置不变 eye=dict(x=-1.8, y=-1.8, z=1.2) # 调整eye属性以实现水平旋转180° ), # 设置图例标题 legend_title_text='Time' ) # 保存图像 outputFileHtml = os.path.join( outputAnalysisDir, "{}.html".format(turbineName)) fig.write_html(outputFileHtml) def drawScatter2DMonthly(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): grouped = dataFrameMerge.groupby(Field_NameOfTurbine) for name, group in grouped: self.drawScatter2DMonthlyOfTurbine( group, outputAnalysisDir, confData, name) def drawScatterGraph(self, dataFrame: pd.DataFrame, outputAnalysisDir: str, confData: ConfBusiness): """ 绘制风速-功率分布图并保存为文件。 参数: dataFrameMerge (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 outputAnalysisDir (str): 分析输出目录。 confData (ConfBusiness): 配置 """ dataFrame = dataFrame[(dataFrame[confData.field_power] > 0)].sort_values( by=Field_YearMonth) grouped = dataFrame.groupby(Field_NameOfTurbine) # 遍历每个设备的数据 for name, group in grouped: if len(group[Field_YearMonth].unique()) > 1: self.drawScatterGraphOfTurbine( group, outputAnalysisDir, confData, name) else: fig=self.drawScatter2DMonthlyOfTurbine( group, outputAnalysisDir, confData, name) # 保存html outputFileHtml = os.path.join( outputAnalysisDir, "{}.html".format(name)) fig.write_html(outputFileHtml) def drawScatterGraphForTurbines(self, dataFrame: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): """ 绘制风速-功率分布图并保存为文件。 参数: dataFrameMerge (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 outputAnalysisDir (str): 分析输出目录。 confData (ConfBusiness): 配置 """ dataFrame = dataFrame[(dataFrame[confData.field_power] > 0)].sort_values( by=Field_NameOfTurbine) # 创建3D散点图 fig = px.scatter_3d(dataFrame, x=confData.field_gen_speed, y=Field_NameOfTurbine, z=confData.field_power, color=Field_NameOfTurbine, labels={confData.field_gen_speed: 'Generator Speed', Field_NameOfTurbine: 'Turbine', confData.field_power: 'Power'}, ) # 设置固定散点大小 fig.update_traces(marker=dict(size=1.5)) # 更新图形的布局 fig.update_layout( title={ "text": 'Turbine generator speed power 3D scatter plot', "x": 0.5 }, scene=dict( xaxis=dict( title='Generator Speed', dtick=confData.graphSets["generatorSpeed"]["step"] if not self.common.isNone( confData.graphSets["generatorSpeed"]["step"]) else 200, # 设置y轴刻度间隔为0.1 range=[confData.graphSets["generatorSpeed"]["min"] if not self.common.isNone( confData.graphSets["generatorSpeed"]["min"]) else 1000, confData.graphSets["generatorSpeed"]["max"] if not self.common.isNone(confData.graphSets["generatorSpeed"]["max"]) else 2000], # 设置y轴的范围从0到1 showgrid=True, # 显示网格线 ), yaxis=dict( title='Turbine', showgrid=True, # 显示网格线 ), zaxis=dict( title='Power', dtick=confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250, range=[confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2], showgrid=True, # 显示网格线 ) ), scene_camera=dict( up=dict(x=0, y=0, z=1), # 保持相机向上方向不变 center=dict(x=0, y=0, z=0), # 保持相机中心位置不变 eye=dict(x=-1.8, y=-1.8, z=1.2) # 调整eye属性以实现水平旋转180° ), # 设置图例标题 legend_title_text='Turbine' ) # 保存图像 outputFileHtml = os.path.join( outputAnalysisDir, "{}.html".format(self.typeAnalyst())) fig.write_html(outputFileHtml)