| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import os
- import pandas as pd
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt
- import seaborn as sns
- import plotly.graph_objects as go
- from plotly.subplots import make_subplots
- from geopy.distance import geodesic
- from behavior.analyst import Analyst
- from utils.directoryUtil import DirectoryUtil as dir
- from algorithmContract.confBusiness import *
- import calendar
- import random
- class DataIntegrityOfSecondAnalyst(Analyst):
- """
- 风电机组秒级数据完整度分析
- """
- def typeAnalyst(self):
- return "data_integrity_second"
-
- def filterCommon(self,dataFrame:pd.DataFrame, confData:ConfBusiness):
- return dataFrame
- def turbinesAnalysis(self, dataFrameMerge, outputAnalysisDir, confData: ConfBusiness):
- groupedDataFrame = self.dataIntegrityByMonth(
- dataFrameMerge,confData, Field_NameOfTurbine )
- print("groupedDataFrame : \n {}".format(groupedDataFrame.head()))
- self.plotByAllMonth(groupedDataFrame, outputAnalysisDir,
- confData.farm_name, Field_NameOfTurbine)
-
- def generate_weighted_random(self):
- # 首先,尝试生成一个在91至100之间的随机数,这样大部分值都会落在这个区间
- if random.random() < 0.8: # 假设80%的随机数应该在91至100之间
- return random.randint(91, 100)
- else: # 剩下的20%则均匀分布在79至100之间(包括79但不包括100)
- return random.randint(79, 100)
-
- def fullMonthIndex(self,start_time,end_time,turbine_name,new_frame):
- months = (end_time.year - start_time.year)*12 + end_time.month - start_time.month
- month_range = ['%04d-%02d' % (int(start_time.year + mon//12), int(mon%12+1)) for mon in range(start_time.month-1, start_time.month+months)]
- month_index = pd.DataFrame(month_range,columns=[Field_YearMonth])
- plot_res = pd.DataFrame()
- grouped = new_frame.groupby(turbine_name)
- for name,group in grouped:
- group = pd.merge(group,month_index,on=Field_YearMonth,how='outer')
- group['数据完整度%'] = group['数据完整度%'].fillna(0)
- group[turbine_name] = name
- group['year'] = group[Field_YearMonth].apply(lambda x:str(x).split('-')[0])
- group['month'] = group[Field_YearMonth].apply(lambda x:str(x).split('-')[1])
- plot_res = pd.concat([plot_res,group],axis=0,sort=False)
- return plot_res
- def dataIntegrityByMonth(self, dataFrameMerge:pd.DataFrame, confData:ConfBusiness,fieldTurbineName):
- grouped = dataFrameMerge.groupby([dataFrameMerge.loc[:, confData.field_turbine_time].dt.year.rename('year'),
- dataFrameMerge.loc[:, confData.field_turbine_time].dt.month.rename(
- 'month'),
- dataFrameMerge.loc[:, fieldTurbineName]]).agg({'count'})[confData.field_turbine_time].rename({'count': '长度'}, axis=1)
-
- new_frame = grouped.reset_index('month')
- new_frame = new_frame.assign(数据完整度=(100 * new_frame['长度'] / (
- new_frame['month'].map(lambda x: calendar.mdays[x] * 24 * 3600 / confData.time_period))).round(decimals=0))
-
- # new_frame['数据完整度'] = [self.generate_weighted_random() for _ in range(len(new_frame))]
- new_frame = new_frame.rename(columns={'数据完整度': '数据完整度%'})
- new_frame = new_frame.reset_index()
- new_frame['month'] = new_frame['month'].astype(
- str).apply(lambda x: x.zfill(2))
- new_frame[Field_YearMonth] = new_frame['year'].astype(
- str) + '-' + new_frame['month'].astype(str)
-
- new_frame = self.fullMonthIndex(confData.start_time,confData.end_time,fieldTurbineName,new_frame)
- return new_frame
- def plotByAllMonth(self, groupedDataFrame, outputAnalysisDir, farmName, fieldTurbineName):
- title = 'time integrity check(%)'
- fig, ax = plt.subplots(figsize=(18, 15), dpi=300)
- # 风机数量小于月份
- if len(set(groupedDataFrame.loc[:, Field_YearMonth])) > len(set(groupedDataFrame.loc[:, fieldTurbineName])):
- result = pd.pivot(groupedDataFrame, index=fieldTurbineName,
- columns=Field_YearMonth, values="数据完整度%")
- ax = sns.heatmap(data=result, square=True, annot=True,
- linewidths=0.3, cbar=False, fmt='g',)
- bottom, top = ax.get_ylim()
- ax.set_ylim(bottom + 0.5, top - 0.5)
- ax.set_title(title)
- plt.setp(ax.get_yticklabels(), rotation=0)
- plt.setp(ax.get_xticklabels(), rotation=90)
- plt.savefig(outputAnalysisDir +
- r'/{}数据完整度分析.png'.format(farmName), bbox_inches='tight')
- plt.close()
- else:
- result = pd.pivot(groupedDataFrame, index=Field_YearMonth,
- columns=fieldTurbineName, values="数据完整度%")
- ax = sns.heatmap(data=result, square=True, annot=True,
- linewidths=0.3, cbar=False, fmt='g',)
- bottom, top = ax.get_ylim()
- ax.set_ylim(bottom + 0.5, top - 0.5)
- ax.set_title(title)
- plt.setp(ax.get_yticklabels(), rotation=0)
- plt.setp(ax.get_xticklabels(), rotation=90)
- # 设置x轴标签斜向展示
- plt.xticks(rotation=45) # 旋转45度
- plt.savefig(outputAnalysisDir +
- r'/{}数据完整度分析.png'.format(farmName), bbox_inches='tight')
- plt.close()
- def draw(self, groupedDataFrame, outputAnalysisDir, farmName, fieldTurbineName):
- fig = make_subplots(rows=1, cols=1)
- if len(set(groupedDataFrame[Field_YearMonth])) > len(set(groupedDataFrame[fieldTurbineName])):
- result = groupedDataFrame.pivot(
- index=fieldTurbineName, columns=Field_YearMonth, values="数据完整度%")
- fig.add_trace(
- go.Heatmap(
- z=result.values,
- x=result.columns,
- y=result.index,
- colorscale='Viridis',
- colorbar=dict(title='数据完整度%'),
- text=[[f"{value}" for value in row]
- for row in result.values], # 显示的文本(百分比)
- texttemplate="%{text}", # 使用文本模板
- yaxis=dict(
- tickformat="%Y-%m", # 设置y轴刻度的格式
- tickmode="array", # 如果需要,可以指定自定义的刻度标签
- tickvals=result.columns.strftime("%Y-%m").tolist() # 如果需要自定义刻度位置
- )
- )
- )
- else:
- result = groupedDataFrame.pivot(
- index=Field_YearMonth, columns=fieldTurbineName, values="数据完整度%")
- fig.add_trace(
- go.Heatmap(
- z=result.values,
- x=result.columns,
- y=result.index,
- colorscale='Viridis',
- colorbar=dict(title='数据完整度%'),
- text=[[f"{value}" for value in row]
- for row in result.values], # 显示的文本(百分比)
- texttemplate="%{text}", # 使用文本模板
- xaxis=dict(
- tickformat="%Y-%m", # 设置y轴刻度的格式
- tickmode="array", # 如果需要,可以指定自定义的刻度标签
- tickvals=result.index.strftime("%Y-%m").tolist() # 如果需要自定义刻度位置
- )
- )
- )
- fig.update_layout(
- title_text='{}-time integrity check(%)'.format(farmName),
- xaxis_nticks=36
- )
- fig.write_image(outputAnalysisDir + '/' +
- '{}数据完整度分析.png'.format(farmName))
|