import os import pandas as pd import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodPoint import AnalystWithGoodPoint from plotly.subplots import make_subplots class CpAnalyst(AnalystWithGoodPoint): """ 风电机组风能利用系数分析 """ def typeAnalyst(self): return "cp" def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, [ Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower]) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) # 检查所需列是否存在 required_columns = {Field_WindSpeed, Field_Cp, Field_PowerFloor} if not required_columns.issubset(dataFrameOfTurbines.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode) returnDatas = [] for turbineModelCode, group in groupedOfTurbineModel: currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist() currTurbineModeInfo = self.common.getTurbineModelByCode( turbineModelCode, self.turbineModelInfo) dataFrameOfContractPowerCurve = self.dataFrameContractOfTurbine[ self.dataFrameContractOfTurbine[Field_MillTypeCode] == turbineModelCode] currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin( currTurbineCodes)] returnData = self.drawLineGraphForTurbine( currDataFrameOfTurbines, outputAnalysisDir, conf, currTurbineModeInfo,dataFrameOfContractPowerCurve) returnDatas.append(returnData) returnResult = pd.concat(returnDatas, ignore_index=True) return returnResult def drawLineGraphForTurbine(self, dataFrameOfTurbines: pd.DataFrame, outputAnalysisDir, conf: Contract, turbineModelInfo: pd.Series,dataFrameOfContractPowerCurve:pd.DataFrame): upLimitOfPower = self.turbineInfo[Field_RatedPower].max() * 0.9 grouped = dataFrameOfTurbines.groupby([Field_CodeOfTurbine, Field_PowerFloor]).agg( cp=('cp', 'median'), cp_max=('cp', 'max'), cp_min=('cp', 'min'), ).reset_index() # Rename columns post aggregation for clarity grouped.columns = [Field_CodeOfTurbine, Field_PowerFloor, Field_CpMedian, 'cp_max', 'cp_min'] # Sort by power_floor grouped = grouped.sort_values( by=[Field_CodeOfTurbine, Field_PowerFloor]) # Create Subplots fig = make_subplots(specs=[[{"secondary_y": False}]]) # 创建一个列表来存储各个风电机组的数据 turbine_data_list = [] # colors = px.colors.sequential.Turbo # Plotting the turbine lines for turbineCode in grouped[Field_CodeOfTurbine].unique(): turbine_data = grouped[grouped[Field_CodeOfTurbine] == turbineCode] currTurbineInfo = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo) fig.add_trace( go.Scatter(x=turbine_data[Field_PowerFloor], y=turbine_data[Field_CpMedian], mode='lines', # line=dict(color=colors[idx % len(colors)]), name=currTurbineInfo[Field_NameOfTurbine]) ) # 提取数据 turbine_data_total = { "engineName": currTurbineInfo[Field_NameOfTurbine], "engineCode": turbineCode, "xData": turbine_data[Field_PowerFloor].tolist(), "yData": turbine_data[Field_CpMedian].tolist(), } turbine_data_list.append(turbine_data_total) # Plotting the contract guarantee Cp curve fig.add_trace( go.Scatter(x=dataFrameOfContractPowerCurve[Field_PowerFloor], y=dataFrameOfContractPowerCurve[Field_Cp], # mode='lines', # line=dict(color='red', dash='dash'), mode='lines+markers', line=dict(color='red'), marker=dict(color='red', size=5), name='合同功率曲线' ), secondary_y=False, ) # Update layout fig.update_layout( title={ 'text': f'风能利用系数分布-{turbineModelInfo[Field_MachineTypeCode]}', 'x': 0.5, # 标题位置居中 }, xaxis_title='功率', yaxis_title='风能利用系数', # legend_title='Turbine', xaxis=dict(range=[0, upLimitOfPower], tickangle=-45), yaxis=dict( dtick=self.axisStepCp, range=[self.axisLowerLimitCp, self.axisUpperLimitCp] ), legend=dict( orientation="h", # Horizontal orientation xanchor="center", # Anchor the legend to the center x=0.5, # Position legend at the center of the x-axis y=-0.2, # Position legend below the x-axis # itemsizing='constant', # Keep the size of the legend entries constant # itemwidth=50 ) ) engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "风电机组风能利用系数分析", "typecode": turbineModelInfo[Field_MillTypeCode], "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "title": f'风能利用系数分布-{turbineModelInfo[Field_MachineTypeCode]}', "xaixs": "功率(kW)", "yaixs": "风能利用系数", "contract_Cp_curve_xData": dataFrameOfContractPowerCurve[Field_PowerFloor].tolist(), "contract_Cp_curve_yData": dataFrameOfContractPowerCurve[Field_Cp].tolist(), "data": turbine_data_list } # 将JSON对象保存到文件 output_json_path = os.path.join(outputAnalysisDir, f"{turbineModelInfo[Field_MillTypeCode]}.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 保存html # htmlFileName = f"{self.powerFarmInfo[Field_PowerFarmName].iloc[0]}-{turbineModelInfo[Field_MillTypeCode]}-Cp-Distribution.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) result_rows = [] # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: Const_Output_Total, # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: True # }) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: 'total', Field_MillTypeCode: turbineModelInfo[Field_MillTypeCode], Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) # Individual turbine graphs for turbineCode, group in grouped.groupby(Field_CodeOfTurbine): fig = go.Figure() currTurbineInfo = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo) # 创建一个列表来存储各个风电机组的数据 turbine_data_list_each = [] # Flag to add legend only once # add_legend = True # Plot other turbines data for other_name, other_group in grouped[grouped[Field_CodeOfTurbine] != turbineCode].groupby(Field_CodeOfTurbine): tempTurbineInfo = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, other_name, self.turbineInfo) fig.add_trace( go.Scatter( x=other_group[Field_PowerFloor], y=other_group[Field_CpMedian], mode='lines', # name='Other Turbines' if add_legend else '', line=dict(color='lightgray', width=1), showlegend=False ) ) # 提取数据 turbine_data_other_each = { "engineName": tempTurbineInfo[Field_NameOfTurbine], "engineCode": other_name, "xData": other_group[Field_PowerFloor].tolist(), "yData": other_group[Field_CpMedian].tolist(), } turbine_data_list_each.append(turbine_data_other_each) add_legend = False # Only add legend item for the first other turbine # Add trace for the current turbine fig.add_trace( go.Scatter(x=group[Field_PowerFloor], y=group[Field_CpMedian], mode='lines', name=currTurbineInfo[Field_NameOfTurbine], line=dict(color='darkblue')) ) turbine_data_curr = { "engineName": currTurbineInfo[Field_NameOfTurbine], "engineCode": currTurbineInfo[Field_CodeOfTurbine], "xData": group[Field_PowerFloor].tolist(), "yData": group[Field_CpMedian].tolist(), } turbine_data_list_each.append(turbine_data_curr) fig.add_trace( go.Scatter(x=dataFrameOfContractPowerCurve[Field_PowerFloor], y=dataFrameOfContractPowerCurve[Field_Cp], mode='lines+markers', name='合同功率曲线', marker=dict(color='red', size=5), line=dict(color='red')) ) fig.update_layout( title={ 'text': f'机组: {currTurbineInfo[Field_NameOfTurbine]}', # 'x': 0.5, # 标题位置居中 }, xaxis_title='功率', yaxis_title='风能利用系数', xaxis=dict(range=[0, upLimitOfPower], tickangle=-45), yaxis=dict( dtick=self.axisStepCp, range=[self.axisLowerLimitCp, self.axisUpperLimitCp] # axisStepCp ), legend=dict(x=1.05, y=0.5) ) engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "风电机组风能利用系数分析", "typecode": turbineModelInfo[Field_MillTypeCode], "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "title": f'机组: {currTurbineInfo[Field_NameOfTurbine]}', "xaixs": "功率(kW)", "yaixs": "风能利用系数", "contract_Cp_curve_xData": dataFrameOfContractPowerCurve[Field_PowerFloor].tolist(), "contract_Cp_curve_yData": dataFrameOfContractPowerCurve[Field_Cp].tolist(), "data": turbine_data_list_each } # 将JSON对象保存到文件 output_json_path_each = os.path.join(outputAnalysisDir, f"{currTurbineInfo[Field_NameOfTurbine]}.json") with open(output_json_path_each, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 保存图像 pngFileName = f"{currTurbineInfo[Field_NameOfTurbine]}.png" pngFilePath = os.path.join(outputAnalysisDir, pngFileName) fig.write_image(pngFilePath, scale=3) # 保存HTML # htmlFileName = f"{currTurbineInfo[Field_NameOfTurbine]}.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: pngFilePath, Field_Return_IsSaveDatabase: False }) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: output_json_path_each, Field_Return_IsSaveDatabase: True }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: turbineCode, # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: True # }) result_df = pd.DataFrame(result_rows) return result_df