import os import pandas as pd import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodPoint import AnalystWithGoodPoint class CpWindSpeedAnalyst(AnalystWithGoodPoint): """ 风电机组风能利用系数分析 """ def typeAnalyst(self): return "cp_windspeed" def dataReprocess(self, dataFrameTurbines: pd.DataFrame) -> pd.DataFrame: dataFrame = dataFrameTurbines.groupby([Field_CodeOfTurbine, Field_WindSpeedFloor]).agg( cp=(Field_Cp, 'median'), cp_max=(Field_Cp, 'max'), cp_min=(Field_Cp, 'min'), ).reset_index() dataFrame.columns = [Field_CodeOfTurbine, Field_WindSpeedFloor, Field_Cp, Field_CpMax, Field_CpMin] dataFrame = dataFrame.sort_values( by=[Field_CodeOfTurbine, Field_WindSpeedFloor], ascending=[True, True]) return dataFrame def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, [ Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower]) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) # 检查所需列是否存在 required_columns = {Field_WindSpeedFloor, Field_Cp} if not required_columns.issubset(dataFrameOfTurbines.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode) returnDatas = [] for turbineModelCode, group in groupedOfTurbineModel: currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist() currTurbineModeInfo = self.common.getTurbineModelByCode( turbineModelCode, self.turbineModelInfo) currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin( currTurbineCodes)] dataFrame = self.dataReprocess(currDataFrameOfTurbines) returnData = self.buildChart( dataFrame, outputAnalysisDir, conf, currTurbineModeInfo) returnDatas.append(returnData) returnResult = pd.concat(returnDatas, ignore_index=True) return returnResult def buildChart(self, dataFrameOfTurbines: pd.DataFrame, outputAnalysisDir, conf: Contract, turbineModelInfo: pd.Series): # Create the main Cp distribution plot using Plotly fig = go.Figure() # colors = px.colors.sequential.Turbo # 创建一个列表来存储各个风电机组的数据 turbine_data_list = [] for turbineCode in dataFrameOfTurbines[Field_CodeOfTurbine].unique(): group = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine] == turbineCode] currTurbineInfo = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo) fig.add_trace(go.Scatter(x=group[Field_WindSpeedFloor], y=group[Field_Cp], mode='lines', # line=dict(color=colors[idx % len(colors)]), name=currTurbineInfo[Field_NameOfTurbine])) # 提取数据 turbine_data_total = { "engineName": currTurbineInfo[Field_NameOfTurbine], "engineCode": turbineCode, "xData": group[Field_WindSpeedFloor].tolist(), "yData": group[Field_Cp].tolist(), } turbine_data_list.append(turbine_data_total) fig.update_layout(title={'text': f'风能利用系数分布-{turbineModelInfo[Field_MachineTypeCode]}', 'x': 0.5}, xaxis_title='风速', yaxis_title='风能利用系数', legend=dict( orientation="h", # Horizontal orientation xanchor="center", # Anchor the legend to the center x=0.5, # Position legend at the center of the x-axis y=-0.2, # Position legend below the x-axis # itemsizing='constant', # Keep the size of the legend entries constant # itemwidth=50 ), xaxis=dict(range=[0, 26], tickmode='linear', dtick=1, tickangle=-45), yaxis=dict( dtick=self.axisStepCp, range=[self.axisLowerLimitCp, self.axisUpperLimitCp] ) ) engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "风电机组风能利用系数分析", "typecode": turbineModelInfo[Field_MillTypeCode], "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "title": f'风能利用系数分布-{turbineModelInfo[Field_MachineTypeCode]}', "xaixs": "风速", "yaixs": "风能利用系数", "data": turbine_data_list } output_json_path = os.path.join(outputAnalysisDir, f"{turbineModelInfo[Field_MillTypeCode]}.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 保存HTML # htmlFileName = f"{self.powerFarmInfo[Field_PowerFarmName].iloc[0]}-{turbineModelInfo[Field_MillTypeCode]}-Cp-Distribution.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) result_rows = [] # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: Const_Output_Total, # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: True # }) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: 'total', Field_MillTypeCode: turbineModelInfo[Field_MillTypeCode], Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) # Generate individual turbine plots for turbineCode, group in dataFrameOfTurbines.groupby(Field_CodeOfTurbine): currTurbineInfo = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo) # 创建一个列表来存储各个风电机组的数据 turbine_data_list_each = [] fig = go.Figure() for turbineCode in dataFrameOfTurbines[Field_CodeOfTurbine].unique(): tempDataFrame = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine] == turbineCode] tempTurbineInfo = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo) fig.add_trace(go.Scatter(x=tempDataFrame[Field_WindSpeedFloor], y=tempDataFrame[Field_Cp], mode='lines', line=dict(color='lightgray', width=1), showlegend=False)) # 提取数据 turbine_data_other_each = { "engineName": tempTurbineInfo[Field_NameOfTurbine], "engineCode": turbineCode, "xData": tempDataFrame[Field_WindSpeedFloor].tolist(), "yData": tempDataFrame[Field_Cp].tolist(), } turbine_data_list_each.append(turbine_data_other_each) fig.add_trace(go.Scatter(x=group[Field_WindSpeedFloor], y=group[Field_Cp], mode='lines', line=dict( color='darkblue'), showlegend=False)) fig.update_layout(title=f'风机: {currTurbineInfo[Field_NameOfTurbine]}', xaxis_title='风速', yaxis_title='风能利用系数', xaxis=dict( range=[0, 26], tickmode='linear', dtick=1, tickangle=-45), yaxis=dict( dtick=self.axisStepCp, range=[self.axisLowerLimitCp, self.axisUpperLimitCp] ) ) engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "风电机组风能利用系数分析", "typecode": turbineModelInfo[Field_MillTypeCode], "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "title": f'风机: {currTurbineInfo[Field_NameOfTurbine]}', "xaixs": "风速", "yaixs": "风能利用系数", "data": turbine_data_list_each } # 将JSON对象保存到文件 output_json_path_each = os.path.join(outputAnalysisDir, f"{currTurbineInfo[Field_NameOfTurbine]}.json") with open(output_json_path_each, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 保存图像 pngFileName = f"{currTurbineInfo[Field_NameOfTurbine]}.png" pngFilePath = os.path.join(outputAnalysisDir, pngFileName) fig.write_image(pngFilePath, scale=3) # 保存HTML # htmlFileName = f"{currTurbineInfo[Field_NameOfTurbine]}.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: currTurbineInfo[Field_CodeOfTurbine], Field_Return_FilePath: pngFilePath, Field_Return_IsSaveDatabase: False }) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: output_json_path_each, Field_Return_IsSaveDatabase: True }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: currTurbineInfo[Field_CodeOfTurbine], # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: True # }) result_df = pd.DataFrame(result_rows) return result_df