import os import traceback from datetime import datetime, timezone, timedelta from logging import Logger import concurrent.futures import numpy as np import pandas as pd from sqlalchemy.orm import Session from sqlalchemy import create_engine from sqlalchemy.sql import text from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from algorithmContract.configAnalysis import ConfigAnalysis from algorithmContract.const import * from behavior.analyst import Analyst from common.commonBusiness import CommonBusiness from common.appConfig import GetDbUtil, GetBusinessDbUtil, GetBusinessFoundationDbUtil from utils.rdbmsUtil.databaseUtil import DatabaseUtil from behavior.dalAnalyst import DALAnalyst from behavior.outputProcessor import OutputProcessor class DataProcessor: def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract): self.logger = logger self.dbUtil = dbUtil self.minioUtil = minioUtil self.conf = conf self.common = CommonBusiness() self._baseAnalysts = [] self._analysts = [] dal = DALAnalyst(logger, dbUtil) # 加载所有新能源场站信息 self.powerFarmInfo: pd.DataFrame # 加载所有风电机组信息 self.turbineInfo: pd.DataFrame # 加载数据转换信息 self.dataTransfer: pd.DataFrame # 加载机型信息 self.turbineModelInfo: pd.DataFrame # 加载所有测风塔信息 self.weatherStationInfo: pd.DataFrame # 加载所有新能源场站,及所属风电机组机型的合同功率曲线 self.dataFrameContractOfTurbine: pd.DataFrame def attach(self, analyst: Analyst): if analyst not in self._analysts: self._analysts.append(analyst) def detach(self, analyst: Analyst): try: self._analysts.remove(analyst) except ValueError: pass def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst: Analyst) -> pd.DataFrame: timeGranularity = next( (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None) return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity] def singleTurbineDataNotify(self,analyst,conf:Contract,turbineCode:str): outputAnalysisDir = analyst.getOutputAnalysisDir() outputFilePath = r"{}/{}{}".format(outputAnalysisDir, turbineCode, CSVSuffix) return analyst.analysisOfTurbine( outputAnalysisDir, outputFilePath, conf, turbineCode) def notifyOfTurbine(self, conf: Contract, turbineCode: str) -> pd.DataFrame: results =[] maxWorkers = 4 with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor: futures = [ executor.submit(self.singleTurbineDataNotify,analyst, conf,turbineCode) for analyst in self._analysts ] for future in concurrent.futures.as_completed(futures): try: result = future.result() results.append(result) except Exception as exc: raise exc returnResult = pd.concat(results, ignore_index=True) return returnResult def multiTubineDataNotify(self,analyst,conf:Contract,turbineCodes): outputAnalysisDir = analyst.getOutputAnalysisDir() return analyst.analysisOfTurbines( outputAnalysisDir, conf,turbineCodes) def notifyOfTurbines(self, conf: Contract,turbineCodes) -> pd.DataFrame: results =[] maxWorkers = 4 with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor: futures = [ executor.submit(self.multiTubineDataNotify,analyst, conf,turbineCodes) for analyst in self._analysts ] for future in concurrent.futures.as_completed(futures): try: result = future.result() results.append(result) except Exception as exc: raise exc returnResult = pd.concat(results, ignore_index=True) return returnResult def loadDataCSV(self, dataBatchNum: str, timeGranularity: str, turbineCode: str, condition: str, csvFileDir: str = f"data/mima_dt/second") -> pd.DataFrame: csvFilePath = os.path.join(csvFileDir, f"{turbineCode}.csv") print("current csv file path:", csvFilePath) dataFrame = pd.read_csv(csvFilePath, header=0) dataFrame = dataFrame.astype({ 'wind_turbine_number': 'string', 'wind_turbine_name': 'string', 'time_stamp': 'datetime64[ns]', 'active_power': 'float32', 'rotor_speed': 'float32', 'generator_speed': 'float32', 'wind_velocity': 'float32', 'pitch_angle_blade_1': 'float32', 'pitch_angle_blade_2': 'float32', 'pitch_angle_blade_3': 'float32', 'cabin_position': 'float32', 'true_wind_direction': 'float32', 'yaw_error1': 'float32', 'set_value_of_active_power': 'float32', 'gearbox_oil_temperature': 'float32', 'generatordrive_end_bearing_temperature': 'float32', 'generatornon_drive_end_bearing_temperature': 'float32', 'cabin_temperature': 'float32', 'twisted_cable_angle': 'float32', 'front_back_vibration_of_the_cabin': 'float32', 'side_to_side_vibration_of_the_cabin': 'float32', 'actual_torque': 'float32', 'given_torque': 'float32', 'clockwise_yaw_count': 'float32', 'counterclockwise_yaw_count': 'float32', 'unusable': 'float32', 'power_curve_available': 'float32', 'required_gearbox_speed': 'float32', 'inverter_speed_master_control': 'float32', 'outside_cabin_temperature': 'float32', 'main_bearing_temperature': 'float32', 'gearbox_high_speed_shaft_bearing_temperature': 'float32', 'gearboxmedium_speed_shaftbearing_temperature': 'float32', 'gearbox_low_speed_shaft_bearing_temperature': 'float32', 'generator_winding1_temperature': 'float32', 'generator_winding2_temperature': 'float32', 'generator_winding3_temperature': 'float32', 'wind_turbine_status': 'float32', 'wind_turbine_status2': 'float32', 'turbulence_intensity': 'float32', 'year': 'int32', 'month': 'int32', 'day': 'int32' }) return dataFrame def getTurbines(self,conf:Contract,turbineInfo:pd.DataFrame): return conf.dataContract.dataFilter.turbines if not self.common.isNone( conf.dataContract.dataFilter.turbines) and len(conf.dataContract.dataFilter.turbines) > 0 else turbineInfo[Field_CodeOfTurbine].unique() def executeAnalysis(self, conf: Contract): # 根据输入参数 conf ,加载台账(电场、机型、机组、测风塔信息)、发电机组scada的分钟级与秒级、测风塔运行、事件等数据 # 若同一电场,具有多种机型的机组,注意按机型分组分析 outputProcessor = OutputProcessor( conf, self.logger, self.dbUtil, self.minioUtil) # Initialize an empty DataFrame to store merged results DataFrameOutput = pd.DataFrame() try: foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with foundationDB.session_scope() as foundationSession: outputProcessor.analysisState( foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=10) # 机组筛选 turbines = self.getTurbines(conf,self.turbineInfo) # turbineSqlInStr = ", ".join(f"'{turbine}'" for turbine in turbines) # 使用%s作为占位符,稍后可以替换为实际值 # dictionary= self.processTurbineData(turbines,conf) foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with foundationDB.session_scope() as foundationSession: outputProcessor.analysisState( foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=25) self.logger.info( f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成机组运行数据加载及再处理,执行请求发电性能分析...") for turbine in turbines: self.logger.info( f"Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch: {conf.dataContract.dataFilter.dataBatchNum} Turbine: {turbine} .") dataFrameOfReturn = self.notifyOfTurbine( conf, turbine) DataFrameOutput = pd.concat( [DataFrameOutput, dataFrameOfReturn], ignore_index=True) # Notify processing for all turbines Outputs = self.notifyOfTurbines( conf,turbines) DataFrameOutput = pd.concat( [Outputs, DataFrameOutput], ignore_index=True) self.logger.info( f"批次:{conf.dataContract.dataFilter.dataBatchNum} 执行完成请求发电性能分析,准备输出及更新状态") if DataFrameOutput.empty: raise CustomError(103) else: outputProcessor.process(conf.dataContract.dataFilter.powerFarmID, conf.dataContract.dataFilter.dataBatchNum, DataFrameOutput) self.logger.info( f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成请求发电性能分析、输出及更新状态") except (CustomError, Exception) as e: try: dbUtil = GetBusinessFoundationDbUtil() ex = e if isinstance(e, CustomError) else CustomError(-1) code = ex.code message = ex.message with dbUtil.session_scope() as session: outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzed.value, ErrorState.Err.value, code, message) except Exception as e1: # 使用 traceback 模块获取完整的异常信息 error_message = ''.join(traceback.format_exception( etype=type(e1), value=e1, tb=e1.__traceback__)) # 记录异常信息 self.logger.error(f"捕获到异常: {error_message}") finally: # 使用 traceback 模块获取完整的异常信息 error_message = ''.join(traceback.format_exception( etype=type(e), value=e, tb=e.__traceback__)) # 记录异常信息 self.logger.error(f"捕获到异常: {error_message}") return DataFrameOutput