import os import pandas as pd import math import numpy as np from plotly.subplots import make_subplots import plotly.express as px import pandas as pd import plotly.graph_objects as go import seaborn as sns from matplotlib.ticker import MultipleLocator from behavior.analystWithGoodPoint import AnalystWithGoodPoint from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * from algorithmContract.contract import Contract class TSRAnalyst(AnalystWithGoodPoint): """ 风电机组叶尖速比分析 """ def typeAnalyst(self): return "tsr" def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, [ Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower,Field_RotorSpeed,Field_GeneratorSpeed]) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) # 检查所需列是否存在 required_columns = {Field_WindSpeed, Field_RotorSpeed,Field_PowerFloor,Field_GeneratorSpeed} if not required_columns.issubset(dataFrameOfTurbines.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode) returnDatas = [] for turbineModelCode, group in groupedOfTurbineModel: currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist() currTurbineModeInfo = self.common.getTurbineModelByCode( turbineModelCode, self.turbineModelInfo) currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin( currTurbineCodes)] #创建一个与currDataFrameOfTurbines相同的dataFrameMerge dataFrameMerge=currDataFrameOfTurbines.copy() # return self.plot_tsr_distribution(self.tsr(dataFrameMerge), outputAnalysisDir, conf) dataFrameMerge[Field_PowerFarmName] = self.currPowerFarmInfo.loc[Field_PowerFarmName] # Calculate 'power_floor' dataFrameMerge[Field_PowerFloor] = ( dataFrameMerge[Field_ActiverPower] / 10).astype(int) * 10 # Ensure the necessary columns are of float type dataFrameMerge[Field_WindSpeed] = dataFrameMerge[Field_WindSpeed].astype(float) dataFrameMerge[Field_RotorSpeed] = dataFrameMerge[Field_RotorSpeed].astype(float) dataFrameMerge[Field_GeneratorSpeed] = dataFrameMerge[Field_GeneratorSpeed].astype(float) # Group by 'power_floor' and calculate median, max, and min of TSR grouped = dataFrameMerge.groupby([Field_PowerFloor, Field_CodeOfTurbine, Field_NameOfTurbine]).agg({ Field_WindSpeed: 'mean', Field_RotorSpeed: 'median', Field_GeneratorSpeed: 'median', Field_TSR: ['mean', 'max', 'min'], Field_PowerFarmName: 'max' }).reset_index() # Rename columns for clarity post aggregation grouped.columns = [Field_PowerFloor, Field_CodeOfTurbine, Field_NameOfTurbine, Field_WindSpeed, Field_RotorSpeed, Field_GeneratorSpeed, Field_TSR, Field_TSRMax, Field_TSRMin, Field_PowerFarmName] # Sort by 'power_floor' grouped = grouped.sort_values(by=[Field_CodeOfTurbine, Field_PowerFloor]) returnData = self.plot_tsr_distribution( grouped, outputAnalysisDir, conf, currTurbineModeInfo) returnDatas.append(returnData) returnResult = pd.concat(returnDatas, ignore_index=True) return returnResult #------------------------------------------ # dictionary = self.processTurbineData(turbineCodes,conf,[Field_DeviceCode,Field_Time,Field_WindSpeed,Field_ActiverPower,Field_RotorSpeed,Field_GeneratorSpeed]) # dataFrameMerge = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self) # # return self.plot_tsr_distribution(self.tsr(dataFrameMerge), outputAnalysisDir, conf) # dataFrameMerge[Field_PowerFarmName] = self.currPowerFarmInfo.loc[Field_PowerFarmName] # # Calculate 'power_floor' # dataFrameMerge[Field_PowerFloor] = ( # dataFrameMerge[Field_ActiverPower] / 10).astype(int) * 10 # # Ensure the necessary columns are of float type # dataFrameMerge[Field_WindSpeed] = dataFrameMerge[Field_WindSpeed].astype(float) # dataFrameMerge[Field_RotorSpeed] = dataFrameMerge[Field_RotorSpeed].astype(float) # dataFrameMerge[Field_GeneratorSpeed] = dataFrameMerge[Field_GeneratorSpeed].astype(float) # # Group by 'power_floor' and calculate median, max, and min of TSR # grouped = dataFrameMerge.groupby([Field_PowerFloor, Field_CodeOfTurbine, Field_NameOfTurbine]).agg({ # Field_WindSpeed: 'median', # Field_RotorSpeed: 'median', # Field_GeneratorSpeed: 'median', # Field_TSR: ['median', 'max', 'min'], # Field_PowerFarmName: 'max' # }).reset_index() # # Rename columns for clarity post aggregation # grouped.columns = [Field_PowerFloor, Field_CodeOfTurbine, Field_NameOfTurbine, Field_WindSpeed, # Field_RotorSpeed, Field_GeneratorSpeed, Field_TSR, Field_TSRMax, Field_TSRMin, Field_PowerFarmName] # # Sort by 'power_floor' # grouped = grouped.sort_values(by=[Field_CodeOfTurbine, Field_PowerFloor]) # return self.plot_tsr_distribution(grouped, outputAnalysisDir, conf) def plot_tsr_distribution(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract, turbineModelInfo: pd.Series): """ Generates tsr distribution plots for turbines in a wind farm. Parameters: - csvFileDirOfCp: str, path to the directory containing input CSV files. - farm_name: str, name of the wind farm. - encoding: str, encoding of the input CSV files. Defaults to 'utf-8'. """ x_name = Field_PowerFloor y_name = Field_TSR upLimitOfTSR = 20 # 创建一个列表来存储各个风电机组的数据 turbine_data_list = [] # 绘制全场TSR分布图 fig = go.Figure() # colors = px.colors.sequential.Turbo # 遍历不同的turbine来添加线条 for turbine in dataFrameMerge[Field_NameOfTurbine].unique(): turbine_data = dataFrameMerge[dataFrameMerge[Field_NameOfTurbine] == turbine] fig.add_trace(go.Scatter(x=turbine_data[x_name], y=turbine_data[y_name], mode='lines', # line=dict(color=colors[idx % len(colors)]), name=turbine)) # 提取数据 turbine_data_total = { "engineName": turbine, "engineCode": turbine_data[Field_CodeOfTurbine].iloc[0], "xData": turbine_data[x_name].tolist(), "yData": turbine_data[y_name].tolist(), } turbine_data_list.append(turbine_data_total) fig.update_layout( title={ "text": f'叶尖速比分布-{turbineModelInfo[Field_MachineTypeCode]}', 'x': 0.5 }, xaxis=dict( title='最小功率', dtick=200, tickangle=-45, range=[0, 1800]), yaxis=dict( title='叶尖速比', dtick=self.axisStepTSR, range=[self.axisLowerLimitTSR, self.axisUpperLimitTSR] ), legend=dict( orientation="h", # Horizontal orientation xanchor="center", # Anchor the legend to the center x=0.5, # Position legend at the center of the x-axis y=-0.2, # Position legend below the x-axis # itemsizing='constant', # Keep the size of the legend entries constant # itemwidth=50 ) ) # 设置x轴标签旋转 fig.update_xaxes(tickangle=-45) engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "风电机组叶尖速比分析", "typecode": turbineModelInfo[Field_MillTypeCode], "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "title": f'叶尖速比分布-{turbineModelInfo[Field_MachineTypeCode]}', "xaixs": "最小功率(kW)", "yaixs": "叶尖速比", "data": turbine_data_list } # 保存图形 # fig.write_image(csvFileDirOfCp + r"/{}-TSR-Distibute.png".format(confData.farm_name),format='png',width=800, height=500,scale=3) # fig.show() # 保存HTML # htmlFileName = f"{dataFrameMerge[Field_PowerFarmName].iloc[0]}-TSR-Distribution-{turbineModelInfo[Field_MillTypeCode]}.html" #htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) #fig.write_html(htmlFilePath) result_rows = [] # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: 'total', # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: True # }) # 将JSON对象保存到文件 output_json_path = os.path.join(outputAnalysisDir, f"{turbineModelInfo[Field_MillTypeCode]}.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: 'total', Field_MillTypeCode: turbineModelInfo[Field_MillTypeCode], Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) # 绘制每个设备的TSR分布图 for name, group in dataFrameMerge.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]): fig = go.Figure() # 创建一个列表来存储各个风电机组的数据 turbine_data_list_each = [] # 循环绘制turbine的线条 for turbine in dataFrameMerge[Field_NameOfTurbine].unique(): turbine_data = dataFrameMerge[dataFrameMerge[Field_NameOfTurbine] == turbine] fig.add_trace(go.Scatter(x=turbine_data[x_name], y=turbine_data[y_name], mode='lines', line=dict(color='lightgrey'), showlegend=False)) # 提取数据 turbine_data_each = { "engineName": turbine, "engineCode": turbine_data[Field_CodeOfTurbine].iloc[0], "xData": turbine_data[x_name].tolist(), "yData": turbine_data[y_name].tolist(), } turbine_data_list_each.append(turbine_data_each) fig.add_trace(go.Scatter(x=group[x_name], y=group[y_name], mode='lines', line=dict(color='darkblue'), showlegend=False)) fig.update_layout( title={"text": '机组: {}'.format(name[0])}, # margin=dict( # t=35, # 顶部 margin,减小这个值可以使标题更靠近图形 # l=60, # 左侧 margin # r=60, # 右侧 margin # b=40, # 底部 margin # ), xaxis=dict( title='功率', dtick=200, tickangle=-45, range=[0, 1800]), yaxis=dict( title='叶尖速比', dtick=self.axisStepTSR, range=[self.axisLowerLimitTSR, self.axisUpperLimitTSR] ) ) fig.update_xaxes(tickangle=-45) engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "风电机组叶尖速比分析", "typecode": turbineModelInfo[Field_MillTypeCode], "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "title": f'机组:{format(name[0])}', "xaixs": "功率(kW)", "yaixs": "叶尖速比", "data": turbine_data_list_each } # 保存图像 # pngFileName = f"{name[0]}.png" # pngFilePath = os.path.join(outputAnalysisDir, pngFileName) # fig.write_image(pngFilePath, scale=3) # 保存HTML # htmlFileName = f"{name[0]}.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: name[1], # Field_Return_FilePath: pngFilePath, # Field_Return_IsSaveDatabase: False # }) # 将JSON对象保存到文件 output_json_path_each = os.path.join(outputAnalysisDir, f"{name[0]}.json") with open(output_json_path_each, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: output_json_path_each, Field_Return_IsSaveDatabase: True }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: name[1], # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: True # }) result_df = pd.DataFrame(result_rows) return result_df