Xmia преди 5 дни
родител
ревизия
5772586bee
променени са 1 файла, в които са добавени 75 реда и са изтрити 42 реда
  1. 75 42
      dataAnalysisBusiness/algorithm/faultAnalyst.py

+ 75 - 42
dataAnalysisBusiness/algorithm/faultAnalyst.py

@@ -13,31 +13,57 @@ class FaultAnalyst(AnalystNotFilter):
 
     def typeAnalyst(self):
         return "fault"
+    
     def selectColumns(self):
-        return [Field_DeviceName,Field_FaultTime,Field_FaultDetail]
+        # 这里的字段必须与数据库 _fault 表中的列名对应
+        return [Field_DeviceName, Field_FaultTime, Field_FaultDetail]
 
+    # 强制重写获取数据源类型的方法,防止因配置错误去查 _minute 表,强制代码去查 _fault 表
+    def getTimeGranularitys(self, conf: Contract):
+        return ["fault"] 
 
-    def turbinesAnalysis(self,outputAnalysisDir, conf: Contract, turbineCodes):
-        dictionary = self.processTurbineData(turbineCodes,conf,self.selectColumns())
-        dataFrameMerge = self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self)
+    def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
+        dictionary = self.processTurbineData(turbineCodes, conf, self.selectColumns())
+        dataFrameMerge = self.userDataFrame(dictionary, conf.dataContract.configAnalysis, self)
+        
+        # 增加空数据保护
+        if dataFrameMerge.empty:
+            print("Warning: No fault data found for the selected turbines.")
+            return pd.DataFrame()
+            
         return self.get_result(dataFrameMerge, outputAnalysisDir, conf)
 
     def get_result(self, dataFrame: pd.DataFrame, outputAnalysisDir: str, conf: Contract):
+        # 双重保险:如果数据为空直接返回
+        if dataFrame.empty:
+            return pd.DataFrame()
+
         #---------------整个风场维度统计故障时长与次数---------------------------
         # 统计各种类型故障出现的次数
-        fault_detail_count = dataFrame[Field_FaultDetail].value_counts().reset_index()
-        fault_detail_count.columns = [Field_FaultDetail, 'count']
+        if Field_FaultDetail in dataFrame.columns:
+            fault_detail_count = dataFrame[Field_FaultDetail].value_counts().reset_index()
+            fault_detail_count.columns = [Field_FaultDetail, 'count']
 
-        # 统计每个 fault_detail 的时长加和
-        fault_time_sum = dataFrame.groupby(Field_FaultDetail)[Field_FaultTime].sum().reset_index()
-        fault_time_sum.columns = [Field_FaultDetail, 'fault_time_sum']
+            # 统计每个 fault_detail 的时长加和
+            fault_time_sum = dataFrame.groupby(Field_FaultDetail)[Field_FaultTime].sum().reset_index()
+            fault_time_sum.columns = [Field_FaultDetail, 'fault_time_sum']
 
-        # 合并两个 DataFrame
-        fault_summary = pd.merge(fault_detail_count, fault_time_sum, on=Field_FaultDetail, how='inner')
-        fault_summary_sorted = fault_summary.sort_values(by='fault_time_sum', ascending=False)
+            # 合并两个 DataFrame
+            fault_summary = pd.merge(fault_detail_count, fault_time_sum, on=Field_FaultDetail, how='inner')
+            fault_summary_sorted = fault_summary.sort_values(by='fault_time_sum', ascending=False)
+        else:
+            # 防御性代码:如果缺列
+            fault_summary_sorted = pd.DataFrame()
 
         # -------------按风机分组统计故障情况------------------------------------------
-        grouped = dataFrame.groupby(Field_DeviceName)
+        # 确保有设备名称列
+        if Field_DeviceName not in dataFrame.columns:
+             # 有时 Field_DeviceName 没取到,尝试用 DeviceCode 或其他
+             groupby_col = dataFrame.columns[0] 
+        else:
+             groupby_col = Field_DeviceName
+
+        grouped = dataFrame.groupby(groupby_col)
         results= []
 
         for name, group in grouped:
@@ -49,37 +75,44 @@ class FaultAnalyst(AnalystNotFilter):
             results.append(turbine_fault_summary)
 
         # 合并所有风机的故障统计结果
-        turbine_fault_summary = pd.concat(results, ignore_index=True)
-        turbine_fault_sorted = turbine_fault_summary.sort_values(by='fault_time', ascending=False)
-        # 故障类型前十名
-        draw_results=turbine_fault_sorted.head(10)
+        if results:
+            turbine_fault_summary = pd.concat(results, ignore_index=True)
+            turbine_fault_sorted = turbine_fault_summary.sort_values(by='fault_time', ascending=False)
+            # 故障类型前十名
+            # draw_results=turbine_fault_sorted.head(10) # 暂时没用到
+        else:
+            turbine_fault_sorted = pd.DataFrame()
+
         # 保存结果
         result_rows = []
 
-        filePathOfturbinefault = os.path.join(outputAnalysisDir, f"turbine_fault_result{CSVSuffix}")
-        turbine_fault_sorted.to_csv(filePathOfturbinefault, index=False,encoding='utf-8-sig')
-
-        result_rows.append({
-            Field_Return_TypeAnalyst: self.typeAnalyst(),
-            Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
-            Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
-            Field_CodeOfTurbine: "total",
-            Field_MillTypeCode:"turbine_fault_result",
-            Field_Return_FilePath: filePathOfturbinefault,
-            Field_Return_IsSaveDatabase: True
-        })
-
-        filePathOftotalfault = os.path.join(outputAnalysisDir, f"total_fault_result{CSVSuffix}")
-        fault_summary_sorted.to_csv(filePathOftotalfault, index=False,encoding='utf-8-sig')
-
-        result_rows.append({
-            Field_Return_TypeAnalyst: self.typeAnalyst(),
-            Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
-            Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
-            Field_CodeOfTurbine: "total",
-            Field_MillTypeCode:"total_fault_result",
-            Field_Return_FilePath: filePathOftotalfault,
-            Field_Return_IsSaveDatabase: True
-        })
+        if not turbine_fault_sorted.empty:
+            filePathOfturbinefault = os.path.join(outputAnalysisDir, f"turbine_fault_result{CSVSuffix}")
+            turbine_fault_sorted.to_csv(filePathOfturbinefault, index=False,encoding='utf-8-sig')
+
+            result_rows.append({
+                Field_Return_TypeAnalyst: self.typeAnalyst(),
+                Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
+                Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
+                Field_CodeOfTurbine: "total",
+                Field_MillTypeCode:"turbine_fault_result",
+                Field_Return_FilePath: filePathOfturbinefault,
+                Field_Return_IsSaveDatabase: True
+            })
+
+        if not fault_summary_sorted.empty:
+            filePathOftotalfault = os.path.join(outputAnalysisDir, f"total_fault_result{CSVSuffix}")
+            fault_summary_sorted.to_csv(filePathOftotalfault, index=False,encoding='utf-8-sig')
+
+            result_rows.append({
+                Field_Return_TypeAnalyst: self.typeAnalyst(),
+                Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
+                Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
+                Field_CodeOfTurbine: "total",
+                Field_MillTypeCode:"total_fault_result",
+                Field_Return_FilePath: filePathOftotalfault,
+                Field_Return_IsSaveDatabase: True
+            })
+            
         result_df = pd.DataFrame(result_rows)
         return result_df