import os import pandas as pd import numpy as np import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodPoint import AnalystWithGoodPoint from scipy.stats import binned_statistic_2d from scipy.stats import skew, kurtosis class YawErrorDensityAnalyst(AnalystWithGoodPoint): """ 风电机组动态偏航策略分析 """ def typeAnalyst(self): return "yaw_error_density" def selectColumns(self): return [Field_DeviceCode,Field_Time,Field_WindSpeed,Field_ActiverPower,Field_YawError] def calculateYawError(self, dataFrame: pd.DataFrame): dataFrame = dataFrame.dropna( subset=[Field_NameOfTurbine, Field_YawError, Field_ActiverPower,Field_WindSpeed]) filtered_dataFrame = dataFrame[(dataFrame[Field_YawError].abs() <= 30)&(dataFrame[Field_WindSpeed] >= 0)&(dataFrame[Field_WindSpeed] <= 25)] x=filtered_dataFrame[Field_YawError] y=filtered_dataFrame[Field_WindSpeed] # data = np.column_stack((x, y)) # 合并为两列数组 # 使用 binned_statistic_2d 来计算散点的密度分布 binSize_x = 60 binSize_y = 50 counts, x_edges, y_edges, binnumber = binned_statistic_2d(x, y, values=None, statistic='count', bins=[binSize_x, binSize_y]) # 将数据密度转化为颜色值 binX = np.digitize(x, x_edges) - 1 binY = np.digitize(y, y_edges) - 1 # 删除超出范围的下标 validIdx = (binX >= 0) & (binX < binSize_x) & (binY >= 0) & (binY < binSize_y) # 获取有效下标的密度 density = np.zeros(len(x)) density[validIdx] = counts[binX[validIdx], binY[validIdx]] # 将结果保存到 result 中 result = { 'x': x, 'y': y, 'density': density } # 将 result 转换为 DataFrame result_df = pd.DataFrame(result) return result_df def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes,conf,self.selectColumns()) dataFrameMerge = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self) # dataFrameMerge.to_csv('D:\dashengkeji\project\WTOAAM\debug_tset_20241008\dataFrameJHS_WOG00935.csv',index=False) return self.yawErrorAnalysis(dataFrameMerge, outputAnalysisDir, conf) def yawErrorAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract): # 检查所需列是否存在 required_columns = {Field_ActiverPower, Field_YawError,Field_WindSpeed} if not required_columns.issubset(dataFrameMerge.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") result_rows = [] grouped = dataFrameMerge.groupby( [Field_NameOfTurbine, Field_CodeOfTurbine]) # 定义固定的颜色映射列表 fixed_colors = [ "#3E409C", "#476CB9", "#3586BF", "#4FA4B5", "#52A3AE", "#60C5A3", "#85D0AE", "#A8DCA2", "#CFEE9E", "#E4F39E", "#EEF9A7", "#FBFFBE", "#FDF1A9", "#FFE286", "#FFC475", "#FCB06C", "#F78F4F", "#F96F4A", "#E4574C", "#CA3756", "#AF254F" ] # 将 fixed_colors 转换为 Plotly 的 colorscale 格式 fixed_colorscale = [ [i / (len(fixed_colors) - 1), color] for i, color in enumerate(fixed_colors) ] for name, group in grouped: df = self.calculateYawError(group) df.dropna(inplace=True) counts = df['density'].value_counts() count_0 = counts.get(0, 0) # 获取 density 为 0 的数量,如果没有 0 则返回 0 count_1 = counts.get(1, 0) # 获取 density 为 1 的数量,如果没有 1 则返回 0 # print(f"Density 为 0 的数量: {count_0}") # print(f"Density 为 1 的数量: {count_1}") df = df[df["density"]>0] mean_X = np.mean(df["x"]) std_X = np.std(df["x"]) mediann_X= np.median(df["x"]) skewness_X = skew(df["x"]) kurtosis_X = kurtosis(df["x"]) max_X = np.max(df["x"]) min_X = np.min(df["x"]) result={ 'mean_X':[mean_X], 'std_X': [std_X], 'mediann_X':[mediann_X], 'skewness_X':[skewness_X], 'kurtosis_X':[kurtosis_X], 'max_X':[max_X], 'min_X':[min_X] } result = pd.DataFrame(result) print(f'{name[0]}指标:',result) # 用密度作为颜色绘制散点图,并限制横坐标范围为 -20 到 20 fig = go.Figure() fig.add_trace(go.Scattergl( x=df["x"], y=df["y"], mode='markers', marker=dict( size=3, opacity=0.7, color=df["density"], colorscale=fixed_colorscale, showscale=True, ) )) fig.update_layout( xaxis_title='对风角度', yaxis_title='风速', title=f'动态偏航误差分析-{name[0]}', xaxis=dict(range=[-20, 20]), # 限制横坐标范围为 -20 到 20 yaxis=dict(range=[0, 25]) ) # Save to file filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png") fig.write_image(filePathOfImage, scale=3) filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html") fig.write_html(filePathOfHtml) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfHtml, Field_Return_IsSaveDatabase: True }) result_df = pd.DataFrame(result_rows) return result_df