import os from datetime import datetime import pandas as pd from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystWithGoodBadPoint import AnalystWithGoodBadPoint class PowerScatter2DAnalyst(AnalystWithGoodBadPoint): """ 风电机组功率曲线散点分析。 秒级scada数据运算太慢,建议使用分钟级scada数据 """ def typeAnalyst(self): return "power_scatter_2D" def selectColumns(self): return [Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower] def addPropertyToDataFrame(self,dataFrameOfTurbine : pd.DataFrame, currTurbineInfo : pd.Series, currTurbineModelInfo : pd.Series): dataFrameOfTurbine[Field_PowerFarmCode] = self.currPowerFarmInfo[Field_PowerFarmCode] dataFrameOfTurbine[Field_MillTypeCode] = currTurbineModelInfo[Field_MillTypeCode] def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, self.selectColumns()) dataFrame = self.userDataFrame(dictionary, conf.dataContract.configAnalysis, self) turbineInfos = self.common.getTurbineInfos(conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) if len(dataFrame) <= 0: self.logger.info("After screening for blade pitch angle less than the configured value, plot power curve scatter points without data") return return self.drawOfPowerCurveScatter(dataFrame, turbineInfos,outputAnalysisDir, conf, self.dataFrameContractOfTurbine) def drawOfPowerCurveScatter(self, dataFrame: pd.DataFrame, turbineModelInfo: pd.Series, outputAnalysisDir, conf: Contract, dataFrameGuaranteePowerCurve: pd.DataFrame): """ 绘制风速-功率分布图并保存为文件。 参数: dataFrameMerge (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。 csvPowerCurveFilePath (str): 功率曲线文件路径。 outputAnalysisDir (str): 分析输出目录。 confData (ConfBusiness): 配置 """ #机型切入风速 series cutInWsField = self.turbineModelInfo[Field_CutInWS] cut_in_ws = cutInWsField.min() - 1 if cutInWsField.notna().any() else 2 # if not dataFrame.empty and Field_CutInWS in dataFrame.columns and dataFrame[Field_CutInWS].notna().any(): # cut_in_ws = dataFrame[Field_CutInWS].min() - 1 # else: # cut_in_ws = 2 # 按设备名分组数据 grouped = dataFrame.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) result_rows = [] # 遍历每个设备的数据 for name, group in grouped: #获取当前风机信息dataFrame currentEngineDataFrame = turbineModelInfo[turbineModelInfo[Field_CodeOfTurbine]==name[1]] #获取当前机型 millTypeCode = currentEngineDataFrame.get(Field_MillTypeCode, "").iloc[0] #当前机型合同功率曲线dataFrame currentMillTypePowerDataFrame = dataFrameGuaranteePowerCurve[dataFrameGuaranteePowerCurve[Field_MillTypeCode] == millTypeCode] # 获取机型的名字(machine_type_code) engineTypeName = self.common.getTurbineModelByCode(millTypeCode, self.turbineModelInfo)[Field_MachineTypeCode] # 使用 apply() 对每个元素调用 datetime.fromtimestamp group['monthIntTime'] = group['monthIntTime'].apply(lambda x: datetime.fromtimestamp(x).strftime('%Y-%m')) # 定义要替换的空值类型 na_values = {pd.NA, float('nan')} # 构建最终的JSON对象 json_output = { "analysisTypeCode": "逐月有功功率散点2D分析", "engineCode": millTypeCode, "engineTypeName": engineTypeName, "xaixs": "风速(m/s)", "yaixs": "有功功率(kW)", "data": [ {# 提取机组数据 "engineName": name[0], "engineCode": name[1], "title":f' 逐月有功功率散点2D分析-机组: {name[0]}', "xData": group[Field_WindSpeed].replace(na_values, None).tolist(), "xrange":[cut_in_ws, 25], "yData": group[Field_ActiverPower].replace(na_values, None).tolist(), "yrange":[self.axisLowerLimitActivePower,self.axisUpperLimitActivePower], "colorbar": group['monthIntTime'].tolist(), "colorbartitle": "年月", "mode":"markers" }, {# 提取合同功率曲线数据 "enginName": "合同功率曲线", "xData":currentMillTypePowerDataFrame[Field_WindSpeed].replace(na_values, None).tolist(), "yData":currentMillTypePowerDataFrame[Field_ActiverPower].replace(na_values, None).tolist(), "zData": [], "mode":"lines+markers" }] } # 将JSON对象保存到文件 output_json_path = os.path.join(outputAnalysisDir, f"{name[0]}-scatter.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 如果需要返回DataFrame,可以包含文件路径 result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) result_df = pd.DataFrame(result_rows) return result_df