import concurrent.futures import os import traceback from logging import Logger import pandas as pd from algorithmContract.confBusiness import * from algorithmContract.configAnalysis import ConfigAnalysis from algorithmContract.const import * from algorithmContract.contract import Contract from behavior.analyst import Analyst from behavior.dalAnalyst import DALAnalyst from behavior.outputProcessor import OutputProcessor from common.appConfig import GetBusinessFoundationDbUtil from common.commonBusiness import CommonBusiness from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient from utils.rdbmsUtil.databaseUtil import DatabaseUtil class DataProcessor: def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract): self.logger = logger self.dbUtil = dbUtil self.minioUtil = minioUtil self.conf = conf self.common = CommonBusiness() self._baseAnalysts = [] self._analysts = [] dal = DALAnalyst(logger, dbUtil) # 加载所有新能源场站信息 self.powerFarmInfo: pd.DataFrame # 加载所有风电机组信息 self.turbineInfo: pd.DataFrame # 加载数据转换信息 self.dataTransfer: pd.DataFrame # 加载机型信息 self.turbineModelInfo: pd.DataFrame # 加载所有测风塔信息 self.weatherStationInfo: pd.DataFrame # 加载所有新能源场站,及所属风电机组机型的合同功率曲线 self.dataFrameContractOfTurbine: pd.DataFrame def attach(self, analyst: Analyst): if analyst not in self._analysts: self._analysts.append(analyst) def detach(self, analyst: Analyst): try: self._analysts.remove(analyst) except ValueError: pass def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst: Analyst) -> pd.DataFrame: timeGranularity = next( (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None) return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity] def singleTurbineDataNotify(self,analyst,conf:Contract,turbineCode:str): outputAnalysisDir = analyst.getOutputAnalysisDir() outputFilePath = r"{}/{}{}".format(outputAnalysisDir, turbineCode, CSVSuffix) return analyst.analysisOfTurbine( outputAnalysisDir, outputFilePath, conf, turbineCode) def notifyOfTurbine(self, conf: Contract, turbineCode: str) -> pd.DataFrame: results =[] maxWorkers = 4 with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor: futures = [ executor.submit(self.singleTurbineDataNotify,analyst, conf,turbineCode) for analyst in self._analysts ] for future in concurrent.futures.as_completed(futures): try: result = future.result() results.append(result) except Exception as exc: raise exc returnResult = pd.concat(results, ignore_index=True) return returnResult def multiTubineDataNotify(self,analyst,conf:Contract,turbineCodes): try: outputAnalysisDir = analyst.getOutputAnalysisDir() dataFram = analyst.analysisOfTurbines(outputAnalysisDir, conf,turbineCodes) self.logger.info(f"analysis type execute result : dataFrame = {dataFram}") return dataFram except Exception as exception: self.logger.info(f"analysis type execute error : {traceback.format_exc()}") raise exception def notifyOfTurbines(self, conf: Contract,turbineCodes) -> pd.DataFrame: results =[] maxWorkers = 4 with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor: futures = [ executor.submit(self.multiTubineDataNotify,analyst, conf,turbineCodes) for analyst in self._analysts ] self.logger.info("Waiting for all futures to complete...") for future in concurrent.futures.as_completed(futures): self.logger.info(f"future is : {future}") try: result = future.result() results.append(result) except Exception as exc: self.logger.info(f"future deal error. future : {future} , error is {traceback.format_exc()}") raise exc returnResult = pd.concat(results, ignore_index=True) return returnResult def loadDataCSV(self, dataBatchNum: str, timeGranularity: str, turbineCode: str, condition: str, csvFileDir: str = f"data/mima_dt/second") -> pd.DataFrame: csvFilePath = os.path.join(csvFileDir, f"{turbineCode}.csv") print("current csv file path:", csvFilePath) dataFrame = pd.read_csv(csvFilePath, header=0) dataFrame = dataFrame.astype({ 'wind_turbine_number': 'string', 'wind_turbine_name': 'string', 'time_stamp': 'datetime64[ns]', 'active_power': 'float32', 'rotor_speed': 'float32', 'generator_speed': 'float32', 'wind_velocity': 'float32', 'pitch_angle_blade_1': 'float32', 'pitch_angle_blade_2': 'float32', 'pitch_angle_blade_3': 'float32', 'cabin_position': 'float32', 'true_wind_direction': 'float32', 'yaw_error1': 'float32', 'set_value_of_active_power': 'float32', 'gearbox_oil_temperature': 'float32', 'generatordrive_end_bearing_temperature': 'float32', 'generatornon_drive_end_bearing_temperature': 'float32', 'cabin_temperature': 'float32', 'twisted_cable_angle': 'float32', 'front_back_vibration_of_the_cabin': 'float32', 'side_to_side_vibration_of_the_cabin': 'float32', 'actual_torque': 'float32', 'given_torque': 'float32', 'clockwise_yaw_count': 'float32', 'counterclockwise_yaw_count': 'float32', 'unusable': 'float32', 'power_curve_available': 'float32', 'required_gearbox_speed': 'float32', 'inverter_speed_master_control': 'float32', 'outside_cabin_temperature': 'float32', 'main_bearing_temperature': 'float32', 'gearbox_high_speed_shaft_bearing_temperature': 'float32', 'gearboxmedium_speed_shaftbearing_temperature': 'float32', 'gearbox_low_speed_shaft_bearing_temperature': 'float32', 'generator_winding1_temperature': 'float32', 'generator_winding2_temperature': 'float32', 'generator_winding3_temperature': 'float32', 'wind_turbine_status': 'float32', 'wind_turbine_status2': 'float32', 'turbulence_intensity': 'float32', 'year': 'int32', 'month': 'int32', 'day': 'int32' }) return dataFrame def getTurbines(self,conf:Contract,turbineInfo:pd.DataFrame): return conf.dataContract.dataFilter.turbines if not self.common.isNone( conf.dataContract.dataFilter.turbines) and len(conf.dataContract.dataFilter.turbines) > 0 else turbineInfo[Field_CodeOfTurbine].unique() def executeAnalysis(self, conf: Contract): # 根据输入参数 conf ,加载台账(电场、机型、机组、测风塔信息)、发电机组scada的分钟级与秒级、测风塔运行、事件等数据 # 若同一电场,具有多种机型的机组,注意按机型分组分析 outputProcessor = OutputProcessor( conf, self.logger, self.dbUtil, self.minioUtil) # Initialize an empty DataFrame to store merged results DataFrameOutput = pd.DataFrame() try: foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with foundationDB.session_scope() as foundationSession: outputProcessor.analysisState( foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=10) # 机组筛选 turbines = self.getTurbines(conf,self.turbineInfo) # turbineSqlInStr = ", ".join(f"'{turbine}'" for turbine in turbines) # 使用%s作为占位符,稍后可以替换为实际值 # dictionary= self.processTurbineData(turbines,conf) foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with foundationDB.session_scope() as foundationSession: outputProcessor.analysisState( foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=25) self.logger.info( f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成机组运行数据加载及再处理,执行请求发电性能分析...") for turbine in turbines: self.logger.info( f"Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch: {conf.dataContract.dataFilter.dataBatchNum} Turbine: {turbine} .") dataFrameOfReturn = self.notifyOfTurbine( conf, turbine) DataFrameOutput = pd.concat( [DataFrameOutput, dataFrameOfReturn], ignore_index=True) # Notify processing for all turbines Outputs = self.notifyOfTurbines( conf,turbines) DataFrameOutput = pd.concat( [Outputs, DataFrameOutput], ignore_index=True) self.logger.info( f"批次:{conf.dataContract.dataFilter.dataBatchNum} 执行完成请求发电性能分析,准备输出及更新状态") if DataFrameOutput.empty: raise CustomError(103) else: outputProcessor.process(conf.dataContract.dataFilter.powerFarmID, conf.dataContract.dataFilter.dataBatchNum, DataFrameOutput) self.logger.info( f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成请求发电性能分析、输出及更新状态") except (CustomError, Exception) as e: self.logger.info(f" executeAnalysis error: {traceback.format_exc()}") try: dbUtil = GetBusinessFoundationDbUtil() ex = e if isinstance(e, CustomError) else CustomError(-1) code = ex.code message = ex.message with dbUtil.session_scope() as session: outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzed.value, ErrorState.Err.value, code, message) except Exception as e1: # 使用 traceback 模块获取完整的异常信息 error_message = ''.join(traceback.format_exception(e1)) # 记录异常信息 self.logger.error(f"捕获到异常: {traceback.format_exc()}") return DataFrameOutput