123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- import os
- from datetime import datetime
- import pandas as pd
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract
- from behavior.analystWithGoodBadPoint import AnalystWithGoodBadPoint
- class PowerScatter2DAnalyst(AnalystWithGoodBadPoint):
- """
- 风电机组功率曲线散点分析。
- 秒级scada数据运算太慢,建议使用分钟级scada数据
- """
- def typeAnalyst(self):
- return "power_scatter_2D"
- def selectColumns(self):
- return [Field_DeviceCode, Field_Time, Field_WindSpeed, Field_ActiverPower]
- def addPropertyToDataFrame(self,dataFrameOfTurbine : pd.DataFrame, currTurbineInfo : pd.Series, currTurbineModelInfo : pd.Series):
- dataFrameOfTurbine[Field_PowerFarmCode] = self.currPowerFarmInfo[Field_PowerFarmCode]
- dataFrameOfTurbine[Field_MillTypeCode] = currTurbineModelInfo[Field_MillTypeCode]
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- dictionary = self.processTurbineData(turbineCodes, conf, self.selectColumns())
- dataFrame = self.userDataFrame(dictionary, conf.dataContract.configAnalysis, self)
- turbineInfos = self.common.getTurbineInfos(conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
- if len(dataFrame) <= 0:
- self.logger.info("After screening for blade pitch angle less than the configured value, plot power curve scatter points without data")
- return
- return self.drawOfPowerCurveScatter(dataFrame, turbineInfos,outputAnalysisDir, conf, self.dataFrameContractOfTurbine)
- def drawOfPowerCurveScatter(self, dataFrame: pd.DataFrame, turbineModelInfo: pd.Series, outputAnalysisDir, conf: Contract, dataFrameGuaranteePowerCurve: pd.DataFrame):
- """
- 绘制风速-功率分布图并保存为文件。
- 参数:
- dataFrameMerge (pd.DataFrame): 包含数据的DataFrame,需要包含设备名、风速和功率列。
- csvPowerCurveFilePath (str): 功率曲线文件路径。
- outputAnalysisDir (str): 分析输出目录。
- confData (ConfBusiness): 配置
- """
- #机型切入风速 series
- cutInWsField = self.turbineModelInfo[Field_CutInWS]
- cut_in_ws = cutInWsField.min() - 1 if cutInWsField.notna().any() else 2
- # if not dataFrame.empty and Field_CutInWS in dataFrame.columns and dataFrame[Field_CutInWS].notna().any():
- # cut_in_ws = dataFrame[Field_CutInWS].min() - 1
- # else:
- # cut_in_ws = 2
- # 按设备名分组数据
- grouped = dataFrame.groupby([Field_NameOfTurbine, Field_CodeOfTurbine])
- result_rows = []
- # 遍历每个设备的数据
- for name, group in grouped:
- #获取当前风机信息dataFrame
- currentEngineDataFrame = turbineModelInfo[turbineModelInfo[Field_CodeOfTurbine]==name[1]]
- #获取当前机型
- millTypeCode = currentEngineDataFrame.get(Field_MillTypeCode, "").iloc[0]
- #当前机型合同功率曲线dataFrame
- currentMillTypePowerDataFrame = dataFrameGuaranteePowerCurve[dataFrameGuaranteePowerCurve[Field_MillTypeCode] == millTypeCode]
- # 获取机型的名字(machine_type_code)
- engineTypeName = self.common.getTurbineModelByCode(millTypeCode, self.turbineModelInfo)[Field_MachineTypeCode]
- # 使用 apply() 对每个元素调用 datetime.fromtimestamp
- group['monthIntTime'] = group['monthIntTime'].apply(lambda x: datetime.fromtimestamp(x).strftime('%Y-%m'))
- # 定义要替换的空值类型
- na_values = {pd.NA, float('nan')}
- # 构建最终的JSON对象
- json_output = {
- "analysisTypeCode": "逐月有功功率散点2D分析",
- "engineCode": millTypeCode,
- "engineTypeName": engineTypeName,
- "xaixs": "风速(m/s)",
- "yaixs": "有功功率(kW)",
- "data": [
- {# 提取机组数据
- "engineName": name[0],
- "engineCode": name[1],
- "title":f' 逐月有功功率散点2D分析-机组: {name[0]}',
- "xData": group[Field_WindSpeed].replace(na_values, None).tolist(),
- "xrange":[cut_in_ws, 25],
- "yData": group[Field_ActiverPower].replace(na_values, None).tolist(),
- "yrange":[self.axisLowerLimitActivePower,self.axisUpperLimitActivePower],
- "colorbar": group['monthIntTime'].tolist(),
- "colorbartitle": "年月",
- "mode":"markers"
- },
- {# 提取合同功率曲线数据
- "enginName": "合同功率曲线",
- "xData":currentMillTypePowerDataFrame[Field_WindSpeed].replace(na_values, None).tolist(),
- "yData":currentMillTypePowerDataFrame[Field_ActiverPower].replace(na_values, None).tolist(),
- "zData": [],
- "mode":"lines+markers"
- }]
- }
- # 将JSON对象保存到文件
- output_json_path = os.path.join(outputAnalysisDir, f"{name[0]}-scatter.json")
- with open(output_json_path, 'w', encoding='utf-8') as f:
- import json
- json.dump(json_output, f, ensure_ascii=False, indent=4)
- # 如果需要返回DataFrame,可以包含文件路径
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: output_json_path,
- Field_Return_IsSaveDatabase: True
- })
- result_df = pd.DataFrame(result_rows)
- return result_df
|