import os from datetime import datetime import numpy as np import pandas as pd import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodBadPoint import AnalystWithGoodBadPoint from plotly.subplots import make_subplots class PowerScatter2DAnalyst(AnalystWithGoodBadPoint): """ 风电机组功率曲线散点分析。 秒级scada数据运算太慢,建议使用分钟级scada数据 """ def typeAnalyst(self): return "power_scatter_2D" def selectColumns(self): return [Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower] def addPropertyToDataFrame(self,dataFrameOfTurbine : pd.DataFrame, currTurbineInfo : pd.Series, currTurbineModelInfo : pd.Series): dataFrameOfTurbine[Field_PowerFarmCode] = self.currPowerFarmInfo[Field_PowerFarmCode] dataFrameOfTurbine[Field_MillTypeCode] = currTurbineModelInfo[Field_MillTypeCode] def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, self.selectColumns()) dataFrame = self.userDataFrame(dictionary, conf.dataContract.configAnalysis, self) turbineInfos = self.common.getTurbineInfos(conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) if len(dataFrame) <= 0: print("After screening for blade pitch angle less than the configured value, plot power curve scatter points without data") return grouped = self.dataFrameContractOfTurbine.groupby( [Field_PowerFarmCode, Field_MillTypeCode]) for groupByKey, contractPowerCurveOfMillType in grouped: break return self.drawOfPowerCurveScatter(dataFrame, turbineInfos,outputAnalysisDir, conf, contractPowerCurveOfMillType) def drawOfPowerCurveScatter(self, dataFrame: pd.DataFrame, turbineModelInfo: pd.Series, outputAnalysisDir, conf: Contract, dataFrameGuaranteePowerCurve: pd.DataFrame): """ 绘制风速-功率分布图并保存为文件。 参数: dataFrameMerge (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 csvPowerCurveFilePath (str): 功率曲线文件路径。 outputAnalysisDir (str): 分析输出目录。 confData (ConfBusiness): 配置 """ x_name = '风速' y_name = '功率' #机型切入风速 series cutInWsField = self.turbineModelInfo[Field_CutInWS] cut_in_ws = cutInWsField.min() - 1 if cutInWsField.notna().any() else 2 # if not dataFrame.empty and Field_CutInWS in dataFrame.columns and dataFrame[Field_CutInWS].notna().any(): # cut_in_ws = dataFrame[Field_CutInWS].min() - 1 # else: # cut_in_ws = 2 # 按设备名分组数据 grouped = dataFrame.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) result_rows = [] # 定义固定的颜色映射列表 fixed_colors = [ "#3E409C", "#476CB9", "#3586BF", "#4FA4B5", "#52A3AE", "#60C5A3", "#85D0AE", "#A8DCA2", "#CFEE9E", "#E4F39E", "#EEF9A7", "#FBFFBE", "#FDF1A9", "#FFE286", "#FFC475", "#FCB06C", "#F78F4F", "#F96F4A", "#E4574C", "#CA3756", "#AF254F" ] # 将 fixed_colors 转换为 Plotly 的 colorscale 格式 fixed_colorscale = [ [i / (len(fixed_colors) - 1), color] for i, color in enumerate(fixed_colors) ] fixed_colors_points = [ "#F96F4A", "#FFC475", "#FBFFBE", "#85D0AE", "#3586BF", "#3E409C" ] # 遍历每个设备的数据 for name, group in grouped: fig = make_subplots() # 提取月份 group['month'] = group['monthIntTime'].apply(lambda x: datetime.fromtimestamp(x).month) unique_months = group['month'].unique() # 计算时间跨度 time_span_months = len(unique_months) if time_span_months >= 6: # 绘制散点图(时间跨度大于等于6个月) scatter = go.Scatter(x=group[Field_WindSpeed], y=group[Field_ActiverPower], mode='markers', marker=dict( color=group['monthIntTime'], colorscale=fixed_colorscale, # 使用自定义的 colorscale size=3, opacity=0.7, colorbar=dict( tickvals=np.linspace( group['monthIntTime'].min(), group['monthIntTime'].max(), 6), ticktext=[datetime.fromtimestamp(ts).strftime( '%Y-%m') for ts in np.linspace(group['monthIntTime'].min(), group['monthIntTime'].max(), 6)], thickness=18, len=1, # 设置颜色条的长度,使其占据整个图的高度 outlinecolor='rgba(255,255,255,0)' ), showscale=True ), showlegend=False) # 不显示散点图的 legend,用 colorbar 代替 fig.add_trace(scatter) else: # 绘制散点图(时间跨度小于6个月) for i, month in enumerate(unique_months): month_data = group[group['month'] == month] # 使用固定的颜色列表 color = fixed_colors_points[i % len(fixed_colors_points)] scatter = go.Scatter(x=month_data[Field_WindSpeed], y=month_data[Field_ActiverPower], mode='markers', marker=dict( color=color, size=3, opacity=0.7 ), name=f'{datetime.fromtimestamp(month_data["monthIntTime"].iloc[0]).strftime("%Y-%m")}', showlegend=True) fig.add_trace(scatter) # 绘制合同功率曲线 line = go.Scatter(x=dataFrameGuaranteePowerCurve[Field_WindSpeed], y=dataFrameGuaranteePowerCurve[Field_ActiverPower], mode='lines+markers', marker=dict(color='gray', size=7), name='合同功率曲线') fig.add_trace(line, secondary_y=False) # 设置图形布局 fig.update_layout( title=f'机组: {name[0]}', xaxis=dict(title=x_name, range=[cut_in_ws, 25], tickmode='linear', tick0=0, dtick=1, tickangle=-45), yaxis=dict(title=y_name, dtick=self.axisStepActivePower, range=[self.axisLowerLimitActivePower, self.axisUpperLimitActivePower] ), legend=dict(yanchor="bottom", y=0, xanchor="right", x=1, font=dict( size=10), bgcolor='rgba(255,255,255,0)') ) # 确保从 Series 中提取的是具体的值 engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] # 定义要替换的空值类型 na_values = {pd.NA, float('nan')} # 构建最终的JSON对象 json_output = { "analysisTypeCode": "逐月有功功率散点2D分析", "engineCode": engineTypeCode, "engineTypeName": engineTypeName, "xaixs": "风速(m/s)", "yaixs": "有功功率(kw)", "data": [ {# 提取机组数据 "engineName": name[0], "engineCode": name[1], "title":f' 机组: {name[0]}', "xData": group[Field_WindSpeed].replace(na_values, None).tolist(), "xrange":[cut_in_ws, 25], "yData": group[Field_ActiverPower].replace(na_values, None).tolist(), "yrange":[self.axisLowerLimitActivePower,self.axisUpperLimitActivePower], "colorbar": group['monthIntTime'].tolist(), "colorbartitle": "年月", "mode":"markers" }, {# 提取合同功率曲线数据 "enginName": "合同功率曲线", "xData":dataFrameGuaranteePowerCurve[Field_WindSpeed].replace(na_values, None).tolist(), "yData":dataFrameGuaranteePowerCurve[Field_ActiverPower].replace(na_values, None).tolist(), "zData": [], "mode":"lines+markers" }] } # 保存图像 pngFileName = f"{name[0]}-scatter.png" pngFilePath = os.path.join(outputAnalysisDir, pngFileName) fig.write_image(pngFilePath, scale=3) # # 保存HTML # htmlFileName = f"{name[0]}-scatter.html" # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) # 将JSON对象保存到文件 output_json_path = os.path.join(outputAnalysisDir, f"{name[0]}-scatter.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: pngFilePath, Field_Return_IsSaveDatabase: False }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: name[1], # Field_Return_FilePath: htmlFilePath, # Field_Return_IsSaveDatabase: True # }) result_df = pd.DataFrame(result_rows) return result_df