import os import numpy as np import pandas as pd import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodPoint import AnalystWithGoodPoint from utils.jsonUtil import JsonUtil class PowerCurveAnalyst(AnalystWithGoodPoint): """ 风电机组功率曲线散点分析。 秒级scada数据运算太慢,建议使用分钟级scada数据 """ def typeAnalyst(self): return "power_curve" def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, [ Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower]) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) # 检查所需列是否存在 required_columns = {Field_WindSpeed, Field_ActiverPower} if not required_columns.issubset(dataFrameOfTurbines.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode) returnDatas = [] for turbineModelCode, group in groupedOfTurbineModel: currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist() currTurbineModeInfo = self.common.getTurbineModelByCode( turbineModelCode, self.turbineModelInfo) dataFrameOfContractPowerCurve = self.dataFrameContractOfTurbine[ self.dataFrameContractOfTurbine[Field_MillTypeCode] == turbineModelCode] currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin( currTurbineCodes)] # 【新增筛选逻辑】 只在画图计算前清洗,剔除高风速下的低功率(停机)数据 # +1. 获取额定风速。 # 如果 confBusiness 中定义了 Field_RatedWindSpeed 且机型信息里有该字段,则使用;否则给一个默认值(例如 11m/s 或 12m/s),防止报错。 rated_ws = 11 # 默认值,防报错 if 'Field_RatedWindSpeed' in globals() and Field_RatedWindSpeed in currTurbineModeInfo: rated_ws = currTurbineModeInfo[Field_RatedWindSpeed] # +2. 执行过滤:保留 (风速 <= 额定风速) 或者 (风速 > 额定风速 且 功率 >= 20) 的数据 # 先做一个 .copy() 防止 SettingWithCopyWarning currDataFrameOfTurbines = currDataFrameOfTurbines.copy() mask_bad_data = (currDataFrameOfTurbines[Field_WindSpeed] > rated_ws) & (currDataFrameOfTurbines[Field_ActiverPower] < 20) currDataFrameOfTurbines = currDataFrameOfTurbines[~mask_bad_data] powerCurveDataOfTurbines = self.dataReprocess( currDataFrameOfTurbines, self.binsWindSpeed) # returnData = self.drawOfPowerCurve( # powerCurveDataOfTurbines, outputAnalysisDir, conf, dataFrameOfContractPowerCurve, currTurbineModeInfo) # returnDatas.append(returnData) returnJsonData= self.outputPowerCurveData(conf,outputAnalysisDir,currTurbineModeInfo,powerCurveDataOfTurbines,dataFrameOfContractPowerCurve) returnDatas.append(returnJsonData) returnResult = pd.concat(returnDatas, ignore_index=True) return returnResult def outputPowerCurveData(self, conf: Contract, outputAnalysisDir: str, turbineModelInfo: pd.Series, powerCurveDataOfTurbines: pd.DataFrame, dataFrameOfContractPowerCurve: pd.DataFrame) -> pd.DataFrame: turbineCodes = powerCurveDataOfTurbines[Field_CodeOfTurbine].unique() jsonDictionary = self.convert2Json(turbineModelInfo,turbineCodes=turbineCodes, dataFrameOfTurbines=powerCurveDataOfTurbines, dataFrameOfContract=dataFrameOfContractPowerCurve) jsonFileName = f"power_curve-{turbineModelInfo[Field_MillTypeCode]}.json" jsonFilePath = os.path.join(outputAnalysisDir, jsonFileName) JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath) result_rows = [] result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: Const_Output_Total, Field_MillTypeCode:turbineModelInfo[Field_MillTypeCode], Field_Return_FilePath: jsonFilePath, Field_Return_IsSaveDatabase: True }) for turbineCode in turbineCodes: data:pd.DataFrame=powerCurveDataOfTurbines[powerCurveDataOfTurbines[Field_CodeOfTurbine]==turbineCode] jsonFileName2 = f"power_curve-{data[Field_NameOfTurbine].iloc[0]}.json" jsonFilePath2 = os.path.join(outputAnalysisDir, jsonFileName2) JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath2) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: jsonFilePath2, Field_Return_IsSaveDatabase: True }) returnDatas = pd.DataFrame(result_rows) return returnDatas def convert2Json(self, turbineModelInfo: pd.Series,turbineCodes, dataFrameOfTurbines: pd.DataFrame, dataFrameOfContract: pd.DataFrame): result = { "analysisTypeCode":"功率曲线分析", "engineTypeCode": turbineModelInfo[Field_MillTypeCode] , "engineTypeName": turbineModelInfo[Field_MachineTypeCode] , "data": [] } # 定义要替换的空值类型 na_values = {pd.NA, float('nan')} # 从对象A提取数据 for turbineCode in turbineCodes: data:pd.DataFrame=dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine]==turbineCode] engine_data = { "enginName": data[Field_NameOfTurbine].iloc[0], "enginCode": turbineCode, "xData": data[Field_WindSpeed].replace(na_values, None).tolist(), "yData": data[Field_ActiverPower].replace(na_values, None).tolist(), "zData": [] } result["data"].append(engine_data) # 从对象B提取数据 contract_curve = { "enginName": "合同功率曲线", "xData": dataFrameOfContract[Field_WindSpeed].replace(na_values, None).tolist(), "yData": dataFrameOfContract[Field_ActiverPower].replace(na_values, None).tolist(), "zData": [] } result["data"].append(contract_curve) return result # def buildPowerCurveData(self, group: pd.DataFrame, fieldWindSpeed: str, fieldActivePower: str, bins) -> pd.DataFrame: # """ # 计算设备的功率曲线。 # """ # powerCut = group.groupby(pd.cut(group[fieldWindSpeed], bins, labels=np.arange(0, 25.5, 0.5))).agg({ # fieldActivePower: 'median', # fieldWindSpeed: ['median', 'count'] # }) # wind_count = powerCut[fieldWindSpeed]['count'].tolist() # line = powerCut[fieldActivePower]['median'].round(decimals=2).tolist() # act_line = pd.DataFrame([powerCut.index, wind_count, line]).T # act_line.columns = [Field_WindSpeed, # 'EffectiveQuantity', Field_ActiverPower] # return act_line def buildPowerCurveData(self, group: pd.DataFrame, fieldWindSpeed: str, fieldActivePower: str, bins) -> pd.DataFrame: """ 计算设备的功率曲线。 """ # 1. 按照固定步长进行分箱统计 # 注意:这里使用的是固定的 bins (0, 0.5, 1.0 ... 25.0),即使某区间没数据,也会生成一行索引,只是值为 NaN powerCut = group.groupby(pd.cut(group[fieldWindSpeed], bins, labels=np.arange(0, 25.5, 0.5))).agg({ fieldActivePower: 'median', fieldWindSpeed: ['median', 'count'] }) # 2. 提取数据 wind_count = powerCut[fieldWindSpeed]['count'].tolist() # 获取原始的中位数序列(包含 NaN) power_series = powerCut[fieldActivePower]['median'] # 3. 处理不连续(NaN)的情况 # 步骤 A: 线性插值 (Interpolate) # 解决中间的断档。例如:[1000, NaN, 1200] -> [1000, 1100, 1200] # limit_direction='forward' 表示只向后插值,防止低风速段无数据时胡乱填充 power_series = power_series.interpolate(method='linear', limit_direction='forward') # 步骤 B: 前向填充 (Forward Fill) # 解决高风速段的断档。 # 场景:筛选后,20m/s 以上全是 NaN。 # 逻辑:既然是高风速,且之前已经达到了额定功率,那么后面缺失的值应该维持在最后一次观测到的功率(即额定功率)。 power_series = power_series.ffill() # 步骤 C: (可选) 0值填充 # 如果低风速段(开头)是 NaN,通常是因为没风,补 0 power_series = power_series.fillna(0) line = power_series.round(decimals=2).tolist() # 4. 组装结果 act_line = pd.DataFrame([powerCut.index, wind_count, line]).T act_line.columns = [Field_WindSpeed, 'EffectiveQuantity', Field_ActiverPower] return act_line def dataReprocess(self, dataFrameMerge: pd.DataFrame, binsWindSpeed) -> pd.DataFrame: # 初始化结果DataFrame dataFrames = [] # 按设备名分组数据 grouped = dataFrameMerge.groupby( [Field_NameOfTurbine, Field_CodeOfTurbine]) # 计算每个设备的功率曲线 for name, group in grouped: dataFramePowerCurveTurbine = self.buildPowerCurveData( group, Field_WindSpeed, Field_ActiverPower, binsWindSpeed) dataFramePowerCurveTurbine[Field_NameOfTurbine] = name[0] dataFramePowerCurveTurbine[Field_CodeOfTurbine] = name[1] dataFrames.append(dataFramePowerCurveTurbine) # 绘制全场功率曲线图 dataFrameReprocess: pd.DataFrame = pd.concat( dataFrames, ignore_index=True).reset_index(drop=True) return dataFrameReprocess def drawOfPowerCurve(self, powerCurveOfTurbines: pd.DataFrame, outputAnalysisDir, conf: Contract, dataFrameGuaranteePowerCurve: pd.DataFrame, turbineModelInfo: pd.Series): """ 生成功率曲线并保存为文件。 参数: frames (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 outputAnalysisDir (str): 分析输出目录。 confData (ConfBusiness): 配置 """ # 绘制全场功率曲线图 # ress =self.dataReprocess(dataFrameMerge,self.binsWindSpeed) # all_res.reset_index(drop=True) df1 = self.plot_power_curve( powerCurveOfTurbines, outputAnalysisDir, dataFrameGuaranteePowerCurve, Field_NameOfTurbine, conf, turbineModelInfo) # 绘制每个设备的功率曲线图 grouped = powerCurveOfTurbines.groupby( [Field_NameOfTurbine, Field_CodeOfTurbine]) df2 = pd.DataFrame() # 新建一个空表格,与返回的单图功率曲线合并 for name, group in grouped: df_temp2 = self.plot_single_power_curve( powerCurveOfTurbines, group, dataFrameGuaranteePowerCurve, name, outputAnalysisDir, conf) df2 = pd.concat([df2, df_temp2], ignore_index=True) # 总图与单图的表格合并 df = pd.concat([df1, df2], ignore_index=True) return df def plot_power_curve(self, ress, output_path, dataFrameGuaranteePowerCurve: pd.DataFrame, Field_NameOfTurbine, conf: Contract, turbineModelInfo: pd.Series): """ 绘制全场功率曲线图。 """ # colors = px.colors.sequential.Turbo fig = go.Figure() for turbine_num in ress[Field_NameOfTurbine].unique(): turbine_data = ress[ress[Field_NameOfTurbine] == turbine_num] # 循环创建风速-功率折线 fig.add_trace(go.Scatter( x=turbine_data[Field_WindSpeed], y=turbine_data[Field_ActiverPower], mode='lines', # line=dict(color=colors[idx % len(colors)]), name=f'{turbine_num}' # 使用风电机组编号作为图例的名称 ) ) if not ress.empty and Field_CutInWS in ress.columns and ress[Field_CutInWS].notna().any(): cut_in_ws = ress[Field_CutInWS].min() - 1 else: cut_in_ws = 2 fig.add_trace(go.Scatter( x=dataFrameGuaranteePowerCurve[Field_WindSpeed], y=dataFrameGuaranteePowerCurve[Field_ActiverPower], # mode='lines', # line=dict(color='red', dash='dash'), mode='lines+markers', line=dict(color='red'), marker=dict(color='red', size=5), name='合同功率曲线', showlegend=True ) ) # 创建布局 fig.update_layout( title={ "text": f'功率曲线-{turbineModelInfo[Field_MachineTypeCode]}', 'x': 0.5 }, # legend_title='Turbine', xaxis=dict( title='风速', dtick=1, tickangle=-45, range=[cut_in_ws, 25] ), yaxis=dict( title='有功功率', dtick=self.axisStepActivePower, range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower] ), legend=dict( orientation="h", # Horizontal orientation xanchor="center", # Anchor the legend to the center x=0.5, # Position legend at the center of the x-axis y=-0.2, # Position legend below the x-axis # itemsizing='constant', # Keep the size of the legend entries constant # itemwidth=50 ) ) # 保存HTML htmlFileName = '全场-{}-{}-功率曲线.html'.format(self.powerFarmInfo[Field_PowerFarmName].iloc[0],turbineModelInfo[Field_MillTypeCode]) htmlFilePath = os.path.join(output_path, htmlFileName) fig.write_html(htmlFilePath) result_rows = [] result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: Const_Output_Total, Field_Return_FilePath: htmlFilePath, Field_Return_IsSaveDatabase: False }) result_df = pd.DataFrame(result_rows) return result_df def plot_single_power_curve(self, ress, group, dataFrameGuaranteePowerCurve: pd.DataFrame, turbineName, outputAnalysisDir, conf: Contract): fig = go.Figure() for turbine_num in ress[Field_NameOfTurbine].unique(): turbine_data = ress[ress[Field_NameOfTurbine] == turbine_num] # 循环创建风速-功率折线 fig.add_trace(go.Scatter( x=turbine_data[Field_WindSpeed], y=turbine_data[Field_ActiverPower], mode='lines', line=dict(color='lightgrey'), name=f'{turbine_num}', showlegend=False ) ) if not ress.empty and Field_CutInWS in ress.columns and ress[Field_CutInWS].notna().any(): cut_in_ws = ress[Field_CutInWS].min() - 1 else: cut_in_ws = 2 fig.add_trace(go.Scatter( x=group[Field_WindSpeed], y=group[Field_ActiverPower], mode='lines', line=dict(color='darkblue'), name=Field_ActiverPower, showlegend=False ) ) fig.add_trace(go.Scatter( x=dataFrameGuaranteePowerCurve[Field_WindSpeed], y=dataFrameGuaranteePowerCurve[Field_ActiverPower], mode='lines+markers', line=dict(color='red'), marker=dict(color='red', size=5), name='合同功率曲线', showlegend=True ) ) # 创建布局 fig.update_layout( title={ "text": f'机组: {turbineName[0]}' }, legend=dict( orientation="h", # 或者 "v" 表示垂直 yanchor="bottom", # 图例垂直对齐方式 y=0, # 图例距离y轴下边界的距离(0到1之间) xanchor="right", # 图例水平对齐方式 x=1, # 图例距离x轴右边界的距离(0到1之间) bgcolor='rgba(255,255,255,0)' ), xaxis=dict( title='风速', dtick=1, tickangle=-45, range=[cut_in_ws, 25] ), yaxis=dict( title='有功功率', dtick=self.axisStepActivePower, range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower] ) ) # 保存图像 # pngFileName = f"{turbineName[0]}.png" # pngFilePath = os.path.join(outputAnalysisDir, pngFileName) # fig.write_image(pngFilePath, scale=3) # # 保存HTML # htmlFileName = f"{turbineName[0]}.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) result_rows = [] # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: turbineName[1], # Field_Return_FilePath: pngFilePath, # Field_Return_IsSaveDatabase: False # }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: turbineName[1], # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: False # }) result_df = pd.DataFrame(result_rows) return result_df