import os from datetime import datetime import concurrent.futures import numpy as np import pandas as pd from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * from behavior.baseAnalyst import BaseAnalyst from behavior.analyst import Analyst from common.commonBusiness import CommonBusiness from algorithm.dataMarker import DataMarker class DataProcessor: def __init__(self): self.common = CommonBusiness() self._baseAnalysts = [] self._analysts = [] def attachBaseAnalyst(self, analyst: BaseAnalyst): if analyst not in self._analysts: self._baseAnalysts.append(analyst) def detachBaseAnalyst(self, analyst: BaseAnalyst): try: self._baseAnalysts.remove(analyst) except ValueError: pass def attach(self, analyst: Analyst): if analyst not in self._analysts: self._analysts.append(analyst) def detach(self, analyst: Analyst): try: self._analysts.remove(analyst) except ValueError: pass def turbineNotify(self, dataFrameOfTurbine: pd.DataFrame, confData: ConfBusiness, turbineName): for analyst in self._analysts: outputAnalysisDir = analyst.getOutputAnalysisDir() outputFilePath = r"{}/{}{}".format( outputAnalysisDir, turbineName, CSVSuffix) analyst.analysisOfTurbine( dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName) def turbinesNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness): for analyst in self._analysts: outputAnalysisDir = analyst.getOutputAnalysisDir() analyst.analysisOfTurbines( dataFrameOfTurbines, outputAnalysisDir, confData) def baseAnalystTurbineNotify(self, dataFrameOfTurbine: pd.DataFrame, confData: ConfBusiness, turbineName): for analyst in self._baseAnalysts: outputAnalysisDir = analyst.getOutputAnalysisDir() outputFilePath = r"{}/{}{}".format( outputAnalysisDir, turbineName, CSVSuffix) analyst.analysisOfTurbine( dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName) def baseAnalystNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness): for analyst in self._baseAnalysts: outputAnalysisDir = analyst.getOutputAnalysisDir() analyst.analysisOfTurbines( dataFrameOfTurbines, outputAnalysisDir, confData) def calculateAngleIncluded(self, array1, array2): """ 计算两个相同长度角度数组中两两对应角度值的偏差。 结果限制在-90°到+90°之间,并保留两位小数。 参数: array1 (list): 第一个角度数组 array2 (list): 第二个角度数组 返回: list: 两两对应角度的偏差列表 """ deviations = [] for angle1, angle2 in zip(array1, array2): # 计算原始偏差 deviation = angle1 - angle2 # 调整偏差,使其位于-180°到+180°范围内 if deviation == 0.0: deviation = 0.0 else: deviation = (deviation + 180) % 360 - 180 # 将偏差限制在-90°到+90°范围内 if deviation > 90: deviation -= 180 elif deviation < -90: deviation += 180 # 保留两位小数 deviations.append(round(deviation, 2)) return deviations def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos): """ 依据机舱位置(角度)、风向计算两者夹角 """ if not self.common.isNone(fieldAngleIncluded) and fieldAngleIncluded in dataFrame.columns: dataFrame[Field_AngleIncluded] = dataFrame[fieldAngleIncluded] if self.common.isNone(fieldAngleIncluded) and fieldAngleIncluded not in dataFrame.columns and fieldWindDirect in dataFrame.columns and fieldNacellePos in dataFrame.columns: dataFrame[Field_AngleIncluded] = self.calculateAngleIncluded( dataFrame[fieldNacellePos], dataFrame[fieldWindDirect]) def recalculationOfGeneratorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio): """ 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速 """ if fieldGeneratorSpeed in dataFrame.columns: dataFrame[Field_GeneratorSpeed] = dataFrame[fieldGeneratorSpeed] if fieldGeneratorSpeed not in dataFrame.columns and fieldRotorSpeed in dataFrame.columns: dataFrame[fieldGeneratorSpeed] = rotationalSpeedRatio * \ dataFrame[fieldRotorSpeed] def recalculationOfRotorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio): """ 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速 """ if fieldRotorSpeed not in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns: dataFrame[fieldRotorSpeed] = dataFrame[fieldGeneratorSpeed] / \ rotationalSpeedRatio if not self.common.isNone(fieldRotorSpeed) and fieldRotorSpeed in dataFrame.columns: dataFrame[Field_RotorSpeed] = dataFrame[fieldRotorSpeed] def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed): """ 风电机组发电机转矩计算,P的单位换成KW转矩计算公式: P*1000= pi/30*T*n 30000/pi*P=T*n 30000/3.1415926*P=T*n 9549.297*p=T*n 其中:n为发电机转速,p为有功功率,T为转矩 """ if self.common.isNone(fieldGeneratorTorque) and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns: dataFrame[Field_GeneratorTorque] = 9549.297 * \ dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed] if fieldGeneratorTorque in dataFrame.columns: dataFrame[Field_GeneratorTorque] = dataFrame[fieldGeneratorTorque] def recalculation(self, dataFrame: pd.DataFrame, confData: ConfBusiness): """ 再计算数据测点 参数: dataFrame 原始数据 confData 配置数据 """ self.recalculationOfGeneratorSpeed( dataFrame, confData.field_rotor_speed, confData.field_gen_speed, confData.rotational_Speed_Ratio) self.recalculationOfRotorSpeed( dataFrame, confData.field_rotor_speed, confData.field_gen_speed, confData.rotational_Speed_Ratio) self.recalculationOfRotorTorque( dataFrame, confData.field_torque, confData.field_power, confData.field_gen_speed) self.recalculationOfIncludedAngle( dataFrame, confData.field_angle_included, confData.field_wind_dir, confData.field_nacelle_pos) self.common.calculateTSR(dataFrame, confData) self.common.calculateCp(dataFrame, confData) def filterWithDateTime(self, dataFrame: pd.DataFrame, confData: ConfBusiness): dataFrame = dataFrame[(dataFrame[confData.field_turbine_time] >= confData.start_time) & ( dataFrame[confData.field_turbine_time] < confData.end_time)] return dataFrame def processDateTime(self, dataFrame: pd.DataFrame, confData: ConfBusiness): dataFrame["年月"] = pd.to_datetime( dataFrame[confData.field_turbine_time], format="%Y-%m") dataFrame['日期'] = pd.to_datetime( dataFrame[confData.field_turbine_time], format="%Y-%m") dataFrame['monthIntTime'] = dataFrame['日期'].apply( lambda x: x.timestamp()) dataFrame[Field_YearMonth] = dataFrame[confData.field_turbine_time].dt.strftime( '%Y-%m') dataFrame[Field_YearMonthDay] = dataFrame[confData.field_turbine_time].dt.strftime( '%Y-%m-%d') if not self.common.isNone(confData.excludingMonths) and len(confData.excludingMonths) > 0: # 给定的日期列表 date_strings = [] for month in confData.excludingMonths: if not self.common.isNone(month): date_strings.append(month) if len(date_strings) > 0: mask = ~dataFrame[Field_YearMonth].isin(date_strings) # 使用掩码过滤DataFrame,删除指定日期的行 dataFrame = dataFrame[mask] return dataFrame def setColumnDataType(self, dataFrame: pd.DataFrame, confData: ConfBusiness): # 选择所有的数值型列 dataFrame = dataFrame.convert_dtypes() numeric_cols = dataFrame.select_dtypes( include=['float64', 'float16']).columns # 将这些列转换为float32 dataFrame[Field_NameOfTurbine] = dataFrame[Field_NameOfTurbine].astype(str) # 将这些列转换为float32 dataFrame[numeric_cols] = dataFrame[numeric_cols].astype( self.common.getFloat32()) if not self.common.isNone(confData.field_turbine_time) and confData.field_turbine_time in dataFrame.columns: # 首先尝试去除字符串前后的空白 dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].str.strip() dataFrame[confData.field_turbine_time] = pd.to_datetime( dataFrame[confData.field_turbine_time], format='%Y-%m-%d %H:%M:%S', errors="coerce") dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].dt.strftime( '%Y-%m-%d %H:%M:%S') dataFrame[confData.field_turbine_time] = pd.to_datetime( dataFrame[confData.field_turbine_time]) # 删除时间字段为空的行记录 dataFrame.dropna( axis=0, subset=[confData.field_turbine_time], inplace=True) if confData.field_wind_speed in dataFrame.columns: dataFrame[confData.field_wind_speed] = dataFrame[confData.field_wind_speed].astype( self.common.getFloat32()) if confData.field_wind_dir in dataFrame.columns: dataFrame[confData.field_wind_dir] = dataFrame[confData.field_wind_dir].astype( self.common.getFloat32()) if confData.field_angle_included in dataFrame.columns: dataFrame[confData.field_angle_included] = dataFrame[confData.field_angle_included].astype( self.common.getFloat32()) if confData.field_power in dataFrame.columns: dataFrame[confData.field_power] = dataFrame[confData.field_power].astype( self.common.getFloat32()) if confData.field_wind_dir in dataFrame.columns: dataFrame[confData.field_wind_dir] = dataFrame[confData.field_wind_dir].astype( self.common.getFloat32()) if confData.field_pitch_angle1 in dataFrame.columns: dataFrame[confData.field_pitch_angle1] = dataFrame[confData.field_pitch_angle1].astype( self.common.getFloat32()) if confData.field_pitch_angle2 in dataFrame.columns: dataFrame[confData.field_pitch_angle2] = dataFrame[confData.field_pitch_angle2].astype( self.common.getFloat32()) if confData.field_pitch_angle3 in dataFrame.columns: dataFrame[confData.field_pitch_angle3] = dataFrame[confData.field_pitch_angle3].astype( self.common.getFloat32()) if confData.field_gen_speed in dataFrame.columns: dataFrame[confData.field_gen_speed] = dataFrame[confData.field_gen_speed].astype( self.common.getFloat32()) if confData.field_rotor_speed in dataFrame.columns: dataFrame[confData.field_rotor_speed] = dataFrame[confData.field_rotor_speed].astype( self.common.getFloat32()) if confData.field_Cabin_Vibrate_X in dataFrame.columns: dataFrame[confData.field_Cabin_Vibrate_X] = dataFrame[confData.field_Cabin_Vibrate_X].astype( self.common.getFloat32()) if confData.field_Cabin_Vibrate_Y in dataFrame.columns: dataFrame[confData.field_Cabin_Vibrate_Y] = dataFrame[confData.field_Cabin_Vibrate_Y].astype( self.common.getFloat32()) if confData.field_activePowerSet in dataFrame.columns: dataFrame[confData.field_activePowerSet] = dataFrame[confData.field_activePowerSet].astype( self.common.getFloat32()) if confData.field_activePowerAvailable in dataFrame.columns: dataFrame[confData.field_activePowerAvailable] = dataFrame[confData.field_activePowerAvailable].astype( self.common.getFloat32()) return dataFrame def loadData(self, csvFilePath, confData: ConfBusiness, turbineName): useColumns = self.common.getUseColumns(confData) # Load the CSV, skipping the specified initial rows dataFrame = pd.read_csv(csvFilePath, header=0, usecols=useColumns, skiprows=range( 1, confData.skip_row_number+1)) if not self.common.isNone(confData.field_turbine_name) and confData.field_turbine_name in dataFrame.columns: dataFrame[Field_NameOfTurbine] = dataFrame[confData.field_turbine_name] else: dataFrame[Field_NameOfTurbine] = turbineName # 对除了“时间”列之外的所有列进行自下而上的填充(先反转后填充) # 注意:补植须要考虑业务合理性 # dataFrame = dataFrame.fillna(method='ffill') # dataFrame = dataFrame.fillna(method='bfill') return dataFrame def execute(self, confData: ConfBusiness): outputDataAfterFilteringDir = r"{}/{}".format( confData.output_path, "DataAfterFiltering") dir.create_directory(outputDataAfterFilteringDir) labler = DataMarker() #类的实例化 dataFrameMerge = pd.DataFrame() for rootDir, subDirs, files in dir.list_directory(confData.input_path): files = sorted(files) for file in files: if not file.endswith(CSVSuffix): continue csvFilePath = os.path.join(rootDir, file) print(f"current csv file path: {csvFilePath}") turbineName = confData.add_W_if_starts_with_digit(file.split(confData.csvFileNameSplitStringForTurbine)[ confData.index_turbine]) dataFrame = self.loadData( csvFilePath, confData, turbineName) turbineName = dataFrame[Field_NameOfTurbine].loc[0] if len(dataFrame) <= 0: print("dataFrameFilter not data.") continue dataFrame = self.setColumnDataType( dataFrame, confData) dataFrame = self.processDateTime(dataFrame, confData) dataFrame = self.filterWithDateTime(dataFrame, confData) # self.baseAnalystTurbineNotify(dataFrame, # confData, # turbineName) self.recalculation(dataFrame, confData) # dataFrame = labler.main(confData,dataFrame) dataFrameMerge = pd.concat( [dataFrameMerge, dataFrame], axis=0, sort=False) # dataFrame.to_csv(os.path.join( # outputDataAfterFilteringDir, "{}{}".format(turbineName,CSVSuffix)), index=False) dataFrame = self.turbineNotify(dataFrame, confData, turbineName) # self.baseAnalystNotify(dataFrameMergeFilter, confData) self.turbinesNotify(dataFrameMerge, confData)