import os from datetime import datetime import concurrent.futures import pandas as pd from confBusiness import ConfBusiness, Field_NameOfTurbine, Field_GeneratorSpeed, Field_GeneratorTorque,Field_AngleIncluded from .utils.directoryUtil import DirectoryUtil as dir from .baseAnalyst import BaseAnalyst from .analyst import Analyst from .commonBusiness import CommonBusiness class DataProcessor: def __init__(self): self.common=CommonBusiness() self._baseAnalysts = [] self._noCustomFilterAnalysts = [] self._analysts = [] self.node_filter_value_state_turbine = "filter_value_state_turbine" self.node_angle_pitch_min = "angle_pitch_min" self.node_angle_pitch_max = "angle_pitch_max" self.node_speed_wind_cut_in = "speed_wind_cut_in" self.node_speed_wind_cut_out = "speed_wind_cut_out" self.node_active_power_min = "active_power_min" self.node_active_power_max = "active_power_max" def attachBaseAnalyst(self, analyst: BaseAnalyst): if analyst not in self._analysts: self._baseAnalysts.append(analyst) def detachBaseAnalyst(self, analyst: BaseAnalyst): try: self._baseAnalysts.remove(analyst) except ValueError: pass def attach(self, analyst: Analyst): if analyst not in self._analysts: self._analysts.append(analyst) def detach(self, analyst: Analyst): try: self._analysts.remove(analyst) except ValueError: pass def turbineNotify(self, dataFrameOfTurbine: pd.DataFrame, confData: ConfBusiness, turbineName): for analyst in self._analysts: outputAnalysisDir = analyst.getOutputAnalysisDir() outputFilePath = r"{}/{}_{}.csv".format( outputAnalysisDir, turbineName, analyst.typeAnalyst()) analyst.analysisOfTurbine( dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName) def turbinesNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness): for analyst in self._analysts: outputAnalysisDir = analyst.getOutputAnalysisDir() analyst.analysisOfTurbines( dataFrameOfTurbines, outputAnalysisDir, confData) def baseAnalystTurbineNotify(self, dataFrameOfTurbine: pd.DataFrame, confData: ConfBusiness, turbineName): for analyst in self._baseAnalysts: outputAnalysisDir = analyst.getOutputAnalysisDir() outputFilePath = r"{}/{}_{}.csv".format( outputAnalysisDir, turbineName, analyst.typeAnalyst()) analyst.analysisOfTurbine( dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName) def baseAnalystNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness): for analyst in self._baseAnalysts: outputAnalysisDir = analyst.getOutputAnalysisDir() analyst.analysisOfTurbines( dataFrameOfTurbines, outputAnalysisDir, confData) def filterCustom(self, dataFrame: pd.DataFrame, confData: ConfBusiness): if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_in in confData.filter \ and not self.common.isNone(confData.filter[self.node_speed_wind_cut_in]) \ and not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_out in confData.filter \ and not self.common.isNone(confData.filter[self.node_speed_wind_cut_out]): windSpeedCutIn = float( confData.filter[self.node_speed_wind_cut_in]) windSpeedCutOut = float( confData.filter[self.node_speed_wind_cut_out]) dataFrame = dataFrame[~((dataFrame[confData.field_wind_speed] > windSpeedCutOut) | ( dataFrame[confData.field_wind_speed] < 0))] dataFrame = dataFrame[~((dataFrame[confData.field_wind_speed] < windSpeedCutIn) & ( dataFrame[confData.field_power] < confData.rated_power*1.2))] dataFrame = dataFrame[( dataFrame[confData.field_power] <= confData.rated_power*1.2)] # Filter rows where turbine state if not self.common.isNone(confData.field_turbine_state) and self.node_filter_value_state_turbine in confData.filter and not self.common.isNone(confData.filter[self.node_filter_value_state_turbine]): stateTurbine = confData.filter[self.node_filter_value_state_turbine] dataFrame = dataFrame[dataFrame[confData.field_turbine_state].isin( stateTurbine)] # # Filter rows where pitch # if not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_min in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_min]): # anglePitchMin = float(confData.filter[self.node_angle_pitch_min]) # dataFrame = dataFrame[( # dataFrame[confData.field_pitch_angle1] >= anglePitchMin)] # if not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_max in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_max]): # anglePitchMax = float(confData.filter[self.node_angle_pitch_max]) # dataFrame = dataFrame[( # dataFrame[confData.field_pitch_angle1] <= anglePitchMax)] # # Filter rows where wind speed # if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_in in confData.filter and not self.common.isNone(confData.filter[self.node_speed_wind_cut_in]): # windSpeedCutIn = float(confData.filter[self.node_speed_wind_cut_in]) # dataFrame = dataFrame[( # dataFrame[confData.field_wind_speed] >= windSpeedCutIn)] # if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_out in confData.filter and not self.common.isNone(confData.filter[self.node_speed_wind_cut_out]): # windSpeedCutOut = float(confData.filter[self.node_speed_wind_cut_out]) # dataFrame = dataFrame[( # dataFrame[confData.field_wind_speed] < windSpeedCutOut)] # # Filter rows where power # if not self.common.isNone(confData.field_power) and self.node_active_power_min in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_min]): # activePowerMin = float(confData.filter[self.node_active_power_min]) # dataFrame = dataFrame[( # dataFrame[confData.field_power] >= activePowerMin)] # if not self.common.isNone(confData.field_power) and self.node_active_power_max in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_max]): # activePowerMax = float(confData.filter[self.node_active_power_max]) # dataFrame = dataFrame[( # dataFrame[confData.field_power] < activePowerMax)] return dataFrame def filterData(self, dataFrame: pd.DataFrame, confData: ConfBusiness, dataFilter, rotationalSpeedRatio): # dataFrame = dataFrame.dropna(axis=0,subset=[confData.field_power,confData.field_wind_speed,confData.field_pitch_angle1]) dataFrame = dataFrame.dropna( axis=0, subset=self.getUseColumns(confData)) if confData.field_wind_speed in dataFrame.columns: dataFrame[confData.field_wind_speed] = dataFrame[confData.field_wind_speed].astype('float32') if confData.field_wind_dir in dataFrame.columns: dataFrame[confData.field_wind_dir]=dataFrame[confData.field_wind_dir].astype('float32') if confData.field_angle_included in dataFrame.columns: dataFrame[confData.field_angle_included]=dataFrame[confData.field_angle_included].astype('float32') if confData.field_power in dataFrame.columns: dataFrame[confData.field_power]=dataFrame[confData.field_power].astype('float32') if confData.field_wind_dir in dataFrame.columns: dataFrame[confData.field_wind_dir]=dataFrame[confData.field_wind_dir].astype('float32') if confData.field_pitch_angle1 in dataFrame.columns: dataFrame[confData.field_pitch_angle1]=dataFrame[confData.field_pitch_angle1].astype('float32') if confData.field_gen_speed in dataFrame.columns: dataFrame[confData.field_gen_speed]=dataFrame[confData.field_gen_speed].astype('float32') if confData.field_rotor_speed in dataFrame.columns: dataFrame[confData.field_rotor_speed]=dataFrame[confData.field_rotor_speed].astype('float32') dataFrame = dataFrame[(dataFrame[confData.field_turbine_time] >= confData.start_time) & ( dataFrame[confData.field_turbine_time] < confData.end_time)] if not self.common.isNone(confData.excludingMonths) and len(confData.excludingMonths) > 0: # 给定的日期列表 date_strings = [] for month in confData.excludingMonths: if not self.common.isNone(month): date_strings.append(month) if len(date_strings) > 0: mask = ~dataFrame["year-month"].isin(date_strings) # 使用掩码过滤DataFrame,删除指定日期的行 dataFrame = dataFrame[mask] dataFrame["年月"] = pd.to_datetime( dataFrame[confData.field_turbine_time], format="%Y-%m") dataFrame['日期'] = pd.to_datetime( dataFrame[confData.field_turbine_time], format="%Y-%m") dataFrame['monthIntTime'] = dataFrame['日期'].apply( lambda x: x.timestamp()) dataFrame["year-month"] = dataFrame[confData.field_turbine_time].dt.strftime( '%Y-%m') return dataFrame def calculateAngleIncluded(self, array1, array2): """ 计算两个相同长度角度数组中两两对应角度值的偏差。 结果限制在-90°到+90°之间,并保留两位小数。 参数: array1 (list): 第一个角度数组 array2 (list): 第二个角度数组 返回: list: 两两对应角度的偏差列表 """ deviations = [] for angle1, angle2 in zip(array1, array2): # 计算原始偏差 deviation = angle1 - angle2 # 调整偏差,使其位于-180°到+180°范围内 if deviation == 0.0: deviation = 0.0 else: deviation = (deviation + 180) % 360 - 180 # 将偏差限制在-90°到+90°范围内 if deviation > 90: deviation -= 180 elif deviation < -90: deviation += 180 # 保留两位小数 deviations.append(round(deviation, 2)) return deviations def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos): """ 依据机舱位置(角度)、风向计算两者夹角 """ if fieldAngleIncluded not in dataFrame.columns and fieldWindDirect in dataFrame.columns and fieldNacellePos in dataFrame.columns: dataFrame[Field_AngleIncluded] = self.calculateAngleIncluded() else: dataFrame[Field_AngleIncluded] = dataFrame[fieldAngleIncluded] def recalculationOfGeneratorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio): """ 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速 """ if fieldGeneratorSpeed in dataFrame.columns: dataFrame[Field_GeneratorSpeed] = dataFrame[fieldGeneratorSpeed] if fieldGeneratorSpeed not in dataFrame.columns and fieldRotorSpeed in dataFrame.columns: dataFrame[fieldGeneratorSpeed] = rotationalSpeedRatio * \ dataFrame[fieldRotorSpeed] def recalculationOfRotorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio): """ 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速 """ if fieldRotorSpeed not in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns: dataFrame[fieldRotorSpeed] = dataFrame[fieldGeneratorSpeed] / \ rotationalSpeedRatio def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed): """ 风电机组发电机转矩计算,P的单位换成KW转矩计算公式: P*1000= pi/30*T*n 30000/pi*P=T*n 30000/3.1415926*P=T*n 9549.297*p=T*n 其中:n为发电机转速,p为有功功率,T为转矩 """ if fieldGeneratorTorque not in dataFrame.columns and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns: dataFrame[fieldGeneratorTorque] = 9549.297 * \ dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed] dataFrame[Field_GeneratorTorque] = dataFrame[fieldGeneratorTorque] def getUseColumns(self, confData: ConfBusiness): useColumns = [] if not self.common.isNone(confData.field_turbine_time): useColumns.append(confData.field_turbine_time) if not self.common.isNone(confData.field_turbine_name): useColumns.append(confData.field_turbine_name) if not self.common.isNone(confData.field_wind_speed): useColumns.append(confData.field_wind_speed) if not self.common.isNone(confData.field_power): useColumns.append(confData.field_power) if not self.common.isNone(confData.field_pitch_angle1): useColumns.append(confData.field_pitch_angle1) if not self.common.isNone(confData.field_rotor_speed): useColumns.append(confData.field_rotor_speed) if not self.common.isNone(confData.field_gen_speed): useColumns.append(confData.field_gen_speed) if not self.common.isNone(confData.field_torque): useColumns.append(confData.field_torque) if not self.common.isNone(confData.field_wind_dir): useColumns.append(confData.field_wind_dir) if not self.common.isNone(confData.field_angle_included): useColumns.append(confData.field_angle_included) if not self.common.isNone(confData.field_nacelle_pos): useColumns.append(confData.field_nacelle_pos) if not self.common.isNone(confData.field_env_temp): useColumns.append(confData.field_env_temp) if not self.common.isNone(confData.field_nacelle_temp): useColumns.append(confData.field_nacelle_temp) if not self.common.isNone(confData.field_turbine_state): useColumns.append(confData.field_turbine_state) if not self.common.isNone(confData.field_temperature_large_components): temperature_cols = confData.field_temperature_large_components.split( ',') for temperatureColumn in temperature_cols: useColumns.append(temperatureColumn) print(useColumns) return useColumns def loadData(self, csvFilePath, skip, confData: ConfBusiness, turbineName): # Load the CSV, skipping the specified initial rows dataFrame = pd.read_csv(csvFilePath, header=0,usecols=self.getUseColumns( confData), skiprows=range(1, skip+1)) dataFrame[Field_NameOfTurbine] = confData.add_W_if_starts_with_digit( turbineName) # 选择所有的数值型列 dataFrame = dataFrame.convert_dtypes() numeric_cols = dataFrame.select_dtypes( include=['float64', 'float16']).columns # 将这些列转换为float32 dataFrame[numeric_cols] = dataFrame[numeric_cols].astype('float32') # 首先尝试去除字符串前后的空白 dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].str.strip() dataFrame[confData.field_turbine_time] = pd.to_datetime( dataFrame[confData.field_turbine_time],format='%Y-%m-%d %H:%M:%S',errors="coerce") dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].dt.strftime( '%Y-%m-%d %H:%M:%S') dataFrame[confData.field_turbine_time] = pd.to_datetime( dataFrame[confData.field_turbine_time]) # 对除了“时间”列之外的所有列进行自下而上的填充(先反转后填充) # 注意:补植须要考虑业务合理性 # dataFrame = dataFrame.fillna(method='ffill') # dataFrame = dataFrame.fillna(method='bfill') return dataFrame def execute(self, confData: ConfBusiness): rotationalSpeedRatio = confData.rotational_Speed_Ratio # 转速比 field_Rotor_Speed = confData.field_rotor_speed # 字段 '主轴转速' field_Generator_Speed = confData.field_gen_speed # 字段 '发电机转速' field_Torque = confData.field_torque # 字段 "实际扭矩" field_Active_Power = confData.field_power # 字段 '有功功率' dataFilter = confData.filter # r'E:/BaiduNetdiskDownload/min_scada_TangZhen' csvFileDir = confData.input_path csvFileNameSplitStringForTurbine = confData.csvFileNameSplitStringForTurbine # '.csv' indexTurbine = confData.index_turbine outputRootDir = confData.output_path # r'output' # Example usage: outputDataAfterFilteringDir = r"{}/{}".format( outputRootDir, "DataAfterFiltering") dir.create_directory(outputDataAfterFilteringDir) dataFrameMergeFilter = pd.DataFrame() for rootDir, subDirs, files in dir.list_directory(csvFileDir): files = sorted(files) for file in files: if not file.endswith(".csv"): continue csvFilePath = os.path.join(rootDir, file) print(csvFilePath) turbineName = confData.add_W_if_starts_with_digit(file.split(csvFileNameSplitStringForTurbine)[ indexTurbine]) dataFrameFilter = self.loadData( csvFilePath, confData.skip_row_number, confData, turbineName) dataFrameFilter = self.filterData( dataFrameFilter, confData, dataFilter, rotationalSpeedRatio) self.baseAnalystTurbineNotify(dataFrameFilter, confData, turbineName) dataFrameFilter = self.filterCustom(dataFrameFilter, confData) dataFrameFilter = self.filterForMerge( dataFrameFilter, confData) if len(dataFrameFilter) <= 0: print("dataFrameFilter not data.") continue self.recalculationOfGeneratorSpeed( dataFrameFilter, field_Rotor_Speed, field_Generator_Speed, rotationalSpeedRatio) self.recalculationOfRotorSpeed( dataFrameFilter, field_Rotor_Speed, field_Generator_Speed, rotationalSpeedRatio) self.recalculationOfRotorTorque( dataFrameFilter, field_Torque, field_Active_Power, field_Generator_Speed) self.recalculationOfIncludedAngle( dataFrameFilter, confData.field_angle_included, confData.field_wind_dir, confData.field_nacelle_pos) dataFrameMergeFilter = pd.concat( [dataFrameMergeFilter, dataFrameFilter], axis=0, sort=False) # dataFrameFilter.to_csv(os.path.join( # outputDataAfterFilteringDir, "{}.csv".format(turbineName)), index=False) dataFrameFilter = self.turbineNotify(dataFrameFilter, confData, turbineName) self.baseAnalystNotify(dataFrameMergeFilter, confData) self.turbinesNotify(dataFrameMergeFilter, confData) def filterForMerge(self, dataFrame: pd.DataFrame, confData: ConfBusiness): if not self.common.isNone(confData.field_power) and self.node_active_power_max in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_max]) \ and not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_min in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_min]): activePowerMax = float(confData.filter[self.node_active_power_max]) anglePitchMin = float(confData.filter[self.node_angle_pitch_min]) dataFrame = dataFrame[~((dataFrame[confData.field_power] < activePowerMax*0.9) & ( dataFrame[confData.field_pitch_angle1] > anglePitchMin))] return dataFrame