import os import numpy as np import pandas as pd import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodPoint import AnalystWithGoodPoint from utils.jsonUtil import JsonUtil class PowerCurveAnalyst(AnalystWithGoodPoint): """ 风电机组功率曲线散点分析。 秒级scada数据运算太慢,建议使用分钟级scada数据 """ def typeAnalyst(self): return "power_curve" def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, [ Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower]) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) # 检查所需列是否存在 required_columns = {Field_WindSpeed, Field_ActiverPower} if not required_columns.issubset(dataFrameOfTurbines.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode) returnDatas = [] for turbineModelCode, group in groupedOfTurbineModel: currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist() currTurbineModeInfo = self.common.getTurbineModelByCode( turbineModelCode, self.turbineModelInfo) dataFrameOfContractPowerCurve = self.dataFrameContractOfTurbine[ self.dataFrameContractOfTurbine[Field_MillTypeCode] == turbineModelCode] currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin( currTurbineCodes)] powerCurveDataOfTurbines = self.dataReprocess( currDataFrameOfTurbines, self.binsWindSpeed) # returnData = self.drawOfPowerCurve( # powerCurveDataOfTurbines, outputAnalysisDir, conf, dataFrameOfContractPowerCurve, currTurbineModeInfo) # returnDatas.append(returnData) returnJsonData= self.outputPowerCurveData(conf,outputAnalysisDir,currTurbineModeInfo,powerCurveDataOfTurbines,dataFrameOfContractPowerCurve) returnDatas.append(returnJsonData) returnResult = pd.concat(returnDatas, ignore_index=True) return returnResult def outputPowerCurveData(self, conf: Contract, outputAnalysisDir: str, turbineModelInfo: pd.Series, powerCurveDataOfTurbines: pd.DataFrame, dataFrameOfContractPowerCurve: pd.DataFrame) -> pd.DataFrame: turbineCodes = powerCurveDataOfTurbines[Field_CodeOfTurbine].unique() jsonDictionary = self.convert2Json(turbineModelInfo,turbineCodes=turbineCodes, dataFrameOfTurbines=powerCurveDataOfTurbines, dataFrameOfContract=dataFrameOfContractPowerCurve) jsonFileName = f"power_curve-{turbineModelInfo[Field_MillTypeCode]}.json" jsonFilePath = os.path.join(outputAnalysisDir, jsonFileName) JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath) result_rows = [] result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: Const_Output_Total, Field_MillTypeCode:turbineModelInfo[Field_MillTypeCode], Field_Return_FilePath: jsonFilePath, Field_Return_IsSaveDatabase: True }) for turbineCode in turbineCodes: data:pd.DataFrame=powerCurveDataOfTurbines[powerCurveDataOfTurbines[Field_CodeOfTurbine]==turbineCode] jsonFileName2 = f"power_curve-{data[Field_NameOfTurbine].iloc[0]}.json" jsonFilePath2 = os.path.join(outputAnalysisDir, jsonFileName2) JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath2) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: jsonFilePath2, Field_Return_IsSaveDatabase: True }) returnDatas = pd.DataFrame(result_rows) return returnDatas def convert2Json(self, turbineModelInfo: pd.Series,turbineCodes, dataFrameOfTurbines: pd.DataFrame, dataFrameOfContract: pd.DataFrame): result = { "analysisTypeCode":"功率曲线分析", "engineTypeCode": turbineModelInfo[Field_MillTypeCode] , "engineTypeName": turbineModelInfo[Field_MachineTypeCode] , "data": [] } # 定义要替换的空值类型 na_values = {pd.NA, float('nan')} # 从对象A提取数据 for turbineCode in turbineCodes: data:pd.DataFrame=dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine]==turbineCode] engine_data = { "enginName": data[Field_NameOfTurbine].iloc[0], "enginCode": turbineCode, "xData": data[Field_WindSpeed].replace(na_values, None).tolist(), "yData": data[Field_ActiverPower].replace(na_values, None).tolist(), "zData": [] } result["data"].append(engine_data) # 从对象B提取数据 contract_curve = { "enginName": "合同功率曲线", "xData": dataFrameOfContract[Field_WindSpeed].replace(na_values, None).tolist(), "yData": dataFrameOfContract[Field_ActiverPower].replace(na_values, None).tolist(), "zData": [] } result["data"].append(contract_curve) return result def buildPowerCurveData(self, group: pd.DataFrame, fieldWindSpeed: str, fieldActivePower: str, bins) -> pd.DataFrame: """ 计算设备的功率曲线。 """ powerCut = group.groupby(pd.cut(group[fieldWindSpeed], bins, labels=np.arange(0, 25.5, 0.5))).agg({ fieldActivePower: 'median', fieldWindSpeed: ['median', 'count'] }) wind_count = powerCut[fieldWindSpeed]['count'].tolist() line = powerCut[fieldActivePower]['median'].round(decimals=2).tolist() act_line = pd.DataFrame([powerCut.index, wind_count, line]).T act_line.columns = [Field_WindSpeed, 'EffectiveQuantity', Field_ActiverPower] return act_line def dataReprocess(self, dataFrameMerge: pd.DataFrame, binsWindSpeed) -> pd.DataFrame: # 初始化结果DataFrame dataFrames = [] # 按设备名分组数据 grouped = dataFrameMerge.groupby( [Field_NameOfTurbine, Field_CodeOfTurbine]) # 计算每个设备的功率曲线 for name, group in grouped: dataFramePowerCurveTurbine = self.buildPowerCurveData( group, Field_WindSpeed, Field_ActiverPower, binsWindSpeed) dataFramePowerCurveTurbine[Field_NameOfTurbine] = name[0] dataFramePowerCurveTurbine[Field_CodeOfTurbine] = name[1] dataFrames.append(dataFramePowerCurveTurbine) # 绘制全场功率曲线图 dataFrameReprocess: pd.DataFrame = pd.concat( dataFrames, ignore_index=True).reset_index(drop=True) return dataFrameReprocess def drawOfPowerCurve(self, powerCurveOfTurbines: pd.DataFrame, outputAnalysisDir, conf: Contract, dataFrameGuaranteePowerCurve: pd.DataFrame, turbineModelInfo: pd.Series): """ 生成功率曲线并保存为文件。 参数: frames (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 outputAnalysisDir (str): 分析输出目录。 confData (ConfBusiness): 配置 """ # 绘制全场功率曲线图 # ress =self.dataReprocess(dataFrameMerge,self.binsWindSpeed) # all_res.reset_index(drop=True) df1 = self.plot_power_curve( powerCurveOfTurbines, outputAnalysisDir, dataFrameGuaranteePowerCurve, Field_NameOfTurbine, conf, turbineModelInfo) # 绘制每个设备的功率曲线图 grouped = powerCurveOfTurbines.groupby( [Field_NameOfTurbine, Field_CodeOfTurbine]) df2 = pd.DataFrame() # 新建一个空表格,与返回的单图功率曲线合并 for name, group in grouped: df_temp2 = self.plot_single_power_curve( powerCurveOfTurbines, group, dataFrameGuaranteePowerCurve, name, outputAnalysisDir, conf) df2 = pd.concat([df2, df_temp2], ignore_index=True) # 总图与单图的表格合并 df = pd.concat([df1, df2], ignore_index=True) return df def plot_power_curve(self, ress, output_path, dataFrameGuaranteePowerCurve: pd.DataFrame, Field_NameOfTurbine, conf: Contract, turbineModelInfo: pd.Series): """ 绘制全场功率曲线图。 """ # colors = px.colors.sequential.Turbo fig = go.Figure() for turbine_num in ress[Field_NameOfTurbine].unique(): turbine_data = ress[ress[Field_NameOfTurbine] == turbine_num] # 循环创建风速-功率折线 fig.add_trace(go.Scatter( x=turbine_data[Field_WindSpeed], y=turbine_data[Field_ActiverPower], mode='lines', # line=dict(color=colors[idx % len(colors)]), name=f'{turbine_num}' # 使用风电机组编号作为图例的名称 ) ) if not ress.empty and Field_CutInWS in ress.columns and ress[Field_CutInWS].notna().any(): cut_in_ws = ress[Field_CutInWS].min() - 1 else: cut_in_ws = 2 fig.add_trace(go.Scatter( x=dataFrameGuaranteePowerCurve[Field_WindSpeed], y=dataFrameGuaranteePowerCurve[Field_ActiverPower], # mode='lines', # line=dict(color='red', dash='dash'), mode='lines+markers', line=dict(color='red'), marker=dict(color='red', size=5), name='合同功率曲线', showlegend=True ) ) # 创建布局 fig.update_layout( title={ "text": f'功率曲线-{turbineModelInfo[Field_MachineTypeCode]}', 'x': 0.5 }, # legend_title='Turbine', xaxis=dict( title='风速', dtick=1, tickangle=-45, range=[cut_in_ws, 25] ), yaxis=dict( title='有功功率', dtick=self.axisStepActivePower, range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower] ), legend=dict( orientation="h", # Horizontal orientation xanchor="center", # Anchor the legend to the center x=0.5, # Position legend at the center of the x-axis y=-0.2, # Position legend below the x-axis # itemsizing='constant', # Keep the size of the legend entries constant # itemwidth=50 ) ) # 保存HTML htmlFileName = '全场-{}-{}-功率曲线.html'.format(self.powerFarmInfo[Field_PowerFarmName].iloc[0],turbineModelInfo[Field_MillTypeCode]) htmlFilePath = os.path.join(output_path, htmlFileName) fig.write_html(htmlFilePath) result_rows = [] result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: Const_Output_Total, Field_Return_FilePath: htmlFilePath, Field_Return_IsSaveDatabase: False }) result_df = pd.DataFrame(result_rows) return result_df def plot_single_power_curve(self, ress, group, dataFrameGuaranteePowerCurve: pd.DataFrame, turbineName, outputAnalysisDir, conf: Contract): fig = go.Figure() for turbine_num in ress[Field_NameOfTurbine].unique(): turbine_data = ress[ress[Field_NameOfTurbine] == turbine_num] # 循环创建风速-功率折线 fig.add_trace(go.Scatter( x=turbine_data[Field_WindSpeed], y=turbine_data[Field_ActiverPower], mode='lines', line=dict(color='lightgrey'), name=f'{turbine_num}', showlegend=False ) ) if not ress.empty and Field_CutInWS in ress.columns and ress[Field_CutInWS].notna().any(): cut_in_ws = ress[Field_CutInWS].min() - 1 else: cut_in_ws = 2 fig.add_trace(go.Scatter( x=group[Field_WindSpeed], y=group[Field_ActiverPower], mode='lines', line=dict(color='darkblue'), name=Field_ActiverPower, showlegend=False ) ) fig.add_trace(go.Scatter( x=dataFrameGuaranteePowerCurve[Field_WindSpeed], y=dataFrameGuaranteePowerCurve[Field_ActiverPower], mode='lines+markers', line=dict(color='red'), marker=dict(color='red', size=5), name='合同功率曲线', showlegend=True ) ) # 创建布局 fig.update_layout( title={ "text": f'机组: {turbineName[0]}' }, legend=dict( orientation="h", # 或者 "v" 表示垂直 yanchor="bottom", # 图例垂直对齐方式 y=0, # 图例距离y轴下边界的距离(0到1之间) xanchor="right", # 图例水平对齐方式 x=1, # 图例距离x轴右边界的距离(0到1之间) bgcolor='rgba(255,255,255,0)' ), xaxis=dict( title='风速', dtick=1, tickangle=-45, range=[cut_in_ws, 25] ), yaxis=dict( title='有功功率', dtick=self.axisStepActivePower, range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower] ) ) # 保存图像 pngFileName = f"{turbineName[0]}.png" pngFilePath = os.path.join(outputAnalysisDir, pngFileName) fig.write_image(pngFilePath, scale=3) # # 保存HTML # htmlFileName = f"{turbineName[0]}.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) result_rows = [] result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineName[1], Field_Return_FilePath: pngFilePath, Field_Return_IsSaveDatabase: False }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: turbineName[1], # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: False # }) result_df = pd.DataFrame(result_rows) return result_df