| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- import os
- from datetime import datetime
- import concurrent.futures
- import numpy as np
- import pandas as pd
- from utils.directoryUtil import DirectoryUtil as dir
- from algorithmContract.confBusiness import *
- from behavior.baseAnalyst import BaseAnalyst
- from behavior.analyst import Analyst
- from common.commonBusiness import CommonBusiness
- from algorithm.dataMarker import DataMarker
- class DataProcessor:
- def __init__(self):
- self.common = CommonBusiness()
- self._baseAnalysts = []
- self._analysts = []
- def attachBaseAnalyst(self, analyst: BaseAnalyst):
- if analyst not in self._analysts:
- self._baseAnalysts.append(analyst)
- def detachBaseAnalyst(self, analyst: BaseAnalyst):
- try:
- self._baseAnalysts.remove(analyst)
- except ValueError:
- pass
- def attach(self, analyst: Analyst):
- if analyst not in self._analysts:
- self._analysts.append(analyst)
- def detach(self, analyst: Analyst):
- try:
- self._analysts.remove(analyst)
- except ValueError:
- pass
- def turbineNotify(self,
- dataFrameOfTurbine: pd.DataFrame,
- confData: ConfBusiness,
- turbineName):
- for analyst in self._analysts:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- outputFilePath = r"{}/{}{}".format(
- outputAnalysisDir, turbineName, CSVSuffix)
- analyst.analysisOfTurbine(
- dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName)
- def turbinesNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness):
- for analyst in self._analysts:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- analyst.analysisOfTurbines(
- dataFrameOfTurbines, outputAnalysisDir, confData)
- def baseAnalystTurbineNotify(self,
- dataFrameOfTurbine: pd.DataFrame,
- confData: ConfBusiness,
- turbineName):
- for analyst in self._baseAnalysts:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- outputFilePath = r"{}/{}{}".format(
- outputAnalysisDir, turbineName, CSVSuffix)
- analyst.analysisOfTurbine(
- dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName)
- def baseAnalystNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness):
- for analyst in self._baseAnalysts:
- outputAnalysisDir = analyst.getOutputAnalysisDir()
- analyst.analysisOfTurbines(
- dataFrameOfTurbines, outputAnalysisDir, confData)
- def calculateAngleIncluded(self, array1, array2):
- """
- 计算两个相同长度角度数组中两两对应角度值的偏差。
- 结果限制在-90°到+90°之间,并保留两位小数。
- 参数:
- array1 (list): 第一个角度数组
- array2 (list): 第二个角度数组
- 返回:
- list: 两两对应角度的偏差列表
- """
- deviations = []
- for angle1, angle2 in zip(array1, array2):
- # 计算原始偏差
- deviation = angle1 - angle2
- # 调整偏差,使其位于-180°到+180°范围内
- if deviation == 0.0:
- deviation = 0.0
- else:
- deviation = (deviation + 180) % 360 - 180
- # 将偏差限制在-90°到+90°范围内
- if deviation > 90:
- deviation -= 180
- elif deviation < -90:
- deviation += 180
- # 保留两位小数
- deviations.append(round(deviation, 2))
- return deviations
- def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos):
- """
- 依据机舱位置(角度)、风向计算两者夹角
- """
- if not self.common.isNone(fieldAngleIncluded) and fieldAngleIncluded in dataFrame.columns:
- dataFrame[Field_AngleIncluded] = dataFrame[fieldAngleIncluded]
- if self.common.isNone(fieldAngleIncluded) and fieldAngleIncluded not in dataFrame.columns and fieldWindDirect in dataFrame.columns and fieldNacellePos in dataFrame.columns:
- dataFrame[Field_AngleIncluded] = self.calculateAngleIncluded(
- dataFrame[fieldNacellePos], dataFrame[fieldWindDirect])
- def recalculationOfGeneratorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio):
- """
- 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
- """
- if fieldGeneratorSpeed in dataFrame.columns:
- dataFrame[Field_GeneratorSpeed] = dataFrame[fieldGeneratorSpeed]
- if fieldGeneratorSpeed not in dataFrame.columns and fieldRotorSpeed in dataFrame.columns:
- dataFrame[fieldGeneratorSpeed] = rotationalSpeedRatio * \
- dataFrame[fieldRotorSpeed]
- def recalculationOfRotorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio):
- """
- 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
- """
- if fieldRotorSpeed not in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
- dataFrame[fieldRotorSpeed] = dataFrame[fieldGeneratorSpeed] / \
- rotationalSpeedRatio
- if not self.common.isNone(fieldRotorSpeed) and fieldRotorSpeed in dataFrame.columns:
- dataFrame[Field_RotorSpeed] = dataFrame[fieldRotorSpeed]
- def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed):
- """
- 风电机组发电机转矩计算,P的单位换成KW转矩计算公式:
- P*1000= pi/30*T*n
- 30000/pi*P=T*n
- 30000/3.1415926*P=T*n
- 9549.297*p=T*n
- 其中:n为发电机转速,p为有功功率,T为转矩
- """
- if self.common.isNone(fieldGeneratorTorque) and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
- dataFrame[Field_GeneratorTorque] = 9549.297 * \
- dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed]
- if fieldGeneratorTorque in dataFrame.columns:
- dataFrame[Field_GeneratorTorque] = dataFrame[fieldGeneratorTorque]
- def recalculation(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
- """
- 再计算数据测点
- 参数:
- dataFrame 原始数据
- confData 配置数据
- """
- self.recalculationOfGeneratorSpeed(
- dataFrame, confData.field_rotor_speed, confData.field_gen_speed, confData.rotational_Speed_Ratio)
- self.recalculationOfRotorSpeed(
- dataFrame, confData.field_rotor_speed, confData.field_gen_speed, confData.rotational_Speed_Ratio)
- self.recalculationOfRotorTorque(
- dataFrame, confData.field_torque, confData.field_power, confData.field_gen_speed)
- self.recalculationOfIncludedAngle(
- dataFrame, confData.field_angle_included, confData.field_wind_dir, confData.field_nacelle_pos)
- self.common.calculateTSR(dataFrame, confData)
- self.common.calculateCp(dataFrame, confData)
- def filterWithDateTime(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
- dataFrame = dataFrame[(dataFrame[confData.field_turbine_time] >= confData.start_time) & (
- dataFrame[confData.field_turbine_time] < confData.end_time)]
- return dataFrame
- def processDateTime(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
- dataFrame["年月"] = pd.to_datetime(
- dataFrame[confData.field_turbine_time], format="%Y-%m")
- dataFrame['日期'] = pd.to_datetime(
- dataFrame[confData.field_turbine_time], format="%Y-%m")
- dataFrame['monthIntTime'] = dataFrame['日期'].apply(
- lambda x: x.timestamp())
- dataFrame[Field_YearMonth] = dataFrame[confData.field_turbine_time].dt.strftime(
- '%Y-%m')
- dataFrame[Field_YearMonthDay] = dataFrame[confData.field_turbine_time].dt.strftime(
- '%Y-%m-%d')
- if not self.common.isNone(confData.excludingMonths) and len(confData.excludingMonths) > 0:
- # 给定的日期列表
- date_strings = []
- for month in confData.excludingMonths:
- if not self.common.isNone(month):
- date_strings.append(month)
- if len(date_strings) > 0:
- mask = ~dataFrame[Field_YearMonth].isin(date_strings)
- # 使用掩码过滤DataFrame,删除指定日期的行
- dataFrame = dataFrame[mask]
- return dataFrame
- def setColumnDataType(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
- # 选择所有的数值型列
- dataFrame = dataFrame.convert_dtypes()
- numeric_cols = dataFrame.select_dtypes(
- include=['float64', 'float16']).columns
-
- # 将这些列转换为float32
- dataFrame[Field_NameOfTurbine] = dataFrame[Field_NameOfTurbine].astype(str)
- # 将这些列转换为float32
- dataFrame[numeric_cols] = dataFrame[numeric_cols].astype(
- self.common.getFloat32())
- if not self.common.isNone(confData.field_turbine_time) and confData.field_turbine_time in dataFrame.columns:
- # 首先尝试去除字符串前后的空白
- dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].str.strip()
- dataFrame[confData.field_turbine_time] = pd.to_datetime(
- dataFrame[confData.field_turbine_time], format='%Y-%m-%d %H:%M:%S', errors="coerce")
- dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].dt.strftime(
- '%Y-%m-%d %H:%M:%S')
- dataFrame[confData.field_turbine_time] = pd.to_datetime(
- dataFrame[confData.field_turbine_time])
- # 删除时间字段为空的行记录
- dataFrame.dropna(
- axis=0, subset=[confData.field_turbine_time], inplace=True)
- if confData.field_wind_speed in dataFrame.columns:
- dataFrame[confData.field_wind_speed] = dataFrame[confData.field_wind_speed].astype(
- self.common.getFloat32())
- if confData.field_wind_dir in dataFrame.columns:
- dataFrame[confData.field_wind_dir] = dataFrame[confData.field_wind_dir].astype(
- self.common.getFloat32())
- if confData.field_angle_included in dataFrame.columns:
- dataFrame[confData.field_angle_included] = dataFrame[confData.field_angle_included].astype(
- self.common.getFloat32())
- if confData.field_power in dataFrame.columns:
- dataFrame[confData.field_power] = dataFrame[confData.field_power].astype(
- self.common.getFloat32())
- if confData.field_wind_dir in dataFrame.columns:
- dataFrame[confData.field_wind_dir] = dataFrame[confData.field_wind_dir].astype(
- self.common.getFloat32())
- if confData.field_pitch_angle1 in dataFrame.columns:
- dataFrame[confData.field_pitch_angle1] = dataFrame[confData.field_pitch_angle1].astype(
- self.common.getFloat32())
- if confData.field_pitch_angle2 in dataFrame.columns:
- dataFrame[confData.field_pitch_angle2] = dataFrame[confData.field_pitch_angle2].astype(
- self.common.getFloat32())
- if confData.field_pitch_angle3 in dataFrame.columns:
- dataFrame[confData.field_pitch_angle3] = dataFrame[confData.field_pitch_angle3].astype(
- self.common.getFloat32())
- if confData.field_gen_speed in dataFrame.columns:
- dataFrame[confData.field_gen_speed] = dataFrame[confData.field_gen_speed].astype(
- self.common.getFloat32())
- if confData.field_rotor_speed in dataFrame.columns:
- dataFrame[confData.field_rotor_speed] = dataFrame[confData.field_rotor_speed].astype(
- self.common.getFloat32())
-
- if confData.field_Cabin_Vibrate_X in dataFrame.columns:
- dataFrame[confData.field_Cabin_Vibrate_X] = dataFrame[confData.field_Cabin_Vibrate_X].astype(
- self.common.getFloat32())
-
- if confData.field_Cabin_Vibrate_Y in dataFrame.columns:
- dataFrame[confData.field_Cabin_Vibrate_Y] = dataFrame[confData.field_Cabin_Vibrate_Y].astype(
- self.common.getFloat32())
-
- if confData.field_activePowerSet in dataFrame.columns:
- dataFrame[confData.field_activePowerSet] = dataFrame[confData.field_activePowerSet].astype(
- self.common.getFloat32())
-
- if confData.field_activePowerAvailable in dataFrame.columns:
- dataFrame[confData.field_activePowerAvailable] = dataFrame[confData.field_activePowerAvailable].astype(
- self.common.getFloat32())
- return dataFrame
- def loadData(self, csvFilePath, confData: ConfBusiness, turbineName):
- useColumns = self.common.getUseColumns(confData)
- # Load the CSV, skipping the specified initial rows
- dataFrame = pd.read_csv(csvFilePath, header=0, usecols=useColumns, skiprows=range(
- 1, confData.skip_row_number+1))
- if not self.common.isNone(confData.field_turbine_name) and confData.field_turbine_name in dataFrame.columns:
- dataFrame[Field_NameOfTurbine] = dataFrame[confData.field_turbine_name]
- else:
- dataFrame[Field_NameOfTurbine] = turbineName
- # 对除了“时间”列之外的所有列进行自下而上的填充(先反转后填充)
- # 注意:补植须要考虑业务合理性
- # dataFrame = dataFrame.fillna(method='ffill')
- # dataFrame = dataFrame.fillna(method='bfill')
- return dataFrame
- def execute(self, confData: ConfBusiness):
- outputDataAfterFilteringDir = r"{}/{}".format(
- confData.output_path, "DataAfterFiltering")
- dir.create_directory(outputDataAfterFilteringDir)
- labler = DataMarker() #类的实例化
-
- dataFrameMerge = pd.DataFrame()
- for rootDir, subDirs, files in dir.list_directory(confData.input_path):
- files = sorted(files)
- for file in files:
- if not file.endswith(CSVSuffix):
- continue
- csvFilePath = os.path.join(rootDir, file)
- print(f"current csv file path: {csvFilePath}")
- turbineName = confData.add_W_if_starts_with_digit(file.split(confData.csvFileNameSplitStringForTurbine)[
- confData.index_turbine])
- dataFrame = self.loadData(
- csvFilePath, confData, turbineName)
- turbineName = dataFrame[Field_NameOfTurbine].loc[0]
- if len(dataFrame) <= 0:
- print("dataFrameFilter not data.")
- continue
- dataFrame = self.setColumnDataType(
- dataFrame, confData)
- dataFrame = self.processDateTime(dataFrame, confData)
- dataFrame = self.filterWithDateTime(dataFrame, confData)
- # self.baseAnalystTurbineNotify(dataFrame,
- # confData,
- # turbineName)
- self.recalculation(dataFrame, confData)
- # dataFrame = labler.main(confData,dataFrame)
- dataFrameMerge = pd.concat(
- [dataFrameMerge, dataFrame], axis=0, sort=False)
- # dataFrame.to_csv(os.path.join(
- # outputDataAfterFilteringDir, "{}{}".format(turbineName,CSVSuffix)), index=False)
- dataFrame = self.turbineNotify(dataFrame,
- confData,
- turbineName)
- # self.baseAnalystNotify(dataFrameMergeFilter, confData)
- self.turbinesNotify(dataFrameMerge, confData)
|