| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- import pandas as pd
- import pymysql
- from pymysql import Error
- from typing import List, Dict, Any
- import logging
- # 设置日志
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- # 数据库配置(从config.py导入,这里直接定义)
- class DatabaseConfig:
- HOST = '192.168.50.234'
- PORT = 24001
- DATABASE = 'wind_data'
- USERNAME = 'root'
- PASSWORD = 'admin123456'
- class WindTurbinePowerCurve:
- """风力发电机功率曲线数据处理器"""
-
- def __init__(self, csv_file_path: str):
- """
- 初始化功率曲线处理器
-
- Args:
- csv_file_path: CSV文件路径
- """
- self.csv_file_path = csv_file_path
- self.dataframe = None
- self.connection = None
-
- def load_csv_data(self) -> pd.DataFrame:
- """加载CSV数据到DataFrame"""
- try:
- # 读取CSV文件,注意文件开头的BOM字符
- self.dataframe = pd.read_csv(self.csv_file_path, encoding='utf-8-sig')
- logger.info(f"成功加载CSV数据,共{len(self.dataframe)}行记录")
- return self.dataframe
- except Exception as e:
- logger.error(f"加载CSV文件失败: {e}")
- raise
-
- def clean_data(self) -> None:
- """清理和验证数据"""
- if self.dataframe is None:
- raise ValueError("数据未加载,请先调用load_csv_data()")
-
- # 检查必需列
- required_columns = [
- '风场id', '风场名称', '标准机型', '原始机型', '风速', '有功功率'
- ]
-
- for col in required_columns:
- if col not in self.dataframe.columns:
- raise ValueError(f"缺少必需列: {col}")
-
- # 处理缺失值
- original_count = len(self.dataframe)
- self.dataframe.dropna(subset=['风速', '有功功率'], inplace=True)
- cleaned_count = len(self.dataframe)
-
- if original_count > cleaned_count:
- logger.warning(f"删除了{original_count - cleaned_count}行包含缺失值的记录")
-
- # 确保数据类型正确
- self.dataframe['风速'] = pd.to_numeric(self.dataframe['风速'], errors='coerce')
- self.dataframe['有功功率'] = pd.to_numeric(self.dataframe['有功功率'], errors='coerce')
- self.dataframe['空气密度'] = pd.to_numeric(self.dataframe['空气密度'], errors='coerce')
- self.dataframe['叶轮直径(m)'] = pd.to_numeric(self.dataframe['叶轮直径(m)'], errors='coerce')
- self.dataframe['额定功率(kW)'] = pd.to_numeric(self.dataframe['额定功率(kW)'], errors='coerce')
- self.dataframe['额定风速(m/s)'] = pd.to_numeric(self.dataframe['额定风速(m/s)'], errors='coerce')
-
- logger.info(f"数据清理完成,剩余{len(self.dataframe)}行有效记录")
- class DatabaseManager:
- """数据库管理器"""
-
- def __init__(self, config: DatabaseConfig):
- """
- 初始化数据库管理器
-
- Args:
- config: 数据库配置
- """
- self.config = config
- self.connection = None
-
- def connect(self) -> pymysql.Connection:
- """连接到MySQL数据库"""
- try:
- self.connection = pymysql.connect(
- host=self.config.HOST,
- port=self.config.PORT,
- user=self.config.USERNAME,
- password=self.config.PASSWORD,
- database=self.config.DATABASE,
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor
- )
- logger.info("成功连接到数据库")
- return self.connection
- except Error as e:
- logger.error(f"数据库连接失败: {e}")
- raise
-
- def disconnect(self) -> None:
- """断开数据库连接"""
- if self.connection and self.connection.open:
- self.connection.close()
- logger.info("数据库连接已关闭")
-
- def create_table(self) -> None:
- """创建风力发电机功率曲线表"""
- create_table_sql = """
- CREATE TABLE IF NOT EXISTS info_curve_power_turbine (
- id INT AUTO_INCREMENT PRIMARY KEY,
- wind_farm_id VARCHAR(50) NOT NULL COMMENT '风场ID',
- wind_farm_name VARCHAR(100) NOT NULL COMMENT '风场名称',
- standard_model VARCHAR(50) NOT NULL COMMENT '标准机型',
- original_model VARCHAR(50) NOT NULL COMMENT '原始机型',
- wind_speed DECIMAL(6,2) NOT NULL COMMENT '风速(m/s)',
- active_power DECIMAL(10,2) NOT NULL COMMENT '有功功率(kW)',
- description TEXT COMMENT '描述',
- air_density DECIMAL(8,6) COMMENT '空气密度(kg/m³)',
- rotor_diameter DECIMAL(8,2) COMMENT '叶轮直径(m)',
- rated_power DECIMAL(10,2) COMMENT '额定功率(kW)',
- sweep_area DECIMAL(12,2) COMMENT '扫风面积(m²)',
- rated_wind_speed DECIMAL(6,2) COMMENT '额定风速(m/s)',
- power_density DECIMAL(15,12) COMMENT '功率密度(W/m²)',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
-
- INDEX idx_wind_farm (wind_farm_id),
- INDEX idx_model (standard_model),
- INDEX idx_wind_speed (wind_speed),
- INDEX idx_original_model (original_model)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风力发电机功率曲线数据表';
- """
-
- try:
- with self.connection.cursor() as cursor:
- cursor.execute(create_table_sql)
- self.connection.commit()
- logger.info("表 'info_curve_power_turbine' 创建成功或已存在")
- except Error as e:
- logger.error(f"创建表失败: {e}")
- self.connection.rollback()
- raise
-
- def insert_data(self, data: pd.DataFrame) -> int:
- """
- 批量插入数据到数据库
-
- Args:
- data: 包含要插入数据的DataFrame
-
- Returns:
- 插入的行数
- """
- if data.empty:
- logger.warning("没有数据需要插入")
- return 0
-
- # 准备插入SQL
- insert_sql = """
- INSERT INTO info_curve_power_turbine (
- wind_farm_id, wind_farm_name, standard_model, original_model,
- wind_speed, active_power, description, air_density,
- rotor_diameter, rated_power, sweep_area, rated_wind_speed,
- power_density
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
-
- # 准备数据
- records = []
- for _, row in data.iterrows():
- record = (
- str(row.get('风场id', '')).strip(),
- str(row.get('风场名称', '')).strip(),
- str(row.get('标准机型', '')).strip(),
- str(row.get('原始机型', '')).strip(),
- float(row.get('风速', 0.0)),
- float(row.get('有功功率', 0.0)),
- str(row.get('描述', '')).strip(),
- float(row.get('空气密度', 0.0)) if pd.notna(row.get('空气密度')) else None,
- float(row.get('叶轮直径(m)', 0.0)) if pd.notna(row.get('叶轮直径(m)')) else None,
- float(row.get('额定功率(kW)', 0.0)) if pd.notna(row.get('额定功率(kW)')) else None,
- float(row.get('扫风面积(㎡)', 0.0)) if pd.notna(row.get('扫风面积(㎡)')) else None,
- float(row.get('额定风速(m/s)', 0.0)) if pd.notna(row.get('额定风速(m/s)')) else None,
- float(row.get('功率密度(W/㎡)', 0.0)) if pd.notna(row.get('功率密度(W/㎡)')) else None
- )
- records.append(record)
-
- try:
- with self.connection.cursor() as cursor:
- # 批量插入
- cursor.executemany(insert_sql, records)
- self.connection.commit()
-
- inserted_count = cursor.rowcount
- logger.info(f"成功插入 {inserted_count} 条记录到数据库")
- return inserted_count
-
- except Error as e:
- logger.error(f"插入数据失败: {e}")
- self.connection.rollback()
- raise
-
- def count_records(self) -> int:
- """统计表中的记录数"""
- try:
- with self.connection.cursor() as cursor:
- cursor.execute("SELECT COUNT(*) as count FROM info_curve_power_turbine")
- result = cursor.fetchone()
- return result['count'] if result else 0
- except Error as e:
- logger.error(f"统计记录失败: {e}")
- return 0
- class WindDataImporter:
- """风力数据导入器"""
-
- def __init__(self, csv_file_path: str):
- """
- 初始化数据导入器
-
- Args:
- csv_file_path: CSV文件路径
- """
- self.csv_file_path = csv_file_path
- self.data_processor = WindTurbinePowerCurve(csv_file_path)
- self.db_config = DatabaseConfig()
- self.db_manager = None
-
- def import_data(self) -> bool:
- """
- 主导入方法
-
- Returns:
- 导入是否成功
- """
- try:
- logger.info("开始导入风力发电机功率曲线数据...")
-
- # 1. 加载和清理CSV数据
- self.data_processor.load_csv_data()
- self.data_processor.clean_data()
-
- # 2. 连接数据库
- self.db_manager = DatabaseManager(self.db_config)
- self.db_manager.connect()
-
- # 3. 创建表
- self.db_manager.create_table()
-
- # 4. 插入数据
- inserted_count = self.db_manager.insert_data(self.data_processor.dataframe)
-
- # 5. 验证数据
- total_count = self.db_manager.count_records()
- logger.info(f"数据库中的总记录数: {total_count}")
-
- # 6. 断开连接
- self.db_manager.disconnect()
-
- logger.info("数据导入完成!")
- return True
-
- except Exception as e:
- logger.error(f"数据导入过程中出错: {e}")
- # 确保在出错时断开连接
- if self.db_manager:
- try:
- self.db_manager.disconnect()
- except:
- pass
- return False
-
- def get_summary(self) -> Dict[str, Any]:
- """获取数据摘要"""
- if self.data_processor.dataframe is None:
- return {}
-
- df = self.data_processor.dataframe
- summary = {
- '总记录数': len(df),
- '风场数量': df['风场id'].nunique(),
- '标准机型数量': df['标准机型'].nunique(),
- '原始机型数量': df['原始机型'].nunique(),
- '风速范围': f"{df['风速'].min():.1f} - {df['风速'].max():.1f} m/s",
- '有功功率范围': f"{df['有功功率'].min():.1f} - {df['有功功率'].max():.1f} kW",
- '空气密度范围': f"{df['空气密度'].min():.3f} - {df['空气密度'].max():.3f} kg/m³"
- }
-
- return summary
- def main():
- """主函数"""
- # CSV文件路径(根据实际情况修改)
- csv_file_path = f"./data/全部机型功率曲线_含标准类型_解析结果_详细.csv"
-
- # 创建导入器并执行导入
- importer = WindDataImporter(csv_file_path)
-
- # 显示数据摘要
- try:
- importer.data_processor.load_csv_data()
- importer.data_processor.clean_data()
- summary = importer.get_summary()
-
- print("数据摘要:")
- for key, value in summary.items():
- print(f" {key}: {value}")
- print()
- except Exception as e:
- print(f"读取CSV文件时出错: {e}")
- return
-
- # 执行导入
- success = importer.import_data()
-
- if success:
- print("数据导入成功!")
- else:
- print("数据导入失败!")
- if __name__ == "__main__":
- main()
|