Explorar o código

Merge branch 'WTOAAM-deploy'

chenhongyan1989 hai 2 semanas
pai
achega
a731daa7b8
Modificáronse 1 ficheiros con 99 adicións e 66 borrados
  1. 99 66
      dataAnalysisBehavior/behavior/baseAnalyst.py

+ 99 - 66
dataAnalysisBehavior/behavior/baseAnalyst.py

@@ -30,7 +30,7 @@ class BaseAnalyst(ABC):
         self.minioUtil = minioUtil
         self.conf = conf
         # 定义风速区间
-        self.binsWindSpeed:NDArray[np.floating[Any]] = np.arange(0, 26, 0.5)
+        self.binsWindSpeed: NDArray[np.floating[Any]] = np.arange(0, 26, 0.5)
 
         self.dataMarker = DataMarker()
         self.common = CommonBusiness()
@@ -69,7 +69,7 @@ class BaseAnalyst(ABC):
         # 轴系 发电机转速
         iGeneratorSpeed = "IgeneratorSpeed"
         dGeneratorSpeed = "DgeneratorSpeed"
-        if self.turbineModelInfo[Field_MotionType] .iloc[0]==2 :
+        if self.turbineModelInfo[Field_MotionType] .iloc[0] == 2:
             # 直驱 发电机转速
             self.axisStepGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].step if not self.common.isNone(
                 conf.dataContract.graphSets[dGeneratorSpeed]) and not self.common.isNone(
@@ -79,7 +79,7 @@ class BaseAnalyst(ABC):
             self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[dGeneratorSpeed].max if not self.common.isNone(
                 conf.dataContract.graphSets[dGeneratorSpeed].max) else 25
         else:
-        # 非直驱 发电机转速
+            # 非直驱 发电机转速
             self.axisStepGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].step if not self.common.isNone(
                 conf.dataContract.graphSets[iGeneratorSpeed]) and not self.common.isNone(
                 conf.dataContract.graphSets[iGeneratorSpeed].step) else 200
@@ -105,8 +105,6 @@ class BaseAnalyst(ABC):
         # self.axisUpperLimitGeneratorSpeed = conf.dataContract.graphSets[iGeneratorSpeed].max if not self.common.isNone(
         #     conf.dataContract.graphSets[iGeneratorSpeed].max) else 2000
 
-
-
         # 轴系 发电机转矩
         iGgeneratorTorque = "IgeneratorTorque"
         dGgeneratorTorque = "DgeneratorTorque"
@@ -137,7 +135,7 @@ class BaseAnalyst(ABC):
             conf.dataContract.graphSets[dGgeneratorTorque].min) else 0
         self.axisUpperLimitGeneratorTorqueWithDirect = conf.dataContract.graphSets[dGgeneratorTorque].max if not self.common.isNone(
             conf.dataContract.graphSets[dGgeneratorTorque].max) else 100000
-                # 非直驱 发电机转矩
+        # 非直驱 发电机转矩
         self.axisStepGeneratorTorque = conf.dataContract.graphSets[iGgeneratorTorque].step if not self.common.isNone(
             conf.dataContract.graphSets[iGgeneratorTorque]) and not self.common.isNone(
             conf.dataContract.graphSets[iGgeneratorTorque].step) else 2000
@@ -186,7 +184,7 @@ class BaseAnalyst(ABC):
         self.axisUpperLimitTSR = conf.dataContract.graphSets[tsr].max if not self.common.isNone(
             conf.dataContract.graphSets[tsr].max) else 20
 
-    @ abstractmethod
+    @abstractmethod
     def typeAnalyst(self):
         pass
 
@@ -292,6 +290,8 @@ class BaseAnalyst(ABC):
         return self.filterCommon(dataFrame, conf)
 
     def analysisOfTurbines(self,  outputAnalysisDir, conf: Contract, turbineCodes):
+        self.logger.info(
+            f"typeAnalyst: {self.typeAnalyst()}  method: analysisOfTurbines , turbineCodes : {turbineCodes}")
         return self.turbinesAnalysis(outputAnalysisDir, conf, turbineCodes)
 
     def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
@@ -317,7 +317,7 @@ class BaseAnalyst(ABC):
         deviations = []
         for angle1, angle2 in zip(array1, array2):
             # 计算原始偏差
-            deviation =  angle1-angle2
+            deviation = angle1-angle2
 
             # 调整偏差,使其位于-180°到+180°范围内
             if deviation == 0.0:
@@ -343,7 +343,7 @@ class BaseAnalyst(ABC):
         if fieldNacellePos in dataFrame.columns and fieldWindDirect in dataFrame.columns and not dataFrame[fieldNacellePos].isna().all() and not dataFrame[fieldWindDirect].isna().all():
             # 计算 angle_included_list
             angle_included_list = self.calculateAngleIncluded(
-               dataFrame[fieldWindDirect],dataFrame[fieldNacellePos] )
+                dataFrame[fieldWindDirect], dataFrame[fieldNacellePos])
 
             # 检查 angle_included_list 的长度是否与 dataFrame 一致
             if len(angle_included_list) != len(dataFrame):
@@ -360,47 +360,50 @@ class BaseAnalyst(ABC):
             # 删除临时列
             dataFrame.drop(columns=['Calculated_AngleIncluded'], inplace=True)
 
-    def recalculationOfFieldPowerFloor(self, dataFrame : pd.DataFrame, fieldActivePower):
+    def recalculationOfFieldPowerFloor(self, dataFrame: pd.DataFrame, fieldActivePower):
         """
         功率计算
         """
         if fieldActivePower in dataFrame.columns:
             dataFrame[Field_PowerFloor] = dataFrame[fieldActivePower].apply(
-                lambda x: int(x / 10) * 10 if pd.notnull(x) or pd.notna(x) else np.nan  # 保留NaN值
+                lambda x: int(
+                    x / 10) * 10 if pd.notnull(x) or pd.notna(x) else np.nan  # 保留NaN值
             )
 
-    def recalculationOfFieldWindSpeedFloor(self, dataFrame : pd.DataFrame, fieldWindSpeed):
+    def recalculationOfFieldWindSpeedFloor(self, dataFrame: pd.DataFrame, fieldWindSpeed):
         """
         风速计算
         """
         if fieldWindSpeed in dataFrame.columns:
             dataFrame[Field_WindSpeedFloor] = dataFrame[fieldWindSpeed].apply(
-                lambda x: int(x/1)+0.5 if pd.notnull(x) or pd.notna(x) else np.nan
+                lambda x: int(x/1) +
+                0.5 if pd.notnull(x) or pd.notna(x) else np.nan
             )
 
-    def recalculationOfGeneratorSpeed(self, turbineModelInfo: pd.Series,dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
+    def recalculationOfGeneratorSpeed(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
         """
         风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
         """
 
         if fieldGeneratorSpeed in dataFrame.columns and fieldRotorSpeed in dataFrame.columns:
 
-            if turbineModelInfo[Field_MotionType]==2 :
-                dataFrame[fieldGeneratorSpeed] =dataFrame[fieldGeneratorSpeed].fillna(dataFrame[fieldRotorSpeed])
+            if turbineModelInfo[Field_MotionType] == 2:
+                dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna(
+                    dataFrame[fieldRotorSpeed])
                 # dataFrame[fieldGeneratorSpeed] =dataFrame[fieldRotorSpeed]
             else:
                 dataFrame[fieldGeneratorSpeed] = dataFrame[fieldGeneratorSpeed].fillna(
                     dataFrame[fieldRotorSpeed]*rotationalSpeedRatio)
                 # dataFrame[fieldGeneratorSpeed] = dataFrame[fieldRotorSpeed]*rotationalSpeedRatio
 
-
-    def recalculationOfRotorSpeed(self,turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
+    def recalculationOfRotorSpeed(self, turbineModelInfo: pd.Series, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio: float):
         """
         风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
         """
         if fieldRotorSpeed in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
-            if turbineModelInfo[Field_MotionType]==2 :
-                dataFrame[fieldRotorSpeed] =dataFrame[fieldRotorSpeed].fillna(dataFrame[fieldGeneratorSpeed])
+            if turbineModelInfo[Field_MotionType] == 2:
+                dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna(
+                    dataFrame[fieldGeneratorSpeed])
             else:
                 dataFrame[fieldRotorSpeed] = dataFrame[fieldRotorSpeed].fillna(
                     dataFrame[fieldGeneratorSpeed] / rotationalSpeedRatio)
@@ -429,31 +432,40 @@ class BaseAnalyst(ABC):
 
         self.recalculationOfFieldWindSpeedFloor(dataFrame, Field_WindSpeed)
 
-        self.recalculationOfGeneratorSpeed(turbineModelInfo,dataFrame, Field_RotorSpeed, Field_GeneratorSpeed,self.turbineModelInfo[Field_RSR].iloc[0])
+        self.recalculationOfGeneratorSpeed(
+            turbineModelInfo, dataFrame, Field_RotorSpeed, Field_GeneratorSpeed, self.turbineModelInfo[Field_RSR].iloc[0])
 
-        self.recalculationOfRotorSpeed(turbineModelInfo,dataFrame, Field_RotorSpeed, Field_GeneratorSpeed,self.turbineModelInfo[Field_RSR].iloc[0])
+        self.recalculationOfRotorSpeed(turbineModelInfo, dataFrame, Field_RotorSpeed,
+                                       Field_GeneratorSpeed, self.turbineModelInfo[Field_RSR].iloc[0])
 
-        self.recalculationOfRotorTorque(dataFrame, Field_GeneratorTorque, Field_ActiverPower, Field_GeneratorSpeed)
+        self.recalculationOfRotorTorque(
+            dataFrame, Field_GeneratorTorque, Field_ActiverPower, Field_GeneratorSpeed)
 
-        self.recalculationOfIncludedAngle(dataFrame, Field_AngleIncluded, Field_WindDirection, Field_NacPos)
+        self.recalculationOfIncludedAngle(
+            dataFrame, Field_AngleIncluded, Field_WindDirection, Field_NacPos)
 
-        self.common.calculateTSR(dataFrame, self.turbineModelInfo[Field_RotorDiameter].iloc[0])
+        self.common.calculateTSR(
+            dataFrame, self.turbineModelInfo[Field_RotorDiameter].iloc[0])
 
-        self.common.calculateCpOfSingleTurbine(dataFrame, self.currPowerFarmInfo[Field_AirDensity], turbineModelInfo[Field_RotorDiameter])
+        self.common.calculateCpOfSingleTurbine(
+            dataFrame, self.currPowerFarmInfo[Field_AirDensity], turbineModelInfo[Field_RotorDiameter])
 
     def processDateTimeForAll(self, dataFrame: pd.DataFrame):
-        dataFrame[Field_UnixYearMonth] = pd.to_datetime(dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
+        dataFrame[Field_UnixYearMonth] = pd.to_datetime(
+            dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
 
         dataFrame[Field_YearMonth] = dataFrame[Field_Time].dt.strftime('%Y-%m')
 
-        dataFrame[Field_YearMonthDay] = dataFrame[Field_Time].dt.strftime('%Y-%m-%d')
+        dataFrame[Field_YearMonthDay] = dataFrame[Field_Time].dt.strftime(
+            '%Y-%m-%d')
 
         return dataFrame
 
-    def processDateTime(self, dataFrame: pd.DataFrame, fieldTime : str = None):
+    def processDateTime(self, dataFrame: pd.DataFrame, fieldTime: str = None):
 
         if Field_Time not in dataFrame.columns:
-            self.logger.info("Field_Time is not in dataFrame columns, Do not handle time processing")
+            self.logger.info(
+                "Field_Time is not in dataFrame columns, Do not handle time processing")
             return dataFrame
 
         if fieldTime is None:
@@ -461,7 +473,8 @@ class BaseAnalyst(ABC):
 
         timeSeries = pd.Series
         if fieldTime == Field_UnixYearMonth:
-            timeSeries = pd.to_datetime(dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
+            timeSeries = pd.to_datetime(
+                dataFrame[Field_Time], format="%Y-%m").apply(lambda x: x.timestamp())
 
         if fieldTime == Field_YearMonth:
             timeSeries = dataFrame[Field_Time].dt.strftime('%Y-%m')
@@ -496,21 +509,26 @@ class BaseAnalyst(ABC):
         self.logger.info(
             f"load data -> Time Granulary: {timeGranularity} Power Farm: {powerFarmID} Batch: {dataBatchNum}  Turbine: {turbineCode} .")
         # Get current turbine info
-        currTurbineInfo = self.common.getTurbineInfo(self.conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo)
+        currTurbineInfo = self.common.getTurbineInfo(
+            self.conf.dataContract.dataFilter.powerFarmID, turbineCode, self.turbineInfo)
         # Get turbine model info
-        currTurbineModelInfo = self.common.getTurbineModelByTurbine(currTurbineInfo, self.turbineModelInfo)
+        currTurbineModelInfo = self.common.getTurbineModelByTurbine(
+            currTurbineInfo, self.turbineModelInfo)
         # select DB data
-        dataFrameOfTurbine = self.loadData(powerFarmID, timeGranularity, f"'{turbineCode}'", select, customCondition)
+        dataFrameOfTurbine = self.loadData(
+            powerFarmID, timeGranularity, f"'{turbineCode}'", select, customCondition)
 
         if dataFrameOfTurbine.empty:
-            self.logger.warning(f"{turbineCode} Time Granulary: {timeGranularity} scada data is empty.")
+            self.logger.warning(
+                f"{turbineCode} Time Granulary: {timeGranularity} scada data is empty.")
             return pd.DataFrame(), dataBatchNum, turbineCode
         else:
-            if  timeGranularity in ['fault', 'warn']:
+            if timeGranularity in ['fault', 'warn']:
                 return dataFrameOfTurbine, dataBatchNum, turbineCode
             else:
                 # Add property to dataFrame
-                self.addPropertyToDataFrame(dataFrameOfTurbine, currTurbineInfo, currTurbineModelInfo)
+                self.addPropertyToDataFrame(
+                    dataFrameOfTurbine, currTurbineInfo, currTurbineModelInfo)
                 # Add engine_name to dataFrame
                 dataFrameOfTurbine[Field_NameOfTurbine] = currTurbineInfo.loc[Field_NameOfTurbine]
                 # Additional data processing steps
@@ -518,29 +536,33 @@ class BaseAnalyst(ABC):
                 # Recalculation
                 self.recalculation(currTurbineModelInfo, dataFrameOfTurbine)
                 # Filter data
-                dataFrameOfTurbine = self.filterCommon(dataFrameOfTurbine, self.conf)
+                dataFrameOfTurbine = self.filterCommon(
+                    dataFrameOfTurbine, self.conf)
 
                 return dataFrameOfTurbine, dataBatchNum, turbineCode
 
-    def selectTimeCondition(self,conf: Contract, conditions: list[str]):
+    def selectTimeCondition(self, conf: Contract, conditions: list[str]):
         """
         时间过滤条件组装
         从json配置中获取时间过滤条件进行组装
         """
         # 时间过滤条件
         if conf.dataContract.dataFilter.beginTime:
-            conditions.append(f"time_stamp >= '{conf.dataContract.dataFilter.beginTime}'")
+            conditions.append(
+                f"time_stamp >= '{conf.dataContract.dataFilter.beginTime}'")
 
         if conf.dataContract.dataFilter.endTime:
-            conditions.append(f"time_stamp <= '{conf.dataContract.dataFilter.endTime}'")
+            conditions.append(
+                f"time_stamp <= '{conf.dataContract.dataFilter.endTime}'")
 
         # 排除月份
         if conf.dataContract.dataFilter.excludingMonths:
-            excluding_months = ", ".join(f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
+            excluding_months = ", ".join(
+                f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
             excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
             conditions.append(excluding_condition)
 
-    def selectLabCondition(self,conditions: list[str]):
+    def selectLabCondition(self, conditions: list[str]):
         """
         lab 过滤条件组装
         根据lab获取相应的数据
@@ -548,37 +570,41 @@ class BaseAnalyst(ABC):
         """
         return
 
-    def selectAllCondition(self,conf: Contract):
+    def selectAllCondition(self, conf: Contract):
         conditions = []
         # 时间过滤条件
-        self.selectTimeCondition(conf,conditions)
-        #lab过滤
+        self.selectTimeCondition(conf, conditions)
+        # lab过滤
         self.selectLabCondition(conditions)
         return " AND ".join(conditions) if conditions else "1=1"
 
-    def selectFaultTimeCondition(self,conf: Contract, conditions: list[str]):
+    def selectFaultTimeCondition(self, conf: Contract, conditions: list[str]):
         """
         时间过滤条件组装
         从json配置中获取时间过滤条件进行组装
         """
         # 时间过滤条件
         if conf.dataContract.dataFilter.beginTime:
-            conditions.append(f"begin_time >= '{conf.dataContract.dataFilter.beginTime}'")
+            conditions.append(
+                f"begin_time >= '{conf.dataContract.dataFilter.beginTime}'")
 
         if conf.dataContract.dataFilter.endTime:
-            conditions.append(f"end_time <= '{conf.dataContract.dataFilter.endTime}'")
+            conditions.append(
+                f"end_time <= '{conf.dataContract.dataFilter.endTime}'")
 
         # 排除月份
         if conf.dataContract.dataFilter.excludingMonths:
-            excluding_months = ", ".join(f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
+            excluding_months = ", ".join(
+                f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
             excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
             conditions.append(excluding_condition)
     # 故障数据过滤条件
-    def selectAllFaultCondition(self,conf: Contract):
+
+    def selectAllFaultCondition(self, conf: Contract):
         conditions = []
         # 时间过滤条件
-        self.selectFaultTimeCondition(conf,conditions)
-        #lab过滤
+        self.selectFaultTimeCondition(conf, conditions)
+        # lab过滤
         self.selectLabCondition(conditions)
         return " AND ".join(conditions) if conditions else "1=1"
 
@@ -589,14 +615,17 @@ class BaseAnalyst(ABC):
 
             configAnalysisDF = pd.DataFrame(
                 [config.to_dict() for config in conf.dataContract.configAnalysis])
-            scadaTimeGranularities = configAnalysisDF["scada"].unique()
+            configAnalysisDF = configAnalysisDF[(configAnalysisDF["className"] == self.__class__.__name__)]
 
+            scadaTimeGranularities = configAnalysisDF["scada"].unique()
+            self.logger.info(
+                f"typeAnalyst: {self.typeAnalyst()}  method: processTurbineData , scadaTimeGranularities : {scadaTimeGranularities}  current class : {self.__class__.__name__}" )
             dictionary = dict()
             for timeGranularity in scadaTimeGranularities:
                 dataFrames = []
                 dataFrameOfTurbines = pd.DataFrame()
-                if  timeGranularity in ['fault', 'warn']:
-                    select_conditions=self.selectAllFaultCondition(conf)
+                if timeGranularity in ['fault', 'warn']:
+                    select_conditions = self.selectAllFaultCondition(conf)
                 maxWorkers = 5
                 with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
                     futures = [
@@ -610,44 +639,48 @@ class BaseAnalyst(ABC):
                             dataFrameOfTurbine, dataBatchNum, turbineCode = future.result()
 
                             dataFrames.append(dataFrameOfTurbine)
-                            self.logger.info(f"data frame append,dataBatchNum: {dataBatchNum}  turbineCode: {turbineCode}")
+                            self.logger.info(
+                                f"data frame append,dataBatchNum: {dataBatchNum}  turbineCode: {turbineCode}")
                         except Exception as exc:
                             raise exc
 
                 dataFrameOfTurbines = pd.concat(dataFrames, ignore_index=True)
 
-                self.logger.info(f"data frame concat dataBatchNum: {dataBatchNum}  timeGranularity: {timeGranularity} finish")
+                self.logger.info(
+                    f"data frame concat dataBatchNum: {dataBatchNum}  timeGranularity: {timeGranularity} finish")
                 if dataFrameOfTurbines.empty:
                     excption = CustomError(102)
                     self.logger.warning(
                         f"{excption.message} Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch : {conf.dataContract.dataFilter.dataBatchNum} Time Granularity : {timeGranularity}")
                     raise excption
                 else:
-                    self.logger.info(f"data frame concat dataBatchNum: {dataBatchNum}  timeGranularity: {timeGranularity} dataFrameOfTurbines : {dataFrameOfTurbines}")
+                    self.logger.info(
+                        f"data frame concat dataBatchNum: {dataBatchNum}  timeGranularity: {timeGranularity} dataFrameOfTurbines : {dataFrameOfTurbines}")
                     if Field_DeviceCode in dataFrameOfTurbines.columns and Field_CodeOfTurbine not in dataFrameOfTurbines.columns:
-                        dataFrameOfTurbines = dataFrameOfTurbines.rename(columns={Field_DeviceCode: Field_CodeOfTurbine})
+                        dataFrameOfTurbines = dataFrameOfTurbines.rename(
+                            columns={Field_DeviceCode: Field_CodeOfTurbine})
 
                     dictionary[timeGranularity] = dataFrameOfTurbines
             return dictionary
         except Exception as e:
-            self.logger.error(f"Error processing turbine data:{traceback.format_exc()}")
+            self.logger.error(
+                f"Error processing turbine data:{traceback.format_exc()}")
             raise
 
-    def addPropertyToDataFrame(self,dataFrameOfTurbine : pd.DataFrame, currTurbineInfo : pd.Series, currTurbineModelInfo: pd.Series):
+    def addPropertyToDataFrame(self, dataFrameOfTurbine: pd.DataFrame, currTurbineInfo: pd.Series, currTurbineModelInfo: pd.Series):
         """
         用来添加额外当前风机属性
         在business中相应的分析员中有实现,没有实现就无额外参数添加
         """
         return
 
-    def escape_special_characters(self, original_string:str):
+    def escape_special_characters(self, original_string: str):
         """
             ---废弃---
             特殊字符url编码处理
             "/" 符号单独处理
         """
-        retrun_string =  quote(original_string)
+        retrun_string = quote(original_string)
         if "/" in retrun_string:
-            retrun_string =  retrun_string.replace("/","%2F")
+            retrun_string = retrun_string.replace("/", "%2F")
         return retrun_string
-