123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- import os
- import pandas as pd
- import numpy as np
- import plotly.graph_objects as go
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract
- from behavior.analystWithGoodBadLimitPoint import AnalystWithGoodBadLimitPoint
- from scipy.stats import binned_statistic_2d
- from scipy.stats import skew, kurtosis
- from utils.jsonUtil import JsonUtil
- from scipy.stats import norm, gaussian_kde
- class YawErrorDensityAnalyst(AnalystWithGoodBadLimitPoint):
- """
- 风电机组动态偏航策略分析
- """
- def typeAnalyst(self):
- return "yaw_error_density"
- def selectColumns(self):
- return [Field_DeviceCode,Field_Time,Field_WindSpeed,Field_ActiverPower,Field_YawError]
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- dictionary = self.processTurbineData(turbineCodes,conf,self.selectColumns())
- dataFrameMerge = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self)
- turbineInfos = self.common.getTurbineInfos(conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
- results=self.yawErrorAnalysis(dataFrameMerge, turbineInfos,outputAnalysisDir, conf)
- return results
- def yawErrorAnalysis(self, dataFrameMerge: pd.DataFrame, turbineModelInfo: pd.Series, outputAnalysisDir, conf: Contract):
- # 检查所需列是否存在
- required_columns = {Field_ActiverPower, Field_YawError,Field_WindSpeed}
- if not required_columns.issubset(dataFrameMerge.columns):
- raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}")
- result_rows = []
- grouped = dataFrameMerge.groupby(
- [Field_NameOfTurbine, Field_CodeOfTurbine])
- # 定义固定的颜色映射列表
- fixed_colors = [
- "#3E409C",
- "#476CB9",
- "#3586BF",
- "#4FA4B5",
- "#52A3AE",
- "#60C5A3",
- "#85D0AE",
- "#A8DCA2",
- "#CFEE9E",
- "#E4F39E",
- "#EEF9A7",
- "#FBFFBE",
- "#FDF1A9",
- "#FFE286",
- "#FFC475",
- "#FCB06C",
- "#F78F4F",
- "#F96F4A",
- "#E4574C",
- "#CA3756",
- "#AF254F"
- ]
- # 将 fixed_colors 转换为 Plotly 的 colorscale 格式
- fixed_colorscale = [
- [i / (len(fixed_colors) - 1), color] for i, color in enumerate(fixed_colors)
- ]
-
- for name, group in grouped:
- dataFrame=group[Field_YawError].abs() <= 45
- yawerror=np.mean(dataFrame[Field_YawError])
-
- df = self.calculateYawError(group)
- df.dropna(inplace=True)
- counts = df['density'].value_counts()
- count_0 = counts.get(0, 0) # 获取 density 为 0 的数量,如果没有 0 则返回 0
- count_1 = counts.get(1, 0) # 获取 density 为 1 的数量,如果没有 1 则返回 0
- # print(f"Density 为 0 的数量: {count_0}")
- # print(f"Density 为 1 的数量: {count_1}")
- df = df[df["density"]>0]
- mean_X = np.mean(df["x"])
- std_X = np.std(df["x"])
- mediann_X= np.median(df["x"])
- skewness_X = skew(df["x"])
- kurtosis_X = kurtosis(df["x"])
- max_X = np.max(df["x"])
- min_X = np.min(df["x"])
- result={
- 'mean_X':[mean_X],
- 'std_X': [std_X],
- 'mediann_X':[mediann_X],
- 'skewness_X':[skewness_X],
- 'kurtosis_X':[kurtosis_X],
- 'max_X':[max_X],
- 'min_X':[min_X]
- }
- result = pd.DataFrame(result)
- # 用密度作为颜色绘制散点图,并限制横坐标范围为 -20 到 20
- fig = go.Figure()
- fig.add_trace(go.Scattergl(
- x=df["x"],
- y=df["y"],
- mode='markers',
- marker=dict(
- size=3,
- opacity=0.7,
- color=df["density"],
- colorscale=fixed_colorscale,
- showscale=True,
- )
- ))
- fig.update_layout(
- xaxis_title='对风角度',
- yaxis_title='风速',
- title=f'动态偏航误差分析-{name[0]}',
- xaxis=dict(range=[-20, 20]), # 限制横坐标范围为 -20 到 20
- yaxis=dict(range=[0, 25])
- )
- # 确保从 Series 中提取的是具体的值
- engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "")
- if isinstance(engineTypeCode, pd.Series):
- engineTypeCode = engineTypeCode.iloc[0]
- engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "")
- if isinstance(engineTypeName, pd.Series):
- engineTypeName = engineTypeName.iloc[0]
- # 构建最终的JSON对象
- json_output = {
- "analysisTypeCode": "动态偏航误差",
- "engineCode": engineTypeCode,
- "engineTypeName": engineTypeName,
- "xaixs": "对风角度(°)",
- "yaixs": "风速(m/s)",
- "data": [{
- "engineName": name[0],
- "engineCode": name[1],
- "title":f'动态偏航误差分析-{name[0]}',
- "xData": df["x"].tolist(),
- "yData": df["y"].tolist(),
- "colorbar": df["density"].tolist(),
- "colorbartitle": "密度"
-
- }]
- }
- # 使用 gaussian_kde 估计数据的概率密度函数
- kde = gaussian_kde(df["x"])
- x = np.linspace(-30, 30, 1000) # 生成 x 轴数据
- pdf_data = kde(x) # 数据的概率密度函数
- # 构建最终的JSON对象2
- json_output2 = {
- "analysisTypeCode": "动态偏航误差",
- "engineCode": engineTypeCode,
- "engineTypeName": engineTypeName,
- "xaixs": "对风角度(°)",
- "yaixs": "概率密度函数",
- "data": [{
- "engineName": name[0],
- "engineCode": name[1],
- "title":f'概率密度函数-{name[0]}',
- "xData": x .tolist(),
- "yData": pdf_data.tolist(),
- "xrange":[-30,30]
-
- }]
- }
-
- # Save to file
- filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png")
- fig.write_image(filePathOfImage, scale=3)
- # filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html")
- # fig.write_html(filePathOfHtml)
- # 将JSON对象保存到文件
- output_json_path = os.path.join(outputAnalysisDir, f"{name[0]}.json")
- with open(output_json_path, 'w', encoding='utf-8') as f:
- import json
- json.dump(json_output, f, ensure_ascii=False, indent=4)
- # 将JSON对象2保存到文件
- output_json_path2 = os.path.join(outputAnalysisDir, f"PDF-{name[0]}.json")
- with open(output_json_path2, 'w', encoding='utf-8') as f:
- import json
- json.dump(json_output2, f, ensure_ascii=False, indent=4)
- # 如果需要返回DataFrame,可以包含文件路径
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: output_json_path,
- Field_Return_IsSaveDatabase: True
- })
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: output_json_path2,
- Field_Return_IsSaveDatabase: True
- })
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: filePathOfImage,
- Field_Return_IsSaveDatabase: False
- })
- # result_rows.append({
- # Field_Return_TypeAnalyst: self.typeAnalyst(),
- # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- # Field_CodeOfTurbine: name[1],
- # Field_Return_FilePath: filePathOfHtml,
- # Field_Return_IsSaveDatabase: True
-
-
- # })
- result_df = pd.DataFrame(result_rows)
- return result_df
-
- def calculateYawError(self, dataFrame: pd.DataFrame):
-
- dataFrame = dataFrame.dropna(
- subset=[Field_NameOfTurbine, Field_YawError, Field_ActiverPower,Field_WindSpeed])
- filtered_dataFrame = dataFrame[(dataFrame[Field_YawError].abs() <= 30)&(dataFrame[Field_WindSpeed] >= 0)&(dataFrame[Field_WindSpeed] <= 25)]
- x=filtered_dataFrame[Field_YawError]
- y=filtered_dataFrame[Field_WindSpeed]
- # data = np.column_stack((x, y)) # 合并为两列数组
- # 使用 binned_statistic_2d 来计算散点的密度分布
- binSize_x = 60
- binSize_y = 50
- counts, x_edges, y_edges, binnumber = binned_statistic_2d(x, y, values=None, statistic='count', bins=[binSize_x, binSize_y])
- # 将数据密度转化为颜色值
- binX = np.digitize(x, x_edges) - 1
- binY = np.digitize(y, y_edges) - 1
- # 删除超出范围的下标
- validIdx = (binX >= 0) & (binX < binSize_x) & (binY >= 0) & (binY < binSize_y)
- # 获取有效下标的密度
- density = np.zeros(len(x))
- density[validIdx] = counts[binX[validIdx], binY[validIdx]]
- # 将结果保存到 result 中
- result = {
- 'x': x,
- 'y': y,
- 'density': density
- }
- # 将 result 转换为 DataFrame
- result_df = pd.DataFrame(result)
-
- return result_df
|