| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- # impellerDiameter.py
- import pandas as pd
- import re
- import math
- def extract_diameter_and_power(model_str):
- """
- 从风机机型字符串中提取叶轮直径(单位:米)和额定功率(单位:kW)。
- 返回一个字典:{'diameter': 直径, 'power_kw': 功率}
- 如果无法确定则对应值为None。
- """
- if not isinstance(model_str, str):
- return {'diameter': None, 'power_kw': None}
-
- s = model_str.strip().upper()
-
- # ----- 第一步:找出所有可能的数字 -----
- all_numbers = []
- matches = re.findall(r'\d+\.?\d*', s)
- for num_str in matches:
- try:
- num = float(num_str)
- all_numbers.append(num)
- except ValueError:
- continue
-
- if len(all_numbers) < 2:
- # 如果没有至少两个数字,无法区分直径和功率
- return {'diameter': None, 'power_kw': None}
-
- # ----- 第二步:根据特征区分直径和功率 -----
- diameter_candidates = []
- power_candidates = []
-
- for num in all_numbers:
- # 直径的特征:通常为2-3位整数,范围在50-300米之间
- if 20 <= num <= 400 and num > 10: # 放宽下限到20,确保包含小直径机型
- # 直径通常接近整数,且数值相对较小
- if abs(num - round(num)) < 0.1: # 接近整数
- diameter_candidates.append(num)
- elif 50 <= num <= 300: # 在典型直径范围内的小数也可能是直径
- diameter_candidates.append(num)
-
- # 功率的特征:
- # 1. 兆瓦级的小数 (如1.5, 2.0, 3.6, 6.7)
- # 2. 百位或千位整数 (如1500, 2000, 3000, 5000)
- # 3. 万位整数 (如10000, 12000)
-
- # 判断是否为兆瓦级功率(常见的小数功率)
- if 0.5 <= num <= 20 and '.' in str(num):
- power_candidates.append(num * 1000) # 转换为kW
-
- # 判断是否为千瓦级功率
- elif num >= 100: # 功率通常至少100kW以上
- # 典型功率值范围
- if 100 <= num <= 30000:
- power_candidates.append(num)
-
- # ----- 第三步:特殊处理MW单位标识 -----
- # 如果字符串中包含"MW"标识,可以更准确地提取功率
- if 'MW' in s:
- # 寻找靠近"MW"的数字
- mw_pattern = r'(\d+\.?\d*)\s*MW'
- mw_matches = re.findall(mw_pattern, s)
- for mw_str in mw_matches:
- try:
- mw_value = float(mw_str)
- # 转换为kW
- kw_value = mw_value * 1000
- if kw_value not in power_candidates:
- power_candidates.append(kw_value)
- except ValueError:
- pass
-
- # ----- 第四步:决策逻辑 -----
- result = {'diameter': None, 'power_kw': None}
-
- # 1. 直径决策
- if diameter_candidates:
- # 优先选择在典型直径范围(50-200)内的整数
- typical_diameters = [d for d in diameter_candidates if 50 <= d <= 200]
- if typical_diameters:
- # 选择第一个(通常字符串中先出现的是直径)
- result['diameter'] = typical_diameters[0]
- else:
- # 如果不在典型范围,选择最小的(假设直径通常比功率数值小)
- result['diameter'] = min(diameter_candidates)
-
- # 2. 功率决策
- if power_candidates:
- # 优先选择通过MW标识找到的功率
- mw_based_power = [p for p in power_candidates if p in [num * 1000 for num in all_numbers if '.' in str(num)]]
- if mw_based_power:
- result['power_kw'] = mw_based_power[0]
- else:
- # 否则选择最大的(假设功率数值通常比直径大)
- # 但需要排除明显是直径的值
- filtered_power = [p for p in power_candidates if p != result['diameter']]
- if filtered_power:
- # 对于功率,如果是整数,优先选择常见的功率等级
- common_powers = [1500, 2000, 2500, 3000, 5000, 6000, 10000, 12000]
- for cp in common_powers:
- if cp in [int(p) for p in filtered_power if abs(p - round(p)) < 0.1]:
- result['power_kw'] = cp
- break
- if result['power_kw'] is None:
- result['power_kw'] = max(filtered_power)
-
- # ----- 第五步:如果决策失败,尝试基于位置的简单逻辑 -----
- if result['diameter'] is None or result['power_kw'] is None:
- if len(all_numbers) >= 2:
- # 假设第一个数字是直径,第二个是功率(常见格式:直径-功率)
- if 20 <= all_numbers[0] <= 300:
- result['diameter'] = all_numbers[0]
-
- # 判断第二个数字是否为功率
- if len(all_numbers) > 1:
- second_num = all_numbers[1]
- # 如果是小数,很可能是兆瓦级功率
- if '.' in str(second_num) and 0.5 <= second_num <= 20:
- result['power_kw'] = second_num * 1000
- elif second_num >= 100 and second_num <= 30000:
- result['power_kw'] = second_num
-
- return result
- def calculate_swept_area(diameter):
- """
- 计算扫风面积
- 公式:扫风面积 = π × (叶轮直径/2)²
- 单位:平方米(㎡)
- """
- if diameter is None or pd.isna(diameter):
- return None
-
- try:
- # 使用高精度的π值
- radius = diameter / 2.0
- swept_area = math.pi * (radius ** 2)
- return round(swept_area, 2) # 保留两位小数
- except (TypeError, ValueError):
- return None
- def calculate_rated_wind_speed(group):
- """
- 计算额定风速
- 定义:有功功率 >= 额定功率 的最小风速
- """
- if group.empty:
- return None
-
- # 获取该分组的额定功率(假设同一分组内额定功率相同)
- rated_power = group['额定功率(kW)'].iloc[0]
-
- # 如果额定功率为空,无法计算
- if pd.isna(rated_power):
- return None
-
- # 找到有功功率 >= 额定功率的数据行
- qualified_data = group[group['有功功率'] >= rated_power]
-
- # 如果没有满足条件的行,尝试寻找最接近额定功率的数据
- if qualified_data.empty:
- # 找到有功功率最接近额定功率的行(向上取)
- if group['有功功率'].max() > 0:
- # 计算与额定功率的绝对差值
- group['power_diff'] = abs(group['有功功率'] - rated_power)
- # 找到差值最小的行
- closest_row = group.loc[group['power_diff'].idxmin()]
- return closest_row['风速']
- return None
-
- # 找到最小风速
- rated_wind_speed = qualified_data['风速'].min()
- return rated_wind_speed
- def calculate_rated_wind_speed_for_groups(df):
- """
- 按标准机型和描述分组计算额定风速
- """
- if '标准机型' not in df.columns or '描述' not in df.columns:
- print("错误:数据框中缺少'标准机型'或'描述'列")
- return df
-
- print("正在按'标准机型'和'描述'分组计算额定风速...")
-
- # 按标准机型和描述分组
- groups = df.groupby(['标准机型', '描述'])
-
- # 创建一个字典来存储每个分组的额定风速
- rated_wind_speed_dict = {}
-
- # 计算每个分组的额定风速
- for (turbine_model, description), group in groups:
- rated_wind_speed = calculate_rated_wind_speed(group)
- rated_wind_speed_dict[(turbine_model, description)] = rated_wind_speed
-
- # 将额定风速添加到DataFrame中
- rated_wind_speeds = []
- for idx, row in df.iterrows():
- key = (row['标准机型'], row['描述'])
- rated_wind_speed = rated_wind_speed_dict.get(key, None)
- rated_wind_speeds.append(rated_wind_speed)
-
- df['额定风速(m/s)'] = rated_wind_speeds
-
- # 统计计算成功率
- total_groups = len(groups)
- successful_groups = sum(1 for v in rated_wind_speed_dict.values() if v is not None)
-
- print(f"额定风速计算完成。")
- print(f"分组数量:{total_groups}")
- print(f"成功计算额定风速的分组数:{successful_groups} ({successful_groups/total_groups*100:.1f}%)")
-
- return df
- def main():
- # ---------- 配置区:请根据您的实际文件修改 ----------
- input_file = f"./data/全部机型功率曲线_含标准类型.csv" # 输入文件名,支持 .csv, .xlsx, .xls
- output_file = f"./output/全部机型功率曲线_含标准类型_解析结果.csv" # 输出文件名
- model_column_name = "标准机型" # 包含机型信息的列名
- # -------------------------------------------------
-
- # 读取文件
- if input_file.endswith('.csv'):
- df = pd.read_csv(input_file, encoding='utf-8') # 如果编码不对,可尝试 'gbk'
- elif input_file.endswith(('.xlsx', '.xls')):
- df = pd.read_excel(input_file)
- else:
- print("错误:不支持的文件格式。请使用 .csv, .xlsx 或 .xls 文件。")
- return
-
- # 检查"机型"列是否存在
- if model_column_name not in df.columns:
- print(f"错误:数据框中找不到名为 '{model_column_name}' 的列。")
- print(f"可用的列有:{list(df.columns)}")
- return
-
- # 应用提取函数
- print("正在解析叶轮直径和额定功率...")
-
- # 创建临时列表存储结果
- diameters = []
- powers = []
-
- for model in df[model_column_name]:
- result = extract_diameter_and_power(model)
- diameters.append(result['diameter'])
- powers.append(result['power_kw'])
-
- # 添加到DataFrame
- df["叶轮直径(m)"] = diameters
- df["额定功率(kW)"] = powers
-
- # 计算扫风面积
- print("正在计算扫风面积...")
- df["扫风面积(㎡)"] = df["叶轮直径(m)"].apply(calculate_swept_area)
-
- # 统计提取成功率
- dia_success = df["叶轮直径(m)"].notna().sum()
- power_success = df["额定功率(kW)"].notna().sum()
- swept_area_success = df["扫风面积(㎡)"].notna().sum()
- total_count = len(df)
-
- print(f"解析完成。")
- print(f"叶轮直径:成功提取 {dia_success}/{total_count} 条记录 ({dia_success/total_count*100:.1f}%)")
- print(f"额定功率:成功提取 {power_success}/{total_count} 条记录 ({power_success/total_count*100:.1f}%)")
- print(f"扫风面积:成功计算 {swept_area_success}/{total_count} 条记录 ({swept_area_success/total_count*100:.1f}%)")
-
- # 显示一些功率单位的转换情况
- if not df["额定功率(kW)"].empty:
- mw_count = (df["额定功率(kW)"] % 1000 == 0).sum()
- print(f"其中 {mw_count} 条记录的功率由MW单位转换得到")
-
- # 按标准机型和描述分组计算额定风速
- df = calculate_rated_wind_speed_for_groups(df)
-
- # 保存到新文件
- if output_file.endswith('.csv'):
- df.to_csv(output_file, index=False, encoding='utf-8-sig')
- else:
- if not output_file.endswith(('.xlsx', '.xls')):
- output_file = output_file + '.xlsx'
- df.to_excel(output_file, index=False)
-
- print(f"结果已保存至:{output_file}")
-
- # 预览结果
- print("\n前10条记录预览:")
- preview_cols = [model_column_name, "叶轮直径(m)", "额定功率(kW)", "扫风面积(㎡)", "额定风速(m/s)"]
- preview_cols = [col for col in preview_cols if col in df.columns]
- print(df[preview_cols].head(10))
-
- # 显示一些统计信息
- print("\n解析结果统计:")
- if dia_success > 0:
- print(f"叶轮直径范围:{df['叶轮直径(m)'].min():.1f} - {df['叶轮直径(m)'].max():.1f} 米")
-
- if power_success > 0:
- print(f"额定功率范围:{df['额定功率(kW)'].min():.0f} - {df['额定功率(kW)'].max():.0f} kW")
-
- if swept_area_success > 0:
- print(f"扫风面积范围:{df['扫风面积(㎡)'].min():.0f} - {df['扫风面积(㎡)'].max():.0f} ㎡")
-
- # 显示额定风速统计
- if '额定风速(m/s)' in df.columns:
- rated_wind_success = df['额定风速(m/s)'].notna().sum()
- if rated_wind_success > 0:
- print(f"额定风速范围:{df['额定风速(m/s)'].min():.1f} - {df['额定风速(m/s)'].max():.1f} m/s")
- print(f"额定风速平均值:{df['额定风速(m/s)'].mean():.1f} m/s")
- print(f"额定风速中位数:{df['额定风速(m/s)'].median():.1f} m/s")
-
- # 显示最常见的功率等级
- if not df["额定功率(kW)"].empty:
- common_powers = df["额定功率(kW)"].dropna().astype(int).value_counts().head(5)
- print("\n最常见的5个额定功率等级(kW):")
- for power, count in common_powers.items():
- print(f" {power} kW: {count} 台")
-
- # 显示扫风面积与功率的关系示例
- print("\n扫风面积与功率关系示例(前5条有效记录):")
- valid_records = df[["叶轮直径(m)", "扫风面积(㎡)", "额定功率(kW)", "额定风速(m/s)"]].dropna().head(5)
- for idx, row in valid_records.iterrows():
- print(f" 直径{row['叶轮直径(m)']:.1f}m → 面积{row['扫风面积(㎡)']:.0f}㎡ → 功率{row['额定功率(kW)']:.0f}kW → 额定风速{row['额定风速(m/s)']:.1f}m/s")
- def calculate_additional_stats(df):
- """
- 计算额外的统计指标(可选功能)
- """
- if "叶轮直径(m)" in df.columns and "扫风面积(㎡)" in df.columns:
- # 计算单位扫风面积的功率密度
- df_valid = df.dropna(subset=["叶轮直径(m)", "扫风面积(㎡)", "额定功率(kW)"])
-
- if len(df_valid) > 0:
- print("\n功率密度分析(W/㎡):")
- df_valid["功率密度(W/㎡)"] = (df_valid["额定功率(kW)"] * 1000) / df_valid["扫风面积(㎡)"]
-
- print(f"平均功率密度:{df_valid['功率密度(W/㎡)'].mean():.2f} W/㎡")
- print(f"功率密度范围:{df_valid['功率密度(W/㎡)'].min():.2f} - {df_valid['功率密度(W/㎡)'].max():.2f} W/㎡")
-
- # 添加功率密度列到原始DataFrame
- df["功率密度(W/㎡)"] = (df["额定功率(kW)"] * 1000) / df["扫风面积(㎡)"]
-
- # 计算额定风速与功率的关系
- if '额定风速(m/s)' in df.columns and '额定功率(kW)' in df.columns:
- df_valid_wind = df.dropna(subset=["额定风速(m/s)", "额定功率(kW)"])
-
- if len(df_valid_wind) > 0:
- print("\n额定风速与功率关系分析:")
- # 按额定功率分组计算平均额定风速
- power_groups = df_valid_wind.groupby(pd.cut(df_valid_wind['额定功率(kW)'],
- bins=[0, 1000, 2000, 3000, 5000, 10000, 30000]))
-
- print("按功率区间分组的平均额定风速:")
- for power_range, group in power_groups:
- if len(group) > 0:
- avg_wind_speed = group['额定风速(m/s)'].mean()
- print(f" {power_range}: {avg_wind_speed:.1f} m/s (样本数: {len(group)})")
- def analyze_rated_wind_speed_by_model(df):
- """
- 按机型分析额定风速
- """
- if '标准机型' not in df.columns or '额定风速(m/s)' not in df.columns:
- return
-
- # 过滤出有额定风速的数据
- df_valid = df.dropna(subset=['标准机型', '额定风速(m/s)'])
-
- if len(df_valid) == 0:
- print("\n没有可用的额定风速数据进行机型分析")
- return
-
- print("\n按机型分析额定风速:")
-
- # 按机型分组计算统计信息
- model_stats = df_valid.groupby('标准机型')['额定风速(m/s)'].agg([
- ('平均额定风速', 'mean'),
- ('最小额定风速', 'min'),
- ('最大额定风速', 'max'),
- ('标准差', 'std'),
- ('样本数', 'count')
- ]).round(2)
-
- # 按样本数排序
- model_stats = model_stats.sort_values('样本数', ascending=False)
-
- print("各机型额定风速统计(按样本数排序):")
- for model, stats in model_stats.head(10).iterrows(): # 显示前10个机型
- print(f" 机型: {model}")
- print(f" 样本数: {stats['样本数']}, 平均额定风速: {stats['平均额定风速']} m/s")
- print(f" 范围: {stats['最小额定风速']} - {stats['最大额定风速']} m/s, 标准差: {stats['标准差']} m/s")
- print()
- if __name__ == "__main__":
- main()
-
- # 可选:运行额外统计功能
- print("\n" + "="*60)
- print("运行额外统计功能...")
- print("="*60)
-
- # 重新加载数据以进行额外分析
- try:
- df_result = pd.read_csv(f"./output/全部机型功率曲线_含标准类型_解析结果.csv", encoding='utf-8-sig')
-
- # 计算额外统计
- calculate_additional_stats(df_result)
-
- # 按机型分析额定风速
- analyze_rated_wind_speed_by_model(df_result)
-
- # 可选:保存带有额外统计的结果
- extra_output_file = f"./output/全部机型功率曲线_含标准类型_解析结果_详细.csv"
- df_result.to_csv(extra_output_file, index=False, encoding='utf-8-sig')
- print(f"\n详细统计结果已保存至:{extra_output_file}")
-
- except Exception as e:
- print(f"运行额外统计功能时出错: {e}")
|