import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib.cm as cm import matplotlib.ticker as ticker import plotly.express as px import math from behavior.analyst import Analyst from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * class MinPitchAnalyst(Analyst): """ 风电机组最小桨距角分析 """ def typeAnalyst(self): return "min_pitch" def turbinesAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): self.drawTrendGraph(dataFrameMerge, outputAnalysisDir, confData) def drawTrendGraph(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): """ Generates pitch angle distribution scatter plots for turbines in a wind farm using plotly. Parameters: - dataFrameMerge: pd.DataFrame, DataFrame containing turbine data. - outputAnalysisDir: str, path to save the output plots. - confData: ConfBusiness, configuration object containing field names. """ # 检查所需列是否存在 required_columns = {Field_YearMonthDay, confData.field_pitch_angle1} if not required_columns.issubset(dataFrameMerge.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") pitchAngleRate = 'pitch_angle_rate' fieldPitchAngleBin = 'pitch_angle_bin' # Custom color scale: Blue (high pitch_angle_rate) to Light Grey (low pitch_angle_rate) custom_color_scale = [ (0.0, "rgb(240, 240, 240)"), # Light grey for the lowest values # (0.25, "rgb(240, 240, 240)"), # Light grey for the lowest values (0.5, "rgba(55.0, 135.0, 192.33333333333334, 1.0)"), # Medium blue-grey # (0.75, "rgba(55.0, 135.0, 192.33333333333334, 1.0)"), # Medium blue-grey # Dark blue for the highest values (1.0, "rgba(55.0, 135.0, 192.33333333333334, 1.0)") ] # Group data by turbine identifier grouped = dataFrameMerge.groupby(Field_NameOfTurbine) for name, group in grouped: # Convert the date column to datetime type group[Field_YearMonthDay] = pd.to_datetime( group[Field_YearMonthDay]) # Creating bins of 0.2 intervals for pitch angles bins = pd.interval_range(start=group[confData.field_pitch_angle1].min(), end=group[confData.field_pitch_angle1].max( ) + 0.2, freq=0.2, closed='right') group[fieldPitchAngleBin] = pd.cut( group[confData.field_pitch_angle1], bins=bins) group[fieldPitchAngleBin] = group[fieldPitchAngleBin].apply( lambda x: x.left) # 提取每个区间的左端点作为值 # Calculate the pitch angle rate within each day df = group.groupby([Field_YearMonthDay, confData.field_pitch_angle1] ).size().reset_index(name='count') # df = group.groupby([Field_YearMonthDay, fieldPitchAngleBin] # ).size().reset_index(name='count') total_counts = group.groupby( Field_YearMonthDay).size().reset_index(name='total_count') df = df.merge(total_counts, on=Field_YearMonthDay) df[pitchAngleRate] = df['count'] / df['total_count'] * 100 # df[pitchAngleRate] = (df['count'] / df['total_count']).apply(lambda x: x ** 0.5)*100 # Plotting using plotly fig = px.scatter(df, x=Field_YearMonthDay, y=confData.field_pitch_angle1, # 桨距角不分仓方式 # y=fieldPitchAngleBin, # 桨距角分仓方式 size='count', color=pitchAngleRate, # color_continuous_scale='Blues', color_continuous_scale=custom_color_scale ) # Set date format on x-axis fig.update_xaxes( title='Time', tickformat='%Y-%m-%d', tickangle=-45) fig.update_yaxes(title='Pitch Angle', dtick=confData.graphSets["pitchAngle"]["step"] if not self.common.isNone( confData.graphSets["pitchAngle"]["step"]) else 2, # 设置y轴刻度间隔为0.1 range=[confData.graphSets["pitchAngle"]["min"] if not self.common.isNone( confData.graphSets["pitchAngle"]["min"]) else -2, confData.graphSets["pitchAngle"]["max"] if not self.common.isNone(confData.graphSets["pitchAngle"]["max"]) else 28], # 设置y轴的范围从0到1 ) # Customizing legend fig.update_layout( title={ "text": f'Pitch Angle Distribution for {name}', "x": 0.5 }, coloraxis_colorbar=dict(title='Rate'), margin=dict(t=50, b=10), # t为顶部(top)间距,b为底部(bottom)间距 # plot_bgcolor='rgb(240, 240, 240)' ) # Set marker size if fixed size is needed # Fixed size for all points fig.update_traces(marker=dict(size=3, opacity=0.5)) # Save plot filePathOfImage = os.path.join(outputAnalysisDir, f"{name}.png") fig.write_image(filePathOfImage, scale=2) filePathOfHtml = os.path.join(outputAnalysisDir, f"{name}.html") fig.write_html(filePathOfHtml)