import os import pandas as pd import numpy as np import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodPoint import AnalystWithGoodPoint from plotly.subplots import make_subplots from scipy.optimize import curve_fit class YawErrorAnalyst(AnalystWithGoodPoint): """ 风电机组静态偏航误差分析 """ fieldWindDirFloor = 'wind_dir_floor' fieldPower = 'power' fieldPowerMean = 'mean_power' fieldPowerMax = 'max_power' fieldPowerMin = 'min_power' fieldPowerMedian = 'median_power' fieldPowerGT0p = 'power_gt_0' fieldPowerGT80p = 'power_gt_80p' fieldPowerRatio0p = 'ratio_0' fieldPowerRatio80p = 'ratio_80p' fieldSlop = 'slop' def typeAnalyst(self): return "yaw_error" def selectColumns(self): return [Field_DeviceCode,Field_Time,Field_WindSpeed,Field_ActiverPower,Field_AngleIncluded] def filterCommon(self, dataFrame: pd.DataFrame, conf: Contract): dataFrame = dataFrame[~((dataFrame[Field_AngleIncluded].abs() >= 15))] return dataFrame def calculateYawError(self, dataFrame: pd.DataFrame, fieldAngleInclude, fieldActivePower): dataFrame = dataFrame.dropna( subset=[Field_NameOfTurbine, fieldAngleInclude, fieldActivePower]) # Calculate floor values and other transformations dataFrame[self.fieldWindDirFloor] = np.floor( dataFrame[fieldAngleInclude]).astype(int) dataFrame[self.fieldPower] = dataFrame[fieldActivePower].astype(float) # Calculate aggregated metrics for power grouped = dataFrame.groupby(self.fieldWindDirFloor).agg({ Field_NameOfTurbine: ['min'], self.fieldPower: ['mean', 'max', 'min', 'median', lambda x: ( x > 0).sum(), lambda x: (x > x.median()).sum()] }).reset_index() # Rename columns for clarity grouped.columns = [self.fieldWindDirFloor, Field_NameOfTurbine, self.fieldPowerMean, self.fieldPowerMax, self.fieldPowerMin, self.fieldPowerMedian, self.fieldPowerGT0p, self.fieldPowerGT80p] # Calculate total sums for conditions power_gt_0_sum = grouped[self.fieldPowerGT0p].sum() power_gt_80p_sum = grouped[self.fieldPowerGT80p].sum() # Calculate ratios grouped[self.fieldPowerRatio0p] = grouped[self.fieldPowerGT0p] / \ power_gt_0_sum grouped[self.fieldPowerRatio80p] = grouped[self.fieldPowerGT80p] / \ power_gt_80p_sum # Filter out zero ratios and calculate slope grouped = grouped[grouped[self.fieldPowerRatio0p] > 0] grouped[self.fieldSlop] = grouped[self.fieldPowerRatio80p] / \ grouped[self.fieldPowerRatio0p] # Sort by wind direction floor grouped.sort_values(self.fieldWindDirFloor, inplace=True) # # Write to CSV # grouped.to_csv(output_path, index=False) return grouped def poly_func(self, x, a, b, c, d, e): return a * x**4 + b * x ** 3 + c * x ** 2 + d * x + e def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes,conf,self.selectColumns()) dataFrameMerge = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self) return self.yawErrorAnalysis(dataFrameMerge, outputAnalysisDir, conf) def yawErrorAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract): # 检查所需列是否存在 required_columns = {Field_ActiverPower, Field_YawError} if not required_columns.issubset(dataFrameMerge.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") results = [] result_rows = [] grouped = dataFrameMerge.groupby( [Field_NameOfTurbine, Field_CodeOfTurbine]) for name, group in grouped: df = self.calculateYawError( group, Field_YawError, Field_ActiverPower) df.dropna(inplace=True) # drop extreme value, the 3 largest and 3 smallest df_ = df[df[self.fieldPowerRatio80p] > 0.000] xdata = df_[self.fieldWindDirFloor] ydata = df_[self.fieldSlop] # make ydata smooth ydata = ydata.rolling(7).median()[6:] xdata = xdata[3:-3] if len(xdata) <= 0 and len(ydata) <= 0: continue # Curve fitting popt, pcov = curve_fit(self.poly_func, xdata, ydata) # popt contains the optimized parameters a, b, and c # Generate fitted y-dataFrame using the optimized parameters fitted_ydata = self.poly_func(xdata, *popt) # get the max value of fitted_ydata and its index max_pos = fitted_ydata.idxmax() # Create a subplot with two rows fig = make_subplots(rows=2, cols=1) # First subplot fig.add_trace( go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldSlop], mode='markers', name="Energy Gain", marker=dict(size=5)), row=1, col=1 ) fig.add_trace( go.Scatter(x=xdata, y=fitted_ydata, mode='lines', name="Fit", line=dict(color='red')), row=1, col=1 ) fig.add_trace( go.Scatter(x=[df[self.fieldWindDirFloor][max_pos]], y=[fitted_ydata[max_pos]], mode='markers', name="Max Pos:"+str(df[self.fieldWindDirFloor][max_pos]), marker=dict(size=20)), row=1, col=1 ) # Second subplot fig.add_trace( go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldPowerRatio0p], mode='markers', name="Base Energy", marker=dict(size=5)), row=2, col=1 ) fig.add_trace( go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldPowerRatio80p], mode='markers', name="Slope Energy", marker=dict(size=5)), row=2, col=1 ) # Update layout fig.update_layout( title={ "text": f"Yaw Error Analysis: {name[0]}", "x": 0.5 }, showlegend=True, margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距 ) # Save to file filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png") fig.write_image(filePathOfImage, scale=3) filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html") fig.write_html(filePathOfHtml) # calc squear error of fitted_ydata and ydata print(name[0], "\t", df[self.fieldWindDirFloor][max_pos]) resultOfTurbine = [name[0], df[self.fieldWindDirFloor][max_pos]] results.append(resultOfTurbine) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfHtml, Field_Return_IsSaveDatabase: False }) # 初始化一个空的DataFrame,指定列名 columns = [Field_NameOfTurbine, Field_YawError] dataFrameResult = pd.DataFrame(results, columns=columns) filePathOfYawError = os.path.join( outputAnalysisDir, f"yaw_error_result{CSVSuffix}") dataFrameResult.to_csv(filePathOfYawError, index=False) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: "total", Field_Return_FilePath: filePathOfYawError, Field_Return_IsSaveDatabase: True }) result_df = pd.DataFrame(result_rows) return result_df