dataProcessor.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. import os
  2. from datetime import datetime
  3. import concurrent.futures
  4. import numpy as np
  5. import pandas as pd
  6. from utils.directoryUtil import DirectoryUtil as dir
  7. from algorithmContract.confBusiness import *
  8. from behavior.baseAnalyst import BaseAnalyst
  9. from behavior.analyst import Analyst
  10. from common.commonBusiness import CommonBusiness
  11. from algorithm.dataMarker import DataMarker
  12. class DataProcessor:
  13. def __init__(self):
  14. self.common = CommonBusiness()
  15. self._baseAnalysts = []
  16. self._analysts = []
  17. def attachBaseAnalyst(self, analyst: BaseAnalyst):
  18. if analyst not in self._analysts:
  19. self._baseAnalysts.append(analyst)
  20. def detachBaseAnalyst(self, analyst: BaseAnalyst):
  21. try:
  22. self._baseAnalysts.remove(analyst)
  23. except ValueError:
  24. pass
  25. def attach(self, analyst: Analyst):
  26. if analyst not in self._analysts:
  27. self._analysts.append(analyst)
  28. def detach(self, analyst: Analyst):
  29. try:
  30. self._analysts.remove(analyst)
  31. except ValueError:
  32. pass
  33. def turbineNotify(self,
  34. dataFrameOfTurbine: pd.DataFrame,
  35. confData: ConfBusiness,
  36. turbineName):
  37. for analyst in self._analysts:
  38. outputAnalysisDir = analyst.getOutputAnalysisDir()
  39. outputFilePath = r"{}/{}{}".format(
  40. outputAnalysisDir, turbineName, CSVSuffix)
  41. analyst.analysisOfTurbine(
  42. dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName)
  43. def turbinesNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness):
  44. for analyst in self._analysts:
  45. outputAnalysisDir = analyst.getOutputAnalysisDir()
  46. analyst.analysisOfTurbines(
  47. dataFrameOfTurbines, outputAnalysisDir, confData)
  48. def baseAnalystTurbineNotify(self,
  49. dataFrameOfTurbine: pd.DataFrame,
  50. confData: ConfBusiness,
  51. turbineName):
  52. for analyst in self._baseAnalysts:
  53. outputAnalysisDir = analyst.getOutputAnalysisDir()
  54. outputFilePath = r"{}/{}{}".format(
  55. outputAnalysisDir, turbineName, CSVSuffix)
  56. analyst.analysisOfTurbine(
  57. dataFrameOfTurbine, outputAnalysisDir, outputFilePath, confData, turbineName)
  58. def baseAnalystNotify(self, dataFrameOfTurbines: pd.DataFrame, confData: ConfBusiness):
  59. for analyst in self._baseAnalysts:
  60. outputAnalysisDir = analyst.getOutputAnalysisDir()
  61. analyst.analysisOfTurbines(
  62. dataFrameOfTurbines, outputAnalysisDir, confData)
  63. def calculateAngleIncluded(self, array1, array2):
  64. """
  65. 计算两个相同长度角度数组中两两对应角度值的偏差。
  66. 结果限制在-90°到+90°之间,并保留两位小数。
  67. 参数:
  68. array1 (list): 第一个角度数组
  69. array2 (list): 第二个角度数组
  70. 返回:
  71. list: 两两对应角度的偏差列表
  72. """
  73. deviations = []
  74. for angle1, angle2 in zip(array1, array2):
  75. # 计算原始偏差
  76. deviation = angle1 - angle2
  77. # 调整偏差,使其位于-180°到+180°范围内
  78. if deviation == 0.0:
  79. deviation = 0.0
  80. else:
  81. deviation = (deviation + 180) % 360 - 180
  82. # 将偏差限制在-90°到+90°范围内
  83. if deviation > 90:
  84. deviation -= 180
  85. elif deviation < -90:
  86. deviation += 180
  87. # 保留两位小数
  88. deviations.append(round(deviation, 2))
  89. return deviations
  90. def recalculationOfIncludedAngle(self, dataFrame: pd.DataFrame, fieldAngleIncluded, fieldWindDirect, fieldNacellePos):
  91. """
  92. 依据机舱位置(角度)、风向计算两者夹角
  93. """
  94. if not self.common.isNone(fieldAngleIncluded) and fieldAngleIncluded in dataFrame.columns:
  95. dataFrame[Field_AngleIncluded] = dataFrame[fieldAngleIncluded]
  96. if self.common.isNone(fieldAngleIncluded) and fieldAngleIncluded not in dataFrame.columns and fieldWindDirect in dataFrame.columns and fieldNacellePos in dataFrame.columns:
  97. dataFrame[Field_AngleIncluded] = self.calculateAngleIncluded(
  98. dataFrame[fieldNacellePos], dataFrame[fieldWindDirect])
  99. def recalculationOfGeneratorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio):
  100. """
  101. 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
  102. """
  103. if fieldGeneratorSpeed in dataFrame.columns:
  104. dataFrame[Field_GeneratorSpeed] = dataFrame[fieldGeneratorSpeed]
  105. if fieldGeneratorSpeed not in dataFrame.columns and fieldRotorSpeed in dataFrame.columns:
  106. dataFrame[fieldGeneratorSpeed] = rotationalSpeedRatio * \
  107. dataFrame[fieldRotorSpeed]
  108. def recalculationOfRotorSpeed(self, dataFrame: pd.DataFrame, fieldRotorSpeed, fieldGeneratorSpeed, rotationalSpeedRatio):
  109. """
  110. 风电机组发电机转速再计算,公式:转速比=发电机转速/叶轮或主轴转速
  111. """
  112. if fieldRotorSpeed not in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
  113. dataFrame[fieldRotorSpeed] = dataFrame[fieldGeneratorSpeed] / \
  114. rotationalSpeedRatio
  115. if not self.common.isNone(fieldRotorSpeed) and fieldRotorSpeed in dataFrame.columns:
  116. dataFrame[Field_RotorSpeed] = dataFrame[fieldRotorSpeed]
  117. def recalculationOfRotorTorque(self, dataFrame: pd.DataFrame, fieldGeneratorTorque, fieldActivePower, fieldGeneratorSpeed):
  118. """
  119. 风电机组发电机转矩计算,P的单位换成KW转矩计算公式:
  120. P*1000= pi/30*T*n
  121. 30000/pi*P=T*n
  122. 30000/3.1415926*P=T*n
  123. 9549.297*p=T*n
  124. 其中:n为发电机转速,p为有功功率,T为转矩
  125. """
  126. if self.common.isNone(fieldGeneratorTorque) and fieldActivePower in dataFrame.columns and fieldGeneratorSpeed in dataFrame.columns:
  127. dataFrame[Field_GeneratorTorque] = 9549.297 * \
  128. dataFrame[fieldActivePower]/dataFrame[fieldGeneratorSpeed]
  129. if fieldGeneratorTorque in dataFrame.columns:
  130. dataFrame[Field_GeneratorTorque] = dataFrame[fieldGeneratorTorque]
  131. def recalculation(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
  132. """
  133. 再计算数据测点
  134. 参数:
  135. dataFrame 原始数据
  136. confData 配置数据
  137. """
  138. self.recalculationOfGeneratorSpeed(
  139. dataFrame, confData.field_rotor_speed, confData.field_gen_speed, confData.rotational_Speed_Ratio)
  140. self.recalculationOfRotorSpeed(
  141. dataFrame, confData.field_rotor_speed, confData.field_gen_speed, confData.rotational_Speed_Ratio)
  142. self.recalculationOfRotorTorque(
  143. dataFrame, confData.field_torque, confData.field_power, confData.field_gen_speed)
  144. self.recalculationOfIncludedAngle(
  145. dataFrame, confData.field_angle_included, confData.field_wind_dir, confData.field_nacelle_pos)
  146. self.common.calculateTSR(dataFrame, confData)
  147. self.common.calculateCp(dataFrame, confData)
  148. def filterWithDateTime(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
  149. dataFrame = dataFrame[(dataFrame[confData.field_turbine_time] >= confData.start_time) & (
  150. dataFrame[confData.field_turbine_time] < confData.end_time)]
  151. return dataFrame
  152. def processDateTime(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
  153. dataFrame["年月"] = pd.to_datetime(
  154. dataFrame[confData.field_turbine_time], format="%Y-%m")
  155. dataFrame['日期'] = pd.to_datetime(
  156. dataFrame[confData.field_turbine_time], format="%Y-%m")
  157. dataFrame['monthIntTime'] = dataFrame['日期'].apply(
  158. lambda x: x.timestamp())
  159. dataFrame[Field_YearMonth] = dataFrame[confData.field_turbine_time].dt.strftime(
  160. '%Y-%m')
  161. dataFrame[Field_YearMonthDay] = dataFrame[confData.field_turbine_time].dt.strftime(
  162. '%Y-%m-%d')
  163. if not self.common.isNone(confData.excludingMonths) and len(confData.excludingMonths) > 0:
  164. # 给定的日期列表
  165. date_strings = []
  166. for month in confData.excludingMonths:
  167. if not self.common.isNone(month):
  168. date_strings.append(month)
  169. if len(date_strings) > 0:
  170. mask = ~dataFrame[Field_YearMonth].isin(date_strings)
  171. # 使用掩码过滤DataFrame,删除指定日期的行
  172. dataFrame = dataFrame[mask]
  173. return dataFrame
  174. def setColumnDataType(self, dataFrame: pd.DataFrame, confData: ConfBusiness):
  175. # 选择所有的数值型列
  176. dataFrame = dataFrame.convert_dtypes()
  177. numeric_cols = dataFrame.select_dtypes(
  178. include=['float64', 'float16']).columns
  179. # 将这些列转换为float32
  180. dataFrame[Field_NameOfTurbine] = dataFrame[Field_NameOfTurbine].astype(str)
  181. # 将这些列转换为float32
  182. dataFrame[numeric_cols] = dataFrame[numeric_cols].astype(
  183. self.common.getFloat32())
  184. if not self.common.isNone(confData.field_turbine_time) and confData.field_turbine_time in dataFrame.columns:
  185. # 首先尝试去除字符串前后的空白
  186. dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].str.strip()
  187. dataFrame[confData.field_turbine_time] = pd.to_datetime(
  188. dataFrame[confData.field_turbine_time], format='%Y-%m-%d %H:%M:%S', errors="coerce")
  189. dataFrame[confData.field_turbine_time] = dataFrame[confData.field_turbine_time].dt.strftime(
  190. '%Y-%m-%d %H:%M:%S')
  191. dataFrame[confData.field_turbine_time] = pd.to_datetime(
  192. dataFrame[confData.field_turbine_time])
  193. # 删除时间字段为空的行记录
  194. dataFrame.dropna(
  195. axis=0, subset=[confData.field_turbine_time], inplace=True)
  196. if confData.field_wind_speed in dataFrame.columns:
  197. dataFrame[confData.field_wind_speed] = dataFrame[confData.field_wind_speed].astype(
  198. self.common.getFloat32())
  199. if confData.field_wind_dir in dataFrame.columns:
  200. dataFrame[confData.field_wind_dir] = dataFrame[confData.field_wind_dir].astype(
  201. self.common.getFloat32())
  202. if confData.field_angle_included in dataFrame.columns:
  203. dataFrame[confData.field_angle_included] = dataFrame[confData.field_angle_included].astype(
  204. self.common.getFloat32())
  205. if confData.field_power in dataFrame.columns:
  206. dataFrame[confData.field_power] = dataFrame[confData.field_power].astype(
  207. self.common.getFloat32())
  208. if confData.field_wind_dir in dataFrame.columns:
  209. dataFrame[confData.field_wind_dir] = dataFrame[confData.field_wind_dir].astype(
  210. self.common.getFloat32())
  211. if confData.field_pitch_angle1 in dataFrame.columns:
  212. dataFrame[confData.field_pitch_angle1] = dataFrame[confData.field_pitch_angle1].astype(
  213. self.common.getFloat32())
  214. if confData.field_pitch_angle2 in dataFrame.columns:
  215. dataFrame[confData.field_pitch_angle2] = dataFrame[confData.field_pitch_angle2].astype(
  216. self.common.getFloat32())
  217. if confData.field_pitch_angle3 in dataFrame.columns:
  218. dataFrame[confData.field_pitch_angle3] = dataFrame[confData.field_pitch_angle3].astype(
  219. self.common.getFloat32())
  220. if confData.field_gen_speed in dataFrame.columns:
  221. dataFrame[confData.field_gen_speed] = dataFrame[confData.field_gen_speed].astype(
  222. self.common.getFloat32())
  223. if confData.field_rotor_speed in dataFrame.columns:
  224. dataFrame[confData.field_rotor_speed] = dataFrame[confData.field_rotor_speed].astype(
  225. self.common.getFloat32())
  226. if confData.field_Cabin_Vibrate_X in dataFrame.columns:
  227. dataFrame[confData.field_Cabin_Vibrate_X] = dataFrame[confData.field_Cabin_Vibrate_X].astype(
  228. self.common.getFloat32())
  229. if confData.field_Cabin_Vibrate_Y in dataFrame.columns:
  230. dataFrame[confData.field_Cabin_Vibrate_Y] = dataFrame[confData.field_Cabin_Vibrate_Y].astype(
  231. self.common.getFloat32())
  232. if confData.field_activePowerSet in dataFrame.columns:
  233. dataFrame[confData.field_activePowerSet] = dataFrame[confData.field_activePowerSet].astype(
  234. self.common.getFloat32())
  235. if confData.field_activePowerAvailable in dataFrame.columns:
  236. dataFrame[confData.field_activePowerAvailable] = dataFrame[confData.field_activePowerAvailable].astype(
  237. self.common.getFloat32())
  238. return dataFrame
  239. def loadData(self, csvFilePath, confData: ConfBusiness, turbineName):
  240. useColumns = self.common.getUseColumns(confData)
  241. # Load the CSV, skipping the specified initial rows
  242. dataFrame = pd.read_csv(csvFilePath, header=0, usecols=useColumns, skiprows=range(
  243. 1, confData.skip_row_number+1))
  244. if not self.common.isNone(confData.field_turbine_name) and confData.field_turbine_name in dataFrame.columns:
  245. dataFrame[Field_NameOfTurbine] = dataFrame[confData.field_turbine_name]
  246. else:
  247. dataFrame[Field_NameOfTurbine] = turbineName
  248. # 对除了“时间”列之外的所有列进行自下而上的填充(先反转后填充)
  249. # 注意:补植须要考虑业务合理性
  250. # dataFrame = dataFrame.fillna(method='ffill')
  251. # dataFrame = dataFrame.fillna(method='bfill')
  252. return dataFrame
  253. def execute(self, confData: ConfBusiness):
  254. outputDataAfterFilteringDir = r"{}/{}".format(
  255. confData.output_path, "DataAfterFiltering")
  256. dir.create_directory(outputDataAfterFilteringDir)
  257. labler = DataMarker() #类的实例化
  258. dataFrameMerge = pd.DataFrame()
  259. for rootDir, subDirs, files in dir.list_directory(confData.input_path):
  260. files = sorted(files)
  261. for file in files:
  262. if not file.endswith(CSVSuffix):
  263. continue
  264. csvFilePath = os.path.join(rootDir, file)
  265. print(f"current csv file path: {csvFilePath}")
  266. turbineName = confData.add_W_if_starts_with_digit(file.split(confData.csvFileNameSplitStringForTurbine)[
  267. confData.index_turbine])
  268. dataFrame = self.loadData(
  269. csvFilePath, confData, turbineName)
  270. turbineName = dataFrame[Field_NameOfTurbine].loc[0]
  271. if len(dataFrame) <= 0:
  272. print("dataFrameFilter not data.")
  273. continue
  274. dataFrame = self.setColumnDataType(
  275. dataFrame, confData)
  276. dataFrame = self.processDateTime(dataFrame, confData)
  277. dataFrame = self.filterWithDateTime(dataFrame, confData)
  278. # self.baseAnalystTurbineNotify(dataFrame,
  279. # confData,
  280. # turbineName)
  281. self.recalculation(dataFrame, confData)
  282. # dataFrame = labler.main(confData,dataFrame)
  283. dataFrameMerge = pd.concat(
  284. [dataFrameMerge, dataFrame], axis=0, sort=False)
  285. # dataFrame.to_csv(os.path.join(
  286. # outputDataAfterFilteringDir, "{}{}".format(turbineName,CSVSuffix)), index=False)
  287. dataFrame = self.turbineNotify(dataFrame,
  288. confData,
  289. turbineName)
  290. # self.baseAnalystNotify(dataFrameMergeFilter, confData)
  291. self.turbinesNotify(dataFrameMerge, confData)