| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 |
- import pymysql
- from typing import List, Dict, Any, Optional
- import logging
- from datetime import datetime
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- # SQL语句定义
- CREATE_MODEL_TURBINE_TABLE_SQL = """
- CREATE TABLE IF NOT EXISTS info_model_turbine (
- id INT AUTO_INCREMENT PRIMARY KEY,
- no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识',
- model VARCHAR(100) COMMENT '机型',
- manufacturer VARCHAR(100) COMMENT '制造商',
- rated_capacity INT COMMENT '额定容量(kW)',
- cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)',
- cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)',
- rotor_diameter INT COMMENT '叶轮直径(m)',
- hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)',
- turbine_count INT DEFAULT 0 COMMENT '该机型风机数量',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY idx_no_model (no_model),
- INDEX idx_model (model),
- INDEX idx_manufacturer (manufacturer),
- INDEX idx_rated_capacity (rated_capacity)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表';
- """
- SELECT_MODEL_DATA_SQL = """
- SELECT
- CONCAT(
- IFNULL(model, ''),
- '-',
- IFNULL(cut_in_wind_speed, ''),
- '-',
- IFNULL(cut_out_wind_speed, ''),
- '-',
- IFNULL(hub_height, '')
- ) AS no_model,
- model,
- manufacturer,
- rated_capacity,
- cut_in_wind_speed,
- cut_out_wind_speed,
- rotor_diameter,
- hub_height,
- COUNT(*) AS turbine_count
- FROM info_turbine
- WHERE model IS NOT NULL
- GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height
- ORDER BY model, manufacturer, rated_capacity;
- """
- DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine"
- CHECK_TABLE_EXISTS_SQL = """
- SELECT COUNT(*) as table_exists
- FROM information_schema.tables
- WHERE table_schema = %s AND table_name = 'info_model_turbine'
- """
- class DatabaseConfig:
- """数据库配置类"""
- def __init__(self, host='192.168.50.234', port=4000, user='root',
- password='123456', database='wind_data', charset='utf8mb4'):
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.database = database
- self.charset = charset
-
- def get_connection_config(self) -> Dict[str, Any]:
- """获取数据库连接配置"""
- return {
- 'host': self.host,
- 'port': self.port,
- 'user': self.user,
- 'password': self.password,
- 'database': self.database,
- 'charset': self.charset,
- 'cursorclass': pymysql.cursors.DictCursor # 返回字典格式的结果
- }
- class ModelTurbineManager:
- """风机机型信息管理器"""
-
- def __init__(self, db_config: DatabaseConfig):
- """
- 初始化管理器
-
- Args:
- db_config: 数据库配置对象
- """
- self.db_config = db_config
- self.connection = None
-
- def connect(self) -> pymysql.connections.Connection:
- """创建数据库连接"""
- try:
- self.connection = pymysql.connect(**self.db_config.get_connection_config())
- logger.info(f"成功连接到数据库: {self.db_config.host}:{self.db_config.port}/{self.db_config.database}")
- return self.connection
- except Exception as e:
- logger.error(f"数据库连接失败: {e}")
- raise
-
- def close(self):
- """关闭数据库连接"""
- if self.connection:
- self.connection.close()
- logger.info("数据库连接已关闭")
-
- def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
- """
- 执行查询语句
-
- Args:
- sql: SQL查询语句
- params: SQL参数
-
- Returns:
- 查询结果列表
- """
- if not self.connection:
- self.connect()
-
- try:
- with self.connection.cursor() as cursor:
- cursor.execute(sql, params)
- result = cursor.fetchall()
- logger.debug(f"查询执行成功,返回 {len(result)} 条记录")
- return result
- except Exception as e:
- logger.error(f"查询执行失败: {e}")
- raise
-
- def execute_update(self, sql: str, params: Optional[tuple] = None) -> int:
- """
- 执行更新语句(INSERT, UPDATE, DELETE)
-
- Args:
- sql: SQL更新语句
- params: SQL参数
-
- Returns:
- 影响的行数
- """
- if not self.connection:
- self.connect()
-
- try:
- with self.connection.cursor() as cursor:
- affected_rows = cursor.execute(sql, params)
- self.connection.commit()
- logger.debug(f"更新执行成功,影响 {affected_rows} 行")
- return affected_rows
- except Exception as e:
- self.connection.rollback()
- logger.error(f"更新执行失败: {e}")
- raise
-
- def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool:
- """
- 检查表是否存在
-
- Args:
- table_name: 表名
-
- Returns:
- bool: 表是否存在
- """
- try:
- result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,))
- return result[0]['table_exists'] > 0
- except Exception as e:
- logger.error(f"检查表存在性失败: {e}")
- return False
-
- def create_model_turbine_table(self) -> bool:
- """
- 创建风机机型信息表
-
- Returns:
- bool: 是否成功创建表
- """
- try:
- logger.info("开始创建风机机型信息表...")
- self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL)
- logger.info("风机机型信息表创建成功")
- return True
- except Exception as e:
- logger.error(f"创建风机机型信息表失败: {e}")
- return False
-
- def drop_model_turbine_table(self) -> bool:
- """
- 删除风机机型信息表
-
- Returns:
- bool: 是否成功删除表
- """
- try:
- logger.info("开始删除风机机型信息表...")
- self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL)
- logger.info("风机机型信息表删除成功")
- return True
- except Exception as e:
- logger.error(f"删除风机机型信息表失败: {e}")
- return False
-
- def get_model_data(self) -> List[Dict[str, Any]]:
- """
- 从info_turbine表获取机型分组数据
-
- Returns:
- 机型数据列表
- """
- try:
- logger.info("开始从info_turbine表查询机型数据...")
- data = self.execute_query(SELECT_MODEL_DATA_SQL)
- logger.info(f"成功查询到 {len(data)} 条机型记录")
- return data
- except Exception as e:
- logger.error(f"查询机型数据失败: {e}")
- return []
-
- def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int:
- """
- 将机型数据插入到info_model_turbine表
-
- Args:
- model_data: 机型数据列表
-
- Returns:
- int: 成功插入的记录数
- """
- if not model_data:
- logger.warning("没有数据需要插入")
- return 0
-
- insert_sql = """
- INSERT INTO info_model_turbine
- (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed,
- cut_out_wind_speed, rotor_diameter, hub_height, turbine_count)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- turbine_count = VALUES(turbine_count),
- updated_at = CURRENT_TIMESTAMP
- """
-
- try:
- logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...")
-
- # 准备插入数据
- insert_data = []
- for item in model_data:
- insert_data.append((
- item['no_model'],
- item['model'],
- item['manufacturer'],
- item['rated_capacity'],
- item['cut_in_wind_speed'],
- item['cut_out_wind_speed'],
- item['rotor_diameter'],
- item['hub_height'],
- item.get('turbine_count', 1) # 使用查询中的count值
- ))
-
- # 批量插入数据
- with self.connection.cursor() as cursor:
- affected_rows = cursor.executemany(insert_sql, insert_data)
- self.connection.commit()
-
- logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表")
- return affected_rows
-
- except Exception as e:
- self.connection.rollback()
- logger.error(f"插入机型数据失败: {e}")
- raise
-
- def get_model_turbine_stats(self) -> Dict[str, Any]:
- """
- 获取info_model_turbine表的统计信息
-
- Returns:
- 统计信息字典
- """
- try:
- # 查询统计信息
- stats_sql = """
- SELECT
- COUNT(*) as total_models,
- COUNT(DISTINCT manufacturer) as manufacturer_count,
- COUNT(DISTINCT model) as model_count,
- MIN(rated_capacity) as min_capacity,
- MAX(rated_capacity) as max_capacity,
- AVG(rated_capacity) as avg_capacity,
- SUM(turbine_count) as total_turbines
- FROM info_model_turbine
- """
-
- result = self.execute_query(stats_sql)
- return result[0] if result else {}
-
- except Exception as e:
- logger.error(f"获取统计信息失败: {e}")
- return {}
-
- def print_model_summary(self, model_data: List[Dict[str, Any]]):
- """
- 打印机型数据摘要
-
- Args:
- model_data: 机型数据列表
- """
- if not model_data:
- logger.info("没有机型数据")
- return
-
- print("\n" + "="*80)
- print("风机机型数据摘要")
- print("="*80)
- print(f"总机型数: {len(model_data)}")
-
- # 按制造商统计
- manufacturers = {}
- for item in model_data:
- manufacturer = item.get('manufacturer', '未知')
- manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1
-
- print(f"制造商数: {len(manufacturers)}")
- print("\n制造商分布:")
- for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True):
- print(f" {manufacturer}: {count} 种机型")
-
- # 按额定容量统计
- capacities = {}
- for item in model_data:
- capacity = item.get('rated_capacity', 0)
- if capacity:
- capacity_range = f"{capacity}kW"
- capacities[capacity_range] = capacities.get(capacity_range, 0) + 1
-
- print("\n额定容量分布:")
- for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))):
- print(f" {capacity}: {count} 种机型")
-
- print("="*80)
-
- def run_model_extraction_pipeline(self, recreate_table: bool = False) -> bool:
- """
- 运行完整的机型数据提取和入库流程
-
- Args:
- recreate_table: 是否重新创建表
-
- Returns:
- bool: 整个流程是否成功
- """
- try:
- logger.info("开始执行风机机型数据提取流程...")
-
- # 步骤1: 连接数据库
- self.connect()
-
- # 步骤2: 检查或创建表
- if recreate_table:
- self.drop_model_turbine_table()
- self.create_model_turbine_table()
- else:
- if not self.check_table_exists():
- self.create_model_turbine_table()
- else:
- logger.info("info_model_turbine表已存在,将追加数据")
-
- # 步骤3: 从info_turbine表获取机型数据
- model_data = self.get_model_data()
- if not model_data:
- logger.error("未获取到机型数据,流程终止")
- return False
-
- # 步骤4: 打印摘要信息
- self.print_model_summary(model_data)
-
- # 步骤5: 插入数据到info_model_turbine表
- inserted_count = self.insert_model_data(model_data)
-
- # 步骤6: 获取并显示统计信息
- stats = self.get_model_turbine_stats()
- if stats:
- print("\n数据库统计信息:")
- print(f" 总机型数: {stats.get('total_models', 0)}")
- print(f" 制造商数: {stats.get('manufacturer_count', 0)}")
- print(f" 总风机数: {stats.get('total_turbines', 0)}")
- print(f" 额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW")
- print(f" 平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW")
-
- logger.info("风机机型数据提取流程执行完成!")
- return True
-
- except Exception as e:
- logger.error(f"机型数据提取流程执行失败: {e}")
- return False
- finally:
- # 关闭连接
- self.close()
- def main():
- """主函数"""
- # 数据库配置
- db_config = DatabaseConfig(
- # host="192.168.50.234",
- # port=4000,
- host="106.120.102.238",
- port=44000,
- user='root',
- password='123456',
- database='wind_data',
- charset='utf8mb4'
- )
-
- # 创建管理器实例
- manager = ModelTurbineManager(db_config)
-
- # 运行机型数据提取流程
- # 设置 recreate_table=True 会删除并重新创建表
- # 设置 recreate_table=False 会追加数据(使用ON DUPLICATE KEY UPDATE)
- success = manager.run_model_extraction_pipeline(
- recreate_table=False # 设置为True会重新创建表
- )
-
- if success:
- logger.info("风机机型数据提取成功完成!")
- else:
- logger.error("风机机型数据提取失败!")
- if __name__ == "__main__":
- main()
|