dalAnalyst.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from logging import Logger
  2. import pandas as pd
  3. from common.commonBusiness import CommonBusiness
  4. from algorithmContract.const import *
  5. from algorithmContract.confBusiness import *
  6. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy.sql import text
  9. class DALAnalyst:
  10. def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil]) -> None:
  11. self.logger = logger
  12. self.dbUtil = dbUtil
  13. def loadPowerFarmInfos(self, powerFarmID: str):
  14. """
  15. 获取场站基础信息
  16. """
  17. dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  18. with dbUtil.session_scope() as session:
  19. # 执行原生 SQL 查询
  20. result = session.execute(text(
  21. f"SELECT field_code,company_code,field_name,density,state,engine_number,rated_capacity_number,province_id,province_name,city_id,city_name,longitude,latitude,elevation_height,power_contract_url FROM wind_field where del_state=0 and field_code='{powerFarmID}'")).fetchall()
  22. # 获取查询结果的列名
  23. columns = ['field_code', 'company_code', 'field_name', 'density', 'state', 'engine_number', 'rated_capacity_number',
  24. 'province_id', 'province_name', 'city_id', 'city_name', 'longitude', 'latitude', 'elevation_height', 'power_contract_url']
  25. # 将查询结果转换为 DataFrame
  26. dataFrame = pd.DataFrame(result, columns=columns)
  27. return dataFrame
  28. def loadTurbineInfos(self, powerFarmID: str):
  29. """
  30. 获取风电机组基础信息
  31. """
  32. dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  33. with dbUtil.session_scope() as session:
  34. # 执行原生 SQL 查询
  35. result = session.execute(text(
  36. f"SELECT field_code,engine_code,engine_name,mill_type_code,rated_capacity,elevation_height,hub_height,state,longitude,latitude,sightcing FROM wind_engine_group where del_state=0 and field_code='{powerFarmID}'")).fetchall()
  37. # 获取查询结果的列名
  38. columns = ['field_code', 'engine_code', 'engine_name', Field_MillTypeCode, 'rated_capacity', 'elevation_height', 'hub_height',
  39. 'state', 'longitude', 'latitude', 'sightcing']
  40. # 将查询结果转换为 DataFrame
  41. dataFrame = pd.DataFrame(result, columns=columns)
  42. return dataFrame
  43. def loadDataTransfer(self, powerFarmID: str, dataBatchNum: str):
  44. """
  45. 获取数据操作信息
  46. """
  47. dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  48. with dbUtil.session_scope() as session:
  49. # 执行原生 SQL 查询
  50. result = session.execute(text(
  51. f"SELECT field_code, batch_code, engine_count, transfer_type, transfer_addr, time_granularity FROM data_transfer where field_code='{powerFarmID}' and batch_code='{dataBatchNum}' ")).fetchall()
  52. # 获取查询结果的列名
  53. columns = ['field_code', 'batch_code', 'engine_count',
  54. 'transfer_type', 'transfer_addr', 'time_granularity']
  55. # 将查询结果转换为 DataFrame
  56. dataFrame = pd.DataFrame(result, columns=columns)
  57. return dataFrame
  58. def loadTurbineModelInfos(self, turbineModels: list):
  59. """
  60. 获取型号基础信息
  61. """
  62. if len(turbineModels) <= 0:
  63. return pd.DataFrame()
  64. turbineModelStr = ", ".join(
  65. f"'{model}'" for model in turbineModels) # 使用%s作为占位符,稍后可以替换为实际值
  66. dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  67. with dbUtil.session_scope() as session:
  68. # 执行原生 SQL 查询
  69. result = session.execute(text(f"SELECT mill_type_code,machine_type_code,manufacturer_name,manufacturer_code,brand,tower_height,vane_long,curved_motion_type,combination,power_criterion_url,rotor_diameter,rotational_speed_ratio,rated_wind_speed,rated_cut_in_windspeed,rated_cut_out_windspeed FROM wind_engine_mill where del_state=0 and state=1 and mill_type_code in ({turbineModelStr})"
  70. )).fetchall()
  71. # 获取查询结果的列名
  72. columns = [Field_MillTypeCode, Field_MachineTypeCode, Field_ManufacturerName, Field_ManufacturerCode, Field_Brand, Field_HubHeight, Field_VaneLong, Field_MotionType,
  73. Field_Combination, Field_PowerCriterionURL, Field_RotorDiameter, Field_RSR, Field_RatedWindSpeed, Field_CutInWS, Field_CutOutWS]
  74. # 将查询结果转换为 DataFrame
  75. dataFrame = pd.DataFrame(result, columns=columns)
  76. return dataFrame
  77. def loadWeatherStationInfos(self, powerFarmID: str):
  78. """
  79. 获取气象站(测风塔)基础信息
  80. """
  81. dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  82. with dbUtil.session_scope() as session:
  83. # 执行原生 SQL 查询
  84. result = session.execute(text(
  85. f"select y.field_code ,x.anemometer_code ,x.anemometer_name ,x.longitude ,x.latitude from anemometer_tower as x inner join anemometer_tower_relation as y on x.anemometer_code =y.tower_code where x.del_state=0 and x.state=1 and y.field_code='{powerFarmID}'")).fetchall()
  86. # 获取查询结果的列名
  87. columns = ['field_code', 'anemometer_code',
  88. 'anemometer_name', 'longitude', 'latitude']
  89. # 将查询结果转换为 DataFrame
  90. dataFrame = pd.DataFrame(result, columns=columns)
  91. return dataFrame
  92. def processContractData(self, common: CommonBusiness, powerFarmID: str, airDensity: float, turbineModelInfo: pd.DataFrame):
  93. """
  94. 获取合同功率曲线数据
  95. """
  96. dataFrameMerge = pd.DataFrame()
  97. turbineModels = turbineModelInfo[Field_MillTypeCode]
  98. turbineModelStr = ", ".join(f"'{model}'" for model in turbineModels)
  99. dbUtil: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  100. with dbUtil.session_scope() as session:
  101. # 执行原生 SQL 查询
  102. result = session.execute(text(
  103. f"SELECT field_code, mill_type_code,active_power,wind_speed as wind_velocity FROM power_word_relation_contract where field_code='{powerFarmID}' and mill_type_code in ({turbineModelStr})")).fetchall()
  104. # 获取查询结果的列名
  105. columns = [Field_PowerFarmCode, Field_MillTypeCode,
  106. Field_ActiverPower, Field_WindSpeed]
  107. # 将查询结果转换为 DataFrame
  108. contractPowerCurves = pd.DataFrame(result, columns=columns)
  109. grouped = contractPowerCurves.groupby(Field_MillTypeCode)
  110. for name, group in grouped:
  111. print("current turbine model :", name)
  112. model = turbineModelInfo[turbineModelInfo[Field_MillTypeCode] == name]
  113. if len(model) <= 0:
  114. continue
  115. dataFrame = common.calculateCp2(
  116. group, airDensity, model[Field_RotorDiameter].iloc[0], Field_WindSpeed, Field_ActiverPower)
  117. dataFrameMerge = pd.concat(
  118. [dataFrameMerge, dataFrame], axis=0, sort=False)
  119. return dataFrameMerge