import os from datetime import datetime import pandas as pd import numpy as np import pandas as pd import plotly.graph_objects as go from plotly.subplots import make_subplots from behavior.analyst import Analyst from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * import plotly.express.colors as px_colors class PowerScatterAnalyst(Analyst): """ 风电机组功率曲线散点分析。 秒级scada数据运算太慢,建议使用分钟级scada数据 """ def typeAnalyst(self): return "power_scatter" def turbinesAnalysis(self, dataFrameMerge, outputAnalysisDir, confData: ConfBusiness): if len(dataFrameMerge) <= 0: print("After screening for blade pitch angle less than the configured value, plot power curve scatter points without data") return dataFrameGuaranteePowerCurve = self.dataFrameContractOfTurbine self.drawOfPowerCurveScatter( dataFrameMerge, outputAnalysisDir, confData, dataFrameGuaranteePowerCurve) def contractGuaranteePowerCurveData(self, csvPowerCurveFilePath): dataFrameGuaranteePowerCurve = pd.read_csv( csvPowerCurveFilePath, encoding=charset_unify) return dataFrameGuaranteePowerCurve def drawOfPowerCurveScatter(self, dataFrame: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness, dataFrameGuaranteePowerCurve: pd.DataFrame): """ 绘制风速-功率分布图并保存为文件。 参数: dataFrameMerge (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 csvPowerCurveFilePath (str): 功率曲线文件路径。 outputAnalysisDir (str): 分析输出目录。 confData (ConfBusiness): 配置 """ # 按设备名分组数据 colorsList = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf', '#aec7e8', '#ffbb78'] grouped = dataFrame.groupby(Field_NameOfTurbine) # 遍历每个设备的数据 for name, group in grouped: # 创建颜色映射,将每个年月映射到一个唯一的颜色 unique_months = group[Field_YearMonth].unique() colors = [ colorsList[i % 12] for i in range(len(unique_months))] color_map = dict(zip(unique_months, colors)) # 使用go.Scatter3d创建3D散点图 trace = go.Scatter3d( x=group[confData.field_wind_speed], y=group[Field_YearMonth], z=group[confData.field_power], mode='markers', marker=dict( color=[color_map[month] for month in group[Field_YearMonth]], size=1.5, line=dict( color='rgba(0, 0, 0, 0)', # 设置边框颜色为透明,以去掉白色边框 width=0 # 设置边框宽度为0,进一步确保没有边框 ), opacity=0.8 # 调整散点的透明度,增加透视效果 ) ) # 创建图形 fig = go.Figure(data=[trace]) # 更新图形的布局 fig.update_layout( title={ "text": f'Monthly power 3D scatter plot {name}', "x": 0.5 }, scene=dict( xaxis=dict(title='Wind Speed'), yaxis=dict( title='Time', tickmode='array', tickvals=unique_months, ticktext=unique_months, categoryorder='category ascending' ), zaxis=dict( title='Power', dtick=confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250, range=[confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2], ) ), scene_camera=dict( up=dict(x=0, y=0, z=1), # 保持相机向上方向不变 center=dict(x=0, y=0, z=0), # 保持相机中心位置不变 eye=dict(x=-1.8, y=-1.8, z=1.2) # 调整eye属性以实现水平旋转180° ), margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距 ) # 保存图像 outputFileHtml = os.path.join( outputAnalysisDir, "{}.html".format(name)) fig.write_html(outputFileHtml)