import os import pandas as pd import plotly.express as px import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodBadLimitPoint import AnalystWithGoodBadLimitPoint from plotly.subplots import make_subplots from utils.directoryUtil import DirectoryUtil as dir class Generator: def __init__(self) -> None: self.fieldTemperatorOfDEBearing = None self.fieldTemperatorOfNDEBearing = None TemperatureColumns = {Field_MainBearTemp: "主轴承温度", Field_GbMsBearTemp: "齿轮箱中速轴温度", Field_GbLsBearTemp: "齿轮箱低速轴温度", Field_GbHsBearTemp: "齿轮箱高速轴温度", Field_GeneratorDE: "发电机驱动端轴承温度", Field_GeneratorNDE: "发电机非驱动端轴承温度", Field_GenWiTemp1: "发电机绕组温度"} GeneratorTemperatureAnslysisColumns = [ Field_GeneratorDE, Field_GeneratorNDE, Field_NacTemp] class TemperatureLargeComponentsAnalyst(AnalystWithGoodBadLimitPoint): """ 风电机组大部件温升分析 """ def typeAnalyst(self): return "temperature_large_components" def getUseColumns(self, dataFrame: pd.DataFrame, temperatureColumns: list[dict]): # 获取非全为空的列名 non_empty_cols = self.getNoneEmptyFields(dataFrame, temperatureColumns) useCols = [] # useCols.append(Field_Time) useCols.append(Field_ActiverPower) if not self.common.isNone(Field_EnvTemp) and Field_EnvTemp in dataFrame.columns: useCols.append(Field_EnvTemp) if not self.common.isNone(Field_NacTemp) and Field_NacTemp in dataFrame.columns: useCols.append(Field_NacTemp) useCols.extend(non_empty_cols) return useCols def getNoneEmptyFields(self, dataFrame: pd.DataFrame, temperatureColumns: dict) -> list: # 使用set和列表推导式来获取在DataFrame中存在的字段 existing_fields = [ key for key in TemperatureColumns.keys() if key in dataFrame.columns ] # 检查指定列中非全为空的列 non_empty_columns = dataFrame[existing_fields].apply( lambda x: x.notnull().any(), axis=0) # 获取非全为空的列名 noneEmptyFields = non_empty_columns[non_empty_columns].index.tolist() return noneEmptyFields def dataReprocess(self, dataFrame: pd.DataFrame, non_empty_cols: list): # Initialize an empty df for aggregation agg_dict = {col: 'median' for col in non_empty_cols} # Group by 'power_floor' and aggregate grouped = dataFrame.groupby([Field_PowerFloor, Field_CodeOfTurbine, Field_NameOfTurbine]).agg(agg_dict).reset_index() # Sort by 'power_floor' grouped.sort_values( [Field_PowerFloor, Field_CodeOfTurbine, Field_NameOfTurbine], inplace=True) return grouped def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): select=[Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower] select=select+[Field_EnvTemp,Field_NacTemp]+list(TemperatureColumns.keys()) dictionary = self.processTurbineData(turbineCodes, conf,select ) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode) returnDatas = [] for turbineModelCode, group in groupedOfTurbineModel: currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist() currTurbineModeInfo = self.common.getTurbineModelByCode( turbineModelCode, self.turbineModelInfo) currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin( currTurbineCodes)] # 将 currTurbineInfos 转换为字典 currTurbineInfos_dict = turbrineInfos.set_index(Field_CodeOfTurbine)[Field_NameOfTurbine].to_dict() # 使用 map 函数来填充 Field_NameOfTurbine 列 currDataFrameOfTurbines[Field_NameOfTurbine] = currDataFrameOfTurbines[Field_CodeOfTurbine].map(currTurbineInfos_dict).fillna("") # 获取非全为空的列名 non_empty_cols = self.getUseColumns(currDataFrameOfTurbines, TemperatureColumns) dataFrame = self.dataReprocess(currDataFrameOfTurbines, non_empty_cols) # non_empty_cols.remove(Field_Time) non_empty_cols.remove(Field_ActiverPower) non_empty_cols.remove(Field_EnvTemp) non_empty_cols.remove(Field_NacTemp) returnData= self.drawTemperatureGraph(dataFrame, outputAnalysisDir, conf, non_empty_cols,currTurbineModeInfo) returnDatas.append(returnData) returnResult = pd.concat(returnDatas, ignore_index=True) return returnResult def drawTemperatureGraph(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract, temperatureCols: list, turbineModelInfo: pd.Series): """ 大部件温度传感器分析 """ y_name = '温度' outputDir = os.path.join(outputAnalysisDir, "GeneratorTemperature") dir.create_directory(outputDir) # 按设备名分组数据 grouped = dataFrameMerge.groupby(Field_NameOfTurbine) result_rows = [] # Create output directories if they don't exist for column in temperatureCols: if not column in dataFrameMerge.columns: continue columnZH = TemperatureColumns.get(column) outputPath = os.path.join(outputAnalysisDir, column) dir.create_directory(outputPath) fig = go.Figure() # 获取 Plotly Express 中的颜色序列 colors = px.colors.sequential.Turbo # Add traces for each turbine for idx, (name, group) in enumerate(grouped): fig.add_trace(go.Scatter( x=group[Field_PowerFloor], y=group[column], mode='lines', name=name, # 从 'Rainbow' 色组中循环选择颜色 line=dict(color=colors[idx % len(colors)]) )) # Update layout and axes fig.update_layout( title={'text': f'{columnZH}分布-{turbineModelInfo[Field_MachineTypeCode]}', 'x': 0.5}, xaxis_title='功率', yaxis_title=y_name, xaxis=dict( range=[ self.axisLowerLimitActivePower, self.axisUpperLimitActivePower ], dtick=self.axisStepActivePower ), yaxis=dict( range=[0, 100], dtick=20 ), # legend_title_text='Turbine', legend=dict( orientation="h", # Horizontal orientation xanchor="center", # Anchor the legend to the center x=0.5, # Position legend at the center of the x-axis y=-0.2, # Position legend below the x-axis # itemsizing='constant', # Keep the size of the legend entries constant # # Set the width of the legend items (in pixels) # itemwidth=50 ) ) # Save the plot as a PNG/HTML file filePathOfImage = os.path.join(outputPath, f"{columnZH}-{turbineModelInfo[Field_MillTypeCode]}.png") fig.write_image(filePathOfImage, scale=3) filePathOfHtml = os.path.join(outputPath, f"{columnZH}-{turbineModelInfo[Field_MillTypeCode]}.html") fig.write_html(filePathOfHtml) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: Const_Output_Total, Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: Const_Output_Total, Field_Return_FilePath: filePathOfHtml, Field_Return_IsSaveDatabase: True }) # Create individual plots with specific turbine highlighted # for name, group in grouped: for idx, (name, group) in enumerate(grouped): single_fig = go.Figure() # Add all other turbines in grey first # for other_name, other_group in grouped: for idx, (other_name, other_group) in enumerate(grouped): if other_name != name: single_fig.add_trace(go.Scatter( x=other_group[Field_PowerFloor], y=other_group[column], mode='lines', name=other_name, line=dict(color='lightgrey', width=1), showlegend=False )) # Add the turbine of interest in dark blue single_fig.add_trace(go.Scatter( x=group[Field_PowerFloor], y=group[column], mode='lines', # Make it slightly thicker for visibility line=dict(color='darkblue', width=2), showlegend=False # Disable legend for cleaner look )) # Update layout and axes for the individual plot single_fig.update_layout( # title={'text': f'Turbine: {name}', 'x': 0.5}, title={'text': f'{columnZH}分布: {name}', 'x': 0.5}, xaxis_title='功率', yaxis_title=y_name, xaxis=dict( range=[0, Field_RatedPower], dtick=200 ), yaxis=dict( range=[0, 100], dtick=20 ) ) filePathOfImage = os.path.join( outputPath, f"{name}.png") single_fig.write_image(filePathOfImage, scale=3) filePathOfHtml = os.path.join( outputPath, f"{name}.html") single_fig.write_html(filePathOfHtml) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: group[Field_CodeOfTurbine].iloc[0], Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: group[Field_CodeOfTurbine].iloc[0], Field_Return_FilePath: filePathOfHtml, Field_Return_IsSaveDatabase: True }) for idx, (name, group) in enumerate(grouped): # 绘制每台机组发电机的,驱动轴承温度、非驱动轴承温度、发电机轴承温度BIAS、发电机轴承温度和机舱温度BIAS 均与有功功率的折线图 if not Field_GeneratorDE in group.columns or not Field_GeneratorNDE in group.columns or group[Field_GeneratorDE].isna().all() or group[Field_GeneratorNDE].isna().all(): self.logger.warning(f"{name} 不具备发电机温度测点,或测点值全为空") continue refFieldTemperature = Field_EnvTemp if group[Field_NacTemp].isna( ).all() else Field_NacTemp result_rows1 = self.drawGeneratorTemperature( group, conf, Field_GeneratorDE, Field_GeneratorNDE, refFieldTemperature, Field_PowerFloor, name, outputDir) result_rows.extend(result_rows1) result_df = pd.DataFrame(result_rows) return result_df def drawGeneratorTemperature(self, dataFrame: pd.DataFrame, conf: Contract, yAxisDE, yAxisNDE, diffTemperature, xAxis, turbineName, outputDir): # 发电机驱动轴承温度 和 发电机非驱动轴承 温差 fieldBIAS_DE_NDE = 'BIAS_DE-NDE' fieldBIAS_DE = 'BIAS_DE' fieldBIAS_NDE = 'BIAS_NDE' # Prepare Data dataFrame[fieldBIAS_DE] = dataFrame[yAxisDE] - \ dataFrame[diffTemperature] dataFrame[fieldBIAS_NDE] = dataFrame[yAxisNDE] - \ dataFrame[diffTemperature] dataFrame[fieldBIAS_DE_NDE] = dataFrame[yAxisDE] - dataFrame[yAxisNDE] # Create a plot with dual y-axes fig = make_subplots(specs=[[{"secondary_y": True}]]) # Plot DE Bearing Temperature fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[yAxisDE], name='驱动端轴承温度', line=dict( color='blue')), secondary_y=False) # Plot NDE Bearing Temperature fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[yAxisNDE], name='非驱动端轴承温度', line=dict( color='green')), secondary_y=False) # Plot Temperature Differences fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[fieldBIAS_DE], name='驱动端轴承温度与机舱温度偏差', line=dict( color='blue', dash='dot')), secondary_y=False) fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[fieldBIAS_NDE], name='非驱动端轴承温度与机舱温度偏差', line=dict( color='green', dash='dot')), secondary_y=False) fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[fieldBIAS_DE_NDE], name='驱动端轴承与非驱动端轴承温度偏差', line=dict(color='black', dash='dash')), secondary_y=False) # Plot Nacelle Temperature fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[diffTemperature], name='机舱温度', line=dict(color='orange')), secondary_y=False) # Add horizontal reference lines fig.add_hline(y=5, line_dash="dot", line_color="#FFDB58") fig.add_hline(y=-5, line_dash="dot", line_color="#FFDB58") fig.add_hline(y=15, line_dash="dot", line_color="red") fig.add_hline(y=-15, line_dash="dot", line_color="red") # Update layout fig.update_layout( title={'text': f'发电机温度偏差: {turbineName}'}, xaxis_title="功率", yaxis_title='轴承温度 & 偏差', legend_title='温度 & 偏差', legend=dict(x=1.1, y=0.5, bgcolor='rgba(255, 255, 255, 0.5)'), margin=dict(r=200) # Adjust margin to fit legend ) fig.update_yaxes(range=[-20, 100], secondary_y=False) # Save the plot as a PNG/HTML file filePathOfImage = os.path.join(outputDir, f"{turbineName}.png") fig.write_image(filePathOfImage, width=800, height=600, scale=3) filePathOfHtml = os.path.join(outputDir, f"{turbineName}.html") fig.write_html(filePathOfHtml) result_rows1 = [] result_rows1.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: dataFrame[Field_CodeOfTurbine].iloc[0], Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) result_rows1.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: dataFrame[Field_CodeOfTurbine].iloc[0], Field_Return_FilePath: filePathOfHtml, Field_Return_IsSaveDatabase: True }) return result_rows1