import os import pandas as pd import numpy as np from plotly.subplots import make_subplots import plotly.graph_objects as go import matplotlib.pyplot as plt from matplotlib.ticker import MultipleLocator from behavior.analystWithGoodPoint import AnalystWithGoodPoint from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * from algorithmContract.contract import Contract class CpTrendAnalyst(AnalystWithGoodPoint): """ 风电机组风能利用系数时序分析 """ def typeAnalyst(self): return "cp_trend" def turbinesAnalysis(self, outputAnalysisDir, conf: Contract,turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, [ Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower]) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) # 检查所需列是否存在 required_columns = {Field_Cp, Field_YearMonthDay} if not required_columns.issubset(dataFrameOfTurbines.columns): raise ValueError(f"DataFrame缺少必要的列。需要的列有: {required_columns}") return self.drawCpTrend(dataFrameOfTurbines, outputAnalysisDir, conf) def drawCpTrend(self, dataFrameOfTurbines: pd.DataFrame, outputAnalysisDir, conf: Contract): # 按设备名分组数据 grouped = dataFrameOfTurbines.groupby(Field_CodeOfTurbine) result_rows = [] for turbineCode, group in grouped: currTurbineInfo=self.common.getTurbineInfo(conf.dataContract.dataFilter.powerFarmID,turbineCode,self.turbineInfo) # # 计算四分位数和IQR Q1 = group[Field_Cp].quantile(0.25) Q3 = group[Field_Cp].quantile(0.75) IQR = Q3 - Q1 # 定义离群值的范围 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR # 筛选掉离群值 filtered_group = group[(group[Field_Cp] >= lower_bound) & ( group[Field_Cp] <= upper_bound)] # 创建箱线图 fig = go.Figure() fig.add_trace(go.Box( x=filtered_group[Field_YearMonthDay], # 设置x轴数据为日期 y=filtered_group[Field_Cp], # 设置y轴数据为风能利用系数 # boxpoints='outliers', # 显示异常值(偏离值),不显示数据的所有点(只显示异常值) boxpoints=False, # 不显示偏离值 marker=dict(color='lightgoldenrodyellow', size=1), # 设置偏离值的颜色和大小 line=dict(color='lightgray', width=2), # 设置箱线和须线的颜色为灰色,粗细为2 fillcolor='rgba(200, 200, 200, 0.5)', # 设置箱体的填充颜色和透明度 name='风能利用系数' # 图例名称 )) # 对于每个箱线图的中位数,绘制一个蓝色点 medians = filtered_group.groupby(filtered_group[Field_YearMonthDay])[ Field_Cp].median() fig.add_trace(go.Scatter( x=medians.index, y=medians.values, mode='markers', marker=dict(color='orange', size=3), name='风能利用系数-中位数' # 中位数标记的图例名称 )) # 设置图表的标题和轴标签 fig.update_layout( title={ 'text': f'机组: {currTurbineInfo[Field_NameOfTurbine]}', # 'x': 0.5, }, xaxis_title='时间', yaxis_title='风能利用系数', xaxis=dict( tickmode='auto', # 自动设置x轴刻度,以适应日期数据 tickformat='%Y-%m-%d', # 设置x轴时间格式 showgrid=True, # 显示网格线 gridcolor='lightgray', # setting y-axis gridline color to black tickangle=-45, linecolor='black', # 设置y轴坐标系线颜色为黑色 ticklen=5, # 设置刻度线的长度 ), yaxis=dict( dtick=self.axisStepCp, range=[self.axisLowerLimitCp, self.axisUpperLimitCp], # 设置y轴的范围从0到1 showgrid=True, # 显示网格线 gridcolor='lightgray', # setting y-axis gridline color to black linecolor='black', # 设置y轴坐标系线颜色为黑色 ticklen=5, # 设置刻度线的长度 ), paper_bgcolor='white', # 设置纸张背景颜色为白色 plot_bgcolor='white', # 设置图表背景颜色为白色 margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距 ) # 确保从 Series 中提取的是具体的值 engineTypeCode = currTurbineInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = currTurbineInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 构建最终的JSON对象 json_output = { "analysisTypeCode": "风能利用系数时序分析", "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "xaixs": "时间", "yaixs": "风能利用系数", "data": [{ "engineName":currTurbineInfo[Field_NameOfTurbine], "engineCode":turbineCode, "title":f'机组-{currTurbineInfo[Field_NameOfTurbine]}', "xData": filtered_group[Field_YearMonthDay].tolist(), "yData": filtered_group[Field_Cp].tolist(), "color":'lightgray', "width":2, "type":"box_plot", "medians": { "x": medians.index.tolist(), # 中位数的 x 轴数据 "y": medians.values.tolist(), # 中位数的 y 轴数据 "mode":'markers', "color":'orange', "size":3 } }] } # 保存图像 filePathOfImage = os.path.join(outputAnalysisDir, f"{currTurbineInfo[Field_NameOfTurbine]}.png") fig.write_image(filePathOfImage, scale=3) # filePathOfHtml = os.path.join(outputAnalysisDir, f"{currTurbineInfo[Field_NameOfTurbine]}.html") # fig.write_html(filePathOfHtml) # 将JSON对象保存到文件 output_json_path = os.path.join(outputAnalysisDir, f"{currTurbineInfo[Field_NameOfTurbine]}.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: turbineCode, # Field_Return_FilePath: filePathOfHtml, # Field_Return_IsSaveDatabase: True # }) result_df = pd.DataFrame(result_rows) return result_df