import os import pandas as pd import plotly.express as px import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodBadLimitPoint import AnalystWithGoodBadLimitPoint from plotly.subplots import make_subplots from utils.directoryUtil import DirectoryUtil as dir class Generator: def __init__(self) -> None: self.fieldTemperatorOfDEBearing = None self.fieldTemperatorOfNDEBearing = None TemperatureColumns = {Field_MainBearTemp: "主轴承温度", Field_GbMsBearTemp: "齿轮箱中速轴温度", Field_GbLsBearTemp: "齿轮箱低速轴温度", Field_GbHsBearTemp: "齿轮箱高速轴温度", Field_GeneratorDE: "发电机驱动端轴承温度", Field_GeneratorNDE: "发电机非驱动端轴承温度", Field_GenWiTemp1: "发电机绕组温度"} GeneratorTemperatureAnslysisColumns = [ Field_GeneratorDE, Field_GeneratorNDE, Field_NacTemp] class TemperatureLargeComponentsAnalyst(AnalystWithGoodBadLimitPoint): """ 风电机组大部件温升分析 """ def typeAnalyst(self): return "temperature_large_components" def getUseColumns(self, dataFrame: pd.DataFrame, temperatureColumns: list[dict]): # 获取非全为空的列名 non_empty_cols = self.getNoneEmptyFields(dataFrame, temperatureColumns) useCols = [] # useCols.append(Field_Time) useCols.append(Field_ActiverPower) if not self.common.isNone(Field_EnvTemp) and Field_EnvTemp in dataFrame.columns: useCols.append(Field_EnvTemp) if not self.common.isNone(Field_NacTemp) and Field_NacTemp in dataFrame.columns: useCols.append(Field_NacTemp) useCols.extend(non_empty_cols) return useCols def getNoneEmptyFields(self, dataFrame: pd.DataFrame, temperatureColumns: dict) -> list: # 使用set和列表推导式来获取在DataFrame中存在的字段 existing_fields = [ key for key in TemperatureColumns.keys() if key in dataFrame.columns ] # 检查指定列中非全为空的列 non_empty_columns = dataFrame[existing_fields].apply( lambda x: x.notnull().any(), axis=0) # 获取非全为空的列名 noneEmptyFields = non_empty_columns[non_empty_columns].index.tolist() return noneEmptyFields def dataReprocess(self, dataFrame: pd.DataFrame, non_empty_cols: list): # Initialize an empty df for aggregation agg_dict = {col: 'median' for col in non_empty_cols} # Group by 'power_floor' and aggregate grouped = dataFrame.groupby([Field_PowerFloor, Field_CodeOfTurbine, Field_NameOfTurbine]).agg(agg_dict).reset_index() # Sort by 'power_floor' grouped.sort_values( [Field_PowerFloor, Field_CodeOfTurbine, Field_NameOfTurbine], inplace=True) return grouped def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): select=[Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower] select=select+[Field_EnvTemp,Field_NacTemp]+list(TemperatureColumns.keys()) dictionary = self.processTurbineData(turbineCodes, conf,select ) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) groupedOfTurbineModel = turbrineInfos.groupby(Field_MillTypeCode) returnDatas = [] for turbineModelCode, group in groupedOfTurbineModel: currTurbineCodes = group[Field_CodeOfTurbine].unique().tolist() currTurbineModeInfo = self.common.getTurbineModelByCode( turbineModelCode, self.turbineModelInfo) currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin( currTurbineCodes)] # 将 currTurbineInfos 转换为字典 currTurbineInfos_dict = turbrineInfos.set_index(Field_CodeOfTurbine)[Field_NameOfTurbine].to_dict() # 使用 map 函数来填充 Field_NameOfTurbine 列 currDataFrameOfTurbines[Field_NameOfTurbine] = currDataFrameOfTurbines[Field_CodeOfTurbine].map(currTurbineInfos_dict).fillna("") # 获取非全为空的列名 non_empty_cols = self.getUseColumns(currDataFrameOfTurbines, TemperatureColumns) dataFrame = self.dataReprocess(currDataFrameOfTurbines, non_empty_cols) # non_empty_cols.remove(Field_Time) non_empty_cols.remove(Field_ActiverPower) non_empty_cols.remove(Field_EnvTemp) non_empty_cols.remove(Field_NacTemp) returnData= self.drawTemperatureGraph(dataFrame, outputAnalysisDir, conf, non_empty_cols,currTurbineModeInfo) returnDatas.append(returnData) returnResult = pd.concat(returnDatas, ignore_index=True) return returnResult def drawTemperatureGraph(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract, temperatureCols: list, turbineModelInfo: pd.Series): """ 大部件温度传感器分析 """ y_name = '温度(℃)' outputDir = os.path.join(outputAnalysisDir, "GeneratorTemperature") dir.create_directory(outputDir) # 按设备名分组数据 grouped = dataFrameMerge.groupby(Field_CodeOfTurbine) result_rows = [] # Create output directories if they don't exist for column in temperatureCols: if not column in dataFrameMerge.columns: continue columnZH = TemperatureColumns.get(column) outputPath = os.path.join(outputAnalysisDir, column) dir.create_directory(outputPath) fig = go.Figure() # 获取 Plotly Express 中的颜色序列 colors = px.colors.sequential.Turbo # 创建一个列表来存储各个风电机组的数据 turbine_data_list = [] # Add traces for each turbine for idx, (name, group) in enumerate(grouped): currTurbineInfo_group = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, name, self.turbineInfo) fig.add_trace(go.Scatter( x=group[Field_PowerFloor], y=group[column], mode='lines', name=currTurbineInfo_group[Field_NameOfTurbine], # 从 'Rainbow' 色组中循环选择颜色 line=dict(color=colors[idx % len(colors)]) )) # 提取数据 turbine_data_total = { "engineName": currTurbineInfo_group[Field_NameOfTurbine], "engineCode": name, "xData": group[Field_PowerFloor].tolist(), "yData": group[column].tolist(), "color": colors[idx % len(colors)] } turbine_data_list.append(turbine_data_total) # Update layout and axes fig.update_layout( title={'text': f'{columnZH}分布-{turbineModelInfo[Field_MachineTypeCode]}', 'x': 0.5}, xaxis_title='功率', yaxis_title=y_name, xaxis=dict( range=[ self.axisLowerLimitActivePower, self.axisUpperLimitActivePower ], dtick=self.axisStepActivePower ), yaxis=dict( range=[0, 100], dtick=20 ), # legend_title_text='Turbine', legend=dict( orientation="h", # Horizontal orientation xanchor="center", # Anchor the legend to the center x=0.5, # Position legend at the center of the x-axis y=-0.2, # Position legend below the x-axis # itemsizing='constant', # Keep the size of the legend entries constant # # Set the width of the legend items (in pixels) # itemwidth=50 ) ) # Save the plot as a PNG/HTML file # filePathOfImage = os.path.join(outputPath, f"{columnZH}-{turbineModelInfo[Field_MillTypeCode]}.png") # fig.write_image(filePathOfImage, scale=3) # filePathOfHtml = os.path.join(outputPath, f"{columnZH}-{turbineModelInfo[Field_MillTypeCode]}.html") #fig.write_html(filePathOfHtml) engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "大部件温度传感器分析", "typecode": turbineModelInfo[Field_MillTypeCode], "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "title": f'{columnZH}分布-{turbineModelInfo[Field_MachineTypeCode]}', "xaixs": "功率(kW)", "yaixs": y_name, "data": turbine_data_list } # 将JSON对象保存到文件 output_json_path = os.path.join(outputPath,f"{column}_{turbineModelInfo[Field_MillTypeCode]}.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: Const_Output_Total, # Field_Return_FilePath: filePathOfImage, # Field_Return_IsSaveDatabase: False # }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: Const_Output_Total, # Field_Return_FilePath: filePathOfHtml, # Field_Return_IsSaveDatabase: True # }) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: 'total', Field_MillTypeCode: f'{column}_{turbineModelInfo[Field_MillTypeCode]}', Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) # Create individual plots with specific turbine highlighted # for name, group in grouped: for idx, (name, group) in enumerate(grouped): single_fig = go.Figure() currTurbineInfo_each = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, name, self.turbineInfo) # 创建一个列表来存储各个风电机组的数据 turbine_data_list_each = [] # Add all other turbines in grey first # for other_name, other_group in grouped: for idx, (other_name, other_group) in enumerate(grouped): if other_name != name: tempTurbineInfo = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, other_name, self.turbineInfo) single_fig.add_trace(go.Scatter( x=other_group[Field_PowerFloor], y=other_group[column], mode='lines', name=tempTurbineInfo[Field_NameOfTurbine], line=dict(color='lightgrey', width=1), showlegend=False )) # 提取数据 turbine_data_other_each = { "engineName": tempTurbineInfo[Field_NameOfTurbine], "engineCode": other_name, "xData": other_group[Field_PowerFloor].tolist(), "yData": other_group[column].tolist(), } turbine_data_list_each.append(turbine_data_other_each) # Add the turbine of interest in dark blue single_fig.add_trace(go.Scatter( x=group[Field_PowerFloor], y=group[column], mode='lines', # Make it slightly thicker for visibility line=dict(color='darkblue', width=2), showlegend=False # Disable legend for cleaner look )) turbine_data_curr = { "engineName": currTurbineInfo_each[Field_NameOfTurbine], "engineCode": currTurbineInfo_each[Field_CodeOfTurbine], "xData": group[Field_PowerFloor].tolist(), "yData": group[column].tolist(), } turbine_data_list_each.append(turbine_data_curr) # Update layout and axes for the individual plot single_fig.update_layout( # title={'text': f'Turbine: {name}', 'x': 0.5}, title={'text': f'{columnZH}分布: {currTurbineInfo_each[Field_NameOfTurbine]}', 'x': 0.5}, xaxis_title='功率', yaxis_title=y_name, xaxis=dict( range=[0, Field_RatedPower], dtick=200 ), yaxis=dict( range=[0, 100], dtick=20 ) ) engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "大部件温度传感器分析", "typecode": turbineModelInfo[Field_MillTypeCode], "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "title": f'{columnZH}分布: {currTurbineInfo_each[Field_NameOfTurbine]}', "xaixs": "功率(kW)", "yaixs": y_name, "data": turbine_data_list_each } # 将JSON对象保存到文件 output_json_path_each = os.path.join(outputPath, f"{currTurbineInfo_each[Field_NameOfTurbine]}.json") with open(output_json_path_each, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # filePathOfImage = os.path.join( # outputPath, f"{currTurbineInfo_each[Field_NameOfTurbine]}.png") # single_fig.write_image(filePathOfImage, scale=3) # filePathOfHtml = os.path.join( # outputPath, f"{name}.html") # single_fig.write_html(filePathOfHtml) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: group[Field_CodeOfTurbine].iloc[0], # Field_Return_FilePath: filePathOfImage, # Field_Return_IsSaveDatabase: False # }) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: group[Field_CodeOfTurbine].iloc[0], Field_Return_FilePath: output_json_path_each, Field_Return_IsSaveDatabase: True }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: group[Field_CodeOfTurbine].iloc[0], # Field_Return_FilePath: filePathOfHtml, # Field_Return_IsSaveDatabase: True # }) for idx, (name, group) in enumerate(grouped): # 绘制每台机组发电机的,驱动轴承温度、非驱动轴承温度、发电机轴承温度BIAS、发电机轴承温度和机舱温度BIAS 均与有功功率的折线图 if not Field_GeneratorDE in group.columns or not Field_GeneratorNDE in group.columns or group[Field_GeneratorDE].isna().all() or group[Field_GeneratorNDE].isna().all(): self.logger.warning(f"{name} 不具备发电机温度测点,或测点值全为空") continue refFieldTemperature = Field_EnvTemp if group[Field_NacTemp].isna( ).all() else Field_NacTemp result_rows1 = self.drawGeneratorTemperature( group, conf, Field_GeneratorDE, Field_GeneratorNDE, refFieldTemperature, Field_PowerFloor, name, outputDir) result_rows.extend(result_rows1) result_df = pd.DataFrame(result_rows) return result_df def drawGeneratorTemperature(self, dataFrame: pd.DataFrame, conf: Contract, yAxisDE, yAxisNDE, diffTemperature, xAxis, turbineCode, outputDir): tempTurbineInfo1 = self.common.getTurbineInfo( conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo) turbineName = tempTurbineInfo1[Field_NameOfTurbine] # 发电机驱动轴承温度 和 发电机非驱动轴承 温差 fieldBIAS_DE_NDE = 'BIAS_DE-NDE' fieldBIAS_DE = 'BIAS_DE' fieldBIAS_NDE = 'BIAS_NDE' # Prepare Data dataFrame[fieldBIAS_DE] = dataFrame[yAxisDE] - \ dataFrame[diffTemperature] dataFrame[fieldBIAS_NDE] = dataFrame[yAxisNDE] - \ dataFrame[diffTemperature] dataFrame[fieldBIAS_DE_NDE] = dataFrame[yAxisDE] - dataFrame[yAxisNDE] # Create a plot with dual y-axes fig = make_subplots(specs=[[{"secondary_y": True}]]) plot_data_list_each = [] # Plot DE Bearing Temperature fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[yAxisDE], name='驱动端轴承温度', line=dict( color='blue')), secondary_y=False) plot_data_curr = { "Name": '驱动端轴承温度', "xData": dataFrame[xAxis].tolist(), "yData": dataFrame[yAxisDE].tolist(), "color": 'blue', } plot_data_list_each.append(plot_data_curr) # Plot NDE Bearing Temperature fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[yAxisNDE], name='非驱动端轴承温度', line=dict( color='green')), secondary_y=False) plot_data_curr = { "Name": '非驱动端轴承温度', "xData": dataFrame[xAxis].tolist(), "yData": dataFrame[yAxisNDE].tolist(), "color": 'green', } plot_data_list_each.append(plot_data_curr) # Plot Temperature Differences fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[fieldBIAS_DE], name='驱动端轴承温度与机舱温度偏差', line=dict( color='blue', dash='dot')), secondary_y=False) plot_data_curr = { "Name": '驱动端轴承温度与机舱温度偏差', "xData": dataFrame[xAxis].tolist(), "yData": dataFrame[fieldBIAS_DE].tolist(), "color": 'blue', } plot_data_list_each.append(plot_data_curr) fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[fieldBIAS_NDE], name='非驱动端轴承温度与机舱温度偏差', line=dict( color='green', dash='dot')), secondary_y=False) plot_data_curr = { "Name": '非驱动端轴承温度与机舱温度偏差', "xData": dataFrame[xAxis].tolist(), "yData": dataFrame[fieldBIAS_NDE].tolist(), "color": 'green', } plot_data_list_each.append(plot_data_curr) fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[fieldBIAS_DE_NDE], name='驱动端轴承与非驱动端轴承温度偏差', line=dict(color='black', dash='dash')), secondary_y=False) plot_data_curr = { "Name": '驱动端轴承与非驱动端轴承温度偏差', "xData": dataFrame[xAxis].tolist(), "yData": dataFrame[fieldBIAS_DE_NDE].tolist(), "color": 'black', } plot_data_list_each.append(plot_data_curr) # Plot Nacelle Temperature fig.add_trace(go.Scatter(x=dataFrame[xAxis], y=dataFrame[diffTemperature], name='机舱温度', line=dict(color='orange')), secondary_y=False) plot_data_curr = { "Name": '机舱温度', "xData": dataFrame[xAxis].tolist(), "yData": dataFrame[diffTemperature].tolist(), "color": 'orange', } plot_data_list_each.append(plot_data_curr) # Add horizontal reference lines fig.add_hline(y=5, line_dash="dot", line_color="#FFDB58") fig.add_hline(y=-5, line_dash="dot", line_color="#FFDB58") fig.add_hline(y=15, line_dash="dot", line_color="red") fig.add_hline(y=-15, line_dash="dot", line_color="red") # Update layout fig.update_layout( title={'text': f'发电机温度偏差: {turbineName}'}, xaxis_title="功率", yaxis_title='轴承温度 & 偏差', legend_title='温度 & 偏差', legend=dict(x=1.1, y=0.5, bgcolor='rgba(255, 255, 255, 0.5)'), margin=dict(r=200) # Adjust margin to fit legend ) fig.update_yaxes(range=[-20, 100], secondary_y=False) # 构建最终的JSON对象 json_output = { "analysisTypeCode": "发电机温度传感器分析", "turbineName": tempTurbineInfo1[Field_NameOfTurbine], "turbineCode": tempTurbineInfo1[Field_CodeOfTurbine], "title": f'发电机温度偏差: {turbineName}', "xaixs": "功率(kW)", "yaixs": '轴承温度 & 偏差(℃)', "data": plot_data_list_each } # 将JSON对象保存到文件 output_json_path_each = os.path.join(outputDir, f"{turbineName}.json") with open(output_json_path_each, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # Save the plot as a PNG/HTML file # filePathOfImage = os.path.join(outputDir, f"{turbineName}.png") # fig.write_image(filePathOfImage, width=800, height=600, scale=3) # filePathOfHtml = os.path.join(outputDir, f"{turbineName}.html") # fig.write_html(filePathOfHtml) result_rows1 = [] # result_rows1.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: dataFrame[Field_CodeOfTurbine].iloc[0], # Field_Return_FilePath: filePathOfImage, # Field_Return_IsSaveDatabase: False # }) # 如果需要返回DataFrame,可以包含文件路径 result_rows1.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: dataFrame[Field_CodeOfTurbine].iloc[0], Field_Return_FilePath: output_json_path_each, Field_Return_IsSaveDatabase: True }) # result_rows1.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: dataFrame[Field_CodeOfTurbine].iloc[0], # Field_Return_FilePath: filePathOfHtml, # Field_Return_IsSaveDatabase: True # }) return result_rows1