123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- import os
- import numpy as np
- import pandas as pd
- import plotly.graph_objects as go
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract
- from behavior.analystWithGoodPoint import AnalystWithGoodPoint
- from utils.jsonUtil import JsonUtil
- class PowerCurveAnalyst(AnalystWithGoodPoint):
- """
- 风电机组功率曲线散点分析。
- 秒级scada数据运算太慢,建议使用分钟级scada数据
- """
- def typeAnalyst(self):
- return "power_curve"
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- dictionary = self.processTurbineData(turbineCodes, conf, [
- Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower])
- dataFrameOfTurbines = self.userDataFrame(
- dictionary, conf.dataContract.configAnalysis, self)
- # 检查所需列是否存在
- required_columns = {Field_WindSpeed, Field_ActiverPower}
- if not required_columns.issubset(dataFrameOfTurbines.columns):
- raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}")
- turbrineInfos = self.common.getTurbineInfos(
- conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
- groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode)
- returnDatas = []
- for turbineModelCode, group in groupedOfTurbineModel:
- currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist()
- currTurbineModeInfo = self.common.getTurbineModelByCode(
- turbineModelCode, self.turbineModelInfo)
- dataFrameOfContractPowerCurve = self.dataFrameContractOfTurbine[
- self.dataFrameContractOfTurbine[Field_MillTypeCode] == turbineModelCode]
- currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin(
- currTurbineCodes)]
- powerCurveDataOfTurbines = self.dataReprocess(
- currDataFrameOfTurbines, self.binsWindSpeed)
- # returnData = self.drawOfPowerCurve(
- # powerCurveDataOfTurbines, outputAnalysisDir, conf, dataFrameOfContractPowerCurve, currTurbineModeInfo)
- # returnDatas.append(returnData)
- returnJsonData= self.outputPowerCurveData(conf,outputAnalysisDir,currTurbineModeInfo,powerCurveDataOfTurbines,dataFrameOfContractPowerCurve)
- returnDatas.append(returnJsonData)
- returnResult = pd.concat(returnDatas, ignore_index=True)
- return returnResult
- def outputPowerCurveData(self, conf: Contract, outputAnalysisDir: str, turbineModelInfo: pd.Series, powerCurveDataOfTurbines: pd.DataFrame, dataFrameOfContractPowerCurve: pd.DataFrame) -> pd.DataFrame:
- turbineCodes = powerCurveDataOfTurbines[Field_CodeOfTurbine].unique()
- jsonDictionary = self.convert2Json(turbineModelInfo,turbineCodes=turbineCodes,
- dataFrameOfTurbines=powerCurveDataOfTurbines, dataFrameOfContract=dataFrameOfContractPowerCurve)
- jsonFileName = f"power_curve-{turbineModelInfo[Field_MillTypeCode]}.json"
- jsonFilePath = os.path.join(outputAnalysisDir, jsonFileName)
- JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath)
- result_rows = []
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: Const_Output_Total,
- Field_MillTypeCode:turbineModelInfo[Field_MillTypeCode],
- Field_Return_FilePath: jsonFilePath,
- Field_Return_IsSaveDatabase: True
- })
- for turbineCode in turbineCodes:
- data:pd.DataFrame=powerCurveDataOfTurbines[powerCurveDataOfTurbines[Field_CodeOfTurbine]==turbineCode]
- jsonFileName2 = f"power_curve-{data[Field_NameOfTurbine].iloc[0]}.json"
- jsonFilePath2 = os.path.join(outputAnalysisDir, jsonFileName2)
- JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath2)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: turbineCode,
- Field_Return_FilePath: jsonFilePath2,
- Field_Return_IsSaveDatabase: True
- })
- returnDatas = pd.DataFrame(result_rows)
- return returnDatas
- def convert2Json(self, turbineModelInfo: pd.Series,turbineCodes, dataFrameOfTurbines: pd.DataFrame, dataFrameOfContract: pd.DataFrame):
- result = {
- "analysisTypeCode":"功率曲线分析",
- "engineTypeCode": turbineModelInfo[Field_MillTypeCode] ,
- "engineTypeName": turbineModelInfo[Field_MachineTypeCode] ,
- "data": []
- }
- # 定义要替换的空值类型
- na_values = {pd.NA, float('nan')}
- # 从对象A提取数据
- for turbineCode in turbineCodes:
- data:pd.DataFrame=dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine]==turbineCode]
- engine_data = {
- "enginName": data[Field_NameOfTurbine].iloc[0],
- "enginCode": turbineCode,
- "xData": data[Field_WindSpeed].replace(na_values, None).tolist(),
- "yData": data[Field_ActiverPower].replace(na_values, None).tolist(),
- "zData": []
- }
- result["data"].append(engine_data)
- # 从对象B提取数据
- contract_curve = {
- "enginName": "合同功率曲线",
- "xData": dataFrameOfContract[Field_WindSpeed].replace(na_values, None).tolist(),
- "yData": dataFrameOfContract[Field_ActiverPower].replace(na_values, None).tolist(),
- "zData": []
- }
- result["data"].append(contract_curve)
- return result
- def buildPowerCurveData(self, group: pd.DataFrame, fieldWindSpeed: str, fieldActivePower: str, bins) -> pd.DataFrame:
- """
- 计算设备的功率曲线。
- """
- powerCut = group.groupby(pd.cut(group[fieldWindSpeed], bins, labels=np.arange(0, 25.5, 0.5))).agg({
- fieldActivePower: 'median',
- fieldWindSpeed: ['median', 'count']
- })
- wind_count = powerCut[fieldWindSpeed]['count'].tolist()
- line = powerCut[fieldActivePower]['median'].round(decimals=2).tolist()
- act_line = pd.DataFrame([powerCut.index, wind_count, line]).T
- act_line.columns = [Field_WindSpeed,
- 'EffectiveQuantity', Field_ActiverPower]
- return act_line
- def dataReprocess(self, dataFrameMerge: pd.DataFrame, binsWindSpeed) -> pd.DataFrame:
- # 初始化结果DataFrame
- dataFrames = []
- # 按设备名分组数据
- grouped = dataFrameMerge.groupby(
- [Field_NameOfTurbine, Field_CodeOfTurbine])
- # 计算每个设备的功率曲线
- for name, group in grouped:
- dataFramePowerCurveTurbine = self.buildPowerCurveData(
- group, Field_WindSpeed, Field_ActiverPower, binsWindSpeed)
- dataFramePowerCurveTurbine[Field_NameOfTurbine] = name[0]
- dataFramePowerCurveTurbine[Field_CodeOfTurbine] = name[1]
- dataFrames.append(dataFramePowerCurveTurbine)
- # 绘制全场功率曲线图
- dataFrameReprocess: pd.DataFrame = pd.concat(
- dataFrames, ignore_index=True).reset_index(drop=True)
- return dataFrameReprocess
- def drawOfPowerCurve(self, powerCurveOfTurbines: pd.DataFrame, outputAnalysisDir, conf: Contract, dataFrameGuaranteePowerCurve: pd.DataFrame, turbineModelInfo: pd.Series):
- """
- 生成功率曲线并保存为文件。
- 参数:
- frames (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。
- outputAnalysisDir (str): 分析输出目录。
- confData (ConfBusiness): 配置
- """
- # 绘制全场功率曲线图
- # ress =self.dataReprocess(dataFrameMerge,self.binsWindSpeed) # all_res.reset_index(drop=True)
- df1 = self.plot_power_curve(
- powerCurveOfTurbines, outputAnalysisDir, dataFrameGuaranteePowerCurve, Field_NameOfTurbine, conf, turbineModelInfo)
- # 绘制每个设备的功率曲线图
- grouped = powerCurveOfTurbines.groupby(
- [Field_NameOfTurbine, Field_CodeOfTurbine])
- df2 = pd.DataFrame() # 新建一个空表格,与返回的单图功率曲线合并
- for name, group in grouped:
- df_temp2 = self.plot_single_power_curve(
- powerCurveOfTurbines, group, dataFrameGuaranteePowerCurve, name, outputAnalysisDir, conf)
- df2 = pd.concat([df2, df_temp2], ignore_index=True)
- # 总图与单图的表格合并
- df = pd.concat([df1, df2], ignore_index=True)
- return df
- def plot_power_curve(self, ress, output_path, dataFrameGuaranteePowerCurve: pd.DataFrame, Field_NameOfTurbine, conf: Contract, turbineModelInfo: pd.Series):
- """
- 绘制全场功率曲线图。
- """
- # colors = px.colors.sequential.Turbo
- fig = go.Figure()
- for turbine_num in ress[Field_NameOfTurbine].unique():
- turbine_data = ress[ress[Field_NameOfTurbine] == turbine_num]
- # 循环创建风速-功率折线
- fig.add_trace(go.Scatter(
- x=turbine_data[Field_WindSpeed],
- y=turbine_data[Field_ActiverPower],
- mode='lines',
- # line=dict(color=colors[idx % len(colors)]),
- name=f'{turbine_num}' # 使用风电机组编号作为图例的名称
- )
- )
- if not ress.empty and Field_CutInWS in ress.columns and ress[Field_CutInWS].notna().any():
- cut_in_ws = ress[Field_CutInWS].min() - 1
- else:
- cut_in_ws = 2
- fig.add_trace(go.Scatter(
- x=dataFrameGuaranteePowerCurve[Field_WindSpeed],
- y=dataFrameGuaranteePowerCurve[Field_ActiverPower],
- # mode='lines',
- # line=dict(color='red', dash='dash'),
- mode='lines+markers',
- line=dict(color='red'),
- marker=dict(color='red', size=5),
- name='合同功率曲线',
- showlegend=True
- )
- )
- # 创建布局
- fig.update_layout(
- title={
- "text": f'功率曲线-{turbineModelInfo[Field_MachineTypeCode]}',
- 'x': 0.5
- },
- # legend_title='Turbine',
- xaxis=dict(
- title='风速',
- dtick=1,
- tickangle=-45,
- range=[cut_in_ws, 25]
- ),
- yaxis=dict(
- title='有功功率',
- dtick=self.axisStepActivePower,
- range=[self.axisLowerLimitActivePower,
- self.axisUpperLimitActivePower]
- ),
- legend=dict(
- orientation="h", # Horizontal orientation
- xanchor="center", # Anchor the legend to the center
- x=0.5, # Position legend at the center of the x-axis
- y=-0.2, # Position legend below the x-axis
- # itemsizing='constant', # Keep the size of the legend entries constant
- # itemwidth=50
- )
- )
- # 保存HTML
- htmlFileName = '全场-{}-{}-功率曲线.html'.format(self.powerFarmInfo[Field_PowerFarmName].iloc[0],turbineModelInfo[Field_MillTypeCode])
- htmlFilePath = os.path.join(output_path, htmlFileName)
- fig.write_html(htmlFilePath)
- result_rows = []
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: Const_Output_Total,
- Field_Return_FilePath: htmlFilePath,
- Field_Return_IsSaveDatabase: False
- })
- result_df = pd.DataFrame(result_rows)
- return result_df
- def plot_single_power_curve(self, ress, group, dataFrameGuaranteePowerCurve: pd.DataFrame, turbineName, outputAnalysisDir, conf: Contract):
- fig = go.Figure()
- for turbine_num in ress[Field_NameOfTurbine].unique():
- turbine_data = ress[ress[Field_NameOfTurbine] == turbine_num]
- # 循环创建风速-功率折线
- fig.add_trace(go.Scatter(
- x=turbine_data[Field_WindSpeed],
- y=turbine_data[Field_ActiverPower],
- mode='lines',
- line=dict(color='lightgrey'),
- name=f'{turbine_num}',
- showlegend=False
- )
- )
- if not ress.empty and Field_CutInWS in ress.columns and ress[Field_CutInWS].notna().any():
- cut_in_ws = ress[Field_CutInWS].min() - 1
- else:
- cut_in_ws = 2
- fig.add_trace(go.Scatter(
- x=group[Field_WindSpeed],
- y=group[Field_ActiverPower],
- mode='lines',
- line=dict(color='darkblue'),
- name=Field_ActiverPower,
- showlegend=False
- )
- )
- fig.add_trace(go.Scatter(
- x=dataFrameGuaranteePowerCurve[Field_WindSpeed],
- y=dataFrameGuaranteePowerCurve[Field_ActiverPower],
- mode='lines+markers',
- line=dict(color='red'),
- marker=dict(color='red', size=5),
- name='合同功率曲线',
- showlegend=True
- )
- )
- # 创建布局
- fig.update_layout(
- title={
- "text": f'机组: {turbineName[0]}'
- },
- legend=dict(
- orientation="h", # 或者 "v" 表示垂直
- yanchor="bottom", # 图例垂直对齐方式
- y=0, # 图例距离y轴下边界的距离(0到1之间)
- xanchor="right", # 图例水平对齐方式
- x=1, # 图例距离x轴右边界的距离(0到1之间)
- bgcolor='rgba(255,255,255,0)'
- ),
- xaxis=dict(
- title='风速',
- dtick=1,
- tickangle=-45,
- range=[cut_in_ws, 25]
- ),
- yaxis=dict(
- title='有功功率',
- dtick=self.axisStepActivePower,
- range=[self.axisLowerLimitActivePower,
- self.axisUpperLimitActivePower]
- )
- )
- # 保存图像
- pngFileName = f"{turbineName[0]}.png"
- pngFilePath = os.path.join(outputAnalysisDir, pngFileName)
- fig.write_image(pngFilePath, scale=3)
- # # 保存HTML
- # htmlFileName = f"{turbineName[0]}.html"
- # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName)
- # fig.write_html(htmlFilePath)
- result_rows = []
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: turbineName[1],
- Field_Return_FilePath: pngFilePath,
- Field_Return_IsSaveDatabase: False
- })
- # result_rows.append({
- # Field_Return_TypeAnalyst: self.typeAnalyst(),
- # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- # Field_CodeOfTurbine: turbineName[1],
- # Field_Return_FilePath: htmlFilePath,
- # Field_Return_IsSaveDatabase: False
- # })
- result_df = pd.DataFrame(result_rows)
- return result_df
|