dataProcessor.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. import os
  2. from datetime import datetime
  3. import concurrent.futures
  4. import pandas as pd
  5. from confBusiness import ConfBusiness, Field_NameOfTurbine, Field_GeneratorSpeed, Field_GeneratorTorque,Field_AngleIncluded
  6. from .utils.directoryUtil import DirectoryUtil as dir
  7. from .baseAnalyst import BaseAnalyst
  8. from .analyst import Analyst
  9. from .commonBusiness import CommonBusiness
  10. class DataProcessor:
  11. def __init__(self):
  12. self.common=CommonBusiness()
  13. self._baseAnalysts = []
  14. self._noCustomFilterAnalysts = []
  15. self._analysts = []
  16. self.node_filter_value_state_turbine = "filter_value_state_turbine"
  17. self.node_angle_pitch_min = "angle_pitch_min"
  18. self.node_angle_pitch_max = "angle_pitch_max"
  19. self.node_speed_wind_cut_in = "speed_wind_cut_in"
  20. self.node_speed_wind_cut_out = "speed_wind_cut_out"
  21. self.node_active_power_min = "active_power_min"
  22. self.node_active_power_max = "active_power_max"
  23. def attachBaseAnalyst(self, analyst: BaseAnalyst):
  24. if analyst not in self._analysts:
  25. self._baseAnalysts.append(analyst)
  26. def detachBaseAnalyst(self, analyst: BaseAnalyst):
  27. try:
  28. self._baseAnalysts.remove(analyst)
  29. except ValueError:
  30. pass
  31. def attach(self, analyst: Analyst):
  32. if analyst not in self._analysts:
  33. self._analysts.append(analyst)
  34. def detach(self, analyst: Analyst):
  35. try:
  36. self._analysts.remove(analyst)
  37. except ValueError:
  38. pass
  39. def turbineNotify(self,
  40. dataFrameOfTurbine: pd.DataFrame,
  41. confData: ConfBusiness,
  42. turbineName):
  43. for analyst in self._analysts:
  44. outputAnalysisDir = analyst.getOutputAnalysisDir()
  45. outputFilePath = r"{}/{}_{}.csv".format(
  46. outputAnalysisDir, turbineName, analyst.typeAnalyst())
  47. analyst.analysisOfTurbine(
  48. dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName)
  49. def turbinesNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness):
  50. for analyst in self._analysts:
  51. outputAnalysisDir = analyst.getOutputAnalysisDir()
  52. analyst.analysisOfTurbines(
  53. dataFrameOfTurbines, outputAnalysisDir, confData)
  54. def baseAnalystTurbineNotify(self,
  55. dataFrameOfTurbine: pd.DataFrame,
  56. confData: ConfBusiness,
  57. turbineName):
  58. for analyst in self._baseAnalysts:
  59. outputAnalysisDir = analyst.getOutputAnalysisDir()
  60. outputFilePath = r"{}/{}_{}.csv".format(
  61. outputAnalysisDir, turbineName, analyst.typeAnalyst())
  62. analyst.analysisOfTurbine(
  63. dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName)
  64. def baseAnalystNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness):
  65. for analyst in self._baseAnalysts:
  66. outputAnalysisDir = analyst.getOutputAnalysisDir()
  67. analyst.analysisOfTurbines(
  68. dataFrameOfTurbines, outputAnalysisDir, confData)
  69. def filterCustom(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
  70. if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_in in confData.filter \
  71. and not self.common.isNone(confData.filter[self.node_speed_wind_cut_in]) \
  72. and not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_out in confData.filter \
  73. and not self.common.isNone(confData.filter[self.node_speed_wind_cut_out]):
  74. windSpeedCutIn = float(
  75. confData.filter[self.node_speed_wind_cut_in])
  76. windSpeedCutOut = float(
  77. confData.filter[self.node_speed_wind_cut_out])
  78. dataFrame = dataFrame[~((dataFrame[confData.field_wind_speed] > windSpeedCutOut) | (
  79. dataFrame[confData.field_wind_speed] < 0))]
  80. dataFrame = dataFrame[~((dataFrame[confData.field_wind_speed] < windSpeedCutIn) & (
  81. dataFrame[confData.field_power] < confData.rated_power*1.2))]
  82. dataFrame = dataFrame[(
  83. dataFrame[confData.field_power] <= confData.rated_power*1.2)]
  84. # Filter rows where turbine state
  85. if not self.common.isNone(confData.field_turbine_state) and self.node_filter_value_state_turbine in confData.filter and not self.common.isNone(confData.filter[self.node_filter_value_state_turbine]):
  86. stateTurbine = confData.filter[self.node_filter_value_state_turbine]
  87. dataFrame = dataFrame[dataFrame[confData.field_turbine_state].isin(
  88. stateTurbine)]
  89. # # Filter rows where pitch
  90. # if not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_min in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_min]):
  91. # anglePitchMin = float(confData.filter[self.node_angle_pitch_min])
  92. # dataFrame = dataFrame[(
  93. # dataFrame[confData.field_pitch_angle1] >= anglePitchMin)]
  94. # if not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_max in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_max]):
  95. # anglePitchMax = float(confData.filter[self.node_angle_pitch_max])
  96. # dataFrame = dataFrame[(
  97. # dataFrame[confData.field_pitch_angle1] <= anglePitchMax)]
  98. # # Filter rows where wind speed
  99. # if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_in in confData.filter and not self.common.isNone(confData.filter[self.node_speed_wind_cut_in]):
  100. # windSpeedCutIn = float(confData.filter[self.node_speed_wind_cut_in])
  101. # dataFrame = dataFrame[(
  102. # dataFrame[confData.field_wind_speed] >= windSpeedCutIn)]
  103. # if not self.common.isNone(confData.field_wind_speed) and self.node_speed_wind_cut_out in confData.filter and not self.common.isNone(confData.filter[self.node_speed_wind_cut_out]):
  104. # windSpeedCutOut = float(confData.filter[self.node_speed_wind_cut_out])
  105. # dataFrame = dataFrame[(
  106. # dataFrame[confData.field_wind_speed] < windSpeedCutOut)]
  107. # # Filter rows where power
  108. # if not self.common.isNone(confData.field_power) and self.node_active_power_min in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_min]):
  109. # activePowerMin = float(confData.filter[self.node_active_power_min])
  110. # dataFrame = dataFrame[(
  111. # dataFrame[confData.field_power] >= activePowerMin)]
  112. # if not self.common.isNone(confData.field_power) and self.node_active_power_max in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_max]):
  113. # activePowerMax = float(confData.filter[self.node_active_power_max])
  114. # dataFrame = dataFrame[(
  115. # dataFrame[confData.field_power] < activePowerMax)]
  116. return dataFrame
  117. def filterData(self, dataFrame: pd.DataFrame, confData: ConfBusiness, dataFilter, rotationalSpeedRatio):
  118. # dataFrame = dataFrame.dropna(axis=0,subset=[confData.field_power,confData.field_wind_speed,confData.field_pitch_angle1])
  119. dataFrame = dataFrame.dropna(
  120. axis=0, subset=self.getUseColumns(confData))
  121. if confData.field_wind_speed in dataFrame.columns:
  122. dataFrame[confData.field_wind_speed] = dataFrame[confData.field_wind_speed].astype('float32')
  123. if confData.field_wind_dir in dataFrame.columns:
  124. dataFrame[confData.field_wind_dir]=dataFrame[confData.field_wind_dir].astype('float32')
  125. if confData.field_angle_included in dataFrame.columns:
  126. dataFrame[confData.field_angle_included]=dataFrame[confData.field_angle_included].astype('float32')
  127. if confData.field_power in dataFrame.columns:
  128. dataFrame[confData.field_power]=dataFrame[confData.field_power].astype('float32')
  129. if confData.field_wind_dir in dataFrame.columns:
  130. dataFrame[confData.field_wind_dir]=dataFrame[confData.field_wind_dir].astype('float32')
  131. if confData.field_pitch_angle1 in dataFrame.columns:
  132. dataFrame[confData.field_pitch_angle1]=dataFrame[confData.field_pitch_angle1].astype('float32')
  133. if confData.field_gen_speed in dataFrame.columns:
  134. dataFrame[confData.field_gen_speed]=dataFrame[confData.field_gen_speed].astype('float32')
  135. if confData.field_rotor_speed in dataFrame.columns:
  136. dataFrame[confData.field_rotor_speed]=dataFrame[confData.field_rotor_speed].astype('float32')
  137. dataFrame = dataFrame[(dataFrame[confData.field_turbine_time] >= confData.start_time) & (
  138. dataFrame[confData.field_turbine_time] < confData.end_time)]
  139. if not self.common.isNone(confData.excludingMonths) and len(confData.excludingMonths) > 0:
  140. # 给定的日期列表
  141. date_strings = []
  142. for month in confData.excludingMonths:
  143. if not self.common.isNone(month):
  144. date_strings.append(month)
  145. if len(date_strings) > 0:
  146. mask = ~dataFrame["year-month"].isin(date_strings)
  147. # 使用掩码过滤DataFrame,删除指定日期的行
  148. dataFrame = dataFrame[mask]
  149. dataFrame["年月"] = pd.to_datetime(
  150. dataFrame[confData.field_turbine_time], format="%Y-%m")
  151. dataFrame['日期'] = pd.to_datetime(
  152. dataFrame[confData.field_turbine_time], format="%Y-%m")
  153. dataFrame['monthIntTime'] = dataFrame['日期'].apply(
  154. lambda x: x.timestamp())
  155. dataFrame["year-month"] = dataFrame[confData.field_turbine_time].dt.strftime(
  156. '%Y-%m')
  157. return dataFrame
  158. def calculateAngleIncluded(self, array1, array2):
  159. """
  160. 计算两个相同长度角度数组中两两对应角度值的偏差。
  161. 结果限制在-90°到+90°之间,并保留两位小数。
  162. 参数:
  163. array1 (list): 第一个角度数组
  164. array2 (list): 第二个角度数组
  165. 返回:
  166. list: 两两对应角度的偏差列表
  167. """
  168. deviations = []
  169. for angle1, angle2 in zip(array1, array2):
  170. # 计算原始偏差
  171. deviation = angle1 - angle2
  172. # 调整偏差,使其位于-180°到+180°范围内
  173. if deviation == 0.0:
  174. deviation = 0.0
  175. else:
  176. deviation = (deviation + 180) % 360 - 180
  177. # 将偏差限制在-90°到+90°范围内
  178. if deviation > 90:
  179. deviation -= 180
  180. elif deviation < -90:
  181. deviation += 180
  182. # 保留两位小数
  183. deviations.append(round(deviation, 2))
  184. return deviations
  185. def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos):
  186. """
  187. 依据机舱位置(角度)、风向计算两者夹角
  188. """
  189. if fieldAngleIncluded not in dataFrame.columns and fieldWindDirect in dataFrame.columns and fieldNacellePos in dataFrame.columns:
  190. dataFrame[Field_AngleIncluded] = self.calculateAngleIncluded()
  191. else:
  192. dataFrame[Field_AngleIncluded] = dataFrame[fieldAngleIncluded]
  193. def recalculationOfGeneratorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio):
  194. """
  195. 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
  196. """
  197. if fieldGeneratorSpeed in dataFrame.columns:
  198. dataFrame[Field_GeneratorSpeed] = dataFrame[fieldGeneratorSpeed]
  199. if fieldGeneratorSpeed not in dataFrame.columns and fieldRotorSpeed in dataFrame.columns:
  200. dataFrame[fieldGeneratorSpeed] = rotationalSpeedRatio * \
  201. dataFrame[fieldRotorSpeed]
  202. def recalculationOfRotorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio):
  203. """
  204. 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
  205. """
  206. if fieldRotorSpeed not in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
  207. dataFrame[fieldRotorSpeed] = dataFrame[fieldGeneratorSpeed] / \
  208. rotationalSpeedRatio
  209. def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed):
  210. """
  211. 风电机组发电机转矩计算,P的单位换成KW转矩计算公式:
  212. P*1000= pi/30*T*n
  213. 30000/pi*P=T*n
  214. 30000/3.1415926*P=T*n
  215. 9549.297*p=T*n
  216. 其中:n为发电机转速,p为有功功率,T为转矩
  217. """
  218. if fieldGeneratorTorque not in dataFrame.columns and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
  219. dataFrame[fieldGeneratorTorque] = 9549.297 * \
  220. dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed]
  221. dataFrame[Field_GeneratorTorque] = dataFrame[fieldGeneratorTorque]
  222. def getUseColumns(self, confData: ConfBusiness):
  223. useColumns = []
  224. if not self.common.isNone(confData.field_turbine_time):
  225. useColumns.append(confData.field_turbine_time)
  226. if not self.common.isNone(confData.field_turbine_name):
  227. useColumns.append(confData.field_turbine_name)
  228. if not self.common.isNone(confData.field_wind_speed):
  229. useColumns.append(confData.field_wind_speed)
  230. if not self.common.isNone(confData.field_power):
  231. useColumns.append(confData.field_power)
  232. if not self.common.isNone(confData.field_pitch_angle1):
  233. useColumns.append(confData.field_pitch_angle1)
  234. if not self.common.isNone(confData.field_rotor_speed):
  235. useColumns.append(confData.field_rotor_speed)
  236. if not self.common.isNone(confData.field_gen_speed):
  237. useColumns.append(confData.field_gen_speed)
  238. if not self.common.isNone(confData.field_torque):
  239. useColumns.append(confData.field_torque)
  240. if not self.common.isNone(confData.field_wind_dir):
  241. useColumns.append(confData.field_wind_dir)
  242. if not self.common.isNone(confData.field_angle_included):
  243. useColumns.append(confData.field_angle_included)
  244. if not self.common.isNone(confData.field_nacelle_pos):
  245. useColumns.append(confData.field_nacelle_pos)
  246. if not self.common.isNone(confData.field_env_temp):
  247. useColumns.append(confData.field_env_temp)
  248. if not self.common.isNone(confData.field_nacelle_temp):
  249. useColumns.append(confData.field_nacelle_temp)
  250. if not self.common.isNone(confData.field_turbine_state):
  251. useColumns.append(confData.field_turbine_state)
  252. if not self.common.isNone(confData.field_temperature_large_components):
  253. temperature_cols = confData.field_temperature_large_components.split(
  254. ',')
  255. for temperatureColumn in temperature_cols:
  256. useColumns.append(temperatureColumn)
  257. print(useColumns)
  258. return useColumns
  259. def loadData(self, csvFilePath, skip, confData: ConfBusiness, turbineName):
  260. # Load the CSV, skipping the specified initial rows
  261. dataFrame = pd.read_csv(csvFilePath, header=0,usecols=self.getUseColumns(
  262. confData), skiprows=range(1, skip+1))
  263. dataFrame[Field_NameOfTurbine] = confData.add_W_if_starts_with_digit(
  264. turbineName)
  265. # 选择所有的数值型列
  266. dataFrame = dataFrame.convert_dtypes()
  267. numeric_cols = dataFrame.select_dtypes(
  268. include=['float64', 'float16']).columns
  269. # 将这些列转换为float32
  270. dataFrame[numeric_cols] = dataFrame[numeric_cols].astype('float32')
  271. # 首先尝试去除字符串前后的空白
  272. dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].str.strip()
  273. dataFrame[confData.field_turbine_time] = pd.to_datetime(
  274. dataFrame[confData.field_turbine_time],format='%Y-%m-%d %H:%M:%S',errors="coerce")
  275. dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].dt.strftime(
  276. '%Y-%m-%d %H:%M:%S')
  277. dataFrame[confData.field_turbine_time] = pd.to_datetime(
  278. dataFrame[confData.field_turbine_time])
  279. # 对除了“时间”列之外的所有列进行自下而上的填充(先反转后填充)
  280. # 注意:补植须要考虑业务合理性
  281. # dataFrame = dataFrame.fillna(method='ffill')
  282. # dataFrame = dataFrame.fillna(method='bfill')
  283. return dataFrame
  284. def execute(self, confData: ConfBusiness):
  285. rotationalSpeedRatio = confData.rotational_Speed_Ratio # 转速比
  286. field_Rotor_Speed = confData.field_rotor_speed # 字段 '主轴转速'
  287. field_Generator_Speed = confData.field_gen_speed # 字段 '发电机转速'
  288. field_Torque = confData.field_torque # 字段 "实际扭矩"
  289. field_Active_Power = confData.field_power # 字段 '有功功率'
  290. dataFilter = confData.filter
  291. # r'E:/BaiduNetdiskDownload/min_scada_TangZhen'
  292. csvFileDir = confData.input_path
  293. csvFileNameSplitStringForTurbine = confData.csvFileNameSplitStringForTurbine # '.csv'
  294. indexTurbine = confData.index_turbine
  295. outputRootDir = confData.output_path # r'output'
  296. # Example usage:
  297. outputDataAfterFilteringDir = r"{}/{}".format(
  298. outputRootDir, "DataAfterFiltering")
  299. dir.create_directory(outputDataAfterFilteringDir)
  300. dataFrameMergeFilter = pd.DataFrame()
  301. for rootDir, subDirs, files in dir.list_directory(csvFileDir):
  302. files = sorted(files)
  303. for file in files:
  304. if not file.endswith(".csv"):
  305. continue
  306. csvFilePath = os.path.join(rootDir, file)
  307. print(csvFilePath)
  308. turbineName = confData.add_W_if_starts_with_digit(file.split(csvFileNameSplitStringForTurbine)[
  309. indexTurbine])
  310. dataFrameFilter = self.loadData(
  311. csvFilePath, confData.skip_row_number, confData, turbineName)
  312. dataFrameFilter = self.filterData(
  313. dataFrameFilter, confData, dataFilter, rotationalSpeedRatio)
  314. self.baseAnalystTurbineNotify(dataFrameFilter,
  315. confData,
  316. turbineName)
  317. dataFrameFilter = self.filterCustom(dataFrameFilter, confData)
  318. dataFrameFilter = self.filterForMerge(
  319. dataFrameFilter, confData)
  320. if len(dataFrameFilter) <= 0:
  321. print("dataFrameFilter not data.")
  322. continue
  323. self.recalculationOfGeneratorSpeed(
  324. dataFrameFilter, field_Rotor_Speed, field_Generator_Speed, rotationalSpeedRatio)
  325. self.recalculationOfRotorSpeed(
  326. dataFrameFilter, field_Rotor_Speed, field_Generator_Speed, rotationalSpeedRatio)
  327. self.recalculationOfRotorTorque(
  328. dataFrameFilter, field_Torque, field_Active_Power, field_Generator_Speed)
  329. self.recalculationOfIncludedAngle(
  330. dataFrameFilter, confData.field_angle_included, confData.field_wind_dir, confData.field_nacelle_pos)
  331. dataFrameMergeFilter = pd.concat(
  332. [dataFrameMergeFilter, dataFrameFilter], axis=0, sort=False)
  333. # dataFrameFilter.to_csv(os.path.join(
  334. # outputDataAfterFilteringDir, "{}.csv".format(turbineName)), index=False)
  335. dataFrameFilter = self.turbineNotify(dataFrameFilter,
  336. confData,
  337. turbineName)
  338. self.baseAnalystNotify(dataFrameMergeFilter, confData)
  339. self.turbinesNotify(dataFrameMergeFilter, confData)
  340. def filterForMerge(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
  341. if not self.common.isNone(confData.field_power) and self.node_active_power_max in confData.filter and not self.common.isNone(confData.filter[self.node_active_power_max]) \
  342. and not self.common.isNone(confData.field_pitch_angle1) and self.node_angle_pitch_min in confData.filter and not self.common.isNone(confData.filter[self.node_angle_pitch_min]):
  343. activePowerMax = float(confData.filter[self.node_active_power_max])
  344. anglePitchMin = float(confData.filter[self.node_angle_pitch_min])
  345. dataFrame = dataFrame[~((dataFrame[confData.field_power] < activePowerMax*0.9) & (
  346. dataFrame[confData.field_pitch_angle1] > anglePitchMin))]
  347. return dataFrame