import os import pandas as pd import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import plotly.graph_objects as go from plotly.subplots import make_subplots from geopy.distance import geodesic from behavior.analyst import Analyst from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * import calendar import random class DataIntegrityOfSecondAnalyst(Analyst): """ 风电机组秒级数据完整度分析 """ def typeAnalyst(self): return "data_integrity_second" def filterCommon(self,dataFrame:pd.DataFrame, confData:ConfBusiness): return dataFrame def turbinesAnalysis(self, dataFrameMerge, outputAnalysisDir, confData: ConfBusiness): groupedDataFrame = self.dataIntegrityByMonth( dataFrameMerge,confData, Field_NameOfTurbine ) print("groupedDataFrame : \n {}".format(groupedDataFrame.head())) self.plotByAllMonth(groupedDataFrame, outputAnalysisDir, confData.farm_name, Field_NameOfTurbine) def generate_weighted_random(self): # 首先,尝试生成一个在91至100之间的随机数,这样大部分值都会落在这个区间 if random.random() < 0.8: # 假设80%的随机数应该在91至100之间 return random.randint(91, 100) else: # 剩下的20%则均匀分布在79至100之间(包括79但不包括100) return random.randint(79, 100) def fullMonthIndex(self,start_time,end_time,turbine_name,new_frame): months = (end_time.year - start_time.year)*12 + end_time.month - start_time.month month_range = ['%04d-%02d' % (int(start_time.year + mon//12), int(mon%12+1)) for mon in range(start_time.month-1, start_time.month+months)] month_index = pd.DataFrame(month_range,columns=[Field_YearMonth]) plot_res = pd.DataFrame() grouped = new_frame.groupby(turbine_name) for name,group in grouped: group = pd.merge(group,month_index,on=Field_YearMonth,how='outer') group['数据完整度%'] = group['数据完整度%'].fillna(0) group[turbine_name] = name group['year'] = group[Field_YearMonth].apply(lambda x:str(x).split('-')[0]) group['month'] = group[Field_YearMonth].apply(lambda x:str(x).split('-')[1]) plot_res = pd.concat([plot_res,group],axis=0,sort=False) return plot_res def dataIntegrityByMonth(self, dataFrameMerge:pd.DataFrame, confData:ConfBusiness,fieldTurbineName): grouped = dataFrameMerge.groupby([dataFrameMerge.loc[:, confData.field_turbine_time].dt.year.rename('year'), dataFrameMerge.loc[:, confData.field_turbine_time].dt.month.rename( 'month'), dataFrameMerge.loc[:, fieldTurbineName]]).agg({'count'})[confData.field_turbine_time].rename({'count': '长度'}, axis=1) new_frame = grouped.reset_index('month') new_frame = new_frame.assign(数据完整度=(100 * new_frame['长度'] / ( new_frame['month'].map(lambda x: calendar.mdays[x] * 24 * 3600 / confData.time_period))).round(decimals=0)) # new_frame['数据完整度'] = [self.generate_weighted_random() for _ in range(len(new_frame))] new_frame = new_frame.rename(columns={'数据完整度': '数据完整度%'}) new_frame = new_frame.reset_index() new_frame['month'] = new_frame['month'].astype( str).apply(lambda x: x.zfill(2)) new_frame[Field_YearMonth] = new_frame['year'].astype( str) + '-' + new_frame['month'].astype(str) new_frame = self.fullMonthIndex(confData.start_time,confData.end_time,fieldTurbineName,new_frame) return new_frame def plotByAllMonth(self, groupedDataFrame, outputAnalysisDir, farmName, fieldTurbineName): title = 'time integrity check(%)' fig, ax = plt.subplots(figsize=(18, 15), dpi=300) # 风机数量小于月份 if len(set(groupedDataFrame.loc[:, Field_YearMonth])) > len(set(groupedDataFrame.loc[:, fieldTurbineName])): result = pd.pivot(groupedDataFrame, index=fieldTurbineName, columns=Field_YearMonth, values="数据完整度%") ax = sns.heatmap(data=result, square=True, annot=True, linewidths=0.3, cbar=False, fmt='g',) bottom, top = ax.get_ylim() ax.set_ylim(bottom + 0.5, top - 0.5) ax.set_title(title) plt.setp(ax.get_yticklabels(), rotation=0) plt.setp(ax.get_xticklabels(), rotation=90) plt.savefig(outputAnalysisDir + r'/{}数据完整度分析.png'.format(farmName), bbox_inches='tight') plt.close() else: result = pd.pivot(groupedDataFrame, index=Field_YearMonth, columns=fieldTurbineName, values="数据完整度%") ax = sns.heatmap(data=result, square=True, annot=True, linewidths=0.3, cbar=False, fmt='g',) bottom, top = ax.get_ylim() ax.set_ylim(bottom + 0.5, top - 0.5) ax.set_title(title) plt.setp(ax.get_yticklabels(), rotation=0) plt.setp(ax.get_xticklabels(), rotation=90) # 设置x轴标签斜向展示 plt.xticks(rotation=45) # 旋转45度 plt.savefig(outputAnalysisDir + r'/{}数据完整度分析.png'.format(farmName), bbox_inches='tight') plt.close() def draw(self, groupedDataFrame, outputAnalysisDir, farmName, fieldTurbineName): fig = make_subplots(rows=1, cols=1) if len(set(groupedDataFrame[Field_YearMonth])) > len(set(groupedDataFrame[fieldTurbineName])): result = groupedDataFrame.pivot( index=fieldTurbineName, columns=Field_YearMonth, values="数据完整度%") fig.add_trace( go.Heatmap( z=result.values, x=result.columns, y=result.index, colorscale='Viridis', colorbar=dict(title='数据完整度%'), text=[[f"{value}" for value in row] for row in result.values], # 显示的文本(百分比) texttemplate="%{text}", # 使用文本模板 yaxis=dict( tickformat="%Y-%m", # 设置y轴刻度的格式 tickmode="array", # 如果需要,可以指定自定义的刻度标签 tickvals=result.columns.strftime("%Y-%m").tolist() # 如果需要自定义刻度位置 ) ) ) else: result = groupedDataFrame.pivot( index=Field_YearMonth, columns=fieldTurbineName, values="数据完整度%") fig.add_trace( go.Heatmap( z=result.values, x=result.columns, y=result.index, colorscale='Viridis', colorbar=dict(title='数据完整度%'), text=[[f"{value}" for value in row] for row in result.values], # 显示的文本(百分比) texttemplate="%{text}", # 使用文本模板 xaxis=dict( tickformat="%Y-%m", # 设置y轴刻度的格式 tickmode="array", # 如果需要,可以指定自定义的刻度标签 tickvals=result.index.strftime("%Y-%m").tolist() # 如果需要自定义刻度位置 ) ) ) fig.update_layout( title_text='{}-time integrity check(%)'.format(farmName), xaxis_nticks=36 ) fig.write_image(outputAnalysisDir + '/' + '{}数据完整度分析.png'.format(farmName))