import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import plotly.graph_objects as go from plotly.subplots import make_subplots from geopy.distance import geodesic from behavior.analystNotFilter import AnalystNotFilter from utils.directoryUtil import DirectoryUtil as dir from algorithmContract.confBusiness import * import calendar import random from datetime import datetime from algorithmContract.contract import Contract class DataIntegrityOfSecondAnalyst(AnalystNotFilter): """ 风电机组秒级数据完整度分析 """ def typeAnalyst(self): return "data_integrity_second" def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): select = [Field_DeviceCode, Field_Time, Field_ActiverPower, Field_WindSpeed, Field_NacPos, Field_WindDirection, Field_RotorSpeed, Field_GeneratorSpeed, Field_GeneratorTorque, Field_AngleIncluded, Field_EnvTemp, Field_NacTemp, Field_PitchAngel1, Field_PitchAngel2, Field_PitchAngel3] dictionary = self.processTurbineData(turbineCodes, conf, select) dataFrameOfTurbines = self.userDataFrame( dictionary, conf.dataContract.configAnalysis, self) turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin( turbineCodes)] # 将 currTurbineInfos 转换为字典 currTurbineInfosDict = turbrineInfos.set_index( Field_CodeOfTurbine)[Field_NameOfTurbine].to_dict() # 使用 map 函数来填充 Field_NameOfTurbine 列 currDataFrameOfTurbines[Field_NameOfTurbine] = currDataFrameOfTurbines[Field_CodeOfTurbine].map( currTurbineInfosDict).fillna("") groupedDataFrame = self.dataIntegrityByMonth( dataFrameOfTurbines, conf, Field_NameOfTurbine) print("groupedDataFrame : \n {}".format(groupedDataFrame.head())) return self.plotByAllMonth(groupedDataFrame, outputAnalysisDir, self.powerFarmInfo[Field_PowerFarmName].iloc[0], Field_NameOfTurbine, conf) def fullMonthIndex(self, start_time, end_time, turbine_name, new_frame): months = (end_time.year - start_time.year) * \ 12 + end_time.month - start_time.month month_range = ['%04d-%02d' % (int(start_time.year + mon//12), int(mon % 12+1)) for mon in range(start_time.month-1, start_time.month+months)] month_index = pd.DataFrame(month_range, columns=[Field_YearMonth]) plot_res = pd.DataFrame() grouped = new_frame.groupby(turbine_name) for name, group in grouped: group = pd.merge(group, month_index, on=Field_YearMonth, how='outer') group['数据完整度%'] = group['数据完整度%'].fillna(0) group[turbine_name] = name group['year'] = group[Field_YearMonth].apply( lambda x: str(x).split('-')[0]) group['month'] = group[Field_YearMonth].apply( lambda x: str(x).split('-')[1]) plot_res = pd.concat([plot_res, group], axis=0, sort=False) return plot_res def get_time_space(self,df, time_str): """ :return: 查询时间间隔(单位:秒) """ df1 = pd.DataFrame(df[time_str]) df1['chazhi'] = df1[time_str].shift(-1) - df1[time_str] result = df1.sample(int(df1.shape[0] / 100))['chazhi'].value_counts().idxmax().seconds del df1 return result def dataIntegrityByMonth(self, dataFrameMerge: pd.DataFrame, conf: Contract, Field_NameOfTurbine): grouped = dataFrameMerge.groupby([dataFrameMerge.loc[:, Field_Time].dt.year.rename('year'), dataFrameMerge.loc[:, Field_Time].dt.month.rename( 'month'), dataFrameMerge.loc[:, Field_NameOfTurbine]]).agg({'count'})[Field_Time].rename({'count': '长度'}, axis=1) new_frame = grouped.reset_index('month') # timeGranularity = self.dataTransfer[self.dataTransfer[Field_TransferType] == Const_TimeGranularity_Second][Field_TimeGranularity].iloc[0] if self.typeAnalyst( # ) == "data_integrity_second" else self.dataTransfer[self.dataTransfer[Field_TransferType] == Const_TimeGranularity_Minute][Field_TimeGranularity].iloc[0] timeGranularity=self.get_time_space(dataFrameMerge,Field_Time) self.logger.info(f"{self.typeAnalyst()} timeGranularity-->{timeGranularity}") new_frame = new_frame.reset_index() new_frame['数据完整度'] = (100 * new_frame['长度'] / (new_frame.apply(lambda row: calendar.monthrange( row['year'], row['month'])[1] * 24 * 3600 / timeGranularity, axis=1))).round(decimals=0) new_frame = new_frame.rename(columns={'数据完整度': '数据完整度%'}) new_frame['month'] = new_frame['month'].astype( str).apply(lambda x: x.zfill(2)) new_frame[Field_YearMonth] = new_frame['year'].astype( str) + '-' + new_frame['month'].astype(str) beginTime = None if not self.common.isNone(conf.dataContract.dataFilter.beginTime): beginTime = conf.dataContract.dataFilter.beginTime else: beginTime = dataFrameMerge[Field_Time].min().strftime( '%Y-%m-%d %H:%M:%S') endTime = None if not self.common.isNone(conf.dataContract.dataFilter.endTime): endTime = conf.dataContract.dataFilter.endTime else: endTime = dataFrameMerge[Field_Time] .max().strftime( '%Y-%m-%d %H:%M:%S') beginTime = datetime.strptime(beginTime, '%Y-%m-%d %H:%M:%S') endTime = datetime.strptime(endTime, '%Y-%m-%d %H:%M:%S') new_frame = self.fullMonthIndex( beginTime, endTime, Field_NameOfTurbine, new_frame) return new_frame def plotByAllMonth(self, groupedDataFrame, outputAnalysisDir, farmName, fieldTurbineName, conf: Contract): title = '数据完整度检测(%)' # 根据场景决定索引和列的方向 if len(set(groupedDataFrame[Field_YearMonth])) > len(set(groupedDataFrame[fieldTurbineName])): result = groupedDataFrame.pivot( values="数据完整度%", index=fieldTurbineName, columns=Field_YearMonth) x_labels = result.columns.tolist() # 月份 y_labels = result.index.tolist() # 风机名 x_axis_title = "日期" y_axis_title = "机组" # 构建最终的JSON对象 json_output = { "analysisTypeCode": "数据完整度检测(%)", "engineCode": "", "engineTypeName": "", "xaixs": "日期", "yaixs": "机组", "data": [{ "engineName": "", "engineCode": "", "title": f' 数据完整度%', "xData": x_labels, "yData": y_labels, "ZData": result.values.tolist(), }] } else: result = groupedDataFrame.pivot( values="数据完整度%", index=Field_YearMonth, columns=fieldTurbineName) x_labels = result.columns.tolist() # 风机名 y_labels = result.index.tolist() # 月份 x_axis_title = "机组" y_axis_title = "日期" # 构建最终的JSON对象 json_output = { "analysisTypeCode": "数据完整度检测(%)", "engineCode": "", "engineTypeName": "", "xaixs": "机组", "yaixs": "日期", "data": [{ "engineName": "", "engineCode": "", "title": f' 数据完整度%', "xData": x_labels, "yData": y_labels, "ZData": result.values.tolist(), }] } # # 创建热图 # fig = go.Figure(data=go.Heatmap( # z=result.values, # x=x_labels, # y=y_labels, # colorscale='Viridis', # # colorbar=dict(title='数据完整度%'), # showscale=False, # 显示颜色条 # text=result.values, # texttemplate="%{text}", # Format the text display inside cells # # hoverinfo='text' # )) # 创建热图 fig = go.Figure(data=go.Heatmap( z=result.values, x=x_labels, y=y_labels, colorscale=[ [0.0, 'rgb(255, 102, 102)'], # 柔和的红色 [0.5, 'rgb(255, 102, 102)'], [0.5, 'rgb(255, 255, 153)'], # 柔和的黄色 [0.85, 'rgb(255, 255, 153)'], [0.85, 'rgb(153, 255, 153)'], # 柔和的绿色 [1.0, 'rgb(153, 255, 153)'] ], zmin=0, # 设置颜色范围的最小值 zmax=100, # 设置颜色范围的最大值 showscale=True, # 显示颜色条 text=result.values, texttemplate="%{text}", # Format the text display inside cells )) # 更新图形布局 fig.update_layout( title={'text': title, 'x': 0.5}, # xaxis_nticks=len(x_labels), xaxis=dict(tickmode='array', tickvals=x_labels, ticktext=x_labels, tickangle=-45, title=x_axis_title), yaxis=dict(tickmode='array', tickvals=y_labels, ticktext=y_labels, title=y_axis_title), # xaxis=dict(tickmode='array', tickvals=list(range(len(x_labels))), ticktext=x_labels, tickangle=-45, title=x_axis_title), # yaxis=dict(tickmode='array', tickvals=list(range(len(y_labels))), ticktext=y_labels, title=y_axis_title), autosize=True, # width=len(x_labels) * 80, # Adjust width and height as needed # height=len(y_labels) * 80, margin=dict(l=50, r=50, b=100, t=100), # 调整边距以确保标签完整显示 # Transparent background to show cell borders plot_bgcolor='rgba(0,0,0,0)' ) fig.update_traces( xgap=1, ygap=1 ) result_rows = [] # 将JSON对象保存到文件 output_json_path = os.path.join(outputAnalysisDir, f"Data_Integrity_Of_Second_Analyst.json") with open(output_json_path, 'w', encoding='utf-8') as f: import json json.dump(json_output, f, ensure_ascii=False, indent=4) # 保存图像 # pngFileName = f'{farmName}数据完整度分析.png' # pngFilePath = os.path.join(outputAnalysisDir, pngFileName) # fig.write_image(pngFilePath, scale=3) # 保存HTML # htmlFileName = f'{farmName}数据完整度分析.html' # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName) # fig.write_html(htmlFilePath) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: Const_Output_Total, # Field_Return_FilePath: pngFilePath, # Field_Return_IsSaveDatabase: False # }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: Const_Output_Total, Field_MillTypeCode: 'total', Field_Return_FilePath: output_json_path, Field_Return_IsSaveDatabase: True }) result_df = pd.DataFrame(result_rows) return result_df