import pandas as pd import pymysql from pymysql import Error from typing import List, Dict, Any import logging # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 数据库配置(从config.py导入,这里直接定义) class DatabaseConfig: HOST = '192.168.50.234' PORT = 24001 DATABASE = 'wind_data' USERNAME = 'root' PASSWORD = 'admin123456' class WindTurbinePowerCurve: """风力发电机功率曲线数据处理器""" def __init__(self, csv_file_path: str): """ 初始化功率曲线处理器 Args: csv_file_path: CSV文件路径 """ self.csv_file_path = csv_file_path self.dataframe = None self.connection = None def load_csv_data(self) -> pd.DataFrame: """加载CSV数据到DataFrame""" try: # 读取CSV文件,注意文件开头的BOM字符 self.dataframe = pd.read_csv(self.csv_file_path, encoding='utf-8-sig') logger.info(f"成功加载CSV数据,共{len(self.dataframe)}行记录") return self.dataframe except Exception as e: logger.error(f"加载CSV文件失败: {e}") raise def clean_data(self) -> None: """清理和验证数据""" if self.dataframe is None: raise ValueError("数据未加载,请先调用load_csv_data()") # 检查必需列 required_columns = [ '风场id', '风场名称', '标准机型', '原始机型', '风速', '有功功率' ] for col in required_columns: if col not in self.dataframe.columns: raise ValueError(f"缺少必需列: {col}") # 处理缺失值 original_count = len(self.dataframe) self.dataframe.dropna(subset=['风速', '有功功率'], inplace=True) cleaned_count = len(self.dataframe) if original_count > cleaned_count: logger.warning(f"删除了{original_count - cleaned_count}行包含缺失值的记录") # 确保数据类型正确 self.dataframe['风速'] = pd.to_numeric(self.dataframe['风速'], errors='coerce') self.dataframe['有功功率'] = pd.to_numeric(self.dataframe['有功功率'], errors='coerce') self.dataframe['空气密度'] = pd.to_numeric(self.dataframe['空气密度'], errors='coerce') self.dataframe['叶轮直径(m)'] = pd.to_numeric(self.dataframe['叶轮直径(m)'], errors='coerce') self.dataframe['额定功率(kW)'] = pd.to_numeric(self.dataframe['额定功率(kW)'], errors='coerce') self.dataframe['额定风速(m/s)'] = pd.to_numeric(self.dataframe['额定风速(m/s)'], errors='coerce') logger.info(f"数据清理完成,剩余{len(self.dataframe)}行有效记录") class DatabaseManager: """数据库管理器""" def __init__(self, config: DatabaseConfig): """ 初始化数据库管理器 Args: config: 数据库配置 """ self.config = config self.connection = None def connect(self) -> pymysql.Connection: """连接到MySQL数据库""" try: self.connection = pymysql.connect( host=self.config.HOST, port=self.config.PORT, user=self.config.USERNAME, password=self.config.PASSWORD, database=self.config.DATABASE, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) logger.info("成功连接到数据库") return self.connection except Error as e: logger.error(f"数据库连接失败: {e}") raise def disconnect(self) -> None: """断开数据库连接""" if self.connection and self.connection.open: self.connection.close() logger.info("数据库连接已关闭") def create_table(self) -> None: """创建风力发电机功率曲线表""" create_table_sql = """ CREATE TABLE IF NOT EXISTS info_curve_power_turbine ( id INT AUTO_INCREMENT PRIMARY KEY, wind_farm_id VARCHAR(50) NOT NULL COMMENT '风场ID', wind_farm_name VARCHAR(100) NOT NULL COMMENT '风场名称', standard_model VARCHAR(50) NOT NULL COMMENT '标准机型', original_model VARCHAR(50) NOT NULL COMMENT '原始机型', wind_speed DECIMAL(6,2) NOT NULL COMMENT '风速(m/s)', active_power DECIMAL(10,2) NOT NULL COMMENT '有功功率(kW)', description TEXT COMMENT '描述', air_density DECIMAL(8,6) COMMENT '空气密度(kg/m³)', rotor_diameter DECIMAL(8,2) COMMENT '叶轮直径(m)', rated_power DECIMAL(10,2) COMMENT '额定功率(kW)', sweep_area DECIMAL(12,2) COMMENT '扫风面积(m²)', rated_wind_speed DECIMAL(6,2) COMMENT '额定风速(m/s)', power_density DECIMAL(15,12) COMMENT '功率密度(W/m²)', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', INDEX idx_wind_farm (wind_farm_id), INDEX idx_model (standard_model), INDEX idx_wind_speed (wind_speed), INDEX idx_original_model (original_model) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风力发电机功率曲线数据表'; """ try: with self.connection.cursor() as cursor: cursor.execute(create_table_sql) self.connection.commit() logger.info("表 'info_curve_power_turbine' 创建成功或已存在") except Error as e: logger.error(f"创建表失败: {e}") self.connection.rollback() raise def insert_data(self, data: pd.DataFrame) -> int: """ 批量插入数据到数据库 Args: data: 包含要插入数据的DataFrame Returns: 插入的行数 """ if data.empty: logger.warning("没有数据需要插入") return 0 # 准备插入SQL insert_sql = """ INSERT INTO info_curve_power_turbine ( wind_farm_id, wind_farm_name, standard_model, original_model, wind_speed, active_power, description, air_density, rotor_diameter, rated_power, sweep_area, rated_wind_speed, power_density ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # 准备数据 records = [] for _, row in data.iterrows(): record = ( str(row.get('风场id', '')).strip(), str(row.get('风场名称', '')).strip(), str(row.get('标准机型', '')).strip(), str(row.get('原始机型', '')).strip(), float(row.get('风速', 0.0)), float(row.get('有功功率', 0.0)), str(row.get('描述', '')).strip(), float(row.get('空气密度', 0.0)) if pd.notna(row.get('空气密度')) else None, float(row.get('叶轮直径(m)', 0.0)) if pd.notna(row.get('叶轮直径(m)')) else None, float(row.get('额定功率(kW)', 0.0)) if pd.notna(row.get('额定功率(kW)')) else None, float(row.get('扫风面积(㎡)', 0.0)) if pd.notna(row.get('扫风面积(㎡)')) else None, float(row.get('额定风速(m/s)', 0.0)) if pd.notna(row.get('额定风速(m/s)')) else None, float(row.get('功率密度(W/㎡)', 0.0)) if pd.notna(row.get('功率密度(W/㎡)')) else None ) records.append(record) try: with self.connection.cursor() as cursor: # 批量插入 cursor.executemany(insert_sql, records) self.connection.commit() inserted_count = cursor.rowcount logger.info(f"成功插入 {inserted_count} 条记录到数据库") return inserted_count except Error as e: logger.error(f"插入数据失败: {e}") self.connection.rollback() raise def count_records(self) -> int: """统计表中的记录数""" try: with self.connection.cursor() as cursor: cursor.execute("SELECT COUNT(*) as count FROM info_curve_power_turbine") result = cursor.fetchone() return result['count'] if result else 0 except Error as e: logger.error(f"统计记录失败: {e}") return 0 class WindDataImporter: """风力数据导入器""" def __init__(self, csv_file_path: str): """ 初始化数据导入器 Args: csv_file_path: CSV文件路径 """ self.csv_file_path = csv_file_path self.data_processor = WindTurbinePowerCurve(csv_file_path) self.db_config = DatabaseConfig() self.db_manager = None def import_data(self) -> bool: """ 主导入方法 Returns: 导入是否成功 """ try: logger.info("开始导入风力发电机功率曲线数据...") # 1. 加载和清理CSV数据 self.data_processor.load_csv_data() self.data_processor.clean_data() # 2. 连接数据库 self.db_manager = DatabaseManager(self.db_config) self.db_manager.connect() # 3. 创建表 self.db_manager.create_table() # 4. 插入数据 inserted_count = self.db_manager.insert_data(self.data_processor.dataframe) # 5. 验证数据 total_count = self.db_manager.count_records() logger.info(f"数据库中的总记录数: {total_count}") # 6. 断开连接 self.db_manager.disconnect() logger.info("数据导入完成!") return True except Exception as e: logger.error(f"数据导入过程中出错: {e}") # 确保在出错时断开连接 if self.db_manager: try: self.db_manager.disconnect() except: pass return False def get_summary(self) -> Dict[str, Any]: """获取数据摘要""" if self.data_processor.dataframe is None: return {} df = self.data_processor.dataframe summary = { '总记录数': len(df), '风场数量': df['风场id'].nunique(), '标准机型数量': df['标准机型'].nunique(), '原始机型数量': df['原始机型'].nunique(), '风速范围': f"{df['风速'].min():.1f} - {df['风速'].max():.1f} m/s", '有功功率范围': f"{df['有功功率'].min():.1f} - {df['有功功率'].max():.1f} kW", '空气密度范围': f"{df['空气密度'].min():.3f} - {df['空气密度'].max():.3f} kg/m³" } return summary def main(): """主函数""" # CSV文件路径(根据实际情况修改) csv_file_path = f"./data/全部机型功率曲线_含标准类型_解析结果_详细.csv" # 创建导入器并执行导入 importer = WindDataImporter(csv_file_path) # 显示数据摘要 try: importer.data_processor.load_csv_data() importer.data_processor.clean_data() summary = importer.get_summary() print("数据摘要:") for key, value in summary.items(): print(f" {key}: {value}") print() except Exception as e: print(f"读取CSV文件时出错: {e}") return # 执行导入 success = importer.import_data() if success: print("数据导入成功!") else: print("数据导入失败!") if __name__ == "__main__": main()