import pandas as pd import numpy as np from typing import List, Dict, Any, Optional, Tuple import traceback import logging from datetime import datetime as dt import hashlib import pymysql from pymysql import Connection, cursors from dbutils.pooled_db import PooledDB import json import time import re import threading from threading import Thread, Event import signal import sys from config import DatabaseConfig, TableConfig from file_scanner import ParquetFileInfo # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) class DatabaseManager: """数据库管理器,使用连接池管理数据库连接,包含连通性测试和自动重连""" # 线程局部存储,用于保存每个线程的数据库连接 _thread_local = threading.local() def __init__(self, config: DatabaseConfig, table_config: TableConfig, pool_size: int = 6): self.config = config self.table_config = table_config self.pool_size = pool_size # 连接状态监控相关 self._connection_monitor_thread = None self._monitor_running = Event() self._last_connection_check = None self._connection_status = "UNKNOWN" # UNKNOWN, HEALTHY, WARNING, ERROR, DISCONNECTED self._connection_error_count = 0 self._max_error_count = 5 # 连续错误次数阈值 self._monitor_interval = 1 # 监控间隔(秒) self._monitor_stats = { 'total_checks': 0, 'successful_checks': 0, 'failed_checks': 0, 'total_reconnections': 0, 'last_error': None, 'last_success': None } # 连接池 self.pool: Optional[PooledDB] = None # 初始化连接池 self._init_pool() # 启动连接监控线程 self._start_connection_monitor() # 注册信号处理,优雅关闭 self._setup_signal_handlers() def _setup_signal_handlers(self): """设置信号处理,确保程序退出时能正确关闭资源""" def signal_handler(signum, frame): logger.info(f"接收到信号 {signum},正在关闭数据库连接池...") self.close_all() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # 终止信号 def _init_pool(self): """初始化数据库连接池""" try: logger.info(f"正在初始化数据库连接池到 {self.config.host}:{self.config.port}/{self.config.database}") self.pool = PooledDB( creator=pymysql, # 使用的数据库驱动 maxconnections=self.pool_size, # 连接池允许的最大连接数 mincached=2, # 初始化时创建的连接数 maxcached=5, # 连接池中空闲连接的最大数量 maxshared=3, # 最大共享连接数 blocking=True, # 连接数达到最大时是否阻塞等待 maxusage=None, # 连接的最大使用次数,None表示不限制 setsession=[], # 连接时执行的SQL语句列表 ping=1, # 使用连接前是否ping检查连接可用性 (0=从不, 1=每次, 2=每2次请求, 4=每4次请求, 7=每次请求) host=self.config.host, port=self.config.port, user=self.config.user, password=self.config.password, database=self.config.database, charset=self.config.charset, cursorclass=cursors.DictCursor, autocommit=False, # 手动控制事务 connect_timeout=30, # 连接超时时间 read_timeout=600, # 读取超时时间 write_timeout=600, # 写入超时时间 client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS # 支持多语句 ) logger.info(f"数据库连接池初始化成功") logger.info(f"连接池配置: maxconnections={self.pool_size}, mincached=2, maxcached=5") # 测试连接池 test_result = self._test_pool_connection() if test_result: self._connection_status = "HEALTHY" self._monitor_stats['last_success'] = dt.now() except Exception as e: logger.error(f"数据库连接池初始化失败: {e}") self._connection_status = "ERROR" self._monitor_stats['last_error'] = str(e) raise def _test_pool_connection(self): """测试连接池连接""" try: conn = self.pool.connection() cursor = conn.cursor() start_time = time.time() cursor.execute("SELECT 1 as test, NOW() as server_time") result = cursor.fetchone() elapsed = time.time() - start_time cursor.close() conn.close() # 归还连接 logger.info(f"连接池测试成功: 响应时间={elapsed:.3f}s, 服务器时间={result['server_time']}") return True except Exception as e: logger.error(f"连接池测试失败: {e}") return False def _start_connection_monitor(self): """启动连接监控线程""" if self._connection_monitor_thread is None or not self._connection_monitor_thread.is_alive(): self._monitor_running.clear() self._connection_monitor_thread = Thread( target=self._connection_monitor_loop, name="DBConnectionMonitor", daemon=True # 设置为守护线程,主程序退出时自动结束 ) self._connection_monitor_thread.start() logger.info("数据库连接监控线程已启动") def _stop_connection_monitor(self): """停止连接监控线程""" if self._connection_monitor_thread and self._connection_monitor_thread.is_alive(): self._monitor_running.set() # 设置事件,通知线程退出 self._connection_monitor_thread.join(timeout=5) logger.info("数据库连接监控线程已停止") def _connection_monitor_loop(self): """连接监控循环""" logger.info(f"连接监控线程开始运行,检查间隔: {self._monitor_interval}秒") last_log_time = time.time() log_interval = 60 # 每分钟记录一次状态 while not self._monitor_running.is_set(): try: # 执行连接检查 self._perform_connection_check() # 定期记录状态 current_time = time.time() if current_time - last_log_time >= log_interval: self._log_connection_status() last_log_time = current_time # 等待下一次检查 time.sleep(self._monitor_interval) except Exception as e: logger.error(f"连接监控循环异常: {e}") time.sleep(self._monitor_interval) # 异常后继续尝试 logger.info("连接监控循环结束") def _perform_connection_check(self): """执行连接检查""" try: self._monitor_stats['total_checks'] += 1 self._last_connection_check = dt.now() # 检查连接池是否初始化 if self.pool is None: self._connection_status = "ERROR" self._connection_error_count += 1 self._monitor_stats['failed_checks'] += 1 self._monitor_stats['last_error'] = "连接池未初始化" logger.warning("连接池未初始化") return # 尝试获取连接并执行简单查询 conn = None cursor = None try: conn = self.pool.connection() cursor = conn.cursor() # 执行一个简单的查询测试连接 start_time = time.time() cursor.execute("SELECT 1 as test, NOW() as server_time, " "VERSION() as version, CONNECTION_ID() as connection_id") result = cursor.fetchone() elapsed = time.time() - start_time # 更新状态 self._connection_status = "HEALTHY" self._connection_error_count = 0 self._monitor_stats['successful_checks'] += 1 self._monitor_stats['last_success'] = dt.now() # 记录详细连接信息(调试级别) if logger.isEnabledFor(logging.DEBUG): logger.debug(f"连接检查成功: " f"响应时间={elapsed:.3f}s, " f"服务器时间={result['server_time']}, " f"MySQL版本={result['version']}, " f"连接ID={result['connection_id']}") except Exception as e: self._connection_status = "ERROR" self._connection_error_count += 1 self._monitor_stats['failed_checks'] += 1 self._monitor_stats['last_error'] = str(e) # 根据错误计数判断连接状态 if self._connection_error_count >= self._max_error_count: self._connection_status = "DISCONNECTED" logger.error(f"数据库连接失败,已连续失败 {self._connection_error_count} 次: {e}") else: logger.warning(f"数据库连接检查失败 (第{self._connection_error_count}次): {e}") # 尝试自动重连 if self._connection_error_count >= 3: self._auto_reconnect() finally: if cursor: cursor.close() if conn: conn.close() # 归还连接到连接池 except Exception as e: logger.error(f"执行连接检查时发生异常: {e}") self._monitor_stats['failed_checks'] += 1 self._monitor_stats['last_error'] = str(e) def _auto_reconnect(self): """自动重连机制""" try: logger.warning(f"检测到连接问题,正在尝试自动重连 (错误计数: {self._connection_error_count})") # 1. 先释放当前线程的连接 self.release_connection() # 2. 等待一小段时间 time.sleep(2) # 3. 测试当前连接池 if self.pool: test_result = self._test_pool_connection() if test_result: self._connection_status = "HEALTHY" self._connection_error_count = 0 self._monitor_stats['total_reconnections'] += 1 logger.info("自动重连成功") return True else: logger.warning("连接池测试失败,尝试重新初始化") # 4. 重新初始化连接池 old_pool = self.pool try: self.pool = None if old_pool: old_pool.close() self._init_pool() self._connection_status = "HEALTHY" self._connection_error_count = 0 self._monitor_stats['total_reconnections'] += 1 logger.info("连接池重新初始化成功") return True except Exception as e: logger.error(f"连接池重新初始化失败: {e}") self._connection_status = "ERROR" return False except Exception as e: logger.error(f"自动重连失败: {e}") return False def _log_connection_status(self): """记录连接状态""" status_map = { "HEALTHY": "✅ 健康", "WARNING": "⚠️ 警告", "ERROR": "❌ 错误", "DISCONNECTED": "🔌 断开连接", "UNKNOWN": "❓ 未知" } status_text = status_map.get(self._connection_status, self._connection_status) stats_info = ( f"连接状态统计:\n" f" 当前状态: {status_text}\n" f" 总检查次数: {self._monitor_stats['total_checks']}\n" f" 成功次数: {self._monitor_stats['successful_checks']}\n" f" 失败次数: {self._monitor_stats['failed_checks']}\n" f" 连续错误次数: {self._connection_error_count}\n" f" 总重连次数: {self._monitor_stats['total_reconnections']}\n" f" 最后成功: {self._monitor_stats['last_success']}\n" f" 最后错误: {self._monitor_stats['last_error']}" ) if self._connection_status in ["HEALTHY", "WARNING"]: logger.info(stats_info) else: logger.warning(stats_info) def get_connection_status(self) -> Dict[str, Any]: """获取当前连接状态""" return { 'status': self._connection_status, 'error_count': self._connection_error_count, 'last_check': self._last_connection_check, 'monitor_stats': self._monitor_stats.copy(), 'config': { 'host': self.config.host, 'port': self.config.port, 'database': self.config.database, 'pool_size': self.pool_size } } def wait_for_connection(self, timeout: int = 30, check_interval: int = 1) -> bool: """ 等待数据库连接恢复正常 Args: timeout: 总等待时间(秒) check_interval: 检查间隔(秒) Returns: bool: 是否成功连接 """ logger.info(f"等待数据库连接恢复,超时时间: {timeout}秒") start_time = time.time() attempts = 0 while time.time() - start_time < timeout: attempts += 1 # 检查连接状态 if self._connection_status == "HEALTHY": logger.info(f"数据库连接已恢复,等待时间: {time.time() - start_time:.1f}秒") return True # 如果连接断开,尝试立即重连 if self._connection_status == "DISCONNECTED": logger.info(f"尝试重连 (第{attempts}次)") if self._auto_reconnect(): return True # 等待下一次检查 logger.info(f"等待连接恢复... ({attempts}/{int(timeout/check_interval)})") time.sleep(check_interval) logger.error(f"等待数据库连接恢复超时 ({timeout}秒)") return False def get_connection(self) -> Connection: """从连接池获取数据库连接""" try: # 检查当前连接状态 if self._connection_status in ["ERROR", "DISCONNECTED"]: logger.warning(f"获取连接时检测到连接状态为 {self._connection_status},尝试自动重连") if not self.wait_for_connection(timeout=10): raise ConnectionError(f"数据库连接不可用,当前状态: {self._connection_status}") # 检查线程局部存储中是否有连接 if not hasattr(self._thread_local, 'connection') or self._thread_local.connection is None: # 从连接池获取新连接 conn = self.pool.connection() self._thread_local.connection = conn logger.debug(f"从连接池获取新连接,当前线程: {threading.current_thread().name}") return self._thread_local.connection except Exception as e: logger.error(f"从连接池获取连接失败: {e}") # 更新连接状态 self._connection_status = "ERROR" self._connection_error_count += 1 self._monitor_stats['last_error'] = str(e) # 尝试自动重连 self._auto_reconnect() raise def release_connection(self): """释放当前线程的数据库连接回连接池""" try: if hasattr(self._thread_local, 'connection') and self._thread_local.connection is not None: conn = self._thread_local.connection try: # 修复:SteadyDBConnection 需要访问 _con 属性获取原始连接 # 检查原始连接的 autocommit 状态 if hasattr(conn, '_con') and isinstance(conn._con, Connection): original_conn = conn._con if not original_conn.get_autocommit(): original_conn.rollback() else: # 备选方案:直接尝试回滚,不检查 autocommit conn.rollback() # 关闭连接(实际上是归还到连接池) conn.close() logger.debug(f"连接已归还到连接池,当前线程: {threading.current_thread().name}") except Exception as e: logger.warning(f"关闭连接时出错: {e}") finally: self._thread_local.connection = None except Exception as e: logger.error(f"释放连接时出错: {e}") def check_connection(self) -> bool: """检查连接是否有效""" try: conn = self.get_connection() # 执行一个简单的查询来检查连接 cursor = None try: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() return True finally: if cursor: cursor.close() except Exception as e: logger.warning(f"连接检查失败: {e}") # 释放无效连接 self.release_connection() return False def reconnect(self): """重新连接数据库(连接池会自动处理)""" try: logger.info("正在重新连接数据库...") # 释放当前连接,下次获取时会自动创建新连接 self.release_connection() time.sleep(1) # 等待1秒 # 获取新连接 self.get_connection() logger.info("数据库重新连接成功") except Exception as e: logger.error(f"数据库重新连接失败: {e}") raise def close_all(self): """关闭所有资源,包括连接池和监控线程""" logger.info("正在关闭所有数据库资源...") # 停止监控线程 self._stop_connection_monitor() # 关闭连接池 self.close_pool() # 记录最终状态 self._log_connection_status() logger.info("所有数据库资源已关闭") def close_pool(self): """关闭整个连接池""" try: if self.pool: # 释放当前线程的连接 self.release_connection() # 关闭连接池 self.pool.close() self.pool = None self._connection_status = "DISCONNECTED" logger.info("数据库连接池已关闭") except Exception as e: logger.error(f"关闭连接池时出错: {e}") # 以下为原有的业务方法,保持不变 def create_table_with_unique_key(self, table_name: str, columns: List[str], unique_keys: List[str]) -> bool: """根据列定义创建表,包含三字段唯一键,数据字段使用DOUBLE类型""" # 检查连接状态 if not self.wait_for_connection(): logger.error("创建表失败:数据库连接不可用") return False conn = None cursor = None try: conn = self.get_connection() cursor = conn.cursor() try: # 删除已存在的表 drop_sql = f"DROP TABLE IF EXISTS `{table_name}`" cursor.execute(drop_sql) logger.info(f"已删除旧表: {table_name}") # 构建创建表的SQL columns_sql = ",\n ".join(columns) # 添加三字段唯一键约束 unique_keys_str = ', '.join([f'`{key}`' for key in unique_keys]) create_sql = f""" CREATE TABLE `{table_name}` ( `id` BIGINT AUTO_INCREMENT PRIMARY KEY, {columns_sql}, `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `data_hash` VARCHAR(64) COMMENT '数据哈希值,用于快速比较', UNIQUE KEY uk_turbine_data ({unique_keys_str}), INDEX idx_farm (`id_farm`), INDEX idx_turbine (`id_turbine`), INDEX idx_time (`data_time`), INDEX idx_farm_turbine (`id_farm`, `id_turbine`), INDEX idx_composite (`id_farm`, `id_turbine`, `data_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ logger.debug(f"创建表的SQL语句:\n{create_sql}") # 执行创建表 cursor.execute(create_sql) conn.commit() logger.info(f"表 '{table_name}' 创建成功!") logger.info(f"三字段唯一键: {unique_keys}") return True finally: if cursor: try: cursor.close() except: pass except Exception as e: logger.error(f"创建表失败: {e}") logger.error(traceback.format_exc()) if conn: conn.rollback() return False finally: # 不释放连接,让调用者控制 pass # ... 其余的业务方法保持不变,包括: # create_data_scada_turbine_table # _clean_and_convert_simple # _prepare_upsert_sql # _convert_to_numeric # _convert_row_to_tuple # _escape_sql_value # _get_sql_with_values # _log_failed_row_details # batch_upsert_data_direct # upsert_parquet_data # check_table_exists # get_table_row_count # get_table_stats # check_duplicate_keys # with_connection 装饰器 # 添加一个独立的连接测试函数,方便外部调用 def test_database_connection(config: DatabaseConfig, test_query: str = "SELECT 1") -> Dict[str, Any]: """ 测试数据库连接 Args: config: 数据库配置 test_query: 测试查询语句 Returns: Dict[str, Any]: 测试结果 """ result = { 'success': False, 'error': None, 'response_time': None, 'server_info': None, 'timestamp': dt.now().isoformat() } conn = None cursor = None try: # 记录开始时间 start_time = time.time() # 建立连接 conn = pymysql.connect( host=config.host, port=config.port, user=config.user, password=config.password, database=config.database, charset=config.charset, cursorclass=cursors.DictCursor, connect_timeout=10, read_timeout=10 ) # 执行测试查询 cursor = conn.cursor() cursor.execute(test_query) query_result = cursor.fetchone() # 获取服务器信息 cursor.execute("SELECT VERSION() as version, DATABASE() as database_name, " "USER() as user, NOW() as server_time") server_info = cursor.fetchone() # 计算响应时间 response_time = time.time() - start_time result.update({ 'success': True, 'response_time': response_time, 'server_info': server_info, 'test_result': query_result }) logger.info(f"数据库连接测试成功: {config.host}:{config.port}/{config.database}") logger.info(f"响应时间: {response_time:.3f}s") logger.info(f"MySQL版本: {server_info['version']}") logger.info(f"当前数据库: {server_info['database_name']}") except Exception as e: result['error'] = str(e) logger.error(f"数据库连接测试失败: {e}") finally: if cursor: cursor.close() if conn: conn.close() return result # 添加一个监控守护进程示例 class DatabaseMonitorDaemon: """数据库监控守护进程""" def __init__(self, db_manager: DatabaseManager, check_interval: int = 5): self.db_manager = db_manager self.check_interval = check_interval self.monitor_thread = None self.running = False def start(self): """启动监控""" if not self.running: self.running = True self.monitor_thread = Thread(target=self._monitor_loop, daemon=True) self.monitor_thread.start() logger.info(f"数据库监控守护进程已启动,检查间隔: {self.check_interval}秒") def stop(self): """停止监控""" self.running = False if self.monitor_thread: self.monitor_thread.join(timeout=10) logger.info("数据库监控守护进程已停止") def _monitor_loop(self): """监控循环""" while self.running: try: # 获取当前状态 status = self.db_manager.get_connection_status() # 记录状态变化 if status['status'] != "HEALTHY": logger.warning(f"数据库连接状态异常: {status['status']}") # 如果连续错误超过阈值,尝试自动修复 if status['error_count'] >= 3: logger.info("检测到连续错误,尝试自动修复连接...") self.db_manager._auto_reconnect() # 等待下一次检查 time.sleep(self.check_interval) except Exception as e: logger.error(f"监控循环异常: {e}") time.sleep(self.check_interval)