import os import pandas as pd import math import numpy as np from plotly.subplots import make_subplots import plotly.express as px import pandas as pd import plotly.graph_objects as go import seaborn as sns from matplotlib.ticker import MultipleLocator from behavior.analystWithGoodPoint import AnalystWithGoodPoint from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * from algorithmContract.contract import Contract class TSRCpPowerScatterAnalyst(AnalystWithGoodPoint): """ 风电机组叶尖速比-Cp-功率散点分析 """ def typeAnalyst(self): return "tsr_cp_power_scatter" def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes,conf,[Field_DeviceCode,Field_Time,Field_WindSpeed,Field_ActiverPower,Field_RotorSpeed,Field_GeneratorSpeed]) turbineInfos = self.common.getTurbineInfos(conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) dataFrameMerge = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self) # return self.plot_tsr_distribution(self.tsr(dataFrameMerge), outputAnalysisDir, conf) dataFrameMerge[Field_PowerFarmName] = self.currPowerFarmInfo.loc[Field_PowerFarmName] # Ensure the necessary columns are of float type dataFrameMerge[Field_WindSpeed] = dataFrameMerge[Field_WindSpeed].astype(float) dataFrameMerge[Field_RotorSpeed] = dataFrameMerge[Field_RotorSpeed].astype(float) dataFrameMerge[Field_Cp] = dataFrameMerge[Field_Cp].astype(float) dataFrameMerge[Field_ActiverPower] = dataFrameMerge[Field_ActiverPower].astype(float) dataFrameMerge[Field_GeneratorSpeed] = dataFrameMerge[Field_GeneratorSpeed].astype(float) max_cutin = self.turbineModelInfo[Field_CutInWS].max() min_rated = self.turbineModelInfo[Field_RatedWindSpeed].min() dataFrameMerge = dataFrameMerge[(dataFrameMerge[Field_WindSpeed] > max_cutin) & (dataFrameMerge[Field_WindSpeed] < min_rated)] # Group by 'power_floor' and calculate median, max, and min of TSR dataFrameMerge[Field_TSRModified] = dataFrameMerge[Field_TSR] / (dataFrameMerge[Field_Cp] ** (1/3)) return self.plot_tsrcp_distribution(dataFrameMerge, turbineInfos, outputAnalysisDir, conf) def plot_tsrcp_distribution(self, dataFrameMerge: pd.DataFrame, turbineModelInfo: pd.Series, outputAnalysisDir, conf: Contract, encoding=charset_unify): """ Generates tsr distribution plots for turbines in a wind farm. Parameters: - outputAnalysisDir: str, path to the directory containing input CSV files. - farm_name: str, name of the wind farm. - encoding: str, encoding of the input CSV files. Defaults to 'utf-8'. """ x_name = Field_ActiverPower y_name = Field_TSRModified upLimitOfTSR = 20 result_rows = [] # 绘制每个设备的TSR分布图 for name, group in dataFrameMerge.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]): fig = px.scatter(group,x=x_name, y=y_name) fig.update_layout( title={"text": '机组: {}'.format(name[0])}, xaxis=dict( title='功率', dtick=200, tickangle=-45, range=[0, 1800]), yaxis=dict( title= '叶尖速比/风能利用系数分析^(1/3)', # r"$\frac{TSR}{Cp^{1/3}}$" # 仅在.png格式下正确显示 dtick=self.axisStepTSR, range=[self.axisLowerLimitTSR, self.axisUpperLimitTSR] ) ) fig.update_traces(marker=dict(size=3)) fig.update_xaxes(tickangle=-45) # 确保从 Series 中提取的是具体的值 engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "机组叶尖速比-Cp-功率散点分析", "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "xaixs": "功率(kW)", "yaixs": "叶尖速比/风能利用系数分析^(1/3)", "data": [{ "engineName": name[0], # Field_NameOfTurbine "engineCode": name[1], # Field_CodeOfTurbine "title": f' 机组: {format(name[0])}', "xData": group[x_name].tolist(), "yData": group[y_name].tolist(), }] } # 保存图像 # pngFileName = f"{name[0]}.png" # pngFilePath = os.path.join(outputAnalysisDir, pngFileName) # fig.write_image(pngFilePath, scale=3) # 将JSON对象保存到文件 output_json_path = os.path.join(outputAnalysisDir, f"{name[0]}.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 保存HTML # htmlFileName = f"{name[0]}.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: name[1], # Field_Return_FilePath: pngFilePath, # Field_Return_IsSaveDatabase: False # }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: name[1], # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: True # }) result_df = pd.DataFrame(result_rows) return result_df