Parcourir la source

增加符合故障分析的基类分析语句、增加分析完成时间

wenjia Li il y a 6 mois
Parent
commit
8503d9cc5f

+ 44 - 13
dataAnalysisBehavior/behavior/baseAnalyst.py

@@ -504,18 +504,22 @@ class BaseAnalyst(ABC):
         if dataFrameOfTurbine.empty:
             self.logger.warning(f"{turbineCode} Time Granulary: {timeGranularity} scada data is empty.")
             return pd.DataFrame(), dataBatchNum, turbineCode
-        # Add property to dataFrame
-        self.addPropertyToDataFrame(dataFrameOfTurbine, currTurbineInfo, currTurbineModelInfo)
-        # Add engine_name to dataFrame
-        dataFrameOfTurbine[Field_NameOfTurbine] = currTurbineInfo.loc[Field_NameOfTurbine]
-        # Additional data processing steps
-        self.processDateTime(dataFrameOfTurbine)
-        # Recalculation
-        self.recalculation(currTurbineModelInfo, dataFrameOfTurbine)
-        # Filter data
-        dataFrameOfTurbine = self.filterCommon(dataFrameOfTurbine, self.conf)
-
-        return dataFrameOfTurbine, dataBatchNum, turbineCode
+        else:
+            if  timeGranularity in ['fault', 'warn']:
+                return dataFrameOfTurbine, dataBatchNum, turbineCode
+            else:
+                # Add property to dataFrame
+                self.addPropertyToDataFrame(dataFrameOfTurbine, currTurbineInfo, currTurbineModelInfo)
+                # Add engine_name to dataFrame
+                dataFrameOfTurbine[Field_NameOfTurbine] = currTurbineInfo.loc[Field_NameOfTurbine]
+                # Additional data processing steps
+                self.processDateTime(dataFrameOfTurbine)
+                # Recalculation
+                self.recalculation(currTurbineModelInfo, dataFrameOfTurbine)
+                # Filter data
+                dataFrameOfTurbine = self.filterCommon(dataFrameOfTurbine, self.conf)
+
+                return dataFrameOfTurbine, dataBatchNum, turbineCode
 
     def selectTimeCondition(self,conf: Contract, conditions: list[str]):
         """
@@ -550,6 +554,32 @@ class BaseAnalyst(ABC):
         #lab过滤
         self.selectLabCondition(conditions)
         return " AND ".join(conditions) if conditions else "1=1"
+    
+    def selectFaultTimeCondition(self,conf: Contract, conditions: list[str]):
+        """
+        时间过滤条件组装
+        从json配置中获取时间过滤条件进行组装
+        """
+        # 时间过滤条件
+        if conf.dataContract.dataFilter.beginTime:
+            conditions.append(f"begin_time >= '{conf.dataContract.dataFilter.beginTime}'")
+
+        if conf.dataContract.dataFilter.endTime:
+            conditions.append(f"end_time <= '{conf.dataContract.dataFilter.endTime}'")
+
+        # 排除月份
+        if conf.dataContract.dataFilter.excludingMonths:
+            excluding_months = ", ".join(f"'{month}'" for month in conf.dataContract.dataFilter.excludingMonths)
+            excluding_condition = f"DATE_FORMAT(time_stamp, '%Y-%m') NOT IN ({excluding_months})"
+            conditions.append(excluding_condition)
+    # 故障数据过滤条件
+    def selectAllFaultCondition(self,conf: Contract):
+        conditions = []
+        # 时间过滤条件
+        self.selectFaultTimeCondition(conf,conditions)
+        #lab过滤
+        self.selectLabCondition(conditions)
+        return " AND ".join(conditions) if conditions else "1=1"    
 
     def processTurbineData(self, turbines, conf: Contract, select: str):
         try:
@@ -564,7 +594,8 @@ class BaseAnalyst(ABC):
             for timeGranularity in scadaTimeGranularities:
                 dataFrames = []
                 dataFrameOfTurbines = pd.DataFrame()
-
+                if  timeGranularity in ['fault', 'warn']:
+                    select_conditions=self.selectAllFaultCondition(conf)
                 maxWorkers = 5
                 with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
                     futures = [

+ 20 - 7
dataAnalysisBehavior/behavior/outputProcessor.py

@@ -46,18 +46,23 @@ class OutputProcessor:
         directory = f"output/{powerFarmID}/{dataBatchNum}"
         shutil.rmtree(directory)
 
-    def analysisState(self, session: Session, batchNO: str, analysisState: int, errorState: int = ErrorState.NotErr.value, errorCode: str = None, errorInfo: str = None, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8), analysisProgress: float = 0):
+    def analysisState(self, session: Session, batchNO: str, analysisState: int, errorState: int = ErrorState.NotErr.value, errorCode: str = None, errorInfo: str = None, timestamp: datetime = datetime.now(timezone.utc)+timedelta(hours=8), analysisProgress: float = 0,analysis_finish_time: datetime = None):
         """
         写处理状态 至主表 analysis_result
         写总图(多机组一张图表)上传文件 至子表 analysis_general_file
         写单机组图(单机组一张图表)上传文件 至子表 analysis_diagram_relation
         """
-        sql = text(f"INSERT INTO analysis_result(batch_code, analysis_state, err_state, err_code, err_info,  create_time,analysis_progress) \
-                  VALUES(:batch_code, :analysis_state, :err_state,:err_code, :err_info, :create_time,:analysis_progress) \
+        sql = text(f"INSERT INTO analysis_result(batch_code, analysis_state, err_state, err_code, err_info, create_time, analysis_progress, analysis_finish_time, update_time) \
+                  VALUES(:batch_code, :analysis_state, :err_state, :err_code, :err_info, :create_time, :analysis_progress, :analysis_finish_time, :update_time) \
                   ON DUPLICATE KEY \
                   UPDATE \
-                  analysis_state=VALUES(analysis_state),err_state=VALUES(err_state),err_code=VALUES(err_code), \
-                    err_info=VALUES(err_info),update_time=VALUES(update_time),analysis_progress=VALUES(analysis_progress);")
+                  analysis_state=VALUES(analysis_state), \
+                  err_state=VALUES(err_state), \
+                  err_code=VALUES(err_code), \
+                  err_info=VALUES(err_info), \
+                  update_time=VALUES(update_time), \
+                  analysis_progress=VALUES(analysis_progress), \
+                  analysis_finish_time=VALUES(analysis_finish_time);")
 
         params = {
             "batch_code": None if self.common.isNone(batchNO) else batchNO,
@@ -67,7 +72,8 @@ class OutputProcessor:
             "err_info": None if self.common.isNone(errorInfo) else errorInfo,
             "create_time": timestamp,
             "update_time": timestamp,
-            "analysis_progress": analysisProgress
+            "analysis_progress": analysisProgress,
+            "analysis_finish_time":analysis_finish_time if analysis_finish_time is not None else None
         }
 
         session.execute(sql, params)
@@ -154,12 +160,19 @@ class OutputProcessor:
             foundationDB = GetBusinessFoundationDbUtil()
 
             with foundationDB.session_scope() as session:
+                # 第一次调用,设置进度为50%
                 self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
-                                   ErrorState.NotErr.value, None, None, timestamp, 100)
+                                   ErrorState.NotErr.value, None, None, timestamp, 50)
                 self.analysisResultForTotal(
                     session, returnDataFrame, timestamp)
                 self.analysisResultForTurbine(
                     session, returnDataFrame, timestamp)
+                  # 获取分析完成的时间
+            finish_time = datetime.now(timezone.utc) + timedelta(hours=8)
+            
+            # 第二次调用:更新分析状态为已完成,进度为100%,并记录完成时间
+            self.analysisState(session, self.dataBatchNum, AnalysisState.Analyzed.value,
+                               ErrorState.NotErr.value, None, None, timestamp, 100, analysis_finish_time=finish_time)
 
             self.removeLocalFiles(powerFarmID, dataBatchNum)
         except Exception as e: