123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- import os
- import pandas as pd
- import plotly.graph_objects as go
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract
- from behavior.analystWithGoodPoint import AnalystWithGoodPoint
- from plotly.subplots import make_subplotscd
- class CpAnalyst(AnalystWithGoodPoint):
- """
- 风电机组风能利用系数分析
- """
- def typeAnalyst(self):
- return "cp"
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- dictionary = self.processTurbineData(turbineCodes, conf, [
- Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower])
- dataFrameOfTurbines = self.userDataFrame(
- dictionary, conf.dataContract.configAnalysis, self)
- # 检查所需列是否存在
- required_columns = {Field_WindSpeed,
- Field_Cp, Field_PowerFloor}
- if not required_columns.issubset(dataFrameOfTurbines.columns):
- raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}")
- turbrineInfos = self.common.getTurbineInfos(
- conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
- groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode)
- returnDatas = []
- for turbineModelCode, group in groupedOfTurbineModel:
- currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist()
- currTurbineModeInfo = self.common.getTurbineModelByCode(
- turbineModelCode, self.turbineModelInfo)
- dataFrameOfContractPowerCurve = self.dataFrameContractOfTurbine[
- self.dataFrameContractOfTurbine[Field_MillTypeCode] == turbineModelCode]
- currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin(
- currTurbineCodes)]
- returnData = self.drawLineGraphForTurbine(
- currDataFrameOfTurbines, outputAnalysisDir, conf, currTurbineModeInfo,dataFrameOfContractPowerCurve)
- returnDatas.append(returnData)
- returnResult = pd.concat(returnDatas, ignore_index=True)
- return returnResult
- def drawLineGraphForTurbine(self, dataFrameOfTurbines: pd.DataFrame, outputAnalysisDir, conf: Contract, turbineModelInfo: pd.Series,dataFrameOfContractPowerCurve:pd.DataFrame):
- upLimitOfPower = self.turbineInfo[Field_RatedPower].max() * 0.9
- grouped = dataFrameOfTurbines.groupby([Field_CodeOfTurbine, Field_PowerFloor]).agg(
- cp=('cp', 'median'),
- cp_max=('cp', 'max'),
- cp_min=('cp', 'min'),
- ).reset_index()
- # Rename columns post aggregation for clarity
- grouped.columns = [Field_CodeOfTurbine,
- Field_PowerFloor, Field_CpMedian, 'cp_max', 'cp_min']
- # Sort by power_floor
- grouped = grouped.sort_values(
- by=[Field_CodeOfTurbine, Field_PowerFloor])
- # Create Subplots
- fig = make_subplots(specs=[[{"secondary_y": False}]])
- # 创建一个列表来存储各个风电机组的数据
- turbine_data_list = []
- # colors = px.colors.sequential.Turbo
- # Plotting the turbine lines
- for turbineCode in grouped[Field_CodeOfTurbine].unique():
- turbine_data = grouped[grouped[Field_CodeOfTurbine] == turbineCode]
- currTurbineInfo = self.common.getTurbineInfo(
- conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo)
- fig.add_trace(
- go.Scatter(x=turbine_data[Field_PowerFloor],
- y=turbine_data[Field_CpMedian],
- mode='lines',
- # line=dict(color=colors[idx % len(colors)]),
- name=currTurbineInfo[Field_NameOfTurbine])
- )
- # 提取数据
- turbine_data_total = {
- "engineName": currTurbineInfo[Field_NameOfTurbine],
- "engineCode": turbineCode,
- "xData": turbine_data[Field_PowerFloor].tolist(),
- "yData": turbine_data[Field_CpMedian].tolist(),
- }
- turbine_data_list.append(turbine_data_total)
- # Plotting the contract guarantee Cp curve
- fig.add_trace(
- go.Scatter(x=dataFrameOfContractPowerCurve[Field_PowerFloor],
- y=dataFrameOfContractPowerCurve[Field_Cp],
- # mode='lines',
- # line=dict(color='red', dash='dash'),
- mode='lines+markers',
- line=dict(color='red'),
- marker=dict(color='red', size=5),
- name='合同功率曲线'
- ),
- secondary_y=False,
- )
- # Update layout
- fig.update_layout(
- title={
- 'text': f'风能利用系数分布-{turbineModelInfo[Field_MachineTypeCode]}',
- 'x': 0.5, # 标题位置居中
- },
- xaxis_title='功率',
- yaxis_title='风能利用系数',
- # legend_title='Turbine',
- xaxis=dict(range=[0, upLimitOfPower], tickangle=-45),
- yaxis=dict(
- dtick=self.axisStepCp,
- range=[self.axisLowerLimitCp,
- self.axisUpperLimitCp]
- ),
- legend=dict(
- orientation="h", # Horizontal orientation
- xanchor="center", # Anchor the legend to the center
- x=0.5, # Position legend at the center of the x-axis
- y=-0.2, # Position legend below the x-axis
- # itemsizing='constant', # Keep the size of the legend entries constant
- # itemwidth=50
- )
- )
- engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "")
- if isinstance(engineTypeCode, pd.Series):
- engineTypeCode = engineTypeCode.iloc[0]
- engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "")
- if isinstance(engineTypeName, pd.Series):
- engineTypeName = engineTypeName.iloc[0]
- # 构建最终的JSON对象
- json_output = {
- "analysisTypeCode": "风电机组风能利用系数分析",
- "typecode": turbineModelInfo[Field_MillTypeCode],
- "engineCode": engineTypeCode,
- "engineTypeName": engineTypeName,
- "title": f'风能利用系数分布-{turbineModelInfo[Field_MachineTypeCode]}',
- "xaixs": "功率(kW)",
- "yaixs": "风能利用系数",
- "contract_Cp_curve_xData": dataFrameOfContractPowerCurve[Field_PowerFloor].tolist(),
- "contract_Cp_curve_yData": dataFrameOfContractPowerCurve[Field_Cp].tolist(),
- "data": turbine_data_list
- }
- # 将JSON对象保存到文件
- output_json_path = os.path.join(outputAnalysisDir, f"{turbineModelInfo[Field_MillTypeCode]}.json")
- with open(output_json_path, 'w', encoding='utf-8') as f:
- import json
- json.dump(json_output, f, ensure_ascii=False, indent=4)
- # 保存html
- # htmlFileName = f"{self.powerFarmInfo[Field_PowerFarmName].iloc[0]}-{turbineModelInfo[Field_MillTypeCode]}-Cp-Distribution.html"
- # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName)
- # fig.write_html(htmlFilePath)
- result_rows = []
- # result_rows.append({
- # Field_Return_TypeAnalyst: self.typeAnalyst(),
- # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- # Field_CodeOfTurbine: Const_Output_Total,
- # Field_Return_FilePath: htmlFilePath,
- # Field_Return_IsSaveDatabase: True
- # })
- # 如果需要返回DataFrame,可以包含文件路径
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: 'total',
- Field_MillTypeCode: turbineModelInfo[Field_MillTypeCode],
- Field_Return_FilePath: output_json_path,
- Field_Return_IsSaveDatabase: True
- })
- # Individual turbine graphs
- for turbineCode, group in grouped.groupby(Field_CodeOfTurbine):
- fig = go.Figure()
- currTurbineInfo = self.common.getTurbineInfo(
- conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo)
- # 创建一个列表来存储各个风电机组的数据
- turbine_data_list_each = []
- # Flag to add legend only once
- # add_legend = True
- # Plot other turbines data
- for other_name, other_group in grouped[grouped[Field_CodeOfTurbine] != turbineCode].groupby(Field_CodeOfTurbine):
- tempTurbineInfo = self.common.getTurbineInfo(
- conf.dataContract.dataFilter.powerFarmID, other_name, self.turbineInfo)
- fig.add_trace(
- go.Scatter(
- x=other_group[Field_PowerFloor],
- y=other_group[Field_CpMedian],
- mode='lines',
- # name='Other Turbines' if add_legend else '',
- line=dict(color='lightgray', width=1),
- showlegend=False
- )
- )
- # 提取数据
- turbine_data_other_each = {
- "engineName": tempTurbineInfo[Field_NameOfTurbine],
- "engineCode": other_name,
- "xData": other_group[Field_PowerFloor].tolist(),
- "yData": other_group[Field_CpMedian].tolist(),
- }
- turbine_data_list_each.append(turbine_data_other_each)
- add_legend = False # Only add legend item for the first other turbine
- # Add trace for the current turbine
- fig.add_trace(
- go.Scatter(x=group[Field_PowerFloor], y=group[Field_CpMedian],
- mode='lines', name=currTurbineInfo[Field_NameOfTurbine], line=dict(color='darkblue'))
- )
- turbine_data_curr = {
- "engineName": currTurbineInfo[Field_NameOfTurbine],
- "engineCode": currTurbineInfo[Field_CodeOfTurbine],
- "xData": group[Field_PowerFloor].tolist(),
- "yData": group[Field_CpMedian].tolist(),
- }
- turbine_data_list_each.append(turbine_data_curr)
- fig.add_trace(
- go.Scatter(x=dataFrameOfContractPowerCurve[Field_PowerFloor],
- y=dataFrameOfContractPowerCurve[Field_Cp],
- mode='lines+markers',
- name='合同功率曲线',
- marker=dict(color='red', size=5),
- line=dict(color='red'))
- )
- fig.update_layout(
- title={
- 'text': f'机组: {currTurbineInfo[Field_NameOfTurbine]}',
- # 'x': 0.5, # 标题位置居中
- },
- xaxis_title='功率',
- yaxis_title='风能利用系数',
- xaxis=dict(range=[0, upLimitOfPower], tickangle=-45),
- yaxis=dict(
- dtick=self.axisStepCp,
- range=[self.axisLowerLimitCp,
- self.axisUpperLimitCp] # axisStepCp
- ),
- legend=dict(x=1.05, y=0.5)
- )
- engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "")
- if isinstance(engineTypeCode, pd.Series):
- engineTypeCode = engineTypeCode.iloc[0]
- engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "")
- if isinstance(engineTypeName, pd.Series):
- engineTypeName = engineTypeName.iloc[0]
- # 构建最终的JSON对象
- json_output = {
- "analysisTypeCode": "风电机组风能利用系数分析",
- "typecode": turbineModelInfo[Field_MillTypeCode],
- "engineCode": engineTypeCode,
- "engineTypeName": engineTypeName,
- "title": f'机组: {currTurbineInfo[Field_NameOfTurbine]}',
- "xaixs": "功率(kW)",
- "yaixs": "风能利用系数",
- "contract_Cp_curve_xData": dataFrameOfContractPowerCurve[Field_PowerFloor].tolist(),
- "contract_Cp_curve_yData": dataFrameOfContractPowerCurve[Field_Cp].tolist(),
- "data": turbine_data_list_each
- }
- # 将JSON对象保存到文件
- output_json_path_each = os.path.join(outputAnalysisDir,
- f"{currTurbineInfo[Field_NameOfTurbine]}.json")
- with open(output_json_path_each, 'w', encoding='utf-8') as f:
- import json
- json.dump(json_output, f, ensure_ascii=False, indent=4)
- # 保存图像
- pngFileName = f"{currTurbineInfo[Field_NameOfTurbine]}.png"
- pngFilePath = os.path.join(outputAnalysisDir, pngFileName)
- fig.write_image(pngFilePath, scale=3)
- # 保存HTML
- # htmlFileName = f"{currTurbineInfo[Field_NameOfTurbine]}.html"
- # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName)
- # fig.write_html(htmlFilePath)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: turbineCode,
- Field_Return_FilePath: pngFilePath,
- Field_Return_IsSaveDatabase: False
- })
- # 如果需要返回DataFrame,可以包含文件路径
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: turbineCode,
- Field_Return_FilePath: output_json_path_each,
- Field_Return_IsSaveDatabase: True
- })
- # result_rows.append({
- # Field_Return_TypeAnalyst: self.typeAnalyst(),
- # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- # Field_CodeOfTurbine: turbineCode,
- # Field_Return_FilePath: htmlFilePath,
- # Field_Return_IsSaveDatabase: True
- # })
- result_df = pd.DataFrame(result_rows)
- return result_df
|