import os import pandas as pd import numpy as np import plotly.graph_objects as go from plotly.subplots import make_subplots import seaborn as sns import matplotlib.pyplot as plt from .analyst import Analyst from .utils.directoryUtil import DirectoryUtil as dir from confBusiness import ConfBusiness class CpAnalyst(Analyst): """ 风电机组风能利用系数分析 """ def typeAnalyst(self): return "cp" def turbineAnalysis(self, dataFrame, outputAnalysisDir, outputFilePath, confData: ConfBusiness, turbineName): self.cp(dataFrame, outputFilePath, confData.field_wind_speed, confData.field_rotor_speed, confData.field_power, confData.field_pitch_angle1, confData.rotor_diameter, confData.density_air) def cp(self, dataFrame, output_path, wind_speed_col, field_rotor_speed, power_col, pitch_col, rotor_diameter, air_density): print('rotor_diameter={} air_density={}'.format( rotor_diameter, air_density)) dataFrame['power'] = dataFrame[power_col] # Alias the power column # Floor division by 10 and then multiply by 10 dataFrame['power_floor'] = (dataFrame[power_col] / 10).astype(int) * 10 dataFrame['wind_speed'] = dataFrame[wind_speed_col].astype(float) dataFrame['rotor_speed'] = dataFrame[field_rotor_speed].astype(float) # Power coefficient calculation dataFrame['power'] = pd.to_numeric(dataFrame['power'], errors='coerce') dataFrame['wind_speed'] = pd.to_numeric( dataFrame['wind_speed'], errors='coerce') rotor_diameter = pd.to_numeric(rotor_diameter, errors='coerce') air_density = pd.to_numeric(air_density, errors='coerce') print('rotor_diameter={} air_density={}'.format( rotor_diameter, air_density)) # Calculate cp dataFrame['cp'] = dataFrame['power'] * 1000 / \ (0.5 * np.pi * air_density * (rotor_diameter ** 2) / 4 * dataFrame['wind_speed'] ** 3) # Group by power_floor and calculate mean, max, and min of the specified columns grouped = dataFrame.groupby('power_floor').agg( wind_speed=('wind_speed', 'mean'), rotor_speed=('rotor_speed', 'mean'), cp=('cp', 'mean'), cp_max=('cp', 'max'), cp_min=('cp', 'min'), ).reset_index() # grouped = dataFrame.groupby('power_floor').agg({ # 'wind_speed': 'mean', # 'rotor_speed': 'mean', # 'cp': ['mean', 'max', 'min'] # }).reset_index() # Rename columns post aggregation for clarity grouped.columns = ['power_floor', 'wind_speed', 'rotor_speed', 'cp', 'cp_max', 'cp_min'] # Sort by power_floor grouped = grouped.sort_values('power_floor') # Write the dataframe to a CSV file grouped.to_csv(output_path, index=False) def turbinesAnalysis(self, dataFrameMerge, outputAnalysisDir, confData: ConfBusiness): self.generate_cp_distribution(outputAnalysisDir, confData.farm_name) def generate_cp_distribution(self, csvFileDirOfCp, farm_name, encoding='utf-8'): """ Generates Cp distribution plots for turbines in a wind farm. Parameters: - csvFileDirOfCp: str, path to the directory containing input CSV files. - farm_name: str, name of the wind farm. - encoding: str, encoding of the input CSV files. Defaults to 'utf-8'. """ output_path = csvFileDirOfCp field_Name_Turbine = "turbine_name" x_name = 'power_floor' y_name = 'cp' split_way = '_cp.csv' sns.set_palette('deep') res = pd.DataFrame() for root, dir_names, file_names in dir.list_directory(csvFileDirOfCp): for file_name in file_names: if not file_name.endswith(".csv"): continue file_path = os.path.join(root, file_name) print(file_path) frame = pd.read_csv(file_path, encoding=encoding) turbine_name = file_name.split(split_way)[0] frame[field_Name_Turbine] = turbine_name res = pd.concat( [res, frame.loc[:, [field_Name_Turbine, x_name, y_name]]], axis=0) ress = res.reset_index() fig, ax = plt.subplots(figsize=(16, 8)) ax = sns.lineplot(x=x_name, y=y_name, data=ress, hue=field_Name_Turbine) ax.set_title('Cp-Distribution') # plt.legend(ncol=4) plt.legend(title='turbine',bbox_to_anchor=(1.02, 0.5),ncol=2, loc='center left', borderaxespad=0.) plt.savefig(os.path.join( output_path, "{}-Cp-Distribution.png".format(farm_name)), bbox_inches='tight', dpi=300) plt.close() grouped = ress.groupby(field_Name_Turbine) for name, group in grouped: color = ["lightgrey"] * len(ress[field_Name_Turbine].unique()) fig, ax = plt.subplots(figsize=(8, 8)) ax = sns.lineplot(x=x_name, y=y_name, data=ress, hue=field_Name_Turbine, palette=sns.color_palette(color), legend=False) ax = sns.lineplot(x=x_name, y=y_name, data=group, color='darkblue', legend=False) ax.set_title('turbine name={}'.format(name)) plt.savefig(os.path.join(output_path, "{}.png".format( name)), bbox_inches='tight', dpi=120) plt.close() def plot_cp_distribution(self, csvFileDir, farm_name): field_Name_Turbine = "设备名" x_name = 'power_floor' y_name = 'cp' split_way = '_cp.csv' # Create the output path based on the farm name output_path = csvFileDir # output_path_template.format(farm_name) # Ensure the output directory exists os.makedirs(output_path, exist_ok=True) print(csvFileDir) # Initialize a DataFrame to store results res = pd.DataFrame() # Walk through the input directory to process each file for root, _, file_names in dir.list_directory(csvFileDir): for file_name in file_names: full_path = os.path.join(root, file_name) frame = pd.read_csv(full_path, encoding='gbk') turbine_name = file_name.split(split_way)[0] print("turbine_name={}".format(turbine_name)) frame[field_Name_Turbine] = turbine_name res = pd.concat( [res, frame.loc[:, [field_Name_Turbine, x_name, y_name]]], axis=0) # Reset index for plotting ress = res.reset_index(drop=True) # Plot combined Cp distribution for all turbines fig = make_subplots(rows=1, cols=1) for name, group in ress.groupby(field_Name_Turbine): fig.add_trace(go.Scatter( x=group[x_name], y=group[y_name], mode='lines', name=name)) fig.update_layout(title_text='{} Cp分布'.format( farm_name), xaxis_title=x_name, yaxis_title=y_name) fig.write_image(os.path.join( output_path, "{}Cp分布.png".format(farm_name)), scale=3) # Plot individual Cp distributions unique_turbines = ress[field_Name_Turbine].unique() for name in unique_turbines: individual_fig = make_subplots(rows=1, cols=1) # Add all turbines in grey for turbine in unique_turbines: group = ress[ress[field_Name_Turbine] == turbine] individual_fig.add_trace(go.Scatter( x=group[x_name], y=group[y_name], mode='lines', name=turbine, line=dict(color='lightgrey'))) # Highlight the current turbine in dark blue group = ress[ress[field_Name_Turbine] == name] individual_fig.add_trace(go.Scatter( x=group[x_name], y=group[y_name], mode='lines', name=name, line=dict(color='darkblue'))) individual_fig.update_layout(title_text='设备名={}'.format(name)) individual_fig.write_image(os.path.join( output_path, "all-{}.png".format(name)), scale=2)