123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- import os
- from datetime import datetime
- import concurrent.futures
- import pandas as pd
- from confBusiness import ConfBusiness, Field_NameOfTurbine, Field_GeneratorSpeed, Field_GeneratorTorque,Field_AngleIncluded
- from .utils.directoryUtil import DirectoryUtil as dir
- from .baseAnalyst import BaseAnalyst
- from .analyst import Analyst
- from .commonBusiness import CommonBusiness
- class DataProcessor:
- def __init__(self):
- self.common=CommonBusiness()
- self._baseAnalysts = []
- self._noCustomFilterAnalysts = []
- self._analysts = []
- self.node_filter_value_state_turbine = "filter_value_state_turbine"
- self.node_angle_pitch_min = "angle_pitch_min"
- self.node_angle_pitch_max = "angle_pitch_max"
- self.node_speed_wind_cut_in = "speed_wind_cut_in"
- self.node_speed_wind_cut_out = "speed_wind_cut_out"
- self.node_active_power_min = "active_power_min"
- self.node_active_power_max = "active_power_max"
- def attachBaseAnalyst(self, analyst: BaseAnalyst):
- if analyst not in self._analysts:
- self._baseAnalysts.append(analyst)
- def detachBaseAnalyst(self, analyst: BaseAnalyst):
- try:
- self._baseAnalysts.remove(analyst)
- except ValueError:
- pass
- def attach(self, analyst: Analyst):
- if analyst not in self._analysts:
- self._analysts.append(analyst)
- def detach(self, analyst: Analyst):
- try:
- self._analysts.remove(analyst)
- except ValueError:
- pass
- def turbineNotify(self,
- dataFrameOfTurbine: pd.DataFrame,
- confData: ConfBusiness,
- turbineName):
- for analyst in self._analysts:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- outputFilePath = r"{}/{}_{}.csv".format(
- outputAnalysisDir, turbineName, analyst.typeAnalyst())
- analyst.analysisOfTurbine(
- dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName)
- def turbinesNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness):
- for analyst in self._analysts:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- analyst.analysisOfTurbines(
- dataFrameOfTurbines, outputAnalysisDir, confData)
- def baseAnalystTurbineNotify(self,
- dataFrameOfTurbine: pd.DataFrame,
- confData: ConfBusiness,
- turbineName):
- for analyst in self._baseAnalysts:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- outputFilePath = r"{}/{}_{}.csv".format(
- outputAnalysisDir, turbineName, analyst.typeAnalyst())
- analyst.analysisOfTurbine(
- dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName)
- def baseAnalystNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness):
- for analyst in self._baseAnalysts:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- analyst.analysisOfTurbines(
- dataFrameOfTurbines, outputAnalysisDir, confData)
- def filterCustom(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
- if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_in in confData.filter \
- and not self.common.isNone(confData.filter[self.node_speed_wind_cut_in]) \
- and not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_out in confData.filter \
- and not self.common.isNone(confData.filter[self.node_speed_wind_cut_out]):
- windSpeedCutIn = float(
- confData.filter[self.node_speed_wind_cut_in])
- windSpeedCutOut = float(
- confData.filter[self.node_speed_wind_cut_out])
- dataFrame = dataFrame[~((dataFrame[confData.field_wind_speed] > windSpeedCutOut) | (
- dataFrame[confData.field_wind_speed] < 0))]
- dataFrame = dataFrame[~((dataFrame[confData.field_wind_speed] < windSpeedCutIn) & (
- dataFrame[confData.field_power] < confData.rated_power*1.2))]
- dataFrame = dataFrame[(
- dataFrame[confData.field_power] <= confData.rated_power*1.2)]
- # Filter rows where turbine state
- if not self.common.isNone(confData.field_turbine_state) and self.node_filter_value_state_turbine in confData.filter and not self.common.isNone(confData.filter[self.node_filter_value_state_turbine]):
- stateTurbine = confData.filter[self.node_filter_value_state_turbine]
- dataFrame = dataFrame[dataFrame[confData.field_turbine_state].isin(
- stateTurbine)]
- # # Filter rows where pitch
- # if not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_min in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_min]):
- # anglePitchMin = float(confData.filter[self.node_angle_pitch_min])
- # dataFrame = dataFrame[(
- # dataFrame[confData.field_pitch_angle1] >= anglePitchMin)]
- # if not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_max in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_max]):
- # anglePitchMax = float(confData.filter[self.node_angle_pitch_max])
- # dataFrame = dataFrame[(
- # dataFrame[confData.field_pitch_angle1] <= anglePitchMax)]
- # # Filter rows where wind speed
- # if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_in in confData.filter and not self.common.isNone(confData.filter[self.node_speed_wind_cut_in]):
- # windSpeedCutIn = float(confData.filter[self.node_speed_wind_cut_in])
- # dataFrame = dataFrame[(
- # dataFrame[confData.field_wind_speed] >= windSpeedCutIn)]
- # if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_out in confData.filter and not self.common.isNone(confData.filter[self.node_speed_wind_cut_out]):
- # windSpeedCutOut = float(confData.filter[self.node_speed_wind_cut_out])
- # dataFrame = dataFrame[(
- # dataFrame[confData.field_wind_speed] < windSpeedCutOut)]
- # # Filter rows where power
- # if not self.common.isNone(confData.field_power) and self.node_active_power_min in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_min]):
- # activePowerMin = float(confData.filter[self.node_active_power_min])
- # dataFrame = dataFrame[(
- # dataFrame[confData.field_power] >= activePowerMin)]
- # if not self.common.isNone(confData.field_power) and self.node_active_power_max in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_max]):
- # activePowerMax = float(confData.filter[self.node_active_power_max])
- # dataFrame = dataFrame[(
- # dataFrame[confData.field_power] < activePowerMax)]
- return dataFrame
- def filterData(self, dataFrame: pd.DataFrame, confData: ConfBusiness, dataFilter, rotationalSpeedRatio):
- # dataFrame = dataFrame.dropna(axis=0,subset=[confData.field_power,confData.field_wind_speed,confData.field_pitch_angle1])
- dataFrame = dataFrame.dropna(
- axis=0, subset=self.getUseColumns(confData))
-
- if confData.field_wind_speed in dataFrame.columns:
- dataFrame[confData.field_wind_speed] = dataFrame[confData.field_wind_speed].astype('float32')
- if confData.field_wind_dir in dataFrame.columns:
- dataFrame[confData.field_wind_dir]=dataFrame[confData.field_wind_dir].astype('float32')
- if confData.field_angle_included in dataFrame.columns:
- dataFrame[confData.field_angle_included]=dataFrame[confData.field_angle_included].astype('float32')
- if confData.field_power in dataFrame.columns:
- dataFrame[confData.field_power]=dataFrame[confData.field_power].astype('float32')
-
- if confData.field_wind_dir in dataFrame.columns:
- dataFrame[confData.field_wind_dir]=dataFrame[confData.field_wind_dir].astype('float32')
- if confData.field_pitch_angle1 in dataFrame.columns:
- dataFrame[confData.field_pitch_angle1]=dataFrame[confData.field_pitch_angle1].astype('float32')
- if confData.field_gen_speed in dataFrame.columns:
- dataFrame[confData.field_gen_speed]=dataFrame[confData.field_gen_speed].astype('float32')
-
- if confData.field_rotor_speed in dataFrame.columns:
- dataFrame[confData.field_rotor_speed]=dataFrame[confData.field_rotor_speed].astype('float32')
- dataFrame = dataFrame[(dataFrame[confData.field_turbine_time] >= confData.start_time) & (
- dataFrame[confData.field_turbine_time] < confData.end_time)]
- if not self.common.isNone(confData.excludingMonths) and len(confData.excludingMonths) > 0:
- # 给定的日期列表
- date_strings = []
- for month in confData.excludingMonths:
- if not self.common.isNone(month):
- date_strings.append(month)
- if len(date_strings) > 0:
- mask = ~dataFrame["year-month"].isin(date_strings)
- # 使用掩码过滤DataFrame,删除指定日期的行
- dataFrame = dataFrame[mask]
- dataFrame["年月"] = pd.to_datetime(
- dataFrame[confData.field_turbine_time], format="%Y-%m")
- dataFrame['日期'] = pd.to_datetime(
- dataFrame[confData.field_turbine_time], format="%Y-%m")
- dataFrame['monthIntTime'] = dataFrame['日期'].apply(
- lambda x: x.timestamp())
- dataFrame["year-month"] = dataFrame[confData.field_turbine_time].dt.strftime(
- '%Y-%m')
- return dataFrame
- def calculateAngleIncluded(self, array1, array2):
- """
- 计算两个相同长度角度数组中两两对应角度值的偏差。
- 结果限制在-90°到+90°之间,并保留两位小数。
- 参数:
- array1 (list): 第一个角度数组
- array2 (list): 第二个角度数组
- 返回:
- list: 两两对应角度的偏差列表
- """
- deviations = []
- for angle1, angle2 in zip(array1, array2):
- # 计算原始偏差
- deviation = angle1 - angle2
- # 调整偏差,使其位于-180°到+180°范围内
- if deviation == 0.0:
- deviation = 0.0
- else:
- deviation = (deviation + 180) % 360 - 180
- # 将偏差限制在-90°到+90°范围内
- if deviation > 90:
- deviation -= 180
- elif deviation < -90:
- deviation += 180
- # 保留两位小数
- deviations.append(round(deviation, 2))
- return deviations
- def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos):
- """
- 依据机舱位置(角度)、风向计算两者夹角
- """
- if fieldAngleIncluded not in dataFrame.columns and fieldWindDirect in dataFrame.columns and fieldNacellePos in dataFrame.columns:
- dataFrame[Field_AngleIncluded] = self.calculateAngleIncluded()
- else:
- dataFrame[Field_AngleIncluded] = dataFrame[fieldAngleIncluded]
- def recalculationOfGeneratorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio):
- """
- 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
- """
- if fieldGeneratorSpeed in dataFrame.columns:
- dataFrame[Field_GeneratorSpeed] = dataFrame[fieldGeneratorSpeed]
- if fieldGeneratorSpeed not in dataFrame.columns and fieldRotorSpeed in dataFrame.columns:
- dataFrame[fieldGeneratorSpeed] = rotationalSpeedRatio * \
- dataFrame[fieldRotorSpeed]
- def recalculationOfRotorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio):
- """
- 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
- """
- if fieldRotorSpeed not in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
- dataFrame[fieldRotorSpeed] = dataFrame[fieldGeneratorSpeed] / \
- rotationalSpeedRatio
- def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed):
- """
- 风电机组发电机转矩计算,P的单位换成KW转矩计算公式:
- P*1000= pi/30*T*n
- 30000/pi*P=T*n
- 30000/3.1415926*P=T*n
- 9549.297*p=T*n
- 其中:n为发电机转速,p为有功功率,T为转矩
- """
- if fieldGeneratorTorque not in dataFrame.columns and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
- dataFrame[fieldGeneratorTorque] = 9549.297 * \
- dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed]
- dataFrame[Field_GeneratorTorque] = dataFrame[fieldGeneratorTorque]
-
- def getUseColumns(self, confData: ConfBusiness):
- useColumns = []
- if not self.common.isNone(confData.field_turbine_time):
- useColumns.append(confData.field_turbine_time)
- if not self.common.isNone(confData.field_turbine_name):
- useColumns.append(confData.field_turbine_name)
- if not self.common.isNone(confData.field_wind_speed):
- useColumns.append(confData.field_wind_speed)
- if not self.common.isNone(confData.field_power):
- useColumns.append(confData.field_power)
- if not self.common.isNone(confData.field_pitch_angle1):
- useColumns.append(confData.field_pitch_angle1)
- if not self.common.isNone(confData.field_rotor_speed):
- useColumns.append(confData.field_rotor_speed)
- if not self.common.isNone(confData.field_gen_speed):
- useColumns.append(confData.field_gen_speed)
- if not self.common.isNone(confData.field_torque):
- useColumns.append(confData.field_torque)
- if not self.common.isNone(confData.field_wind_dir):
- useColumns.append(confData.field_wind_dir)
- if not self.common.isNone(confData.field_angle_included):
- useColumns.append(confData.field_angle_included)
- if not self.common.isNone(confData.field_nacelle_pos):
- useColumns.append(confData.field_nacelle_pos)
- if not self.common.isNone(confData.field_env_temp):
- useColumns.append(confData.field_env_temp)
- if not self.common.isNone(confData.field_nacelle_temp):
- useColumns.append(confData.field_nacelle_temp)
- if not self.common.isNone(confData.field_turbine_state):
- useColumns.append(confData.field_turbine_state)
- if not self.common.isNone(confData.field_temperature_large_components):
- temperature_cols = confData.field_temperature_large_components.split(
- ',')
- for temperatureColumn in temperature_cols:
- useColumns.append(temperatureColumn)
- print(useColumns)
- return useColumns
- def loadData(self, csvFilePath, skip, confData: ConfBusiness, turbineName):
- # Load the CSV, skipping the specified initial rows
- dataFrame = pd.read_csv(csvFilePath, header=0,usecols=self.getUseColumns(
- confData), skiprows=range(1, skip+1))
- dataFrame[Field_NameOfTurbine] = confData.add_W_if_starts_with_digit(
- turbineName)
- # 选择所有的数值型列
- dataFrame = dataFrame.convert_dtypes()
- numeric_cols = dataFrame.select_dtypes(
- include=['float64', 'float16']).columns
- # 将这些列转换为float32
- dataFrame[numeric_cols] = dataFrame[numeric_cols].astype('float32')
- # 首先尝试去除字符串前后的空白
- dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].str.strip()
- dataFrame[confData.field_turbine_time] = pd.to_datetime(
- dataFrame[confData.field_turbine_time],format='%Y-%m-%d %H:%M:%S',errors="coerce")
- dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].dt.strftime(
- '%Y-%m-%d %H:%M:%S')
- dataFrame[confData.field_turbine_time] = pd.to_datetime(
- dataFrame[confData.field_turbine_time])
- # 对除了“时间”列之外的所有列进行自下而上的填充(先反转后填充)
- # 注意:补植须要考虑业务合理性
- # dataFrame = dataFrame.fillna(method='ffill')
- # dataFrame = dataFrame.fillna(method='bfill')
- return dataFrame
- def execute(self, confData: ConfBusiness):
- rotationalSpeedRatio = confData.rotational_Speed_Ratio # 转速比
- field_Rotor_Speed = confData.field_rotor_speed # 字段 '主轴转速'
- field_Generator_Speed = confData.field_gen_speed # 字段 '发电机转速'
- field_Torque = confData.field_torque # 字段 "实际扭矩"
- field_Active_Power = confData.field_power # 字段 '有功功率'
- dataFilter = confData.filter
- # r'E:/BaiduNetdiskDownload/min_scada_TangZhen'
- csvFileDir = confData.input_path
- csvFileNameSplitStringForTurbine = confData.csvFileNameSplitStringForTurbine # '.csv'
- indexTurbine = confData.index_turbine
- outputRootDir = confData.output_path # r'output'
- # Example usage:
- outputDataAfterFilteringDir = r"{}/{}".format(
- outputRootDir, "DataAfterFiltering")
- dir.create_directory(outputDataAfterFilteringDir)
- dataFrameMergeFilter = pd.DataFrame()
- for rootDir, subDirs, files in dir.list_directory(csvFileDir):
- files = sorted(files)
- for file in files:
- if not file.endswith(".csv"):
- continue
- csvFilePath = os.path.join(rootDir, file)
- print(csvFilePath)
- turbineName = confData.add_W_if_starts_with_digit(file.split(csvFileNameSplitStringForTurbine)[
- indexTurbine])
- dataFrameFilter = self.loadData(
- csvFilePath, confData.skip_row_number, confData, turbineName)
- dataFrameFilter = self.filterData(
- dataFrameFilter, confData, dataFilter, rotationalSpeedRatio)
- self.baseAnalystTurbineNotify(dataFrameFilter,
- confData,
- turbineName)
- dataFrameFilter = self.filterCustom(dataFrameFilter, confData)
- dataFrameFilter = self.filterForMerge(
- dataFrameFilter, confData)
- if len(dataFrameFilter) <= 0:
- print("dataFrameFilter not data.")
- continue
- self.recalculationOfGeneratorSpeed(
- dataFrameFilter, field_Rotor_Speed, field_Generator_Speed, rotationalSpeedRatio)
- self.recalculationOfRotorSpeed(
- dataFrameFilter, field_Rotor_Speed, field_Generator_Speed, rotationalSpeedRatio)
- self.recalculationOfRotorTorque(
- dataFrameFilter, field_Torque, field_Active_Power, field_Generator_Speed)
- self.recalculationOfIncludedAngle(
- dataFrameFilter, confData.field_angle_included, confData.field_wind_dir, confData.field_nacelle_pos)
- dataFrameMergeFilter = pd.concat(
- [dataFrameMergeFilter, dataFrameFilter], axis=0, sort=False)
- # dataFrameFilter.to_csv(os.path.join(
- # outputDataAfterFilteringDir, "{}.csv".format(turbineName)), index=False)
- dataFrameFilter = self.turbineNotify(dataFrameFilter,
- confData,
- turbineName)
- self.baseAnalystNotify(dataFrameMergeFilter, confData)
- self.turbinesNotify(dataFrameMergeFilter, confData)
- def filterForMerge(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
- if not self.common.isNone(confData.field_power) and self.node_active_power_max in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_max]) \
- and not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_min in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_min]):
- activePowerMax = float(confData.filter[self.node_active_power_max])
- anglePitchMin = float(confData.filter[self.node_angle_pitch_min])
- dataFrame = dataFrame[~((dataFrame[confData.field_power] < activePowerMax*0.9) & (
- dataFrame[confData.field_pitch_angle1] > anglePitchMin))]
- return dataFrame
|