| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- import os
- import numpy as np
- from plotly.subplots import make_subplots
- import plotly.graph_objects as go
- from scipy.optimize import curve_fit
- import matplotlib.pyplot as plt
- import pandas as pd
- from behavior.analyst import Analyst
- from utils.directoryUtil import DirectoryUtil as dir
- from algorithmContract.confBusiness import *
- class YawErrorAnalyst(Analyst):
- """
- 风电机组静态偏航误差分析
- """
- fieldWindDirFloor = 'wind_dir_floor'
- fieldPower = 'power'
- fieldPowerMean = 'mean_power'
- fieldPowerMax = 'max_power'
- fieldPowerMin = 'min_power'
- fieldPowerMedian = 'median_power'
- fieldPowerGT0p = 'power_gt_0'
- fieldPowerGT80p = 'power_gt_80p'
- fieldPowerRatio0p = 'ratio_0'
- fieldPowerRatio80p = 'ratio_80p'
- fieldSlop = 'slop'
- def typeAnalyst(self):
- return "yaw_error"
- def filterCommon(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
- dataFrame = super().filterCommon(dataFrame, confData)
- dataFrame = dataFrame[~((dataFrame[Field_AngleIncluded].abs() >= 90))]
- return dataFrame
- def calculateYawError(self, dataFrame: pd.DataFrame, fieldAngleInclude, fieldActivePower):
- dataFrame = dataFrame.dropna(
- subset=[Field_NameOfTurbine, fieldAngleInclude, fieldActivePower])
- # Calculate floor values and other transformations
- dataFrame[self.fieldWindDirFloor] = np.floor(
- dataFrame[fieldAngleInclude]).astype(int)
- dataFrame[self.fieldPower] = dataFrame[fieldActivePower].astype(float)
- # Calculate aggregated metrics for power
- grouped = dataFrame.groupby(self.fieldWindDirFloor).agg({
- Field_NameOfTurbine: ['min'],
- self.fieldPower: ['mean', 'max', 'min', 'median', lambda x: (
- x > 0).sum(), lambda x: (x > x.median()).sum()]
- }).reset_index()
- # Rename columns for clarity
- grouped.columns = [self.fieldWindDirFloor, Field_NameOfTurbine, self.fieldPowerMean, self.fieldPowerMax,
- self.fieldPowerMin, self.fieldPowerMedian, self.fieldPowerGT0p, self.fieldPowerGT80p]
- # Calculate total sums for conditions
- power_gt_0_sum = grouped[self.fieldPowerGT0p].sum()
- power_gt_80p_sum = grouped[self.fieldPowerGT80p].sum()
- # Calculate ratios
- grouped[self.fieldPowerRatio0p] = grouped[self.fieldPowerGT0p] / \
- power_gt_0_sum
- grouped[self.fieldPowerRatio80p] = grouped[self.fieldPowerGT80p] / \
- power_gt_80p_sum
- # Filter out zero ratios and calculate slope
- grouped = grouped[grouped[self.fieldPowerRatio0p] > 0]
- grouped[self.fieldSlop] = grouped[self.fieldPowerRatio80p] / \
- grouped[self.fieldPowerRatio0p]
- # Sort by wind direction floor
- grouped.sort_values(self.fieldWindDirFloor, inplace=True)
- # # Write to CSV
- # grouped.to_csv(output_path, index=False)
- return grouped
- def poly_func(self, x, a, b, c, d, e):
- return a * x**4 + b * x ** 3 + c * x ** 2 + d * x + e
- def turbinesAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness):
- self.yawErrorAnalysis(dataFrameMerge, outputAnalysisDir, confData)
- def yawErrorAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness):
- # 检查所需列是否存在
- required_columns = {confData.field_power,confData.field_angle_included}
- if not required_columns.issubset(dataFrameMerge.columns):
- raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}")
-
- results = []
- grouped = dataFrameMerge.groupby(Field_NameOfTurbine)
- for name, group in grouped:
- df = self.calculateYawError(
- group, confData.field_angle_included, confData.field_power)
- df.dropna(inplace=True)
- # drop extreme value, the 3 largest and 3 smallest
- df_ = df[df[self.fieldPowerRatio80p] > 0.000]
- xdata = df_[self.fieldWindDirFloor]
- ydata = df_[self.fieldSlop]
- # make ydata smooth
- ydata = ydata.rolling(7).median()[6:]
- xdata = xdata[3:-3]
- if len(xdata) <= 0 and len(ydata) <= 0:
- continue
- # Curve fitting
- popt, pcov = curve_fit(self.poly_func, xdata, ydata)
- # popt contains the optimized parameters a, b, and c
- # Generate fitted y-dataFrame using the optimized parameters
- fitted_ydata = self.poly_func(xdata, *popt)
- # get the max value of fitted_ydata and its index
- max_pos = fitted_ydata.idxmax()
- # Create a subplot with two rows
- fig = make_subplots(rows=2, cols=1)
- # First subplot
- fig.add_trace(
- go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldSlop],
- mode='markers', name="energy gain", marker=dict(size=5)),
- row=1, col=1
- )
- fig.add_trace(
- go.Scatter(x=xdata, y=fitted_ydata, mode='lines', name="fit", line=dict(color='red')),
- row=1, col=1
- )
- fig.add_trace(
- go.Scatter(x=[df[self.fieldWindDirFloor][max_pos]], y=[fitted_ydata[max_pos]],
- mode='markers', name="max pos:"+str(df[self.fieldWindDirFloor][max_pos]),
- marker=dict(size=20)),
- row=1, col=1
- )
- # Second subplot
- fig.add_trace(
- go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldPowerRatio0p],
- mode='markers', name="base energy", marker=dict(size=5)),
- row=2, col=1
- )
- fig.add_trace(
- go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldPowerRatio80p],
- mode='markers', name="slope energy", marker=dict(size=5)),
- row=2, col=1
- )
- # Update layout
- fig.update_layout(
- title={
- "text": f"Yaw Error Analysis {name}",
- "x": 0.5
- },
- showlegend=True,
- margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距
- )
- # Save to file
- fig.write_image(os.path.join(outputAnalysisDir, f"{name}.png"))
- # calc squear error of fitted_ydata and ydata
- print(name, "\t", df[self.fieldWindDirFloor][max_pos])
- resultOfTurbine = [name, df[self.fieldWindDirFloor][max_pos]]
- results.append(resultOfTurbine)
- # 初始化一个空的DataFrame,指定列名
- columns = [Field_NameOfTurbine, Field_YawError]
- dataFrameResult = pd.DataFrame(results, columns=columns)
- filePathOfYawError = os.path.join(
- outputAnalysisDir, f"yaw_error_result{CSVSuffix}")
- dataFrameResult.to_csv(filePathOfYawError, index=False)
|