123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- import os
- import pandas as pd
- import numpy as np
- import plotly.graph_objects as go
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract
- from behavior.analystWithGoodPoint import AnalystWithGoodPoint
- from plotly.subplots import make_subplots
- from scipy.optimize import curve_fit
- class YawErrorAnalyst(AnalystWithGoodPoint):
- """
- 风电机组静态偏航误差分析
- """
- fieldWindDirFloor = 'wind_dir_floor'
- fieldPower = 'power'
- fieldPowerMean = 'mean_power'
- fieldPowerMax = 'max_power'
- fieldPowerMin = 'min_power'
- fieldPowerMedian = 'median_power'
- fieldPowerGT0p = 'power_gt_0'
- fieldPowerGT80p = 'power_gt_80p'
- fieldPowerRatio0p = 'ratio_0'
- fieldPowerRatio80p = 'ratio_80p'
- fieldSlop = 'slop'
- def typeAnalyst(self):
- return "yaw_error"
- def selectColumns(self):
- return [Field_DeviceCode,Field_Time,Field_WindSpeed,Field_ActiverPower,Field_AngleIncluded]
- def filterCommon(self, dataFrame: pd.DataFrame, conf: Contract):
- dataFrame = dataFrame[~((dataFrame[Field_AngleIncluded].abs() >= 15))]
- return dataFrame
- def calculateYawError(self, dataFrame: pd.DataFrame, fieldAngleInclude, fieldActivePower):
- dataFrame = dataFrame.dropna(
- subset=[Field_NameOfTurbine, fieldAngleInclude, fieldActivePower])
- # Calculate floor values and other transformations
- dataFrame[self.fieldWindDirFloor] = np.floor(
- dataFrame[fieldAngleInclude]).astype(int)
- dataFrame[self.fieldPower] = dataFrame[fieldActivePower].astype(float)
- # Calculate aggregated metrics for power
- grouped = dataFrame.groupby(self.fieldWindDirFloor).agg({
- Field_NameOfTurbine: ['min'],
- self.fieldPower: ['mean', 'max', 'min', 'median', lambda x: (
- x > 0).sum(), lambda x: (x > x.median()).sum()]
- }).reset_index()
- # Rename columns for clarity
- grouped.columns = [self.fieldWindDirFloor, Field_NameOfTurbine, self.fieldPowerMean, self.fieldPowerMax,
- self.fieldPowerMin, self.fieldPowerMedian, self.fieldPowerGT0p, self.fieldPowerGT80p]
- # Calculate total sums for conditions
- power_gt_0_sum = grouped[self.fieldPowerGT0p].sum()
- power_gt_80p_sum = grouped[self.fieldPowerGT80p].sum()
- # Calculate ratios
- grouped[self.fieldPowerRatio0p] = grouped[self.fieldPowerGT0p] / \
- power_gt_0_sum
- grouped[self.fieldPowerRatio80p] = grouped[self.fieldPowerGT80p] / \
- power_gt_80p_sum
- # Filter out zero ratios and calculate slope
- grouped = grouped[grouped[self.fieldPowerRatio0p] > 0]
- grouped[self.fieldSlop] = grouped[self.fieldPowerRatio80p] / \
- grouped[self.fieldPowerRatio0p]
- # Sort by wind direction floor
- grouped.sort_values(self.fieldWindDirFloor, inplace=True)
- # # Write to CSV
- # grouped.to_csv(output_path, index=False)
- return grouped
- def poly_func(self, x, a, b, c, d, e):
- return a * x**4 + b * x ** 3 + c * x ** 2 + d * x + e
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- dictionary = self.processTurbineData(turbineCodes,conf,self.selectColumns())
- dataFrameMerge = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self)
- return self.yawErrorAnalysis(dataFrameMerge, outputAnalysisDir, conf)
- def yawErrorAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract):
- # 检查所需列是否存在
- required_columns = {Field_ActiverPower, Field_YawError}
- if not required_columns.issubset(dataFrameMerge.columns):
- raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}")
- results = []
- result_rows = []
- grouped = dataFrameMerge.groupby(
- [Field_NameOfTurbine, Field_CodeOfTurbine])
- for name, group in grouped:
- df = self.calculateYawError(
- group, Field_YawError, Field_ActiverPower)
- df.dropna(inplace=True)
- # drop extreme value, the 3 largest and 3 smallest
- df_ = df[df[self.fieldPowerRatio80p] > 0.000]
- xdata = df_[self.fieldWindDirFloor]
- ydata = df_[self.fieldSlop]
- # make ydata smooth
- ydata = ydata.rolling(7).median()[6:]
- xdata = xdata[3:-3]
- if len(xdata) <= 0 and len(ydata) <= 0:
- continue
- # Curve fitting
- popt, pcov = curve_fit(self.poly_func, xdata, ydata)
- # popt contains the optimized parameters a, b, and c
- # Generate fitted y-dataFrame using the optimized parameters
- fitted_ydata = self.poly_func(xdata, *popt)
- # get the max value of fitted_ydata and its index
- max_pos = fitted_ydata.idxmax()
- # Create a subplot with two rows
- fig = make_subplots(rows=2, cols=1)
- # First subplot
- fig.add_trace(
- go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldSlop],
- mode='markers', name="Energy Gain", marker=dict(size=5)),
- row=1, col=1
- )
- fig.add_trace(
- go.Scatter(x=xdata, y=fitted_ydata, mode='lines',
- name="Fit", line=dict(color='red')),
- row=1, col=1
- )
- fig.add_trace(
- go.Scatter(x=[df[self.fieldWindDirFloor][max_pos]], y=[fitted_ydata[max_pos]],
- mode='markers', name="Max Pos:"+str(df[self.fieldWindDirFloor][max_pos]),
- marker=dict(size=20)),
- row=1, col=1
- )
- # Second subplot
- fig.add_trace(
- go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldPowerRatio0p],
- mode='markers', name="Base Energy", marker=dict(size=5)),
- row=2, col=1
- )
- fig.add_trace(
- go.Scatter(x=df[self.fieldWindDirFloor], y=df[self.fieldPowerRatio80p],
- mode='markers', name="Slope Energy", marker=dict(size=5)),
- row=2, col=1
- )
- # Update layout
- fig.update_layout(
- title={
- "text": f"Yaw Error Analysis: {name[0]}",
- "x": 0.5
- },
- showlegend=True,
- margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距
- )
- # Save to file
- filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png")
- fig.write_image(filePathOfImage, scale=3)
- filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html")
- fig.write_html(filePathOfHtml)
- # calc squear error of fitted_ydata and ydata
- print(name[0], "\t", df[self.fieldWindDirFloor][max_pos])
- resultOfTurbine = [name[0], df[self.fieldWindDirFloor][max_pos]]
- results.append(resultOfTurbine)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: filePathOfImage,
- Field_Return_IsSaveDatabase: False
- })
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: filePathOfHtml,
- Field_Return_IsSaveDatabase: False
- })
- # 初始化一个空的DataFrame,指定列名
- columns = [Field_NameOfTurbine, Field_YawError]
- dataFrameResult = pd.DataFrame(results, columns=columns)
- filePathOfYawError = os.path.join(
- outputAnalysisDir, f"yaw_error_result{CSVSuffix}")
- dataFrameResult.to_csv(filePathOfYawError, index=False)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: "total",
- Field_Return_FilePath: filePathOfYawError,
- Field_Return_IsSaveDatabase: True
- })
- result_df = pd.DataFrame(result_rows)
- return result_df
|