123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- import os
- import pandas as pd
- import numpy as np
- import plotly.graph_objects as go
- from plotly.subplots import make_subplots
- from behavior.analystNotFilter import AnalystNotFilter
- from utils.directoryUtil import DirectoryUtil as dir
- import matplotlib.pyplot as plt
- from algorithmContract.confBusiness import *
- from algorithmContract.contract import Contract
- from utils.jsonUtil import JsonUtil
- class WindDirectionFrequencyAnalyst(AnalystNotFilter):
- def typeAnalyst(self):
- return "wind_direction_frequency"
- '''
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- dictionary=self.processTurbineData(turbineCodes,conf,[Field_DeviceCode,Field_Time,Field_WindDirection,Field_WindSpeed,Field_ActiverPower])
- # 获取机型信息
- turbrineInfos = self.common.getTurbineInfos(
- conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
- dataFrameMerge=self.userDataFrame(dictionary,conf.dataContract.configAnalysis,self)
- frequency_data=self.windRoseAnalysis(dataFrameMerge, outputAnalysisDir, conf)
- returnDatas=[]
- returnDatas.append(frequency_data)
- returnJsonData= self.outputJsonData(conf,outputAnalysisDir,turbrineInfos,dataFrameMerge)
- returnDatas.append(returnJsonData)
- return returnDatas
- '''
- def turbinesAnalysis(self, outputAnalysisDir, conf: Contract, turbineCodes):
- dictionary = self.processTurbineData(turbineCodes, conf, [Field_DeviceCode, Field_Time, Field_WindDirection, Field_WindSpeed, Field_ActiverPower])
- turbineInfos = self.common.getTurbineInfos(conf.dataContract.dataFilter.powerFarmID, turbineCodes, self.turbineInfo)
- dataFrameMerge = self.userDataFrame(dictionary, conf.dataContract.configAnalysis, self)
- results=self.windRoseAnalysis(dataFrameMerge, outputAnalysisDir, conf)
- returnJsonData = self.outputJsonData(conf, outputAnalysisDir, turbineInfos, dataFrameMerge)
-
- returnDatas=pd.concat([results,returnJsonData], axis=0, ignore_index=True)
- return returnDatas
- '''
- def windRoseAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract):
- # 检查所需列是否存在
- required_columns = {Field_WindDirection, Field_WindSpeed}
- if not required_columns.issubset(dataFrameMerge.columns) or dataFrameMerge[Field_WindDirection].isna().all() or dataFrameMerge[Field_WindSpeed].isna().all():
- msg=f"DataFrame缺少必要的列及值。涉及的列有: {required_columns}"
- # raise ValueError(msg)
- self.logger.warning(msg)
- return pd.DataFrame()
- # 风速区间
- bins = [0, 3, 6, 9, np.inf]
- speed_labels = ['[0,3)', '[3,6)', '[6,9)', '>=9']
- wind_directions = np.arange(0, 360, 22.5)
- colorscale = {
- '[0,3)': 'rgba(247.0, 251.0, 255.0, 1.0)',
- '[3,6)': 'rgba(171.33333333333334, 207.66666666666666, 229.66666666666669, 1.0)',
- '[6,9)': 'rgba(55.0, 135.0, 192.33333333333334, 1.0)',
- '>=9': 'rgba(8.0, 48.0, 107.0, 1.0)'
- }
- # 按设备名分组数据
- grouped = dataFrameMerge.groupby([Field_NameOfTurbine, Field_CodeOfTurbine])
- result_rows = []
- for name, group in grouped:
- speed_bins = pd.cut(
- group[Field_WindSpeed], bins=bins, labels=speed_labels)
- # 调整风向数据以使东方为0度
- # adjusted_wind_dir = (group[Field_WindDirection] - 90) % 360
- # group['风向分组'] = pd.cut(adjusted_wind_dir, bins=wind_directions, labels=wind_directions[:-1])
- group['风向分组'] = pd.cut(
- group[Field_WindDirection], bins=wind_directions, labels=wind_directions[:-1])
- # 初始化子图,设定为极坐标
- fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'polar'}]])
- for label in speed_labels:
- subset = group[speed_bins == label]
- counts = subset['风向分组'].value_counts().reindex(
- wind_directions[:-1], fill_value=0)
- # 转换为百分比
- percentage = (counts / counts.sum()) * 100
- # 创建 Barpolar 跟踪,并应用单色渐变
- trace = go.Barpolar(
- r=percentage.values,
- theta=counts.index, # 这里的角度已经适配上北下南左西右东的布局
- name=label,
- marker_color=colorscale[label], # 应用颜色尺度
- marker_showscale=False, # 不显示颜色条
- marker_line_color='white', # 设置线条颜色,增加扇区之间的分隔
- marker_line_width=1 # 设置线条宽度
- )
- fig.add_trace(trace)
- # 设置图表的一些基本属性
- fig.update_layout(
- title={
- "text": f"机组: {name[0]}",
- #"x": 0.5
- },
- polar=dict(
- radialaxis=dict(visible=True),
- angularaxis=dict(
- tickmode="array",
- tickvals=wind_directions,
- # 明确标注北、东、南、西等方向,以适应以北为0度的布局
- ticktext=['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE',
- 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW']
- # 更新角度标签,以适应以东为0度的布局
- # ticktext=['E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW', 'N', 'NNE', 'NE', 'ENE']
- )
- ),
- legend_title="风速",
- margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距
- )
- # 保存图像
- filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png")
- fig.write_image(filePathOfImage, scale=3)
- filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html")
- fig.write_html(filePathOfHtml)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: filePathOfImage,
- Field_Return_IsSaveDatabase: False
- })
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: filePathOfHtml,
- Field_Return_IsSaveDatabase: True
- })
- result_df = pd.DataFrame(result_rows)
- return result_df
- '''
- def windRoseAnalysis(self, dataFrameMerge: pd.DataFrame, outputAnalysisDir, conf: Contract):
- # 检查所需列是否存在
- required_columns = {Field_WindDirection, Field_WindSpeed}
- if not required_columns.issubset(dataFrameMerge.columns) or dataFrameMerge[Field_WindDirection].isna().all() or dataFrameMerge[Field_WindSpeed].isna().all():
- msg = f"DataFrame缺少必要的列及值。涉及的列有: {required_columns}"
- self.logger.warning(msg)
- return pd.DataFrame()
- # 定义类属性 self.frequency_data
- self.frequency_data = {}
- # 风速区间
- bins = [0, 3, 6, 9, np.inf]
- speed_labels = ['[0,3)', '[3,6)', '[6,9)', '>=9']
- wind_directions = np.arange(0, 360, 22.5)
- # 设置颜色
- colorscale = {
- '[0,3)': 'rgba(247.0, 251.0, 255.0, 1.0)',
- '[3,6)': 'rgba(171.33333333333334, 207.66666666666666, 229.66666666666669, 1.0)',
- '[6,9)': 'rgba(55.0, 135.0, 192.33333333333334, 1.0)',
- '>=9': 'rgba(8.0, 48.0, 107.0, 1.0)'
- }
- # 按设备名分组数据
- grouped = dataFrameMerge.groupby([Field_NameOfTurbine, Field_CodeOfTurbine])
- result_rows = []
- for name, group in grouped:
- turbine_code = name[1]
- self.frequency_data[turbine_code] = {}
- self.frequency_data[turbine_code]['title'] = f"机组: {name[0]}"
- # 调整风向数据以使北方为0度
- # adjusted_wind_dir = (group[Field_WindDirection] - 90) % 360
- group['风向分组'] = pd.cut( group[Field_WindDirection], bins=wind_directions, labels=wind_directions[:-1])
- # 初始化子图,设定为极坐标
- fig = make_subplots(rows=1, cols=1, specs=[[{'type': 'polar'}]])
- # 计算每个风向分组在各个风速区间内的频率
- for label in speed_labels:
- subset = group[pd.cut(group[Field_WindSpeed], bins=bins, labels=speed_labels) == label]
- counts = subset['风向分组'].value_counts().reindex(wind_directions[:-1], fill_value=0)
- percentage = (counts / counts.sum()) * 100
- self.frequency_data[turbine_code][label] = percentage.to_dict()
- # 创建 Barpolar 跟踪,并应用单色渐变
- trace = go.Barpolar(
- r=percentage.values,
- theta=counts.index, # 这里的角度已经适配上北下南左西右东的布局
- name=label,
- marker_color=colorscale[label], # 应用颜色尺度
- marker_showscale=False, # 不显示颜色条
- marker_line_color='white', # 设置线条颜色,增加扇区之间的分隔
- marker_line_width=1 # 设置线条宽度
- )
- fig.add_trace(trace)
- # 设置图表的一些基本属性
- fig.update_layout(
- title={
- "text": f"机组: {name[0]}",
- #"x": 0.5
- },
- polar=dict(
- radialaxis=dict(visible=True),
- angularaxis=dict(
- tickmode="array",
- tickvals=wind_directions,
- # 明确标注北、东、南、西等方向,以适应以北为0度的布局
- ticktext=['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE',
- 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW']
- # 更新角度标签,以适应以东为0度的布局
- # ticktext=['E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW', 'N', 'NNE', 'NE', 'ENE']
- )
- ),
- legend_title="风速",
- margin=dict(t=50, b=10) # t为顶部(top)间距,b为底部(bottom)间距
- )
- # 保存图像
- filePathOfImage = os.path.join(outputAnalysisDir, f"{name[0]}.png")
- fig.write_image(filePathOfImage, scale=3)
- # filePathOfHtml = os.path.join(outputAnalysisDir, f"{name[0]}.html")
- # fig.write_html(filePathOfHtml)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: name[1],
- Field_Return_FilePath: filePathOfImage,
- Field_Return_IsSaveDatabase: False
- })
- # result_rows.append({
- # Field_Return_TypeAnalyst: self.typeAnalyst(),
- # Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- # Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- # Field_CodeOfTurbine: name[1],
- # Field_Return_FilePath: filePathOfHtml,
- # Field_Return_IsSaveDatabase: True
- # })
- result_df = pd.DataFrame(result_rows)
- # 返回结果数据框
- return result_df
-
- def outputJsonData(self, conf: Contract, outputAnalysisDir: str, turbineModelInfo: pd.Series, dataFrameMerge: pd.DataFrame) -> pd.DataFrame:
- turbineCodes = dataFrameMerge[Field_CodeOfTurbine].unique()
-
- result_rows = []
- for turbineCode in turbineCodes:
- jsonDictionary = self.convert2Json(turbineModelInfo, turbineCodes=turbineCode, dataFrameOfTurbines=dataFrameMerge)
- jsonFileName = f"风向玫瑰图{turbineCode}.json"
- jsonFilePath = os.path.join(outputAnalysisDir, jsonFileName)
- JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: turbineCode,
- Field_Return_FilePath: jsonFilePath,
- Field_Return_IsSaveDatabase: True
- })
-
- returnDatas = pd.DataFrame(result_rows)
- return returnDatas
- def outputJsonData(self, conf: Contract, outputAnalysisDir: str, turbineModelInfo: pd.Series, dataFrameMerge: pd.DataFrame) -> pd.DataFrame:
- turbineCodes = dataFrameMerge[Field_CodeOfTurbine].unique()
-
- result_rows = []
- for turbineCode in turbineCodes:
- if turbineCode in self.frequency_data:
- # 确保 enginName 是具体的值
- enginName = dataFrameMerge[dataFrameMerge[Field_CodeOfTurbine] == turbineCode][Field_NameOfTurbine].iloc[0]
- if isinstance(enginName, pd.Series):
- enginName = enginName.iloc[0]
- jsonDictionary = self.convert2Json(turbineModelInfo, turbineCode, dataFrameMerge)
- jsonFileName = f"wind_direction_frequency{enginName}.json"
- jsonFilePath = os.path.join(outputAnalysisDir, jsonFileName)
- JsonUtil.write_json(jsonDictionary, file_path=jsonFilePath)
- result_rows.append({
- Field_Return_TypeAnalyst: self.typeAnalyst(),
- Field_PowerFarmCode: conf.dataContract.dataFilter.powerFarmID,
- Field_Return_BatchCode: conf.dataContract.dataFilter.dataBatchNum,
- Field_CodeOfTurbine: turbineCode,
- Field_Return_FilePath: jsonFilePath,
- Field_Return_IsSaveDatabase: True
- })
-
- returnDatas = pd.DataFrame(result_rows)
- return returnDatas
- def convert2Json(self, turbineModelInfo: pd.Series, turbineCode, dataFrameOfTurbines: pd.DataFrame):
- # 确保从 Series 中提取的是具体的值
- engineTypeCode = turbineModelInfo.get(Field_MillTypeCode, "")
- if isinstance(engineTypeCode, pd.Series):
- engineTypeCode = engineTypeCode.iloc[0]
- engineTypeName = turbineModelInfo.get(Field_MachineTypeCode, "")
- if isinstance(engineTypeName, pd.Series):
- engineTypeName = engineTypeName.iloc[0]
- if turbineCode in self.frequency_data:
- # 确保 enginName 是具体的值
- enginName = dataFrameOfTurbines[dataFrameOfTurbines[Field_CodeOfTurbine] == turbineCode][Field_NameOfTurbine].iloc[0]
- if isinstance(enginName, pd.Series):
- enginName = enginName.iloc[0]
- turbine_info = {
- "enginName": enginName,
- "enginCode": turbineCode,
- "title": self.frequency_data[turbineCode]['title'],
- "windRoseData": []
- }
- for speed_label, direction_data in self.frequency_data[turbineCode].items():
- if speed_label == 'title':
- continue
- for direction, freq in direction_data.items():
- turbine_info["windRoseData"].append({
- "windDirection": float(direction),
- "windSpeedRange": speed_label,
- "frequency": float(freq)
- })
- result = {
- "analysisTypeCode": "风向玫瑰分析",
- "engineTypeCode": engineTypeCode,
- "engineTypeName": engineTypeName,
- "axes": {
- "radial": "频率百分比(%)",
- "angular": "风向",
- "levelname": "风速(m/s)"
- },
- "data": [turbine_info]
- }
- return result
- return {}
|