import os import pandas as pd import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodPoint import AnalystWithGoodPoint class TSRTrendAnalyst(AnalystWithGoodPoint): """ 风电机组叶尖速比时序分析 """ def typeAnalyst(self): return "tsr_trend" def selectColumns(self): return [Field_DeviceCode,Field_Time,Field_WindSpeed,Field_ActiverPower,Field_RotorSpeed,Field_GeneratorSpeed] def processDateTime(self, dataFrame: pd.DataFrame, fieldTime:str = None): super().processDateTime(dataFrame,Field_YearMonthDay) def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes,conf,self.selectColumns()) dataFrameMerge = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self) turbineInfos = self.common.getTurbineInfos(conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) return self.drawTSRTrend(dataFrameMerge,turbineInfos, outputAnalysisDir, conf) def drawTSRTrend(self,dataFrameMerge:pd.DataFrame, turbineModelInfo: pd.Series,outputAnalysisDir, conf: Contract): # 检查所需列是否存在 required_columns = {Field_TSR, Field_YearMonthDay} if not required_columns.issubset(dataFrameMerge.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") # 按设备名分组数据 grouped = dataFrameMerge.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) result_rows = [] for name, group in grouped: # 计算四分位数和IQR Q1 = group[Field_TSR].quantile(0.25) Q3 = group[Field_TSR].quantile(0.75) IQR = Q3 - Q1 # 定义离群值的范围 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR # 筛选掉离群值 filtered_group = group[(group[Field_TSR] >= lower_bound) & (group[Field_TSR] <= upper_bound)] # 创建箱线图 fig = go.Figure() fig.add_trace(go.Box( x=filtered_group[Field_YearMonthDay], # 设置x轴数据为日期 y=filtered_group[Field_TSR], # 设置y轴数据为风能利用系数 # boxpoints='outliers', # 显示异常值(偏离值),不显示数据的所有点(只显示异常值) boxpoints=False, # 不显示偏离值 marker=dict(color='lightgoldenrodyellow', size=1), # 设置偏离值的颜色和大小 line=dict(color='lightgray', width=2), # 设置箱线和须线的颜色为灰色,粗细为2 fillcolor='rgba(200, 200, 200, 0.5)', # 设置箱体的填充颜色和透明度 name='叶尖速比' # 图例名称 )) # 对于每个箱线图的中位数,绘制一个蓝色点 medians = filtered_group.groupby(filtered_group[Field_YearMonthDay])[Field_TSR].median() fig.add_trace(go.Scatter( x=medians.index, y=medians.values, mode='markers', marker=dict(color='orange', size=3), name='叶尖速比中位数' # 中位数标记的图例名称 )) # 设置图表的标题和轴标签 fig.update_layout( title={ 'text': f'机组: {name[0]}', #'x':0.5, }, xaxis_title='时间', yaxis_title='叶尖速比', xaxis=dict( tickmode='auto', # 自动设置x轴刻度,以适应日期数据 tickformat='%Y-%m-%d', # 设置x轴时间格式 showgrid=True, # 显示网格线 gridcolor='lightgray', # setting y-axis gridline color to black tickangle=-45, linecolor='black', # 设置y轴坐标系线颜色为黑色 ticklen=5, # 设置刻度线的长度 ), yaxis=dict( range=[self.axisLowerLimitTSR, self.axisUpperLimitTSR], dtick=self.axisStepTSR, showgrid=True, # 显示网格线 gridcolor='lightgray', # setting y-axis gridline color to black linecolor='black', # 设置y轴坐标系线颜色为黑色 ticklen=5, # 设置刻度线的长度 ), paper_bgcolor='white', # 设置纸张背景颜色为白色 plot_bgcolor='white', # 设置图表背景颜色为白色 margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距 ) # 确保从 Series 中提取的是具体的值 engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "叶尖速比时序分析", "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "xaixs": "时间", "yaixs": "叶尖速比", "data": [{ "engineName": name[0], "engineCode": name[1], "title":f'机组-{name[0]}', "xData": filtered_group[Field_YearMonthDay].tolist(), "yData": filtered_group[Field_TSR].tolist(), "color":'lightgray', "width":2, "type":"box_plot", "medians": { "x": medians.index.tolist(), # 中位数的 x 轴数据 "y": medians.values.tolist(), # 中位数的 y 轴数据 "mode":'markers', "color":'orange', "size":3 } }] } # 保存图像 # filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png") # fig.write_image(filePathOfImage, scale=3) # filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html") # fig.write_html(filePathOfHtml) # 将JSON对象保存到文件 output_json_path = os.path.join(outputAnalysisDir, f"{name[0]}.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: name[1], # Field_Return_FilePath: filePathOfImage, # Field_Return_IsSaveDatabase: False # }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: name[1], # Field_Return_FilePath: filePathOfHtml, # Field_Return_IsSaveDatabase: True # }) result_df = pd.DataFrame(result_rows) return result_df