dataProcessor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import os
  2. import traceback
  3. from datetime import datetime, timezone, timedelta
  4. from logging import Logger
  5. import concurrent.futures
  6. import numpy as np
  7. import pandas as pd
  8. from sqlalchemy.orm import Session
  9. from sqlalchemy import create_engine
  10. from sqlalchemy.sql import text
  11. from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
  12. from utils.directoryUtil import DirectoryUtil as dir
  13. from algorithmContract.confBusiness import *
  14. from algorithmContract.contract import Contract
  15. from algorithmContract.configAnalysis import ConfigAnalysis
  16. from algorithmContract.const import *
  17. from behavior.analyst import Analyst
  18. from common.commonBusiness import CommonBusiness
  19. from common.appConfig import GetDbUtil, GetBusinessDbUtil, GetBusinessFoundationDbUtil
  20. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  21. from behavior.dalAnalyst import DALAnalyst
  22. from behavior.outputProcessor import OutputProcessor
  23. class DataProcessor:
  24. def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract):
  25. self.logger = logger
  26. self.dbUtil = dbUtil
  27. self.minioUtil = minioUtil
  28. self.conf = conf
  29. self.common = CommonBusiness()
  30. self._baseAnalysts = []
  31. self._analysts = []
  32. dal = DALAnalyst(logger, dbUtil)
  33. # 加载所有新能源场站信息
  34. self.powerFarmInfo: pd.DataFrame
  35. # 加载所有风电机组信息
  36. self.turbineInfo: pd.DataFrame
  37. # 加载数据转换信息
  38. self.dataTransfer: pd.DataFrame
  39. # 加载机型信息
  40. self.turbineModelInfo: pd.DataFrame
  41. # 加载所有测风塔信息
  42. self.weatherStationInfo: pd.DataFrame
  43. # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
  44. self.dataFrameContractOfTurbine: pd.DataFrame
  45. def attach(self, analyst: Analyst):
  46. if analyst not in self._analysts:
  47. self._analysts.append(analyst)
  48. def detach(self, analyst: Analyst):
  49. try:
  50. self._analysts.remove(analyst)
  51. except ValueError:
  52. pass
  53. def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst: Analyst) -> pd.DataFrame:
  54. timeGranularity = next(
  55. (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None)
  56. return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity]
  57. def singleTurbineDataNotify(self,analyst,conf:Contract,turbineCode:str):
  58. outputAnalysisDir = analyst.getOutputAnalysisDir()
  59. outputFilePath = r"{}/{}{}".format(outputAnalysisDir,
  60. turbineCode, CSVSuffix)
  61. return analyst.analysisOfTurbine(
  62. outputAnalysisDir, outputFilePath, conf, turbineCode)
  63. def notifyOfTurbine(self, conf: Contract, turbineCode: str) -> pd.DataFrame:
  64. results =[]
  65. maxWorkers = 4
  66. with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
  67. futures = [
  68. executor.submit(self.singleTurbineDataNotify,analyst, conf,turbineCode)
  69. for analyst in self._analysts
  70. ]
  71. for future in concurrent.futures.as_completed(futures):
  72. try:
  73. result = future.result()
  74. results.append(result)
  75. except Exception as exc:
  76. raise exc
  77. returnResult = pd.concat(results, ignore_index=True)
  78. return returnResult
  79. def multiTubineDataNotify(self,analyst,conf:Contract,turbineCodes):
  80. try:
  81. outputAnalysisDir = analyst.getOutputAnalysisDir()
  82. dataFram = analyst.analysisOfTurbines(outputAnalysisDir, conf,turbineCodes)
  83. self.logger.info(f"analysis type execute result : dataFrame = {dataFram}")
  84. return dataFram
  85. except Exception as exception:
  86. self.logger.info(f"analysis type execute error : {exception.message}")
  87. traceback.format_exc()
  88. raise exception
  89. def notifyOfTurbines(self, conf: Contract,turbineCodes) -> pd.DataFrame:
  90. results =[]
  91. maxWorkers = 4
  92. with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
  93. futures = [
  94. executor.submit(self.multiTubineDataNotify,analyst, conf,turbineCodes)
  95. for analyst in self._analysts
  96. ]
  97. self.logger.info("Waiting for all futures to complete...")
  98. for future in concurrent.futures.as_completed(futures):
  99. self.logger.info(f"future is : {future}")
  100. try:
  101. result = future.result()
  102. results.append(result)
  103. except Exception as exc:
  104. self.logger.info(f"future deal error. future : {future} , error is {exc.message}")
  105. traceback.format_exc()
  106. raise exc
  107. returnResult = pd.concat(results, ignore_index=True)
  108. return returnResult
  109. def loadDataCSV(self, dataBatchNum: str, timeGranularity: str, turbineCode: str, condition: str, csvFileDir: str = f"data/mima_dt/second") -> pd.DataFrame:
  110. csvFilePath = os.path.join(csvFileDir, f"{turbineCode}.csv")
  111. print("current csv file path:", csvFilePath)
  112. dataFrame = pd.read_csv(csvFilePath, header=0)
  113. dataFrame = dataFrame.astype({
  114. 'wind_turbine_number': 'string',
  115. 'wind_turbine_name': 'string',
  116. 'time_stamp': 'datetime64[ns]',
  117. 'active_power': 'float32',
  118. 'rotor_speed': 'float32',
  119. 'generator_speed': 'float32',
  120. 'wind_velocity': 'float32',
  121. 'pitch_angle_blade_1': 'float32',
  122. 'pitch_angle_blade_2': 'float32',
  123. 'pitch_angle_blade_3': 'float32',
  124. 'cabin_position': 'float32',
  125. 'true_wind_direction': 'float32',
  126. 'yaw_error1': 'float32',
  127. 'set_value_of_active_power': 'float32',
  128. 'gearbox_oil_temperature': 'float32',
  129. 'generatordrive_end_bearing_temperature': 'float32',
  130. 'generatornon_drive_end_bearing_temperature': 'float32',
  131. 'cabin_temperature': 'float32',
  132. 'twisted_cable_angle': 'float32',
  133. 'front_back_vibration_of_the_cabin': 'float32',
  134. 'side_to_side_vibration_of_the_cabin': 'float32',
  135. 'actual_torque': 'float32',
  136. 'given_torque': 'float32',
  137. 'clockwise_yaw_count': 'float32',
  138. 'counterclockwise_yaw_count': 'float32',
  139. 'unusable': 'float32',
  140. 'power_curve_available': 'float32',
  141. 'required_gearbox_speed': 'float32',
  142. 'inverter_speed_master_control': 'float32',
  143. 'outside_cabin_temperature': 'float32',
  144. 'main_bearing_temperature': 'float32',
  145. 'gearbox_high_speed_shaft_bearing_temperature': 'float32',
  146. 'gearboxmedium_speed_shaftbearing_temperature': 'float32',
  147. 'gearbox_low_speed_shaft_bearing_temperature': 'float32',
  148. 'generator_winding1_temperature': 'float32',
  149. 'generator_winding2_temperature': 'float32',
  150. 'generator_winding3_temperature': 'float32',
  151. 'wind_turbine_status': 'float32',
  152. 'wind_turbine_status2': 'float32',
  153. 'turbulence_intensity': 'float32',
  154. 'year': 'int32',
  155. 'month': 'int32',
  156. 'day': 'int32'
  157. })
  158. return dataFrame
  159. def getTurbines(self,conf:Contract,turbineInfo:pd.DataFrame):
  160. return conf.dataContract.dataFilter.turbines if not self.common.isNone(
  161. conf.dataContract.dataFilter.turbines) and len(conf.dataContract.dataFilter.turbines) > 0 else turbineInfo[Field_CodeOfTurbine].unique()
  162. def executeAnalysis(self, conf: Contract):
  163. # 根据输入参数 conf ,加载台账(电场、机型、机组、测风塔信息)、发电机组scada的分钟级与秒级、测风塔运行、事件等数据
  164. # 若同一电场,具有多种机型的机组,注意按机型分组分析
  165. outputProcessor = OutputProcessor(
  166. conf, self.logger, self.dbUtil, self.minioUtil)
  167. # Initialize an empty DataFrame to store merged results
  168. DataFrameOutput = pd.DataFrame()
  169. try:
  170. foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  171. with foundationDB.session_scope() as foundationSession:
  172. outputProcessor.analysisState(
  173. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=10)
  174. # 机组筛选
  175. turbines = self.getTurbines(conf,self.turbineInfo)
  176. # turbineSqlInStr = ", ".join(f"'{turbine}'" for turbine in turbines) # 使用%s作为占位符,稍后可以替换为实际值
  177. # dictionary= self.processTurbineData(turbines,conf)
  178. foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  179. with foundationDB.session_scope() as foundationSession:
  180. outputProcessor.analysisState(
  181. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=25)
  182. self.logger.info(
  183. f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成机组运行数据加载及再处理,执行请求发电性能分析...")
  184. for turbine in turbines:
  185. self.logger.info(
  186. f"Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch: {conf.dataContract.dataFilter.dataBatchNum} Turbine: {turbine} .")
  187. dataFrameOfReturn = self.notifyOfTurbine(
  188. conf, turbine)
  189. DataFrameOutput = pd.concat(
  190. [DataFrameOutput, dataFrameOfReturn], ignore_index=True)
  191. # Notify processing for all turbines
  192. Outputs = self.notifyOfTurbines( conf,turbines)
  193. DataFrameOutput = pd.concat(
  194. [Outputs, DataFrameOutput], ignore_index=True)
  195. self.logger.info(
  196. f"批次:{conf.dataContract.dataFilter.dataBatchNum} 执行完成请求发电性能分析,准备输出及更新状态")
  197. if DataFrameOutput.empty:
  198. raise CustomError(103)
  199. else:
  200. outputProcessor.process(conf.dataContract.dataFilter.powerFarmID,
  201. conf.dataContract.dataFilter.dataBatchNum, DataFrameOutput)
  202. self.logger.info(
  203. f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成请求发电性能分析、输出及更新状态")
  204. except (CustomError, Exception) as e:
  205. self.logger.info(f" executeAnalysis error: {e.message}")
  206. traceback.format_exc()
  207. try:
  208. dbUtil = GetBusinessFoundationDbUtil()
  209. ex = e if isinstance(e, CustomError) else CustomError(-1)
  210. code = ex.code
  211. message = ex.message
  212. with dbUtil.session_scope() as session:
  213. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  214. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  215. except Exception as e1:
  216. # 使用 traceback 模块获取完整的异常信息
  217. error_message = ''.join(traceback.format_exception(e1))
  218. # 记录异常信息
  219. self.logger.error(f"捕获到异常: {error_message}")
  220. return DataFrameOutput