import os from datetime import datetime import numpy as np import pandas as pd import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodBadPoint import AnalystWithGoodBadPoint from plotly.subplots import make_subplots class PowerScatter2DAnalyst(AnalystWithGoodBadPoint): """ 风电机组功率曲线散点分析。 秒级scada数据运算太慢,建议使用分钟级scada数据 """ def typeAnalyst(self): return "power_scatter_2D" def selectColumns(self): return [Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower] def addPropertyToDataFrame(self,dataFrameOfTurbine : pd.DataFrame, currTurbineInfo : pd.Series, currTurbineModelInfo : pd.Series): dataFrameOfTurbine[Field_PowerFarmCode] = self.currPowerFarmInfo[Field_PowerFarmCode] dataFrameOfTurbine[Field_MillTypeCode] = currTurbineModelInfo[Field_MillTypeCode] def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, self.selectColumns()) dataFrame = self.userDataFrame(dictionary, conf.dataContract.configAnalysis, self) if len(dataFrame) <= 0: print("After screening for blade pitch angle less than the configured value, plot power curve scatter points without data") return grouped = self.dataFrameContractOfTurbine.groupby( [Field_PowerFarmCode, Field_MillTypeCode]) for groupByKey, contractPowerCurveOfMillType in grouped: break return self.drawOfPowerCurveScatter(dataFrame, outputAnalysisDir, conf, contractPowerCurveOfMillType) def drawOfPowerCurveScatter(self, dataFrame: pd.DataFrame, outputAnalysisDir, conf: Contract, dataFrameGuaranteePowerCurve: pd.DataFrame): """ 绘制风速-功率分布图并保存为文件。 参数: dataFrameMerge (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 csvPowerCurveFilePath (str): 功率曲线文件路径。 outputAnalysisDir (str): 分析输出目录。 confData (ConfBusiness): 配置 """ x_name = '风速' y_name = '功率' #机型切入风速 series cutInWsField = self.turbineModelInfo[Field_CutInWS] cut_in_ws = cutInWsField.min() - 1 if cutInWsField.notna().any() else 2 # if not dataFrame.empty and Field_CutInWS in dataFrame.columns and dataFrame[Field_CutInWS].notna().any(): # cut_in_ws = dataFrame[Field_CutInWS].min() - 1 # else: # cut_in_ws = 2 ''' # 按设备名分组数据 grouped = dataFrame.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) result_rows = [] # 定义固定的颜色映射列表 fixed_colors =[ 'rgb(255, 0, 0)', # 红色 'rgb(0, 255, 0)', # 绿色 'rgb(0, 0, 255)', # 蓝色 'rgb(255, 255, 0)', # 黄色 'rgb(255, 0, 255)', # 紫色 'rgb(0, 255, 255)' # 青色 ] # 遍历每个设备的数据 for name, group in grouped: fig = make_subplots() # 提取月份 group['month'] = group['monthIntTime'].apply(lambda x: datetime.fromtimestamp(x).month) unique_months = group['month'].unique() # 计算时间跨度 time_span_months = len(unique_months) if time_span_months >= 6: # 绘制散点图(时间跨度大于等于6个月) scatter = go.Scatter(x=group[Field_WindSpeed], y=group[Field_ActiverPower], mode='markers', marker=dict( color=group['monthIntTime'], colorscale='Rainbow', size=3, opacity=0.7, colorbar=dict( tickvals=np.linspace( group['monthIntTime'].min(), group['monthIntTime'].max(), 6), ticktext=[datetime.fromtimestamp(ts).strftime( '%Y-%m') for ts in np.linspace(group['monthIntTime'].min(), group['monthIntTime'].max(), 6)], thickness=18, len=1, # 设置颜色条的长度,使其占据整个图的高度 outlinecolor='rgba(255,255,255,0)' ), showscale=True ), showlegend=False) # 不显示散点图的legend,用colorbar代替 fig.add_trace(scatter) else: # 绘制散点图(时间跨度小于6个月) for i,month in enumerate(unique_months): month_data = group[group['month'] == month] # 使用固定的颜色列表 color = fixed_colors[i % len(fixed_colors)] scatter = go.Scatter(x=month_data[Field_WindSpeed], y=month_data[Field_ActiverPower], mode='markers', marker=dict( color=color, size=3, opacity=0.7 ), name=f'{datetime.fromtimestamp(month_data["monthIntTime"].iloc[0]).strftime("%Y-%m")}', showlegend=True) fig.add_trace(scatter) ''' # 按设备名分组数据 grouped = dataFrame.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) result_rows = [] # 定义固定的颜色映射列表 fixed_colors = [ "#3E409C", "#476CB9", "#3586BF", "#4FA4B5", "#52A3AE", "#60C5A3", "#85D0AE", "#A8DCA2", "#CFEE9E", "#E4F39E", "#EEF9A7", "#FBFFBE", "#FDF1A9", "#FFE286", "#FFC475", "#FCB06C", "#F78F4F", "#F96F4A", "#E4574C", "#CA3756", "#AF254F" ] # 将 fixed_colors 转换为 Plotly 的 colorscale 格式 fixed_colorscale = [ [i / (len(fixed_colors) - 1), color] for i, color in enumerate(fixed_colors) ] fixed_colors_points = [ "#F96F4A", "#FFC475", "#FBFFBE", "#85D0AE", "#3586BF", "#3E409C" ] # 遍历每个设备的数据 for name, group in grouped: fig = make_subplots() # 提取月份 group['month'] = group['monthIntTime'].apply(lambda x: datetime.fromtimestamp(x).month) unique_months = group['month'].unique() # 计算时间跨度 time_span_months = len(unique_months) if time_span_months >= 6: # 绘制散点图(时间跨度大于等于6个月) scatter = go.Scatter(x=group[Field_WindSpeed], y=group[Field_ActiverPower], mode='markers', marker=dict( color=group['monthIntTime'], colorscale=fixed_colorscale, # 使用自定义的 colorscale size=3, opacity=0.7, colorbar=dict( tickvals=np.linspace( group['monthIntTime'].min(), group['monthIntTime'].max(), 6), ticktext=[datetime.fromtimestamp(ts).strftime( '%Y-%m') for ts in np.linspace(group['monthIntTime'].min(), group['monthIntTime'].max(), 6)], thickness=18, len=1, # 设置颜色条的长度,使其占据整个图的高度 outlinecolor='rgba(255,255,255,0)' ), showscale=True ), showlegend=False) # 不显示散点图的 legend,用 colorbar 代替 fig.add_trace(scatter) else: # 绘制散点图(时间跨度小于6个月) for i, month in enumerate(unique_months): month_data = group[group['month'] == month] # 使用固定的颜色列表 color = fixed_colors_points[i % len(fixed_colors_points)] scatter = go.Scatter(x=month_data[Field_WindSpeed], y=month_data[Field_ActiverPower], mode='markers', marker=dict( color=color, size=3, opacity=0.7 ), name=f'{datetime.fromtimestamp(month_data["monthIntTime"].iloc[0]).strftime("%Y-%m")}', showlegend=True) fig.add_trace(scatter) # 绘制合同功率曲线 line = go.Scatter(x=dataFrameGuaranteePowerCurve[Field_WindSpeed], y=dataFrameGuaranteePowerCurve[Field_ActiverPower], mode='lines+markers', marker=dict(color='gray', size=7), name='合同功率曲线') fig.add_trace(line, secondary_y=False) # 设置图形布局 fig.update_layout( title=f'机组: {name[0]}', xaxis=dict(title=x_name, range=[cut_in_ws, 25], tickmode='linear', tick0=0, dtick=1, tickangle=-45), yaxis=dict(title=y_name, dtick=self.axisStepActivePower, range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower] ), legend=dict(yanchor="bottom", y=0, xanchor="right", x=1, font=dict( size=10), bgcolor='rgba(255,255,255,0)') ) # 保存图像 pngFileName = f"{name[0]}-scatter.png" pngFilePath = os.path.join(outputAnalysisDir, pngFileName) fig.write_image(pngFilePath, scale=3) # 保存HTML htmlFileName = f"{name[0]}-scatter.html" htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) fig.write_html(htmlFilePath) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: pngFilePath, Field_Return_IsSaveDatabase: False }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: htmlFilePath, Field_Return_IsSaveDatabase: True }) result_df = pd.DataFrame(result_rows) return result_df