123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- import os
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- import seaborn as sns
- import plotly.graph_objects as go
- from plotly.subplots import make_subplots
- from geopy.distance import geodesic
- from behavior.analystNotFilter import AnalystNotFilter
- from utils.directoryUtil import DirectoryUtil as dir
- from algorithmContract.confBusiness import *
- import calendar
- import random
- from datetime import datetime
- from algorithmContract.contract import Contract
- class DataIntegrityOfSecondAnalyst(AnalystNotFilter):
- """
- 风电机组秒级数据完整度分析
- """
- def typeAnalyst(self):
- return "data_integrity_second"
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- select = [Field_DeviceCode, Field_Time, Field_ActiverPower, Field_WindSpeed, Field_NacPos, Field_WindDirection, Field_RotorSpeed, Field_GeneratorSpeed, Field_GeneratorTorque, Field_AngleIncluded, Field_EnvTemp, Field_NacTemp, Field_PitchAngel1, Field_PitchAngel2, Field_PitchAngel3]
- dictionary = self.processTurbineData(turbineCodes, conf, select)
- dataFrameOfTurbines = self.userDataFrame(
- dictionary, conf.dataContract.configAnalysis, self)
- turbrineInfos = self.common.getTurbineInfos(
- conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
- currDataFrameOfTurbines = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine].isin(
- turbineCodes)]
- # 将 currTurbineInfos 转换为字典
- currTurbineInfosDict = turbrineInfos.set_index(
- Field_CodeOfTurbine)[Field_NameOfTurbine].to_dict()
- # 使用 map 函数来填充 Field_NameOfTurbine 列
- currDataFrameOfTurbines[Field_NameOfTurbine] = currDataFrameOfTurbines[Field_CodeOfTurbine].map(
- currTurbineInfosDict).fillna("")
- groupedDataFrame = self.dataIntegrityByMonth(
- dataFrameOfTurbines, conf, Field_NameOfTurbine)
- print("groupedDataFrame : \n {}".format(groupedDataFrame.head()))
- return self.plotByAllMonth(groupedDataFrame, outputAnalysisDir, self.powerFarmInfo[Field_PowerFarmName].iloc[0], Field_NameOfTurbine, conf)
- def fullMonthIndex(self, start_time, end_time, turbine_name, new_frame):
- months = (end_time.year - start_time.year) * \
- 12 + end_time.month - start_time.month
- month_range = ['%04d-%02d' % (int(start_time.year + mon//12), int(mon % 12+1))
- for mon in range(start_time.month-1, start_time.month+months)]
- month_index = pd.DataFrame(month_range, columns=[Field_YearMonth])
- plot_res = pd.DataFrame()
- grouped = new_frame.groupby(turbine_name)
- for name, group in grouped:
- group = pd.merge(group, month_index,
- on=Field_YearMonth, how='outer')
- group['数据完整度%'] = group['数据完整度%'].fillna(0)
- group[turbine_name] = name
- group['year'] = group[Field_YearMonth].apply(
- lambda x: str(x).split('-')[0])
- group['month'] = group[Field_YearMonth].apply(
- lambda x: str(x).split('-')[1])
- plot_res = pd.concat([plot_res, group], axis=0, sort=False)
- return plot_res
-
- def get_time_space(self,df, time_str):
- """
- :return: 查询时间间隔(单位:秒)
- """
- df1 = pd.DataFrame(df[time_str])
- df1['chazhi'] = df1[time_str].shift(-1) - df1[time_str]
- result = df1.sample(int(df1.shape[0] / 100))['chazhi'].value_counts().idxmax().seconds
- del df1
- return result
- def dataIntegrityByMonth(self, dataFrameMerge: pd.DataFrame, conf: Contract, Field_NameOfTurbine):
- grouped = dataFrameMerge.groupby([dataFrameMerge.loc[:, Field_Time].dt.year.rename('year'),
- dataFrameMerge.loc[:, Field_Time].dt.month.rename(
- 'month'),
- dataFrameMerge.loc[:, Field_NameOfTurbine]]).agg({'count'})[Field_Time].rename({'count': '长度'}, axis=1)
- new_frame = grouped.reset_index('month')
- # timeGranularity = self.dataTransfer[self.dataTransfer[Field_TransferType] == Const_TimeGranularity_Second][Field_TimeGranularity].iloc[0] if self.typeAnalyst(
- # ) == "data_integrity_second" else self.dataTransfer[self.dataTransfer[Field_TransferType] == Const_TimeGranularity_Minute][Field_TimeGranularity].iloc[0]
- timeGranularity=self.get_time_space(dataFrameMerge,Field_Time)
- self.logger.info(f"{self.typeAnalyst()} timeGranularity-->{timeGranularity}")
- new_frame = new_frame.reset_index()
- new_frame['数据完整度'] = (100 * new_frame['长度'] / (new_frame.apply(lambda row: calendar.monthrange(
- row['year'], row['month'])[1] * 24 * 3600 / timeGranularity, axis=1))).round(decimals=0)
- new_frame = new_frame.rename(columns={'数据完整度': '数据完整度%'})
- new_frame['month'] = new_frame['month'].astype(
- str).apply(lambda x: x.zfill(2))
- new_frame[Field_YearMonth] = new_frame['year'].astype(
- str) + '-' + new_frame['month'].astype(str)
- beginTime = None
- if not self.common.isNone(conf.dataContract.dataFilter.beginTime):
- beginTime = conf.dataContract.dataFilter.beginTime
- else:
- beginTime = dataFrameMerge[Field_Time].min().strftime(
- '%Y-%m-%d %H:%M:%S')
- endTime = None
- if not self.common.isNone(conf.dataContract.dataFilter.endTime):
- endTime = conf.dataContract.dataFilter.endTime
- else:
- endTime = dataFrameMerge[Field_Time] .max().strftime(
- '%Y-%m-%d %H:%M:%S')
- beginTime = datetime.strptime(beginTime, '%Y-%m-%d %H:%M:%S')
- endTime = datetime.strptime(endTime, '%Y-%m-%d %H:%M:%S')
- new_frame = self.fullMonthIndex(
- beginTime, endTime, Field_NameOfTurbine, new_frame)
- return new_frame
- def plotByAllMonth(self, groupedDataFrame, outputAnalysisDir, farmName, fieldTurbineName, conf: Contract):
- title = '数据完整度检测(%)'
- # 根据场景决定索引和列的方向
- if len(set(groupedDataFrame[Field_YearMonth])) > len(set(groupedDataFrame[fieldTurbineName])):
- result = groupedDataFrame.pivot(
- values="数据完整度%", index=fieldTurbineName, columns=Field_YearMonth)
- x_labels = result.columns.tolist() # 月份
- y_labels = result.index.tolist() # 风机名
- x_axis_title = "日期"
- y_axis_title = "机组"
- # 构建最终的JSON对象
- json_output = {
- "analysisTypeCode": "数据完整度检测(%)",
- "engineCode": "",
- "engineTypeName": "",
- "xaixs": "日期",
- "yaixs": "机组",
- "data": [{
- "engineName": "",
- "engineCode": "",
- "title": f' 数据完整度%',
- "xData": x_labels,
- "yData": y_labels,
- "ZData": result.values.tolist(),
- }]
- }
- else:
- result = groupedDataFrame.pivot(
- values="数据完整度%", index=Field_YearMonth, columns=fieldTurbineName)
- x_labels = result.columns.tolist() # 风机名
- y_labels = result.index.tolist() # 月份
- x_axis_title = "机组"
- y_axis_title = "日期"
- # 构建最终的JSON对象
- json_output = {
- "analysisTypeCode": "数据完整度检测(%)",
- "engineCode": "",
- "engineTypeName": "",
- "xaixs": "机组",
- "yaixs": "日期",
- "data": [{
- "engineName": "",
- "engineCode": "",
- "title": f' 数据完整度%',
- "xData": x_labels,
- "yData": y_labels,
- "ZData": result.values.tolist(),
- }]
- }
- # # 创建热图
- # fig = go.Figure(data=go.Heatmap(
- # z=result.values,
- # x=x_labels,
- # y=y_labels,
- # colorscale='Viridis',
- # # colorbar=dict(title='数据完整度%'),
- # showscale=False, # 显示颜色条
- # text=result.values,
- # texttemplate="%{text}", # Format the text display inside cells
- # # hoverinfo='text'
- # ))
- # 创建热图
- fig = go.Figure(data=go.Heatmap(
- z=result.values,
- x=x_labels,
- y=y_labels,
- colorscale=[
- [0.0, 'rgb(255, 102, 102)'], # 柔和的红色
- [0.5, 'rgb(255, 102, 102)'],
- [0.5, 'rgb(255, 255, 153)'], # 柔和的黄色
- [0.85, 'rgb(255, 255, 153)'],
- [0.85, 'rgb(153, 255, 153)'], # 柔和的绿色
- [1.0, 'rgb(153, 255, 153)']
- ],
- zmin=0, # 设置颜色范围的最小值
- zmax=100, # 设置颜色范围的最大值
- showscale=True, # 显示颜色条
- text=result.values,
- texttemplate="%{text}", # Format the text display inside cells
- ))
- # 更新图形布局
- fig.update_layout(
- title={'text': title, 'x': 0.5},
- # xaxis_nticks=len(x_labels),
- xaxis=dict(tickmode='array', tickvals=x_labels,
- ticktext=x_labels, tickangle=-45, title=x_axis_title),
- yaxis=dict(tickmode='array', tickvals=y_labels,
- ticktext=y_labels, title=y_axis_title),
- # xaxis=dict(tickmode='array', tickvals=list(range(len(x_labels))), ticktext=x_labels, tickangle=-45, title=x_axis_title),
- # yaxis=dict(tickmode='array', tickvals=list(range(len(y_labels))), ticktext=y_labels, title=y_axis_title),
- autosize=True,
- # width=len(x_labels) * 80, # Adjust width and height as needed
- # height=len(y_labels) * 80,
- margin=dict(l=50, r=50, b=100, t=100), # 调整边距以确保标签完整显示
- # Transparent background to show cell borders
- plot_bgcolor='rgba(0,0,0,0)'
- )
- fig.update_traces(
- xgap=1,
- ygap=1
- )
- result_rows = []
- # 将JSON对象保存到文件
- output_json_path = os.path.join(outputAnalysisDir, f"Data_Integrity_Of_Second_Analyst.json")
- with open(output_json_path, 'w', encoding='utf-8') as f:
- import json
- json.dump(json_output, f, ensure_ascii=False, indent=4)
- # 保存图像
- pngFileName = f'{farmName}数据完整度分析.png'
- pngFilePath = os.path.join(outputAnalysisDir, pngFileName)
- fig.write_image(pngFilePath, scale=3)
- # 保存HTML
- # htmlFileName = f'{farmName}数据完整度分析.html'
- # htmlFilePath = os.path.join(outputAnalysisDir, htmlFileName)
- # fig.write_html(htmlFilePath)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: Const_Output_Total,
- Field_Return_FilePath: pngFilePath,
- Field_Return_IsSaveDatabase: False
- })
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: Const_Output_Total,
- Field_MillTypeCode: 'total',
- Field_Return_FilePath: output_json_path,
- Field_Return_IsSaveDatabase: True
- })
- result_df = pd.DataFrame(result_rows)
- return result_df
|