import pandas as pd import os import plotly.graph_objects as go from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from behavior.analystNotFilter import AnalystNotFilter from plotly.subplots import make_subplots class FaultAnalyst(AnalystNotFilter): """ 风电机组故障分析 """ def typeAnalyst(self): return "fault" def selectColumns(self): # 这里的字段必须与数据库 _fault 表中的列名对应 return [Field_DeviceName, Field_FaultTime, Field_FaultDetail] # 强制重写获取数据源类型的方法,防止因配置错误去查 _minute 表,强制代码去查 _fault 表 def getTimeGranularitys(self, conf: Contract): return ["fault"] def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, self.selectColumns()) dataFrameMerge = self.userDataFrame(dictionary, conf.dataContract.configAnalysis, self) # 增加空数据保护 if dataFrameMerge.empty: print("Warning: No fault data found for the selected turbines.") return pd.DataFrame() return self.get_result(dataFrameMerge, outputAnalysisDir, conf) def get_result(self, dataFrame: pd.DataFrame, outputAnalysisDir: str, conf: Contract): # 双重保险:如果数据为空直接返回 if dataFrame.empty: return pd.DataFrame() #---------------整个风场维度统计故障时长与次数--------------------------- # 统计各种类型故障出现的次数 if Field_FaultDetail in dataFrame.columns: fault_detail_count = dataFrame[Field_FaultDetail].value_counts().reset_index() fault_detail_count.columns = [Field_FaultDetail, 'count'] # 统计每个 fault_detail 的时长加和 fault_time_sum = dataFrame.groupby(Field_FaultDetail)[Field_FaultTime].sum().reset_index() fault_time_sum.columns = [Field_FaultDetail, 'fault_time_sum'] # 合并两个 DataFrame fault_summary = pd.merge(fault_detail_count, fault_time_sum, on=Field_FaultDetail, how='inner') fault_summary_sorted = fault_summary.sort_values(by='fault_time_sum', ascending=False) else: # 防御性代码:如果缺列 fault_summary_sorted = pd.DataFrame() # -------------按风机分组统计故障情况------------------------------------------ # 确保有设备名称列 if Field_DeviceName not in dataFrame.columns: # 有时 Field_DeviceName 没取到,尝试用 DeviceCode 或其他 groupby_col = dataFrame.columns[0] else: groupby_col = Field_DeviceName grouped = dataFrame.groupby(groupby_col) results= [] for name, group in grouped: turbine_fault_summary = pd.DataFrame({ Field_DeviceName: [name], 'count': [len(group)], 'fault_time': [group[Field_FaultTime].sum()] }) results.append(turbine_fault_summary) # 合并所有风机的故障统计结果 if results: turbine_fault_summary = pd.concat(results, ignore_index=True) turbine_fault_sorted = turbine_fault_summary.sort_values(by='fault_time', ascending=False) # 故障类型前十名 # draw_results=turbine_fault_sorted.head(10) # 暂时没用到 else: turbine_fault_sorted = pd.DataFrame() # 保存结果 result_rows = [] if not turbine_fault_sorted.empty: filePathOfturbinefault = os.path.join(outputAnalysisDir, f"turbine_fault_result{CSVSuffix}") turbine_fault_sorted.to_csv(filePathOfturbinefault, index=False,encoding='utf-8-sig') result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: "total", Field_MillTypeCode:"turbine_fault_result", Field_Return_FilePath: filePathOfturbinefault, Field_Return_IsSaveDatabase: True }) if not fault_summary_sorted.empty: filePathOftotalfault = os.path.join(outputAnalysisDir, f"total_fault_result{CSVSuffix}") fault_summary_sorted.to_csv(filePathOftotalfault, index=False,encoding='utf-8-sig') result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: "total", Field_MillTypeCode:"total_fault_result", Field_Return_FilePath: filePathOftotalfault, Field_Return_IsSaveDatabase: True }) result_df = pd.DataFrame(result_rows) return result_df