dataIntegrityOfSecondAnalyst.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import os
  2. import pandas as pd
  3. import numpy as np
  4. import matplotlib.pyplot as plt
  5. import seaborn as sns
  6. import plotly.graph_objects as go
  7. from plotly.subplots import make_subplots
  8. from geopy.distance import geodesic
  9. from behavior.analystNotFilter import AnalystNotFilter
  10. from utils.directoryUtil import DirectoryUtil as dir
  11. from algorithmContract.confBusiness import *
  12. import calendar
  13. import random
  14. from datetime import datetime
  15. from algorithmContract.contract import Contract
  16. class DataIntegrityOfSecondAnalyst(AnalystNotFilter):
  17. """
  18. 风电机组秒级数据完整度分析
  19. """
  20. def typeAnalyst(self):
  21. return "data_integrity_second"
  22. def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
  23. select = [Field_DeviceCode, Field_Time, Field_ActiverPower, Field_WindSpeed, Field_NacPos, Field_WindDirection, Field_RotorSpeed, Field_GeneratorSpeed, Field_GeneratorTorque, Field_AngleIncluded, Field_EnvTemp, Field_NacTemp, Field_PitchAngel1, Field_PitchAngel2, Field_PitchAngel3]
  24. dictionary = self.processTurbineData(turbineCodes, conf, select)
  25. dataFrameOfTurbines = self.userDataFrame(
  26. dictionary, conf.dataContract.configAnalysis, self)
  27. turbrineInfos = self.common.getTurbineInfos(
  28. conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
  29. currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin(
  30. turbineCodes)]
  31. # 将 currTurbineInfos 转换为字典
  32. currTurbineInfosDict = turbrineInfos.set_index(
  33. Field_CodeOfTurbine)[Field_NameOfTurbine].to_dict()
  34. # 使用 map 函数来填充 Field_NameOfTurbine 列
  35. currDataFrameOfTurbines[Field_NameOfTurbine] = currDataFrameOfTurbines[Field_CodeOfTurbine].map(
  36. currTurbineInfosDict).fillna("")
  37. groupedDataFrame = self.dataIntegrityByMonth(
  38. dataFrameOfTurbines, conf, Field_NameOfTurbine)
  39. print("groupedDataFrame : \n {}".format(groupedDataFrame.head()))
  40. return self.plotByAllMonth(groupedDataFrame, outputAnalysisDir, self.powerFarmInfo[Field_PowerFarmName].iloc[0], Field_NameOfTurbine, conf)
  41. def fullMonthIndex(self, start_time, end_time, turbine_name, new_frame):
  42. months = (end_time.year - start_time.year) * \
  43. 12 + end_time.month - start_time.month
  44. month_range = ['%04d-%02d' % (int(start_time.year + mon//12), int(mon % 12+1))
  45. for mon in range(start_time.month-1, start_time.month+months)]
  46. month_index = pd.DataFrame(month_range, columns=[Field_YearMonth])
  47. plot_res = pd.DataFrame()
  48. grouped = new_frame.groupby(turbine_name)
  49. for name, group in grouped:
  50. group = pd.merge(group, month_index,
  51. on=Field_YearMonth, how='outer')
  52. group['数据完整度%'] = group['数据完整度%'].fillna(0)
  53. group[turbine_name] = name
  54. group['year'] = group[Field_YearMonth].apply(
  55. lambda x: str(x).split('-')[0])
  56. group['month'] = group[Field_YearMonth].apply(
  57. lambda x: str(x).split('-')[1])
  58. plot_res = pd.concat([plot_res, group], axis=0, sort=False)
  59. return plot_res
  60. def get_time_space(self,df, time_str):
  61. """
  62. :return: 查询时间间隔(单位:秒)
  63. """
  64. df1 = pd.DataFrame(df[time_str])
  65. df1['chazhi'] = df1[time_str].shift(-1) - df1[time_str]
  66. result = df1.sample(int(df1.shape[0] / 100))['chazhi'].value_counts().idxmax().seconds
  67. del df1
  68. return result
  69. def dataIntegrityByMonth(self, dataFrameMerge: pd.DataFrame, conf: Contract, Field_NameOfTurbine):
  70. grouped = dataFrameMerge.groupby([dataFrameMerge.loc[:, Field_Time].dt.year.rename('year'),
  71. dataFrameMerge.loc[:, Field_Time].dt.month.rename(
  72. 'month'),
  73. dataFrameMerge.loc[:, Field_NameOfTurbine]]).agg({'count'})[Field_Time].rename({'count': '长度'}, axis=1)
  74. new_frame = grouped.reset_index('month')
  75. # timeGranularity = self.dataTransfer[self.dataTransfer[Field_TransferType] == Const_TimeGranularity_Second][Field_TimeGranularity].iloc[0] if self.typeAnalyst(
  76. # ) == "data_integrity_second" else self.dataTransfer[self.dataTransfer[Field_TransferType] == Const_TimeGranularity_Minute][Field_TimeGranularity].iloc[0]
  77. timeGranularity=self.get_time_space(dataFrameMerge,Field_Time)
  78. self.logger.info(f"{self.typeAnalyst()} timeGranularity-->{timeGranularity}")
  79. new_frame = new_frame.reset_index()
  80. new_frame['数据完整度'] = (100 * new_frame['长度'] / (new_frame.apply(lambda row: calendar.monthrange(
  81. row['year'], row['month'])[1] * 24 * 3600 / timeGranularity, axis=1))).round(decimals=0)
  82. new_frame = new_frame.rename(columns={'数据完整度': '数据完整度%'})
  83. new_frame['month'] = new_frame['month'].astype(
  84. str).apply(lambda x: x.zfill(2))
  85. new_frame[Field_YearMonth] = new_frame['year'].astype(
  86. str) + '-' + new_frame['month'].astype(str)
  87. beginTime = None
  88. if not self.common.isNone(conf.dataContract.dataFilter.beginTime):
  89. beginTime = conf.dataContract.dataFilter.beginTime
  90. else:
  91. beginTime = dataFrameMerge[Field_Time].min().strftime(
  92. '%Y-%m-%d %H:%M:%S')
  93. endTime = None
  94. if not self.common.isNone(conf.dataContract.dataFilter.endTime):
  95. endTime = conf.dataContract.dataFilter.endTime
  96. else:
  97. endTime = dataFrameMerge[Field_Time] .max().strftime(
  98. '%Y-%m-%d %H:%M:%S')
  99. beginTime = datetime.strptime(beginTime, '%Y-%m-%d %H:%M:%S')
  100. endTime = datetime.strptime(endTime, '%Y-%m-%d %H:%M:%S')
  101. new_frame = self.fullMonthIndex(
  102. beginTime, endTime, Field_NameOfTurbine, new_frame)
  103. return new_frame
  104. def plotByAllMonth(self, groupedDataFrame, outputAnalysisDir, farmName, fieldTurbineName, conf: Contract):
  105. title = '数据完整度检测(%)'
  106. # 根据场景决定索引和列的方向
  107. if len(set(groupedDataFrame[Field_YearMonth])) > len(set(groupedDataFrame[fieldTurbineName])):
  108. result = groupedDataFrame.pivot(
  109. values="数据完整度%", index=fieldTurbineName, columns=Field_YearMonth)
  110. x_labels = result.columns.tolist() # 月份
  111. y_labels = result.index.tolist() # 风机名
  112. x_axis_title = "日期"
  113. y_axis_title = "机组"
  114. # 构建最终的JSON对象
  115. json_output = {
  116. "analysisTypeCode": "数据完整度检测(%)",
  117. "engineCode": "",
  118. "engineTypeName": "",
  119. "xaixs": "日期",
  120. "yaixs": "机组",
  121. "data": [{
  122. "engineName": "",
  123. "engineCode": "",
  124. "title": f' 数据完整度%',
  125. "xData": x_labels,
  126. "yData": y_labels,
  127. "ZData": result.values.tolist(),
  128. }]
  129. }
  130. else:
  131. result = groupedDataFrame.pivot(
  132. values="数据完整度%", index=Field_YearMonth, columns=fieldTurbineName)
  133. x_labels = result.columns.tolist() # 风机名
  134. y_labels = result.index.tolist() # 月份
  135. x_axis_title = "机组"
  136. y_axis_title = "日期"
  137. # 构建最终的JSON对象
  138. json_output = {
  139. "analysisTypeCode": "数据完整度检测(%)",
  140. "engineCode": "",
  141. "engineTypeName": "",
  142. "xaixs": "机组",
  143. "yaixs": "日期",
  144. "data": [{
  145. "engineName": "",
  146. "engineCode": "",
  147. "title": f' 数据完整度%',
  148. "xData": x_labels,
  149. "yData": y_labels,
  150. "ZData": result.values.tolist(),
  151. }]
  152. }
  153. # # 创建热图
  154. # fig = go.Figure(data=go.Heatmap(
  155. # z=result.values,
  156. # x=x_labels,
  157. # y=y_labels,
  158. # colorscale='Viridis',
  159. # # colorbar=dict(title='数据完整度%'),
  160. # showscale=False, # 显示颜色条
  161. # text=result.values,
  162. # texttemplate="%{text}", # Format the text display inside cells
  163. # # hoverinfo='text'
  164. # ))
  165. # 创建热图
  166. fig = go.Figure(data=go.Heatmap(
  167. z=result.values,
  168. x=x_labels,
  169. y=y_labels,
  170. colorscale=[
  171. [0.0, 'rgb(255, 102, 102)'], # 柔和的红色
  172. [0.5, 'rgb(255, 102, 102)'],
  173. [0.5, 'rgb(255, 255, 153)'], # 柔和的黄色
  174. [0.85, 'rgb(255, 255, 153)'],
  175. [0.85, 'rgb(153, 255, 153)'], # 柔和的绿色
  176. [1.0, 'rgb(153, 255, 153)']
  177. ],
  178. zmin=0, # 设置颜色范围的最小值
  179. zmax=100, # 设置颜色范围的最大值
  180. showscale=True, # 显示颜色条
  181. text=result.values,
  182. texttemplate="%{text}", # Format the text display inside cells
  183. ))
  184. # 更新图形布局
  185. fig.update_layout(
  186. title={'text': title, 'x': 0.5},
  187. # xaxis_nticks=len(x_labels),
  188. xaxis=dict(tickmode='array', tickvals=x_labels,
  189. ticktext=x_labels, tickangle=-45, title=x_axis_title),
  190. yaxis=dict(tickmode='array', tickvals=y_labels,
  191. ticktext=y_labels, title=y_axis_title),
  192. # xaxis=dict(tickmode='array', tickvals=list(range(len(x_labels))), ticktext=x_labels, tickangle=-45, title=x_axis_title),
  193. # yaxis=dict(tickmode='array', tickvals=list(range(len(y_labels))), ticktext=y_labels, title=y_axis_title),
  194. autosize=True,
  195. # width=len(x_labels) * 80, # Adjust width and height as needed
  196. # height=len(y_labels) * 80,
  197. margin=dict(l=50, r=50, b=100, t=100), # 调整边距以确保标签完整显示
  198. # Transparent background to show cell borders
  199. plot_bgcolor='rgba(0,0,0,0)'
  200. )
  201. fig.update_traces(
  202. xgap=1,
  203. ygap=1
  204. )
  205. result_rows = []
  206. # 将JSON对象保存到文件
  207. output_json_path = os.path.join(outputAnalysisDir, f"Data_Integrity_Of_Second_Analyst.json")
  208. with open(output_json_path, 'w', encoding='utf-8') as f:
  209. import json
  210. json.dump(json_output, f, ensure_ascii=False, indent=4)
  211. # 保存图像
  212. pngFileName = f'{farmName}数据完整度分析.png'
  213. pngFilePath = os.path.join(outputAnalysisDir, pngFileName)
  214. fig.write_image(pngFilePath, scale=3)
  215. # 保存HTML
  216. # htmlFileName = f'{farmName}数据完整度分析.html'
  217. # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName)
  218. # fig.write_html(htmlFilePath)
  219. result_rows.append({
  220. Field_Return_TypeAnalyst: self.typeAnalyst(),
  221. Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
  222. Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
  223. Field_CodeOfTurbine: Const_Output_Total,
  224. Field_Return_FilePath: pngFilePath,
  225. Field_Return_IsSaveDatabase: False
  226. })
  227. result_rows.append({
  228. Field_Return_TypeAnalyst: self.typeAnalyst(),
  229. Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
  230. Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
  231. Field_CodeOfTurbine: Const_Output_Total,
  232. Field_MillTypeCode: 'total',
  233. Field_Return_FilePath: output_json_path,
  234. Field_Return_IsSaveDatabase: True
  235. })
  236. result_df = pd.DataFrame(result_rows)
  237. return result_df