import os import pandas as pd import numpy as np import plotly.graph_objects as go from plotly.subplots import make_subplots from behavior.analystNotFilter import AnalystNotFilter from utils.directoryUtil import DirectoryUtil as dir import matplotlib.pyplot as plt from algorithmContract.confBusiness import * from algorithmContract.contract import Contract from utils.jsonUtil import JsonUtil class WindDirectionFrequencyAnalyst(AnalystNotFilter): def typeAnalyst(self): return "wind_direction_frequency" ''' def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary=self.processTurbineData(turbineCodes,conf,[Field_DeviceCode,Field_Time,Field_WindDirection,Field_WindSpeed,Field_ActiverPower]) # 获取机型信息 turbrineInfos = self.common.getTurbineInfos( conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) dataFrameMerge=self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self) frequency_data=self.windRoseAnalysis(dataFrameMerge, outputAnalysisDir, conf) returnDatas=[] returnDatas.append(frequency_data) returnJsonData= self.outputJsonData(conf,outputAnalysisDir,turbrineInfos,dataFrameMerge) returnDatas.append(returnJsonData) return returnDatas ''' def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes): dictionary = self.processTurbineData(turbineCodes, conf, [Field_DeviceCode, Field_Time, Field_WindDirection, Field_WindSpeed, Field_ActiverPower]) turbineInfos = self.common.getTurbineInfos(conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo) dataFrameMerge = self.userDataFrame(dictionary, conf.dataContract.configAnalysis, self) results=self.windRoseAnalysis(dataFrameMerge, outputAnalysisDir, conf) returnJsonData = self.outputJsonData(conf, outputAnalysisDir, turbineInfos, dataFrameMerge) returnDatas=pd.concat([results,returnJsonData], axis=0, ignore_index=True) return returnDatas ''' def windRoseAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract): # 检查所需列是否存在 required_columns = {Field_WindDirection, Field_WindSpeed} if not required_columns.issubset(dataFrameMerge.columns) or dataFrameMerge[Field_WindDirection].isna().all() or dataFrameMerge[Field_WindSpeed].isna().all(): msg=f"DataFrame缺少必要的列及值。涉及的列有: {required_columns}" # raise ValueError(msg) self.logger.warning(msg) return pd.DataFrame() # 风速区间 bins = [0, 3, 6, 9, np.inf] speed_labels = ['[0,3)', '[3,6)', '[6,9)', '>=9'] wind_directions = np.arange(0, 360, 22.5) colorscale = { '[0,3)': 'rgba(247.0, 251.0, 255.0, 1.0)', '[3,6)': 'rgba(171.33333333333334, 207.66666666666666, 229.66666666666669, 1.0)', '[6,9)': 'rgba(55.0, 135.0, 192.33333333333334, 1.0)', '>=9': 'rgba(8.0, 48.0, 107.0, 1.0)' } # 按设备名分组数据 grouped = dataFrameMerge.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) result_rows = [] for name, group in grouped: speed_bins = pd.cut( group[Field_WindSpeed], bins=bins, labels=speed_labels) # 调整风向数据以使东方为0度 # adjusted_wind_dir = (group[Field_WindDirection] - 90) % 360 # group['风向分组'] = pd.cut(adjusted_wind_dir, bins=wind_directions, labels=wind_directions[:-1]) group['风向分组'] = pd.cut( group[Field_WindDirection], bins=wind_directions, labels=wind_directions[:-1]) # 初始化子图,设定为极坐标 fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'polar'}]]) for label in speed_labels: subset = group[speed_bins == label] counts = subset['风向分组'].value_counts().reindex( wind_directions[:-1], fill_value=0) # 转换为百分比 percentage = (counts / counts.sum()) * 100 # 创建 Barpolar 跟踪,并应用单色渐变 trace = go.Barpolar( r=percentage.values, theta=counts.index, # 这里的角度已经适配上北下南左西右东的布局 name=label, marker_color=colorscale[label], # 应用颜色尺度 marker_showscale=False, # 不显示颜色条 marker_line_color='white', # 设置线条颜色,增加扇区之间的分隔 marker_line_width=1 # 设置线条宽度 ) fig.add_trace(trace) # 设置图表的一些基本属性 fig.update_layout( title={ "text": f"机组: {name[0]}", #"x": 0.5 }, polar=dict( radialaxis=dict(visible=True), angularaxis=dict( tickmode="array", tickvals=wind_directions, # 明确标注北、东、南、西等方向,以适应以北为0度的布局 ticktext=['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] # 更新角度标签,以适应以东为0度的布局 # ticktext=['E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW', 'N', 'NNE', 'NE', 'ENE'] ) ), legend_title="风速", margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距 ) # 保存图像 filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png") fig.write_image(filePathOfImage, scale=3) filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html") fig.write_html(filePathOfHtml) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfHtml, Field_Return_IsSaveDatabase: True }) result_df = pd.DataFrame(result_rows) return result_df ''' def windRoseAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract): # 检查所需列是否存在 required_columns = {Field_WindDirection, Field_WindSpeed} if not required_columns.issubset(dataFrameMerge.columns) or dataFrameMerge[Field_WindDirection].isna().all() or dataFrameMerge[Field_WindSpeed].isna().all(): msg = f"DataFrame缺少必要的列及值。涉及的列有: {required_columns}" self.logger.warning(msg) return pd.DataFrame() # 定义类属性 self.frequency_data self.frequency_data = {} # 风速区间 bins = [0, 3, 6, 9, np.inf] speed_labels = ['[0,3)', '[3,6)', '[6,9)', '>=9'] wind_directions = np.arange(0, 360, 22.5) # 设置颜色 colorscale = { '[0,3)': 'rgba(247.0, 251.0, 255.0, 1.0)', '[3,6)': 'rgba(171.33333333333334, 207.66666666666666, 229.66666666666669, 1.0)', '[6,9)': 'rgba(55.0, 135.0, 192.33333333333334, 1.0)', '>=9': 'rgba(8.0, 48.0, 107.0, 1.0)' } # 按设备名分组数据 grouped = dataFrameMerge.groupby([Field_NameOfTurbine, Field_CodeOfTurbine]) result_rows = [] for name, group in grouped: turbine_code = name[1] self.frequency_data[turbine_code] = {} self.frequency_data[turbine_code]['title'] = f"机组: {name[0]}" # 调整风向数据以使北方为0度 # adjusted_wind_dir = (group[Field_WindDirection] - 90) % 360 group['风向分组'] = pd.cut( group[Field_WindDirection], bins=wind_directions, labels=wind_directions[:-1]) # 初始化子图,设定为极坐标 fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'polar'}]]) # 计算每个风向分组在各个风速区间内的频率 for label in speed_labels: subset = group[pd.cut(group[Field_WindSpeed], bins=bins, labels=speed_labels) == label] counts = subset['风向分组'].value_counts().reindex(wind_directions[:-1], fill_value=0) percentage = (counts / counts.sum()) * 100 self.frequency_data[turbine_code][label] = percentage.to_dict() # 创建 Barpolar 跟踪,并应用单色渐变 trace = go.Barpolar( r=percentage.values, theta=counts.index, # 这里的角度已经适配上北下南左西右东的布局 name=label, marker_color=colorscale[label], # 应用颜色尺度 marker_showscale=False, # 不显示颜色条 marker_line_color='white', # 设置线条颜色,增加扇区之间的分隔 marker_line_width=1 # 设置线条宽度 ) fig.add_trace(trace) # 设置图表的一些基本属性 fig.update_layout( title={ "text": f"机组: {name[0]}", #"x": 0.5 }, polar=dict( radialaxis=dict(visible=True), angularaxis=dict( tickmode="array", tickvals=wind_directions, # 明确标注北、东、南、西等方向,以适应以北为0度的布局 ticktext=['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] # 更新角度标签,以适应以东为0度的布局 # ticktext=['E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW', 'N', 'NNE', 'NE', 'ENE'] ) ), legend_title="风速", margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距 ) # 保存图像 filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png") fig.write_image(filePathOfImage, scale=3) # filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html") # fig.write_html(filePathOfHtml) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: name[1], Field_Return_FilePath: filePathOfImage, Field_Return_IsSaveDatabase: False }) # result_rows.append({ # Field_Return_TypeAnalyst: self.typeAnalyst(), # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, # Field_CodeOfTurbine: name[1], # Field_Return_FilePath: filePathOfHtml, # Field_Return_IsSaveDatabase: True # }) result_df = pd.DataFrame(result_rows) # 返回结果数据框 return result_df def outputJsonData(self, conf: Contract, outputAnalysisDir: str, turbineModelInfo: pd.Series, dataFrameMerge: pd.DataFrame) -> pd.DataFrame: turbineCodes = dataFrameMerge[Field_CodeOfTurbine].unique() result_rows = [] for turbineCode in turbineCodes: jsonDictionary = self.convert2Json(turbineModelInfo, turbineCodes=turbineCode, dataFrameOfTurbines=dataFrameMerge) jsonFileName = f"风向玫瑰图{turbineCode}.json" jsonFilePath = os.path.join(outputAnalysisDir, jsonFileName) JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: jsonFilePath, Field_Return_IsSaveDatabase: True }) returnDatas = pd.DataFrame(result_rows) return returnDatas def outputJsonData(self, conf: Contract, outputAnalysisDir: str, turbineModelInfo: pd.Series, dataFrameMerge: pd.DataFrame) -> pd.DataFrame: turbineCodes = dataFrameMerge[Field_CodeOfTurbine].unique() result_rows = [] for turbineCode in turbineCodes: if turbineCode in self.frequency_data: # 确保 enginName 是具体的值 enginName = dataFrameMerge[dataFrameMerge[Field_CodeOfTurbine] == turbineCode][Field_NameOfTurbine].iloc[0] if isinstance(enginName, pd.Series): enginName = enginName.iloc[0] jsonDictionary = self.convert2Json(turbineModelInfo, turbineCode, dataFrameMerge) jsonFileName = f"wind_direction_frequency{enginName}.json" jsonFilePath = os.path.join(outputAnalysisDir, jsonFileName) JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath) result_rows.append({ Field_Return_TypeAnalyst: self.typeAnalyst(), Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID, Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum, Field_CodeOfTurbine: turbineCode, Field_Return_FilePath: jsonFilePath, Field_Return_IsSaveDatabase: True }) returnDatas = pd.DataFrame(result_rows) return returnDatas def convert2Json(self, turbineModelInfo: pd.Series, turbineCode, dataFrameOfTurbines: pd.DataFrame): # 确保从 Series 中提取的是具体的值 engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "") if isinstance(engineTypeCode, pd.Series): engineTypeCode = engineTypeCode.iloc[0] engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "") if isinstance(engineTypeName, pd.Series): engineTypeName = engineTypeName.iloc[0] if turbineCode in self.frequency_data: # 确保 enginName 是具体的值 enginName = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine] == turbineCode][Field_NameOfTurbine].iloc[0] if isinstance(enginName, pd.Series): enginName = enginName.iloc[0] turbine_info = { "enginName": enginName, "enginCode": turbineCode, "title": self.frequency_data[turbineCode]['title'], "windRoseData": [] } for speed_label, direction_data in self.frequency_data[turbineCode].items(): if speed_label == 'title': continue for direction, freq in direction_data.items(): turbine_info["windRoseData"].append({ "windDirection": float(direction), "windSpeedRange": speed_label, "frequency": float(freq) }) result = { "analysisTypeCode": "风向玫瑰分析", "engineTypeCode": engineTypeCode, "engineTypeName": engineTypeName, "axes": { "radial": "频率百分比(%)", "angular": "风向", "levelname": "风速(m/s)" }, "data": [turbine_info] } return result return {}