# impellerDiameter.py import pandas as pd import re import math def extract_diameter_and_power(model_str): """ 从风机机型字符串中提取叶轮直径(单位:米)和额定功率(单位:kW)。 返回一个字典:{'diameter': 直径, 'power_kw': 功率} 如果无法确定则对应值为None。 """ if not isinstance(model_str, str): return {'diameter': None, 'power_kw': None} s = model_str.strip().upper() # ----- 第一步:找出所有可能的数字 ----- all_numbers = [] matches = re.findall(r'\d+\.?\d*', s) for num_str in matches: try: num = float(num_str) all_numbers.append(num) except ValueError: continue if len(all_numbers) < 2: # 如果没有至少两个数字,无法区分直径和功率 return {'diameter': None, 'power_kw': None} # ----- 第二步:根据特征区分直径和功率 ----- diameter_candidates = [] power_candidates = [] for num in all_numbers: # 直径的特征:通常为2-3位整数,范围在50-300米之间 if 20 <= num <= 400 and num > 10: # 放宽下限到20,确保包含小直径机型 # 直径通常接近整数,且数值相对较小 if abs(num - round(num)) < 0.1: # 接近整数 diameter_candidates.append(num) elif 50 <= num <= 300: # 在典型直径范围内的小数也可能是直径 diameter_candidates.append(num) # 功率的特征: # 1. 兆瓦级的小数 (如1.5, 2.0, 3.6, 6.7) # 2. 百位或千位整数 (如1500, 2000, 3000, 5000) # 3. 万位整数 (如10000, 12000) # 判断是否为兆瓦级功率(常见的小数功率) if 0.5 <= num <= 20 and '.' in str(num): power_candidates.append(num * 1000) # 转换为kW # 判断是否为千瓦级功率 elif num >= 100: # 功率通常至少100kW以上 # 典型功率值范围 if 100 <= num <= 30000: power_candidates.append(num) # ----- 第三步:特殊处理MW单位标识 ----- # 如果字符串中包含"MW"标识,可以更准确地提取功率 if 'MW' in s: # 寻找靠近"MW"的数字 mw_pattern = r'(\d+\.?\d*)\s*MW' mw_matches = re.findall(mw_pattern, s) for mw_str in mw_matches: try: mw_value = float(mw_str) # 转换为kW kw_value = mw_value * 1000 if kw_value not in power_candidates: power_candidates.append(kw_value) except ValueError: pass # ----- 第四步:决策逻辑 ----- result = {'diameter': None, 'power_kw': None} # 1. 直径决策 if diameter_candidates: # 优先选择在典型直径范围(50-200)内的整数 typical_diameters = [d for d in diameter_candidates if 50 <= d <= 200] if typical_diameters: # 选择第一个(通常字符串中先出现的是直径) result['diameter'] = typical_diameters[0] else: # 如果不在典型范围,选择最小的(假设直径通常比功率数值小) result['diameter'] = min(diameter_candidates) # 2. 功率决策 if power_candidates: # 优先选择通过MW标识找到的功率 mw_based_power = [p for p in power_candidates if p in [num * 1000 for num in all_numbers if '.' in str(num)]] if mw_based_power: result['power_kw'] = mw_based_power[0] else: # 否则选择最大的(假设功率数值通常比直径大) # 但需要排除明显是直径的值 filtered_power = [p for p in power_candidates if p != result['diameter']] if filtered_power: # 对于功率,如果是整数,优先选择常见的功率等级 common_powers = [1500, 2000, 2500, 3000, 5000, 6000, 10000, 12000] for cp in common_powers: if cp in [int(p) for p in filtered_power if abs(p - round(p)) < 0.1]: result['power_kw'] = cp break if result['power_kw'] is None: result['power_kw'] = max(filtered_power) # ----- 第五步:如果决策失败,尝试基于位置的简单逻辑 ----- if result['diameter'] is None or result['power_kw'] is None: if len(all_numbers) >= 2: # 假设第一个数字是直径,第二个是功率(常见格式:直径-功率) if 20 <= all_numbers[0] <= 300: result['diameter'] = all_numbers[0] # 判断第二个数字是否为功率 if len(all_numbers) > 1: second_num = all_numbers[1] # 如果是小数,很可能是兆瓦级功率 if '.' in str(second_num) and 0.5 <= second_num <= 20: result['power_kw'] = second_num * 1000 elif second_num >= 100 and second_num <= 30000: result['power_kw'] = second_num return result def calculate_swept_area(diameter): """ 计算扫风面积 公式:扫风面积 = π × (叶轮直径/2)² 单位:平方米(㎡) """ if diameter is None or pd.isna(diameter): return None try: # 使用高精度的π值 radius = diameter / 2.0 swept_area = math.pi * (radius ** 2) return round(swept_area, 2) # 保留两位小数 except (TypeError, ValueError): return None def calculate_rated_wind_speed(group): """ 计算额定风速 定义:有功功率 >= 额定功率 的最小风速 """ if group.empty: return None # 获取该分组的额定功率(假设同一分组内额定功率相同) rated_power = group['额定功率(kW)'].iloc[0] # 如果额定功率为空,无法计算 if pd.isna(rated_power): return None # 找到有功功率 >= 额定功率的数据行 qualified_data = group[group['有功功率'] >= rated_power] # 如果没有满足条件的行,尝试寻找最接近额定功率的数据 if qualified_data.empty: # 找到有功功率最接近额定功率的行(向上取) if group['有功功率'].max() > 0: # 计算与额定功率的绝对差值 group['power_diff'] = abs(group['有功功率'] - rated_power) # 找到差值最小的行 closest_row = group.loc[group['power_diff'].idxmin()] return closest_row['风速'] return None # 找到最小风速 rated_wind_speed = qualified_data['风速'].min() return rated_wind_speed def calculate_rated_wind_speed_for_groups(df): """ 按标准机型和描述分组计算额定风速 """ if '标准机型' not in df.columns or '描述' not in df.columns: print("错误:数据框中缺少'标准机型'或'描述'列") return df print("正在按'标准机型'和'描述'分组计算额定风速...") # 按标准机型和描述分组 groups = df.groupby(['标准机型', '描述']) # 创建一个字典来存储每个分组的额定风速 rated_wind_speed_dict = {} # 计算每个分组的额定风速 for (turbine_model, description), group in groups: rated_wind_speed = calculate_rated_wind_speed(group) rated_wind_speed_dict[(turbine_model, description)] = rated_wind_speed # 将额定风速添加到DataFrame中 rated_wind_speeds = [] for idx, row in df.iterrows(): key = (row['标准机型'], row['描述']) rated_wind_speed = rated_wind_speed_dict.get(key, None) rated_wind_speeds.append(rated_wind_speed) df['额定风速(m/s)'] = rated_wind_speeds # 统计计算成功率 total_groups = len(groups) successful_groups = sum(1 for v in rated_wind_speed_dict.values() if v is not None) print(f"额定风速计算完成。") print(f"分组数量:{total_groups}") print(f"成功计算额定风速的分组数:{successful_groups} ({successful_groups/total_groups*100:.1f}%)") return df def main(): # ---------- 配置区:请根据您的实际文件修改 ---------- input_file = f"./data/全部机型功率曲线_含标准类型.csv" # 输入文件名,支持 .csv, .xlsx, .xls output_file = f"./output/全部机型功率曲线_含标准类型_解析结果.csv" # 输出文件名 model_column_name = "标准机型" # 包含机型信息的列名 # ------------------------------------------------- # 读取文件 if input_file.endswith('.csv'): df = pd.read_csv(input_file, encoding='utf-8') # 如果编码不对,可尝试 'gbk' elif input_file.endswith(('.xlsx', '.xls')): df = pd.read_excel(input_file) else: print("错误:不支持的文件格式。请使用 .csv, .xlsx 或 .xls 文件。") return # 检查"机型"列是否存在 if model_column_name not in df.columns: print(f"错误:数据框中找不到名为 '{model_column_name}' 的列。") print(f"可用的列有:{list(df.columns)}") return # 应用提取函数 print("正在解析叶轮直径和额定功率...") # 创建临时列表存储结果 diameters = [] powers = [] for model in df[model_column_name]: result = extract_diameter_and_power(model) diameters.append(result['diameter']) powers.append(result['power_kw']) # 添加到DataFrame df["叶轮直径(m)"] = diameters df["额定功率(kW)"] = powers # 计算扫风面积 print("正在计算扫风面积...") df["扫风面积(㎡)"] = df["叶轮直径(m)"].apply(calculate_swept_area) # 统计提取成功率 dia_success = df["叶轮直径(m)"].notna().sum() power_success = df["额定功率(kW)"].notna().sum() swept_area_success = df["扫风面积(㎡)"].notna().sum() total_count = len(df) print(f"解析完成。") print(f"叶轮直径:成功提取 {dia_success}/{total_count} 条记录 ({dia_success/total_count*100:.1f}%)") print(f"额定功率:成功提取 {power_success}/{total_count} 条记录 ({power_success/total_count*100:.1f}%)") print(f"扫风面积:成功计算 {swept_area_success}/{total_count} 条记录 ({swept_area_success/total_count*100:.1f}%)") # 显示一些功率单位的转换情况 if not df["额定功率(kW)"].empty: mw_count = (df["额定功率(kW)"] % 1000 == 0).sum() print(f"其中 {mw_count} 条记录的功率由MW单位转换得到") # 按标准机型和描述分组计算额定风速 df = calculate_rated_wind_speed_for_groups(df) # 保存到新文件 if output_file.endswith('.csv'): df.to_csv(output_file, index=False, encoding='utf-8-sig') else: if not output_file.endswith(('.xlsx', '.xls')): output_file = output_file + '.xlsx' df.to_excel(output_file, index=False) print(f"结果已保存至:{output_file}") # 预览结果 print("\n前10条记录预览:") preview_cols = [model_column_name, "叶轮直径(m)", "额定功率(kW)", "扫风面积(㎡)", "额定风速(m/s)"] preview_cols = [col for col in preview_cols if col in df.columns] print(df[preview_cols].head(10)) # 显示一些统计信息 print("\n解析结果统计:") if dia_success > 0: print(f"叶轮直径范围:{df['叶轮直径(m)'].min():.1f} - {df['叶轮直径(m)'].max():.1f} 米") if power_success > 0: print(f"额定功率范围:{df['额定功率(kW)'].min():.0f} - {df['额定功率(kW)'].max():.0f} kW") if swept_area_success > 0: print(f"扫风面积范围:{df['扫风面积(㎡)'].min():.0f} - {df['扫风面积(㎡)'].max():.0f} ㎡") # 显示额定风速统计 if '额定风速(m/s)' in df.columns: rated_wind_success = df['额定风速(m/s)'].notna().sum() if rated_wind_success > 0: print(f"额定风速范围:{df['额定风速(m/s)'].min():.1f} - {df['额定风速(m/s)'].max():.1f} m/s") print(f"额定风速平均值:{df['额定风速(m/s)'].mean():.1f} m/s") print(f"额定风速中位数:{df['额定风速(m/s)'].median():.1f} m/s") # 显示最常见的功率等级 if not df["额定功率(kW)"].empty: common_powers = df["额定功率(kW)"].dropna().astype(int).value_counts().head(5) print("\n最常见的5个额定功率等级(kW):") for power, count in common_powers.items(): print(f" {power} kW: {count} 台") # 显示扫风面积与功率的关系示例 print("\n扫风面积与功率关系示例(前5条有效记录):") valid_records = df[["叶轮直径(m)", "扫风面积(㎡)", "额定功率(kW)", "额定风速(m/s)"]].dropna().head(5) for idx, row in valid_records.iterrows(): print(f" 直径{row['叶轮直径(m)']:.1f}m → 面积{row['扫风面积(㎡)']:.0f}㎡ → 功率{row['额定功率(kW)']:.0f}kW → 额定风速{row['额定风速(m/s)']:.1f}m/s") def calculate_additional_stats(df): """ 计算额外的统计指标(可选功能) """ if "叶轮直径(m)" in df.columns and "扫风面积(㎡)" in df.columns: # 计算单位扫风面积的功率密度 df_valid = df.dropna(subset=["叶轮直径(m)", "扫风面积(㎡)", "额定功率(kW)"]) if len(df_valid) > 0: print("\n功率密度分析(W/㎡):") df_valid["功率密度(W/㎡)"] = (df_valid["额定功率(kW)"] * 1000) / df_valid["扫风面积(㎡)"] print(f"平均功率密度:{df_valid['功率密度(W/㎡)'].mean():.2f} W/㎡") print(f"功率密度范围:{df_valid['功率密度(W/㎡)'].min():.2f} - {df_valid['功率密度(W/㎡)'].max():.2f} W/㎡") # 添加功率密度列到原始DataFrame df["功率密度(W/㎡)"] = (df["额定功率(kW)"] * 1000) / df["扫风面积(㎡)"] # 计算额定风速与功率的关系 if '额定风速(m/s)' in df.columns and '额定功率(kW)' in df.columns: df_valid_wind = df.dropna(subset=["额定风速(m/s)", "额定功率(kW)"]) if len(df_valid_wind) > 0: print("\n额定风速与功率关系分析:") # 按额定功率分组计算平均额定风速 power_groups = df_valid_wind.groupby(pd.cut(df_valid_wind['额定功率(kW)'], bins=[0, 1000, 2000, 3000, 5000, 10000, 30000])) print("按功率区间分组的平均额定风速:") for power_range, group in power_groups: if len(group) > 0: avg_wind_speed = group['额定风速(m/s)'].mean() print(f" {power_range}: {avg_wind_speed:.1f} m/s (样本数: {len(group)})") def analyze_rated_wind_speed_by_model(df): """ 按机型分析额定风速 """ if '标准机型' not in df.columns or '额定风速(m/s)' not in df.columns: return # 过滤出有额定风速的数据 df_valid = df.dropna(subset=['标准机型', '额定风速(m/s)']) if len(df_valid) == 0: print("\n没有可用的额定风速数据进行机型分析") return print("\n按机型分析额定风速:") # 按机型分组计算统计信息 model_stats = df_valid.groupby('标准机型')['额定风速(m/s)'].agg([ ('平均额定风速', 'mean'), ('最小额定风速', 'min'), ('最大额定风速', 'max'), ('标准差', 'std'), ('样本数', 'count') ]).round(2) # 按样本数排序 model_stats = model_stats.sort_values('样本数', ascending=False) print("各机型额定风速统计(按样本数排序):") for model, stats in model_stats.head(10).iterrows(): # 显示前10个机型 print(f" 机型: {model}") print(f" 样本数: {stats['样本数']}, 平均额定风速: {stats['平均额定风速']} m/s") print(f" 范围: {stats['最小额定风速']} - {stats['最大额定风速']} m/s, 标准差: {stats['标准差']} m/s") print() if __name__ == "__main__": main() # 可选:运行额外统计功能 print("\n" + "="*60) print("运行额外统计功能...") print("="*60) # 重新加载数据以进行额外分析 try: df_result = pd.read_csv(f"./output/全部机型功率曲线_含标准类型_解析结果.csv", encoding='utf-8-sig') # 计算额外统计 calculate_additional_stats(df_result) # 按机型分析额定风速 analyze_rated_wind_speed_by_model(df_result) # 可选:保存带有额外统计的结果 extra_output_file = f"./output/全部机型功率曲线_含标准类型_解析结果_详细.csv" df_result.to_csv(extra_output_file, index=False, encoding='utf-8-sig') print(f"\n详细统计结果已保存至:{extra_output_file}") except Exception as e: print(f"运行额外统计功能时出错: {e}")