import os import pandas as pd from matplotlib.ticker import MultipleLocator import numpy as np import pandas as pd import plotly.graph_objects as go import matplotlib.pyplot as plt import matplotlib.ticker as ticker import seaborn as sns from behavior.analyst import Analyst from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * class Generator: def __init__(self) -> None: self.fieldTemperatorOfDEBearing = None self.fieldTemperatorOfNDEBearing = None class TemperatureLargeComponentsAnalyst(Analyst): """ 风电机组大部件温升分析 """ fieldPowerFloor = 'power_floor' def typeAnalyst(self): return "temperature_large_components" def getUseColumns(self, dataFrame: pd.DataFrame, confData: ConfBusiness): # Convert the string list of temperature columns into a list temperature_cols = self.getLargeComponentTemperatureColumns(confData) # 获取非全为空的列名 non_empty_cols = self.getNoneEmptyFields(dataFrame, temperature_cols) useCols = [] useCols.append(confData.field_turbine_time) useCols.append(confData.field_power) if not self.common.isNone(confData.field_env_temp) and confData.field_env_temp in dataFrame.columns: useCols.append(confData.field_env_temp) if not self.common.isNone(confData.field_nacelle_temp) and confData.field_nacelle_temp in dataFrame.columns: useCols.append(confData.field_nacelle_temp) useCols.extend(non_empty_cols) return useCols def getLargeComponentTemperatureColumns(self, confData: ConfBusiness): if self.common.isNone(confData.field_temperature_large_components): return [] temperature_cols = confData.field_temperature_large_components.split( ',') return temperature_cols # def filterCustomForTurbine(self, dataFrame: pd.DataFrame, confData: ConfBusiness): # useCols = self.getUseColumns(dataFrame,confData) # # 清洗数据 # dataFrameCustome = dataFrame[useCols].copy() # dataFrameCustome = dataFrameCustome.dropna(axis=1, how='all') # dataFrameCustome = dataFrameCustome.dropna(axis=0, subset=useCols) # return dataFrameCustome # def turbineAnalysis(self, # dataFrame, # outputAnalysisDir, # outputFilePath, # confData: ConfBusiness, # turbineName): # self.temp_power(dataFrame, outputFilePath, confData) def getNoneEmptyFields(self, dataFrame, temperatureFields): # 使用set和列表推导式来获取在DataFrame中存在的字段 existing_fields = [ field for field in temperatureFields if field in dataFrame.columns] # 检查指定列中非全为空的列 non_empty_columns = dataFrame[existing_fields].apply( lambda x: x.notnull().any(), axis=0) # 获取非全为空的列名 noneEmptyFields = non_empty_columns[non_empty_columns].index.tolist() return noneEmptyFields def dataReprocess(self, dataFrame: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): # 获取非全为空的列名 non_empty_cols = self.getUseColumns(dataFrame, confData) dataFrame = dataFrame.dropna(subset=non_empty_cols) # Calculate 'power_floor' dataFrame[self.fieldPowerFloor] = ( dataFrame[confData.field_power] / 10).astype(int) * 10 # Initialize an empty DataFrame for aggregation # agg_dict = {col: 'mean' for col in non_empty_cols} agg_dict = {col: 'median' for col in non_empty_cols} # Group by 'power_floor' and aggregate grouped = dataFrame.groupby( [self.fieldPowerFloor, Field_NameOfTurbine]).agg(agg_dict).reset_index() # Sort by 'power_floor' grouped.sort_values( [self.fieldPowerFloor, Field_NameOfTurbine], inplace=True) return grouped def turbinesAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): # self.plot_temperature_distribution(dataFrameMerge, # outputAnalysisDir, confData, confData.field_temperature_large_components) dataFrame = self.dataReprocess( dataFrameMerge, outputAnalysisDir, confData) self.drawTemperatureGraph(dataFrame, outputAnalysisDir, confData) def drawTemperatureGraph(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness): """ 大部件温度传感器分析 """ y_name = 'temperature' outputDir = os.path.join(outputAnalysisDir, "GeneratorTemperature") dir.create_directory(outputDir) columns = confData.field_temperature_large_components.split(',') # 按设备名分组数据 grouped = dataFrameMerge.groupby(Field_NameOfTurbine) # Create output directories if they don't exist for column in columns: if not column in dataFrameMerge.columns: continue sns.set_palette('deep') outputPath = os.path.join(outputAnalysisDir, column) dir.create_directory(outputPath) # 绘制当前温度测点的,所有机组折线图 fig, ax = plt.subplots() ax = sns.lineplot(x=self.fieldPowerFloor, y=column, data=dataFrameMerge, hue=Field_NameOfTurbine) # 创建每100个单位一个刻度的定位器 yloc = MultipleLocator(confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250) ax.xaxis.set_major_locator(yloc) # 将定位器应用到y轴上 ax.set_xlim(confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2) ax.yaxis.set_major_locator(ticker.MultipleLocator(20)) ax.set_ylim(0, 100) ax.set_xlabel(self.fieldPowerFloor) ax.set_ylabel(y_name) ax.set_title('Temperature-Distribute') # 获取线对象的句柄和标签 lines, labels = ax.get_legend_handles_labels() # 排序标签(例如,按照字母顺序) sorted_labels = sorted(labels) sorted_lines = [l for l, lbl in zip( lines, labels) if lbl in sorted_labels] # 创建新的图例 ax.legend(sorted_lines, sorted_labels, bbox_to_anchor=( 1.02, 0.5), loc='center left', ncol=2, borderaxespad=0.) # plt.legend(bbox_to_anchor=(1.02, 0.5), # loc='center left', ncol=2, borderaxespad=0.) plt.savefig(os.path.join(outputPath, "{}.png".format( column)), bbox_inches='tight', dpi=120) plt.close() for name, group in grouped: # Write to CSV # csvFileOfTurbine=os.path.join(outputAnalysisDir,f'{name}{CSVSuffix}') # group.to_csv(csvFileOfTurbine,index=False) color = ["lightgrey"] * \ len(dataFrameMerge[Field_NameOfTurbine].unique()) fig, ax = plt.subplots() ax = sns.lineplot(x=self.fieldPowerFloor, y=column, data=dataFrameMerge, hue=Field_NameOfTurbine, palette=sns.set_palette(color), legend=False) ax = sns.lineplot(x=self.fieldPowerFloor, y=column, data=group, color='darkblue', legend=False) ax.set_title('turbine_name={}'.format(name)) ax.yaxis.set_major_locator(ticker.MultipleLocator(20)) ax.set_ylim(0, 100) ax.set_ylabel(y_name) ax.xaxis.set_major_locator(ticker.MultipleLocator(confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250)) ax.set_xlim(confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2) ax.set_xlabel(self.fieldPowerFloor) plt.savefig(os.path.join(outputPath, "{}.png".format( name)), bbox_inches='tight', dpi=120) plt.close() # 绘制每台机组发电机的,驱动轴承温度、非驱动轴承温度、发电机轴承温度BIAS、发电机轴承温度和机舱温度BIAS 均与有功功率的折线图 dictConf = self.getGeneratorTemperatureConf(confData) if not self.common.isNone(dictConf): self.drawGeneratorTemperature( group, confData, dictConf["yAxisDE"], dictConf["yAxisNDE"], dictConf["diffTemperature"], self.fieldPowerFloor, name, outputDir) def plot_temperature_distribution(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, confData: ConfBusiness, field_temperature_large_componts, encoding='utf-8'): """ Generates Cp distribution plots for turbines in a wind farm. Parameters: - csvFileDirOfCp: str, path to the directory containing input CSV files. - farm_name: str, name of the wind farm. - encoding: str, encoding of the input CSV files. Defaults to 'utf-8'. """ field_Name_Turbine = "turbine_name" x_name = 'power_floor' y_name = 'temperature' outputDir = os.path.join(outputAnalysisDir, "GeneratorTemperature") dir.create_directory(outputDir) sns.set_palette('deep') columns = field_temperature_large_componts.split(',') # Create output directories if they don't exist for column in columns: type_name = '{}'.format(column) output_path = os.path.join(outputAnalysisDir, type_name) os.makedirs(output_path, exist_ok=True) print("current column {}".format(column)) # Initialize DataFrame to store concatenated data res = pd.DataFrame() # Iterate over files in the input path for root, dir_names, file_names in dir.list_directory(outputAnalysisDir): for file_name in file_names: if not file_name.endswith(CSVSuffix): continue print(os.path.join(root, file_name)) frame = pd.read_csv(os.path.join( root, file_name), encoding=encoding) if column not in frame.columns: continue # 获取输出文件名(不含split_way之后的部分) turbineName = file_name.split(CSVSuffix)[0] # 添加设备名作为新列 frame[field_Name_Turbine] = confData.add_W_if_starts_with_digit( turbineName) dictConf = self.getGeneratorTemperatureConf(confData) if not self.common.isNone(dictConf): self.drawGeneratorTemperature( frame, dictConf["yAxisDE"], dictConf["yAxisNDE"], dictConf["diffTemperature"], x_name, turbineName, outputDir) res = pd.concat( [res, frame.loc[:, [field_Name_Turbine, x_name, column]]], axis=0) # Reset index and plot ress = res.reset_index() fig, ax2 = plt.subplots() ax2 = sns.lineplot(x=x_name, y=column, data=ress, hue=field_Name_Turbine) # ax2.set_xlim(-150, 2100) ax2.set_xlabel(x_name) ax2.set_ylabel(y_name) ax2.set_title('Temperature-Distribute') plt.legend(bbox_to_anchor=(1.02, 0.5), loc='center left', ncol=2, borderaxespad=0.) plt.savefig(os.path.join(output_path, "{}.png".format( column)), bbox_inches='tight', dpi=120) plt.close() # Plot individual device lines grouped = ress.groupby(field_Name_Turbine) for name, group in grouped: color = ["lightgrey"] * len(ress[field_Name_Turbine].unique()) fig, ax = plt.subplots() ax = sns.lineplot(x=x_name, y=column, data=ress, hue=field_Name_Turbine, palette=sns.set_palette(color), legend=False) ax = sns.lineplot(x=x_name, y=column, data=group, color='darkblue', legend=False) ax.set_xlabel(x_name) ax.set_ylabel(y_name) ax.set_title('turbine_name={}'.format(name)) # ax.set_xlim(-150, 2100) plt.savefig(os.path.join(output_path, "{}.png".format( name)), bbox_inches='tight', dpi=120) plt.close() def getGeneratorTemperatureConf(self, confData: ConfBusiness): if self.common.isNone(confData.temperature_Generator) or self.common.isNone(confData.temperature_Generator["yAxisDE"]) or self.common.isNone(confData.temperature_Generator["yAxisNDE"]): return None return confData.temperature_Generator def drawGeneratorTemperature(self, dataFrame: pd.DataFrame, confData: ConfBusiness, yAxisDE, yAxisNDE, diffTemperature, xAxis, turbineName, outputDir): # 发电机驱动轴承温度 和 发电机非驱动轴承 温差 fieldBIAS_DE_NDE = 'BIAS_DE-NDE' fieldBIAS_DE = 'BIAS_DE' fieldBIAS_NDE = 'BIAS_NDE' # 绘制双y轴折线图 fig, ax1 = plt.subplots() # 绘制低速轴承温度和高速轴承温度 ax1.plot(dataFrame[xAxis], dataFrame[yAxisDE], label='DEBearingTemperature', color='blue') # 计算低速轴承温度和高速轴承温度的差值 dataFrame[fieldBIAS_DE] = dataFrame[yAxisDE] - \ dataFrame[diffTemperature] # 绘制温度差值 ax1.plot(dataFrame[xAxis], dataFrame[fieldBIAS_DE], label=fieldBIAS_DE, color='blue', linestyle=':') ax1.plot(dataFrame[xAxis], dataFrame[yAxisNDE], label='NDEBearingTemperature', color='green') # 计算低速轴承温度和高速轴承温度的差值 dataFrame[fieldBIAS_NDE] = dataFrame[yAxisNDE] - \ dataFrame[diffTemperature] # 绘制温度差值 ax1.plot(dataFrame[xAxis], dataFrame[fieldBIAS_NDE], label=fieldBIAS_NDE, color='green', linestyle=':') ax1.plot(dataFrame[xAxis], dataFrame[diffTemperature], label='NacelleTemperature', color='orange') # # 创建第二个y轴 # ax2 = ax1.twinx() # 计算低速轴承温度和高速轴承温度的差值 dataFrame[fieldBIAS_DE_NDE] = dataFrame[yAxisDE] - dataFrame[yAxisNDE] # 绘制温度差值 ax1.plot(dataFrame[xAxis], dataFrame[fieldBIAS_DE_NDE], label=fieldBIAS_DE_NDE, color='black', linestyle='--') # 设置y2轴的上限和下限 # ax2.set_ylim(-5, 5) plt.axhline(y=5, ls=":", c="red") # 添加水平直线 plt.axhline(y=-5, ls=":", c="red") # 添加水平直线 # 第一个图例放在右上角 ax1.legend(title='Temperature & BIAS', bbox_to_anchor=( 1.3, 0.5), loc='center', borderaxespad=0.) # # 第二个图例放在第一个图例稍下的位置 # ax1.legend(title='BIAS', bbox_to_anchor=(1.5, 1), loc='upper right', borderaxespad=0.) # 设置x轴和y轴标签 ax1.set_xlabel("power") ax1.set_ylabel('Bearing Temperature & BIAS') # ax2.set_ylabel('Temperature BIAS') ax1.xaxis.set_major_locator(ticker.MultipleLocator(confData.graphSets["activePower"]["step"] if not self.common.isNone( confData.graphSets["activePower"]) and not self.common.isNone( confData.graphSets["activePower"]["step"]) else 250)) ax1.set_xlim(confData.graphSets["activePower"]["min"] if not self.common.isNone( confData.graphSets["activePower"]["min"]) else 0, confData.graphSets["activePower"]["max"] if not self.common.isNone(confData.graphSets["activePower"]["max"]) else confData.rated_power*1.2) ax1.yaxis.set_major_locator(ticker.MultipleLocator(confData.graphSets["generatorTemperature"]["step"] if not self.common.isNone( confData.graphSets["generatorTemperature"]) and not self.common.isNone( confData.graphSets["generatorTemperature"]["step"]) else 10)) ax1.set_ylim(confData.graphSets["generatorTemperature"]["min"] if not self.common.isNone( confData.graphSets["generatorTemperature"]["min"]) else -20, confData.graphSets["generatorTemperature"]["max"] if not self.common.isNone(confData.graphSets["generatorTemperature"]["max"]) else 100) plt.title(f'Generator Temperture BIAS={turbineName}') plt.savefig(os.path.join(outputDir, "{}.png".format( turbineName)), bbox_inches='tight', dpi=120)