dataProcessor.py 11 KB


  1. import concurrent.futures
  2. import os
  3. import traceback
  4. from logging import Logger
  5. import pandas as pd
  6. from algorithmContract.confBusiness import *
  7. from algorithmContract.configAnalysis import ConfigAnalysis
  8. from algorithmContract.const import *
  9. from algorithmContract.contract import Contract
  10. from behavior.analyst import Analyst
  11. from behavior.dalAnalyst import DALAnalyst
  12. from behavior.outputProcessor import OutputProcessor
  13. from common.appConfig import GetBusinessFoundationDbUtil
  14. from common.commonBusiness import CommonBusiness
  15. from utils.minioUtil.threadSafeMinioClient import ThreadSafeMinioClient
  16. from utils.rdbmsUtil.databaseUtil import DatabaseUtil
  17. class DataProcessor:
  18. def __init__(self, logger: Logger, dbUtil: dict[str, DatabaseUtil], minioUtil: ThreadSafeMinioClient, conf: Contract):
  19. self.logger = logger
  20. self.dbUtil = dbUtil
  21. self.minioUtil = minioUtil
  22. self.conf = conf
  23. self.common = CommonBusiness()
  24. self._baseAnalysts = []
  25. self._analysts = []
  26. dal = DALAnalyst(logger, dbUtil)
  27. # 加载所有新能源场站信息
  28. self.powerFarmInfo: pd.DataFrame
  29. # 加载所有风电机组信息
  30. self.turbineInfo: pd.DataFrame
  31. # 加载数据转换信息
  32. self.dataTransfer: pd.DataFrame
  33. # 加载机型信息
  34. self.turbineModelInfo: pd.DataFrame
  35. # 加载所有测风塔信息
  36. self.weatherStationInfo: pd.DataFrame
  37. # 加载所有新能源场站,及所属风电机组机型的合同功率曲线
  38. self.dataFrameContractOfTurbine: pd.DataFrame
  39. def attach(self, analyst: Analyst):
  40. if analyst not in self._analysts:
  41. self._analysts.append(analyst)
  42. def detach(self, analyst: Analyst):
  43. try:
  44. self._analysts.remove(analyst)
  45. except ValueError:
  46. pass
  47. def userDataFrame(self, dictionary: dict, configs: list[ConfigAnalysis], analyst: Analyst) -> pd.DataFrame:
  48. timeGranularity = next(
  49. (config.scada for config in configs if config.className.lower() == type(analyst).__name__.lower()), None)
  50. return pd.DataFrame() if dictionary[timeGranularity].empty else dictionary[timeGranularity]
  51. def singleTurbineDataNotify(self,analyst,conf:Contract,turbineCode:str):
  52. outputAnalysisDir = analyst.getOutputAnalysisDir()
  53. outputFilePath = r"{}/{}{}".format(outputAnalysisDir,
  54. turbineCode, CSVSuffix)
  55. return analyst.analysisOfTurbine(
  56. outputAnalysisDir, outputFilePath, conf, turbineCode)
  57. def notifyOfTurbine(self, conf: Contract, turbineCode: str) -> pd.DataFrame:
  58. results =[]
  59. maxWorkers = 4
  60. with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
  61. futures = [
  62. executor.submit(self.singleTurbineDataNotify,analyst, conf,turbineCode)
  63. for analyst in self._analysts
  64. ]
  65. for future in concurrent.futures.as_completed(futures):
  66. try:
  67. result = future.result()
  68. results.append(result)
  69. except Exception as exc:
  70. raise exc
  71. returnResult = pd.concat(results, ignore_index=True)
  72. return returnResult
  73. def multiTubineDataNotify(self,analyst,conf:Contract,turbineCodes):
  74. try:
  75. outputAnalysisDir = analyst.getOutputAnalysisDir()
  76. dataFram = analyst.analysisOfTurbines(outputAnalysisDir, conf,turbineCodes)
  77. self.logger.info(f"analysis type execute result : dataFrame = {dataFram}")
  78. return dataFram
  79. except Exception as exception:
  80. self.logger.info(f"analysis type execute error : {traceback.format_exc()}")
  81. raise exception
  82. def notifyOfTurbines(self, conf: Contract,turbineCodes) -> pd.DataFrame:
  83. results =[]
  84. maxWorkers = 4
  85. with concurrent.futures.ThreadPoolExecutor(max_workers=maxWorkers) as executor:
  86. futures = [
  87. executor.submit(self.multiTubineDataNotify,analyst, conf,turbineCodes)
  88. for analyst in self._analysts
  89. ]
  90. self.logger.info("Waiting for all futures to complete...")
  91. for future in concurrent.futures.as_completed(futures):
  92. self.logger.info(f"future is : {future}")
  93. try:
  94. result = future.result()
  95. results.append(result)
  96. except Exception as exc:
  97. self.logger.info(f"future deal error. future : {future} , error is {traceback.format_exc()}")
  98. raise exc
  99. returnResult = pd.concat(results, ignore_index=True)
  100. return returnResult
  101. def loadDataCSV(self, dataBatchNum: str, timeGranularity: str, turbineCode: str, condition: str, csvFileDir: str = f"data/mima_dt/second") -> pd.DataFrame:
  102. csvFilePath = os.path.join(csvFileDir, f"{turbineCode}.csv")
  103. print("current csv file path:", csvFilePath)
  104. dataFrame = pd.read_csv(csvFilePath, header=0)
  105. dataFrame = dataFrame.astype({
  106. 'wind_turbine_number': 'string',
  107. 'wind_turbine_name': 'string',
  108. 'time_stamp': 'datetime64[ns]',
  109. 'active_power': 'float32',
  110. 'rotor_speed': 'float32',
  111. 'generator_speed': 'float32',
  112. 'wind_velocity': 'float32',
  113. 'pitch_angle_blade_1': 'float32',
  114. 'pitch_angle_blade_2': 'float32',
  115. 'pitch_angle_blade_3': 'float32',
  116. 'cabin_position': 'float32',
  117. 'true_wind_direction': 'float32',
  118. 'yaw_error1': 'float32',
  119. 'set_value_of_active_power': 'float32',
  120. 'gearbox_oil_temperature': 'float32',
  121. 'generatordrive_end_bearing_temperature': 'float32',
  122. 'generatornon_drive_end_bearing_temperature': 'float32',
  123. 'cabin_temperature': 'float32',
  124. 'twisted_cable_angle': 'float32',
  125. 'front_back_vibration_of_the_cabin': 'float32',
  126. 'side_to_side_vibration_of_the_cabin': 'float32',
  127. 'actual_torque': 'float32',
  128. 'given_torque': 'float32',
  129. 'clockwise_yaw_count': 'float32',
  130. 'counterclockwise_yaw_count': 'float32',
  131. 'unusable': 'float32',
  132. 'power_curve_available': 'float32',
  133. 'required_gearbox_speed': 'float32',
  134. 'inverter_speed_master_control': 'float32',
  135. 'outside_cabin_temperature': 'float32',
  136. 'main_bearing_temperature': 'float32',
  137. 'gearbox_high_speed_shaft_bearing_temperature': 'float32',
  138. 'gearboxmedium_speed_shaftbearing_temperature': 'float32',
  139. 'gearbox_low_speed_shaft_bearing_temperature': 'float32',
  140. 'generator_winding1_temperature': 'float32',
  141. 'generator_winding2_temperature': 'float32',
  142. 'generator_winding3_temperature': 'float32',
  143. 'wind_turbine_status': 'float32',
  144. 'wind_turbine_status2': 'float32',
  145. 'turbulence_intensity': 'float32',
  146. 'year': 'int32',
  147. 'month': 'int32',
  148. 'day': 'int32'
  149. })
  150. return dataFrame
  151. def getTurbines(self,conf:Contract,turbineInfo:pd.DataFrame):
  152. return conf.dataContract.dataFilter.turbines if not self.common.isNone(
  153. conf.dataContract.dataFilter.turbines) and len(conf.dataContract.dataFilter.turbines) > 0 else turbineInfo[Field_CodeOfTurbine].unique()
  154. def executeAnalysis(self, conf: Contract):
  155. # 根据输入参数 conf ,加载台账(电场、机型、机组、测风塔信息)、发电机组scada的分钟级与秒级、测风塔运行、事件等数据
  156. # 若同一电场,具有多种机型的机组,注意按机型分组分析
  157. outputProcessor = OutputProcessor(
  158. conf, self.logger, self.dbUtil, self.minioUtil)
  159. # Initialize an empty DataFrame to store merged results
  160. DataFrameOutput = pd.DataFrame()
  161. try:
  162. foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  163. with foundationDB.session_scope() as foundationSession:
  164. outputProcessor.analysisState(
  165. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=10)
  166. # 机组筛选
  167. turbines = self.getTurbines(conf,self.turbineInfo)
  168. # turbineSqlInStr = ", ".join(f"'{turbine}'" for turbine in turbines) # 使用%s作为占位符,稍后可以替换为实际值
  169. # dictionary= self.processTurbineData(turbines,conf)
  170. foundationDB: DatabaseUtil = self.dbUtil[DATABASE_BusinessFoundationDb]
  171. with foundationDB.session_scope() as foundationSession:
  172. outputProcessor.analysisState(
  173. foundationSession, conf.dataContract.dataFilter.dataBatchNum, AnalysisState.Analyzing.value, analysisProgress=25)
  174. self.logger.info(
  175. f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成机组运行数据加载及再处理,执行请求发电性能分析...")
  176. for turbine in turbines:
  177. self.logger.info(
  178. f"Power Farm: {conf.dataContract.dataFilter.powerFarmID} Batch: {conf.dataContract.dataFilter.dataBatchNum} Turbine: {turbine} .")
  179. dataFrameOfReturn = self.notifyOfTurbine(
  180. conf, turbine)
  181. DataFrameOutput = pd.concat(
  182. [DataFrameOutput, dataFrameOfReturn], ignore_index=True)
  183. # Notify processing for all turbines
  184. Outputs = self.notifyOfTurbines( conf,turbines)
  185. DataFrameOutput = pd.concat(
  186. [Outputs, DataFrameOutput], ignore_index=True)
  187. self.logger.info(
  188. f"批次:{conf.dataContract.dataFilter.dataBatchNum} 执行完成请求发电性能分析,准备输出及更新状态")
  189. if DataFrameOutput.empty:
  190. raise CustomError(103)
  191. else:
  192. outputProcessor.process(conf.dataContract.dataFilter.powerFarmID,
  193. conf.dataContract.dataFilter.dataBatchNum, DataFrameOutput)
  194. self.logger.info(
  195. f"批次:{conf.dataContract.dataFilter.dataBatchNum} 完成请求发电性能分析、输出及更新状态")
  196. except (CustomError, Exception) as e:
  197. self.logger.info(f" executeAnalysis error: {traceback.format_exc()}")
  198. try:
  199. dbUtil = GetBusinessFoundationDbUtil()
  200. ex = e if isinstance(e, CustomError) else CustomError(-1)
  201. code = ex.code
  202. message = ex.message
  203. with dbUtil.session_scope() as session:
  204. outputProcessor.analysisState(session, conf.dataContract.dataFilter.dataBatchNum,
  205. AnalysisState.Analyzed.value, ErrorState.Err.value, code, message)
  206. except Exception as e1:
  207. # 使用 traceback 模块获取完整的异常信息
  208. error_message = ''.join(traceback.format_exception(e1))
  209. # 记录异常信息
  210. self.logger.error(f"捕获到异常: {traceback.format_exc()}")
  211. return DataFrameOutput