from logging import Logger import pandas as pd from common.commonBusiness import CommonBusiness from algorithmContract.const import * from algorithmContract.confBusiness import * from utils.rdbmsUtil.databaseUtil import DatabaseUtil from sqlalchemy.orm import Session from sqlalchemy.sql import text class DALAnalyst: def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil]) -> None: self.logger = logger self.dbUtil = dbUtil def loadPowerFarmInfos(self, powerFarmID: str): """ 获取场站基础信息 """ dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with dbUtil.session_scope() as session: # 执行原生 SQL 查询 result = session.execute(text( f"SELECT field_code,company_code,field_name,density,state,engine_number,rated_capacity_number,province_id,province_name,city_id,city_name,longitude,latitude,elevation_height,power_contract_url FROM wind_field where del_state=0 and field_code='{powerFarmID}'")).fetchall() # 获取查询结果的列名 columns = ['field_code', 'company_code', 'field_name', 'density', 'state', 'engine_number', 'rated_capacity_number', 'province_id', 'province_name', 'city_id', 'city_name', 'longitude', 'latitude', 'elevation_height', 'power_contract_url'] # 将查询结果转换为 DataFrame dataFrame = pd.DataFrame(result, columns=columns) return dataFrame def loadTurbineInfos(self, powerFarmID: str): """ 获取风电机组基础信息 """ dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with dbUtil.session_scope() as session: # 执行原生 SQL 查询 result = session.execute(text( f"SELECT field_code,engine_code,engine_name,mill_type_code,rated_capacity,elevation_height,hub_height,state,longitude,latitude,sightcing FROM wind_engine_group where del_state=0 and field_code='{powerFarmID}'")).fetchall() # 获取查询结果的列名 columns = ['field_code', 'engine_code', 'engine_name', Field_MillTypeCode, 'rated_capacity', 'elevation_height', 'hub_height', 'state', 'longitude', 'latitude', 'sightcing'] # 将查询结果转换为 DataFrame dataFrame = pd.DataFrame(result, columns=columns) return dataFrame def loadDataTransfer(self, powerFarmID: str, dataBatchNum: str): """ 获取数据操作信息 """ dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with dbUtil.session_scope() as session: # 执行原生 SQL 查询 result = session.execute(text( f"SELECT field_code, batch_code, engine_count, transfer_type, transfer_addr, time_granularity FROM data_transfer where field_code='{powerFarmID}' and batch_code='{dataBatchNum}' ")).fetchall() # 获取查询结果的列名 columns = ['field_code', 'batch_code', 'engine_count', 'transfer_type', 'transfer_addr', 'time_granularity'] # 将查询结果转换为 DataFrame dataFrame = pd.DataFrame(result, columns=columns) return dataFrame def loadTurbineModelInfos(self, turbineModels: list): """ 获取型号基础信息 """ if len(turbineModels) <= 0: return pd.DataFrame() turbineModelStr = ", ".join( f"'{model}'" for model in turbineModels) # 使用%s作为占位符,稍后可以替换为实际值 dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with dbUtil.session_scope() as session: # 执行原生 SQL 查询 result = session.execute(text(f"SELECT mill_type_code,machine_type_code,manufacturer_name,manufacturer_code,brand,tower_height,vane_long,curved_motion_type,combination,power_criterion_url,rotor_diameter,rotational_speed_ratio,rated_wind_speed,rated_cut_in_windspeed,rated_cut_out_windspeed FROM wind_engine_mill where del_state=0 and state=1 and mill_type_code in ({turbineModelStr})" )).fetchall() # 获取查询结果的列名 columns = [Field_MillTypeCode, Field_MachineTypeCode, Field_ManufacturerName, Field_ManufacturerCode, Field_Brand, Field_HubHeight, Field_VaneLong, Field_MotionType, Field_Combination, Field_PowerCriterionURL, Field_RotorDiameter, Field_RSR, Field_RatedWindSpeed, Field_CutInWS, Field_CutOutWS] # 将查询结果转换为 DataFrame dataFrame = pd.DataFrame(result, columns=columns) return dataFrame def loadWeatherStationInfos(self, powerFarmID: str): """ 获取气象站(测风塔)基础信息 """ dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with dbUtil.session_scope() as session: # 执行原生 SQL 查询 result = session.execute(text( f"select y.field_code ,x.anemometer_code ,x.anemometer_name ,x.longitude ,x.latitude from anemometer_tower as x inner join anemometer_tower_relation as y on x.anemometer_code =y.tower_code where x.del_state=0 and x.state=1 and y.field_code='{powerFarmID}'")).fetchall() # 获取查询结果的列名 columns = ['field_code', 'anemometer_code', 'anemometer_name', 'longitude', 'latitude'] # 将查询结果转换为 DataFrame dataFrame = pd.DataFrame(result, columns=columns) return dataFrame def processContractData(self, common: CommonBusiness, powerFarmID: str, airDensity: float, turbineModelInfo: pd.DataFrame): """ 获取合同功率曲线数据 """ dataFrameMerge = pd.DataFrame() turbineModels = turbineModelInfo[Field_MillTypeCode] turbineModelStr = ", ".join(f"'{model}'" for model in turbineModels) dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb] with dbUtil.session_scope() as session: # 执行原生 SQL 查询 result = session.execute(text( f"SELECT field_code, mill_type_code,active_power,wind_speed as wind_velocity FROM power_word_relation_contract where field_code='{powerFarmID}' and mill_type_code in ({turbineModelStr})")).fetchall() # 获取查询结果的列名 columns = [Field_PowerFarmCode, Field_MillTypeCode, Field_ActiverPower, Field_WindSpeed] # 将查询结果转换为 DataFrame contractPowerCurves = pd.DataFrame(result, columns=columns) grouped = contractPowerCurves.groupby(Field_MillTypeCode) for name, group in grouped: print("current turbine model :", name) model = turbineModelInfo[turbineModelInfo[Field_MillTypeCode] == name] if len(model) <= 0: continue dataFrame = common.calculateCp2( group, airDensity, model[Field_RotorDiameter].iloc[0], Field_WindSpeed, Field_ActiverPower) dataFrameMerge = pd.concat( [dataFrameMerge, dataFrame], axis=0, sort=False) return dataFrameMerge