| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722 |
- import pandas as pd
- import numpy as np
- from typing import List, Dict, Any, Optional, Tuple
- import traceback
- import logging
- from datetime import datetime as dt
- import hashlib
- import pymysql
- from pymysql import Connection, cursors
- from dbutils.pooled_db import PooledDB
- import json
- import time
- import re
- import threading
- from threading import Thread, Event
- import signal
- import sys
- from config import DatabaseConfig, TableConfig
- from file_scanner import ParquetFileInfo
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- logger = logging.getLogger(__name__)
- class DatabaseManager:
- """数据库管理器,使用连接池管理数据库连接,包含连通性测试和自动重连"""
-
- # 线程局部存储,用于保存每个线程的数据库连接
- _thread_local = threading.local()
-
- def __init__(self, config: DatabaseConfig, table_config: TableConfig, pool_size: int = 6):
- self.config = config
- self.table_config = table_config
- self.pool_size = pool_size
-
- # 连接状态监控相关
- self._connection_monitor_thread = None
- self._monitor_running = Event()
- self._last_connection_check = None
- self._connection_status = "UNKNOWN" # UNKNOWN, HEALTHY, WARNING, ERROR, DISCONNECTED
- self._connection_error_count = 0
- self._max_error_count = 5 # 连续错误次数阈值
- self._monitor_interval = 1 # 监控间隔(秒)
- self._monitor_stats = {
- 'total_checks': 0,
- 'successful_checks': 0,
- 'failed_checks': 0,
- 'total_reconnections': 0,
- 'last_error': None,
- 'last_success': None
- }
-
- # 连接池
- self.pool: Optional[PooledDB] = None
-
- # 初始化连接池
- self._init_pool()
-
- # 启动连接监控线程
- self._start_connection_monitor()
-
- # 注册信号处理,优雅关闭
- self._setup_signal_handlers()
-
- def _setup_signal_handlers(self):
- """设置信号处理,确保程序退出时能正确关闭资源"""
- def signal_handler(signum, frame):
- logger.info(f"接收到信号 {signum},正在关闭数据库连接池...")
- self.close_all()
- sys.exit(0)
-
- signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
- signal.signal(signal.SIGTERM, signal_handler) # 终止信号
-
- def _init_pool(self):
- """初始化数据库连接池"""
- try:
- logger.info(f"正在初始化数据库连接池到 {self.config.host}:{self.config.port}/{self.config.database}")
-
- self.pool = PooledDB(
- creator=pymysql, # 使用的数据库驱动
- maxconnections=self.pool_size, # 连接池允许的最大连接数
- mincached=2, # 初始化时创建的连接数
- maxcached=5, # 连接池中空闲连接的最大数量
- maxshared=3, # 最大共享连接数
- blocking=True, # 连接数达到最大时是否阻塞等待
- maxusage=None, # 连接的最大使用次数,None表示不限制
- setsession=[], # 连接时执行的SQL语句列表
- ping=1, # 使用连接前是否ping检查连接可用性 (0=从不, 1=每次, 2=每2次请求, 4=每4次请求, 7=每次请求)
- host=self.config.host,
- port=self.config.port,
- user=self.config.user,
- password=self.config.password,
- database=self.config.database,
- charset=self.config.charset,
- cursorclass=cursors.DictCursor,
- autocommit=False, # 手动控制事务
- connect_timeout=30, # 连接超时时间
- read_timeout=600, # 读取超时时间
- write_timeout=600, # 写入超时时间
- client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS # 支持多语句
- )
-
- logger.info(f"数据库连接池初始化成功")
- logger.info(f"连接池配置: maxconnections={self.pool_size}, mincached=2, maxcached=5")
-
- # 测试连接池
- test_result = self._test_pool_connection()
- if test_result:
- self._connection_status = "HEALTHY"
- self._monitor_stats['last_success'] = dt.now()
-
- except Exception as e:
- logger.error(f"数据库连接池初始化失败: {e}")
- self._connection_status = "ERROR"
- self._monitor_stats['last_error'] = str(e)
- raise
-
- def _test_pool_connection(self):
- """测试连接池连接"""
- try:
- conn = self.pool.connection()
- cursor = conn.cursor()
- start_time = time.time()
- cursor.execute("SELECT 1 as test, NOW() as server_time")
- result = cursor.fetchone()
- elapsed = time.time() - start_time
-
- cursor.close()
- conn.close() # 归还连接
-
- logger.info(f"连接池测试成功: 响应时间={elapsed:.3f}s, 服务器时间={result['server_time']}")
- return True
-
- except Exception as e:
- logger.error(f"连接池测试失败: {e}")
- return False
-
- def _start_connection_monitor(self):
- """启动连接监控线程"""
- if self._connection_monitor_thread is None or not self._connection_monitor_thread.is_alive():
- self._monitor_running.clear()
- self._connection_monitor_thread = Thread(
- target=self._connection_monitor_loop,
- name="DBConnectionMonitor",
- daemon=True # 设置为守护线程,主程序退出时自动结束
- )
- self._connection_monitor_thread.start()
- logger.info("数据库连接监控线程已启动")
-
- def _stop_connection_monitor(self):
- """停止连接监控线程"""
- if self._connection_monitor_thread and self._connection_monitor_thread.is_alive():
- self._monitor_running.set() # 设置事件,通知线程退出
- self._connection_monitor_thread.join(timeout=5)
- logger.info("数据库连接监控线程已停止")
-
- def _connection_monitor_loop(self):
- """连接监控循环"""
- logger.info(f"连接监控线程开始运行,检查间隔: {self._monitor_interval}秒")
-
- last_log_time = time.time()
- log_interval = 60 # 每分钟记录一次状态
-
- while not self._monitor_running.is_set():
- try:
- # 执行连接检查
- self._perform_connection_check()
-
- # 定期记录状态
- current_time = time.time()
- if current_time - last_log_time >= log_interval:
- self._log_connection_status()
- last_log_time = current_time
-
- # 等待下一次检查
- time.sleep(self._monitor_interval)
-
- except Exception as e:
- logger.error(f"连接监控循环异常: {e}")
- time.sleep(self._monitor_interval) # 异常后继续尝试
-
- logger.info("连接监控循环结束")
-
- def _perform_connection_check(self):
- """执行连接检查"""
- try:
- self._monitor_stats['total_checks'] += 1
- self._last_connection_check = dt.now()
-
- # 检查连接池是否初始化
- if self.pool is None:
- self._connection_status = "ERROR"
- self._connection_error_count += 1
- self._monitor_stats['failed_checks'] += 1
- self._monitor_stats['last_error'] = "连接池未初始化"
- logger.warning("连接池未初始化")
- return
-
- # 尝试获取连接并执行简单查询
- conn = None
- cursor = None
- try:
- conn = self.pool.connection()
- cursor = conn.cursor()
-
- # 执行一个简单的查询测试连接
- start_time = time.time()
- cursor.execute("SELECT 1 as test, NOW() as server_time, "
- "VERSION() as version, CONNECTION_ID() as connection_id")
- result = cursor.fetchone()
- elapsed = time.time() - start_time
-
- # 更新状态
- self._connection_status = "HEALTHY"
- self._connection_error_count = 0
- self._monitor_stats['successful_checks'] += 1
- self._monitor_stats['last_success'] = dt.now()
-
- # 记录详细连接信息(调试级别)
- if logger.isEnabledFor(logging.DEBUG):
- logger.debug(f"连接检查成功: "
- f"响应时间={elapsed:.3f}s, "
- f"服务器时间={result['server_time']}, "
- f"MySQL版本={result['version']}, "
- f"连接ID={result['connection_id']}")
-
- except Exception as e:
- self._connection_status = "ERROR"
- self._connection_error_count += 1
- self._monitor_stats['failed_checks'] += 1
- self._monitor_stats['last_error'] = str(e)
-
- # 根据错误计数判断连接状态
- if self._connection_error_count >= self._max_error_count:
- self._connection_status = "DISCONNECTED"
- logger.error(f"数据库连接失败,已连续失败 {self._connection_error_count} 次: {e}")
- else:
- logger.warning(f"数据库连接检查失败 (第{self._connection_error_count}次): {e}")
-
- # 尝试自动重连
- if self._connection_error_count >= 3:
- self._auto_reconnect()
-
- finally:
- if cursor:
- cursor.close()
- if conn:
- conn.close() # 归还连接到连接池
-
- except Exception as e:
- logger.error(f"执行连接检查时发生异常: {e}")
- self._monitor_stats['failed_checks'] += 1
- self._monitor_stats['last_error'] = str(e)
-
- def _auto_reconnect(self):
- """自动重连机制"""
- try:
- logger.warning(f"检测到连接问题,正在尝试自动重连 (错误计数: {self._connection_error_count})")
-
- # 1. 先释放当前线程的连接
- self.release_connection()
-
- # 2. 等待一小段时间
- time.sleep(2)
-
- # 3. 测试当前连接池
- if self.pool:
- test_result = self._test_pool_connection()
- if test_result:
- self._connection_status = "HEALTHY"
- self._connection_error_count = 0
- self._monitor_stats['total_reconnections'] += 1
- logger.info("自动重连成功")
- return True
- else:
- logger.warning("连接池测试失败,尝试重新初始化")
-
- # 4. 重新初始化连接池
- old_pool = self.pool
- try:
- self.pool = None
- if old_pool:
- old_pool.close()
-
- self._init_pool()
- self._connection_status = "HEALTHY"
- self._connection_error_count = 0
- self._monitor_stats['total_reconnections'] += 1
- logger.info("连接池重新初始化成功")
- return True
-
- except Exception as e:
- logger.error(f"连接池重新初始化失败: {e}")
- self._connection_status = "ERROR"
- return False
-
- except Exception as e:
- logger.error(f"自动重连失败: {e}")
- return False
-
- def _log_connection_status(self):
- """记录连接状态"""
- status_map = {
- "HEALTHY": "✅ 健康",
- "WARNING": "⚠️ 警告",
- "ERROR": "❌ 错误",
- "DISCONNECTED": "🔌 断开连接",
- "UNKNOWN": "❓ 未知"
- }
-
- status_text = status_map.get(self._connection_status, self._connection_status)
-
- stats_info = (
- f"连接状态统计:\n"
- f" 当前状态: {status_text}\n"
- f" 总检查次数: {self._monitor_stats['total_checks']}\n"
- f" 成功次数: {self._monitor_stats['successful_checks']}\n"
- f" 失败次数: {self._monitor_stats['failed_checks']}\n"
- f" 连续错误次数: {self._connection_error_count}\n"
- f" 总重连次数: {self._monitor_stats['total_reconnections']}\n"
- f" 最后成功: {self._monitor_stats['last_success']}\n"
- f" 最后错误: {self._monitor_stats['last_error']}"
- )
-
- if self._connection_status in ["HEALTHY", "WARNING"]:
- logger.info(stats_info)
- else:
- logger.warning(stats_info)
-
- def get_connection_status(self) -> Dict[str, Any]:
- """获取当前连接状态"""
- return {
- 'status': self._connection_status,
- 'error_count': self._connection_error_count,
- 'last_check': self._last_connection_check,
- 'monitor_stats': self._monitor_stats.copy(),
- 'config': {
- 'host': self.config.host,
- 'port': self.config.port,
- 'database': self.config.database,
- 'pool_size': self.pool_size
- }
- }
-
- def wait_for_connection(self, timeout: int = 30, check_interval: int = 1) -> bool:
- """
- 等待数据库连接恢复正常
-
- Args:
- timeout: 总等待时间(秒)
- check_interval: 检查间隔(秒)
-
- Returns:
- bool: 是否成功连接
- """
- logger.info(f"等待数据库连接恢复,超时时间: {timeout}秒")
-
- start_time = time.time()
- attempts = 0
-
- while time.time() - start_time < timeout:
- attempts += 1
-
- # 检查连接状态
- if self._connection_status == "HEALTHY":
- logger.info(f"数据库连接已恢复,等待时间: {time.time() - start_time:.1f}秒")
- return True
-
- # 如果连接断开,尝试立即重连
- if self._connection_status == "DISCONNECTED":
- logger.info(f"尝试重连 (第{attempts}次)")
- if self._auto_reconnect():
- return True
-
- # 等待下一次检查
- logger.info(f"等待连接恢复... ({attempts}/{int(timeout/check_interval)})")
- time.sleep(check_interval)
-
- logger.error(f"等待数据库连接恢复超时 ({timeout}秒)")
- return False
-
- def get_connection(self) -> Connection:
- """从连接池获取数据库连接"""
- try:
- # 检查当前连接状态
- if self._connection_status in ["ERROR", "DISCONNECTED"]:
- logger.warning(f"获取连接时检测到连接状态为 {self._connection_status},尝试自动重连")
- if not self.wait_for_connection(timeout=10):
- raise ConnectionError(f"数据库连接不可用,当前状态: {self._connection_status}")
-
- # 检查线程局部存储中是否有连接
- if not hasattr(self._thread_local, 'connection') or self._thread_local.connection is None:
- # 从连接池获取新连接
- conn = self.pool.connection()
- self._thread_local.connection = conn
- logger.debug(f"从连接池获取新连接,当前线程: {threading.current_thread().name}")
-
- return self._thread_local.connection
-
- except Exception as e:
- logger.error(f"从连接池获取连接失败: {e}")
-
- # 更新连接状态
- self._connection_status = "ERROR"
- self._connection_error_count += 1
- self._monitor_stats['last_error'] = str(e)
-
- # 尝试自动重连
- self._auto_reconnect()
- raise
-
- def release_connection(self):
- """释放当前线程的数据库连接回连接池"""
- try:
- if hasattr(self._thread_local, 'connection') and self._thread_local.connection is not None:
- conn = self._thread_local.connection
- try:
- # 修复:SteadyDBConnection 需要访问 _con 属性获取原始连接
- # 检查原始连接的 autocommit 状态
- if hasattr(conn, '_con') and isinstance(conn._con, Connection):
- original_conn = conn._con
- if not original_conn.get_autocommit():
- original_conn.rollback()
- else:
- # 备选方案:直接尝试回滚,不检查 autocommit
- conn.rollback()
-
- # 关闭连接(实际上是归还到连接池)
- conn.close()
- logger.debug(f"连接已归还到连接池,当前线程: {threading.current_thread().name}")
- except Exception as e:
- logger.warning(f"关闭连接时出错: {e}")
- finally:
- self._thread_local.connection = None
- except Exception as e:
- logger.error(f"释放连接时出错: {e}")
-
- def check_connection(self) -> bool:
- """检查连接是否有效"""
- try:
- conn = self.get_connection()
-
- # 执行一个简单的查询来检查连接
- cursor = None
- try:
- cursor = conn.cursor()
- cursor.execute("SELECT 1")
- cursor.fetchone()
- return True
- finally:
- if cursor:
- cursor.close()
- except Exception as e:
- logger.warning(f"连接检查失败: {e}")
- # 释放无效连接
- self.release_connection()
- return False
-
- def reconnect(self):
- """重新连接数据库(连接池会自动处理)"""
- try:
- logger.info("正在重新连接数据库...")
- # 释放当前连接,下次获取时会自动创建新连接
- self.release_connection()
- time.sleep(1) # 等待1秒
- # 获取新连接
- self.get_connection()
- logger.info("数据库重新连接成功")
- except Exception as e:
- logger.error(f"数据库重新连接失败: {e}")
- raise
-
- def close_all(self):
- """关闭所有资源,包括连接池和监控线程"""
- logger.info("正在关闭所有数据库资源...")
-
- # 停止监控线程
- self._stop_connection_monitor()
-
- # 关闭连接池
- self.close_pool()
-
- # 记录最终状态
- self._log_connection_status()
- logger.info("所有数据库资源已关闭")
-
- def close_pool(self):
- """关闭整个连接池"""
- try:
- if self.pool:
- # 释放当前线程的连接
- self.release_connection()
-
- # 关闭连接池
- self.pool.close()
- self.pool = None
- self._connection_status = "DISCONNECTED"
- logger.info("数据库连接池已关闭")
- except Exception as e:
- logger.error(f"关闭连接池时出错: {e}")
-
- # 以下为原有的业务方法,保持不变
- def create_table_with_unique_key(self, table_name: str, columns: List[str],
- unique_keys: List[str]) -> bool:
- """根据列定义创建表,包含三字段唯一键,数据字段使用DOUBLE类型"""
- # 检查连接状态
- if not self.wait_for_connection():
- logger.error("创建表失败:数据库连接不可用")
- return False
-
- conn = None
- cursor = None
-
- try:
- conn = self.get_connection()
- cursor = conn.cursor()
-
- try:
- # 删除已存在的表
- drop_sql = f"DROP TABLE IF EXISTS `{table_name}`"
- cursor.execute(drop_sql)
- logger.info(f"已删除旧表: {table_name}")
-
- # 构建创建表的SQL
- columns_sql = ",\n ".join(columns)
-
- # 添加三字段唯一键约束
- unique_keys_str = ', '.join([f'`{key}`' for key in unique_keys])
-
- create_sql = f"""
- CREATE TABLE `{table_name}` (
- `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
- {columns_sql},
- `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- `data_hash` VARCHAR(64) COMMENT '数据哈希值,用于快速比较',
- UNIQUE KEY uk_turbine_data ({unique_keys_str}),
- INDEX idx_farm (`id_farm`),
- INDEX idx_turbine (`id_turbine`),
- INDEX idx_time (`data_time`),
- INDEX idx_farm_turbine (`id_farm`, `id_turbine`),
- INDEX idx_composite (`id_farm`, `id_turbine`, `data_time`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
-
- logger.debug(f"创建表的SQL语句:\n{create_sql}")
-
- # 执行创建表
- cursor.execute(create_sql)
- conn.commit()
-
- logger.info(f"表 '{table_name}' 创建成功!")
- logger.info(f"三字段唯一键: {unique_keys}")
-
- return True
-
- finally:
- if cursor:
- try:
- cursor.close()
- except:
- pass
-
- except Exception as e:
- logger.error(f"创建表失败: {e}")
- logger.error(traceback.format_exc())
- if conn:
- conn.rollback()
- return False
- finally:
- # 不释放连接,让调用者控制
- pass
-
- # ... 其余的业务方法保持不变,包括:
- # create_data_scada_turbine_table
- # _clean_and_convert_simple
- # _prepare_upsert_sql
- # _convert_to_numeric
- # _convert_row_to_tuple
- # _escape_sql_value
- # _get_sql_with_values
- # _log_failed_row_details
- # batch_upsert_data_direct
- # upsert_parquet_data
- # check_table_exists
- # get_table_row_count
- # get_table_stats
- # check_duplicate_keys
- # with_connection 装饰器
- # 添加一个独立的连接测试函数,方便外部调用
- def test_database_connection(config: DatabaseConfig, test_query: str = "SELECT 1") -> Dict[str, Any]:
- """
- 测试数据库连接
-
- Args:
- config: 数据库配置
- test_query: 测试查询语句
-
- Returns:
- Dict[str, Any]: 测试结果
- """
- result = {
- 'success': False,
- 'error': None,
- 'response_time': None,
- 'server_info': None,
- 'timestamp': dt.now().isoformat()
- }
-
- conn = None
- cursor = None
-
- try:
- # 记录开始时间
- start_time = time.time()
-
- # 建立连接
- conn = pymysql.connect(
- host=config.host,
- port=config.port,
- user=config.user,
- password=config.password,
- database=config.database,
- charset=config.charset,
- cursorclass=cursors.DictCursor,
- connect_timeout=10,
- read_timeout=10
- )
-
- # 执行测试查询
- cursor = conn.cursor()
- cursor.execute(test_query)
- query_result = cursor.fetchone()
-
- # 获取服务器信息
- cursor.execute("SELECT VERSION() as version, DATABASE() as database_name, "
- "USER() as user, NOW() as server_time")
- server_info = cursor.fetchone()
-
- # 计算响应时间
- response_time = time.time() - start_time
-
- result.update({
- 'success': True,
- 'response_time': response_time,
- 'server_info': server_info,
- 'test_result': query_result
- })
-
- logger.info(f"数据库连接测试成功: {config.host}:{config.port}/{config.database}")
- logger.info(f"响应时间: {response_time:.3f}s")
- logger.info(f"MySQL版本: {server_info['version']}")
- logger.info(f"当前数据库: {server_info['database_name']}")
-
- except Exception as e:
- result['error'] = str(e)
- logger.error(f"数据库连接测试失败: {e}")
-
- finally:
- if cursor:
- cursor.close()
- if conn:
- conn.close()
-
- return result
- # 添加一个监控守护进程示例
- class DatabaseMonitorDaemon:
- """数据库监控守护进程"""
-
- def __init__(self, db_manager: DatabaseManager, check_interval: int = 5):
- self.db_manager = db_manager
- self.check_interval = check_interval
- self.monitor_thread = None
- self.running = False
-
- def start(self):
- """启动监控"""
- if not self.running:
- self.running = True
- self.monitor_thread = Thread(target=self._monitor_loop, daemon=True)
- self.monitor_thread.start()
- logger.info(f"数据库监控守护进程已启动,检查间隔: {self.check_interval}秒")
-
- def stop(self):
- """停止监控"""
- self.running = False
- if self.monitor_thread:
- self.monitor_thread.join(timeout=10)
- logger.info("数据库监控守护进程已停止")
-
- def _monitor_loop(self):
- """监控循环"""
- while self.running:
- try:
- # 获取当前状态
- status = self.db_manager.get_connection_status()
-
- # 记录状态变化
- if status['status'] != "HEALTHY":
- logger.warning(f"数据库连接状态异常: {status['status']}")
-
- # 如果连续错误超过阈值,尝试自动修复
- if status['error_count'] >= 3:
- logger.info("检测到连续错误,尝试自动修复连接...")
- self.db_manager._auto_reconnect()
-
- # 等待下一次检查
- time.sleep(self.check_interval)
-
- except Exception as e:
- logger.error(f"监控循环异常: {e}")
- time.sleep(self.check_interval)
|