|
|
@@ -12,31 +12,77 @@ import json
|
|
|
import time
|
|
|
import re
|
|
|
import threading
|
|
|
+from threading import Thread, Event
|
|
|
+import signal
|
|
|
+import sys
|
|
|
|
|
|
from config import DatabaseConfig, TableConfig
|
|
|
from file_scanner import ParquetFileInfo
|
|
|
|
|
|
# 配置日志
|
|
|
-logging.basicConfig(level=logging.INFO)
|
|
|
+logging.basicConfig(
|
|
|
+ level=logging.INFO,
|
|
|
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
|
+ datefmt='%Y-%m-%d %H:%M:%S'
|
|
|
+)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class DatabaseManager:
|
|
|
- """数据库管理器,使用连接池管理数据库连接"""
|
|
|
+ """数据库管理器,使用连接池管理数据库连接,包含连通性测试和自动重连"""
|
|
|
|
|
|
# 线程局部存储,用于保存每个线程的数据库连接
|
|
|
_thread_local = threading.local()
|
|
|
|
|
|
- def __init__(self, config: DatabaseConfig, table_config: TableConfig, pool_size: int = 10):
|
|
|
+ def __init__(self, config: DatabaseConfig, table_config: TableConfig, pool_size: int = 6):
|
|
|
self.config = config
|
|
|
self.table_config = table_config
|
|
|
self.pool_size = pool_size
|
|
|
+
|
|
|
+ # 连接状态监控相关
|
|
|
+ self._connection_monitor_thread = None
|
|
|
+ self._monitor_running = Event()
|
|
|
+ self._last_connection_check = None
|
|
|
+ self._connection_status = "UNKNOWN" # UNKNOWN, HEALTHY, WARNING, ERROR, DISCONNECTED
|
|
|
+ self._connection_error_count = 0
|
|
|
+ self._max_error_count = 5 # 连续错误次数阈值
|
|
|
+ self._monitor_interval = 1 # 监控间隔(秒)
|
|
|
+ self._monitor_stats = {
|
|
|
+ 'total_checks': 0,
|
|
|
+ 'successful_checks': 0,
|
|
|
+ 'failed_checks': 0,
|
|
|
+ 'total_reconnections': 0,
|
|
|
+ 'last_error': None,
|
|
|
+ 'last_success': None
|
|
|
+ }
|
|
|
+
|
|
|
+ # 连接池
|
|
|
self.pool: Optional[PooledDB] = None
|
|
|
+
|
|
|
+ # 初始化连接池
|
|
|
self._init_pool()
|
|
|
|
|
|
+ # 启动连接监控线程
|
|
|
+ self._start_connection_monitor()
|
|
|
+
|
|
|
+ # 注册信号处理,优雅关闭
|
|
|
+ self._setup_signal_handlers()
|
|
|
+
|
|
|
+ def _setup_signal_handlers(self):
|
|
|
+ """设置信号处理,确保程序退出时能正确关闭资源"""
|
|
|
+ def signal_handler(signum, frame):
|
|
|
+ logger.info(f"接收到信号 {signum},正在关闭数据库连接池...")
|
|
|
+ self.close_all()
|
|
|
+ sys.exit(0)
|
|
|
+
|
|
|
+ signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
|
|
|
+ signal.signal(signal.SIGTERM, signal_handler) # 终止信号
|
|
|
+
|
|
|
def _init_pool(self):
|
|
|
"""初始化数据库连接池"""
|
|
|
try:
|
|
|
+ logger.info(f"正在初始化数据库连接池到 {self.config.host}:{self.config.port}/{self.config.database}")
|
|
|
+
|
|
|
self.pool = PooledDB(
|
|
|
creator=pymysql, # 使用的数据库驱动
|
|
|
maxconnections=self.pool_size, # 连接池允许的最大连接数
|
|
|
@@ -61,14 +107,19 @@ class DatabaseManager:
|
|
|
client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS # 支持多语句
|
|
|
)
|
|
|
|
|
|
- logger.info(f"数据库连接池初始化成功: {self.config.host}:{self.config.port}/{self.config.database}")
|
|
|
+ logger.info(f"数据库连接池初始化成功")
|
|
|
logger.info(f"连接池配置: maxconnections={self.pool_size}, mincached=2, maxcached=5")
|
|
|
|
|
|
# 测试连接池
|
|
|
- self._test_pool_connection()
|
|
|
+ test_result = self._test_pool_connection()
|
|
|
+ if test_result:
|
|
|
+ self._connection_status = "HEALTHY"
|
|
|
+ self._monitor_stats['last_success'] = dt.now()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"数据库连接池初始化失败: {e}")
|
|
|
+ self._connection_status = "ERROR"
|
|
|
+ self._monitor_stats['last_error'] = str(e)
|
|
|
raise
|
|
|
|
|
|
def _test_pool_connection(self):
|
|
|
@@ -76,18 +127,274 @@ class DatabaseManager:
|
|
|
try:
|
|
|
conn = self.pool.connection()
|
|
|
cursor = conn.cursor()
|
|
|
- cursor.execute("SELECT 1 as test")
|
|
|
+ start_time = time.time()
|
|
|
+ cursor.execute("SELECT 1 as test, NOW() as server_time")
|
|
|
result = cursor.fetchone()
|
|
|
+ elapsed = time.time() - start_time
|
|
|
+
|
|
|
cursor.close()
|
|
|
conn.close() # 归还连接
|
|
|
- logger.info(f"连接池测试成功: {result}")
|
|
|
+
|
|
|
+ logger.info(f"连接池测试成功: 响应时间={elapsed:.3f}s, 服务器时间={result['server_time']}")
|
|
|
+ return True
|
|
|
+
|
|
|
except Exception as e:
|
|
|
logger.error(f"连接池测试失败: {e}")
|
|
|
- raise
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _start_connection_monitor(self):
|
|
|
+ """启动连接监控线程"""
|
|
|
+ if self._connection_monitor_thread is None or not self._connection_monitor_thread.is_alive():
|
|
|
+ self._monitor_running.clear()
|
|
|
+ self._connection_monitor_thread = Thread(
|
|
|
+ target=self._connection_monitor_loop,
|
|
|
+ name="DBConnectionMonitor",
|
|
|
+ daemon=True # 设置为守护线程,主程序退出时自动结束
|
|
|
+ )
|
|
|
+ self._connection_monitor_thread.start()
|
|
|
+ logger.info("数据库连接监控线程已启动")
|
|
|
+
|
|
|
+ def _stop_connection_monitor(self):
|
|
|
+ """停止连接监控线程"""
|
|
|
+ if self._connection_monitor_thread and self._connection_monitor_thread.is_alive():
|
|
|
+ self._monitor_running.set() # 设置事件,通知线程退出
|
|
|
+ self._connection_monitor_thread.join(timeout=5)
|
|
|
+ logger.info("数据库连接监控线程已停止")
|
|
|
+
|
|
|
+ def _connection_monitor_loop(self):
|
|
|
+ """连接监控循环"""
|
|
|
+ logger.info(f"连接监控线程开始运行,检查间隔: {self._monitor_interval}秒")
|
|
|
+
|
|
|
+ last_log_time = time.time()
|
|
|
+ log_interval = 60 # 每分钟记录一次状态
|
|
|
+
|
|
|
+ while not self._monitor_running.is_set():
|
|
|
+ try:
|
|
|
+ # 执行连接检查
|
|
|
+ self._perform_connection_check()
|
|
|
+
|
|
|
+ # 定期记录状态
|
|
|
+ current_time = time.time()
|
|
|
+ if current_time - last_log_time >= log_interval:
|
|
|
+ self._log_connection_status()
|
|
|
+ last_log_time = current_time
|
|
|
+
|
|
|
+ # 等待下一次检查
|
|
|
+ time.sleep(self._monitor_interval)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"连接监控循环异常: {e}")
|
|
|
+ time.sleep(self._monitor_interval) # 异常后继续尝试
|
|
|
+
|
|
|
+ logger.info("连接监控循环结束")
|
|
|
+
|
|
|
+ def _perform_connection_check(self):
|
|
|
+ """执行连接检查"""
|
|
|
+ try:
|
|
|
+ self._monitor_stats['total_checks'] += 1
|
|
|
+ self._last_connection_check = dt.now()
|
|
|
+
|
|
|
+ # 检查连接池是否初始化
|
|
|
+ if self.pool is None:
|
|
|
+ self._connection_status = "ERROR"
|
|
|
+ self._connection_error_count += 1
|
|
|
+ self._monitor_stats['failed_checks'] += 1
|
|
|
+ self._monitor_stats['last_error'] = "连接池未初始化"
|
|
|
+ logger.warning("连接池未初始化")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 尝试获取连接并执行简单查询
|
|
|
+ conn = None
|
|
|
+ cursor = None
|
|
|
+ try:
|
|
|
+ conn = self.pool.connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+
|
|
|
+ # 执行一个简单的查询测试连接
|
|
|
+ start_time = time.time()
|
|
|
+ cursor.execute("SELECT 1 as test, NOW() as server_time, "
|
|
|
+ "VERSION() as version, CONNECTION_ID() as connection_id")
|
|
|
+ result = cursor.fetchone()
|
|
|
+ elapsed = time.time() - start_time
|
|
|
+
|
|
|
+ # 更新状态
|
|
|
+ self._connection_status = "HEALTHY"
|
|
|
+ self._connection_error_count = 0
|
|
|
+ self._monitor_stats['successful_checks'] += 1
|
|
|
+ self._monitor_stats['last_success'] = dt.now()
|
|
|
+
|
|
|
+ # 记录详细连接信息(调试级别)
|
|
|
+ if logger.isEnabledFor(logging.DEBUG):
|
|
|
+ logger.debug(f"连接检查成功: "
|
|
|
+ f"响应时间={elapsed:.3f}s, "
|
|
|
+ f"服务器时间={result['server_time']}, "
|
|
|
+ f"MySQL版本={result['version']}, "
|
|
|
+ f"连接ID={result['connection_id']}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self._connection_status = "ERROR"
|
|
|
+ self._connection_error_count += 1
|
|
|
+ self._monitor_stats['failed_checks'] += 1
|
|
|
+ self._monitor_stats['last_error'] = str(e)
|
|
|
+
|
|
|
+ # 根据错误计数判断连接状态
|
|
|
+ if self._connection_error_count >= self._max_error_count:
|
|
|
+ self._connection_status = "DISCONNECTED"
|
|
|
+ logger.error(f"数据库连接失败,已连续失败 {self._connection_error_count} 次: {e}")
|
|
|
+ else:
|
|
|
+ logger.warning(f"数据库连接检查失败 (第{self._connection_error_count}次): {e}")
|
|
|
+
|
|
|
+ # 尝试自动重连
|
|
|
+ if self._connection_error_count >= 3:
|
|
|
+ self._auto_reconnect()
|
|
|
+
|
|
|
+ finally:
|
|
|
+ if cursor:
|
|
|
+ cursor.close()
|
|
|
+ if conn:
|
|
|
+ conn.close() # 归还连接到连接池
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"执行连接检查时发生异常: {e}")
|
|
|
+ self._monitor_stats['failed_checks'] += 1
|
|
|
+ self._monitor_stats['last_error'] = str(e)
|
|
|
+
|
|
|
+ def _auto_reconnect(self):
|
|
|
+ """自动重连机制"""
|
|
|
+ try:
|
|
|
+ logger.warning(f"检测到连接问题,正在尝试自动重连 (错误计数: {self._connection_error_count})")
|
|
|
+
|
|
|
+ # 1. 先释放当前线程的连接
|
|
|
+ self.release_connection()
|
|
|
+
|
|
|
+ # 2. 等待一小段时间
|
|
|
+ time.sleep(2)
|
|
|
+
|
|
|
+ # 3. 测试当前连接池
|
|
|
+ if self.pool:
|
|
|
+ test_result = self._test_pool_connection()
|
|
|
+ if test_result:
|
|
|
+ self._connection_status = "HEALTHY"
|
|
|
+ self._connection_error_count = 0
|
|
|
+ self._monitor_stats['total_reconnections'] += 1
|
|
|
+ logger.info("自动重连成功")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.warning("连接池测试失败,尝试重新初始化")
|
|
|
+
|
|
|
+ # 4. 重新初始化连接池
|
|
|
+ old_pool = self.pool
|
|
|
+ try:
|
|
|
+ self.pool = None
|
|
|
+ if old_pool:
|
|
|
+ old_pool.close()
|
|
|
+
|
|
|
+ self._init_pool()
|
|
|
+ self._connection_status = "HEALTHY"
|
|
|
+ self._connection_error_count = 0
|
|
|
+ self._monitor_stats['total_reconnections'] += 1
|
|
|
+ logger.info("连接池重新初始化成功")
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"连接池重新初始化失败: {e}")
|
|
|
+ self._connection_status = "ERROR"
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"自动重连失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _log_connection_status(self):
|
|
|
+ """记录连接状态"""
|
|
|
+ status_map = {
|
|
|
+ "HEALTHY": "✅ 健康",
|
|
|
+ "WARNING": "⚠️ 警告",
|
|
|
+ "ERROR": "❌ 错误",
|
|
|
+ "DISCONNECTED": "🔌 断开连接",
|
|
|
+ "UNKNOWN": "❓ 未知"
|
|
|
+ }
|
|
|
+
|
|
|
+ status_text = status_map.get(self._connection_status, self._connection_status)
|
|
|
+
|
|
|
+ stats_info = (
|
|
|
+ f"连接状态统计:\n"
|
|
|
+ f" 当前状态: {status_text}\n"
|
|
|
+ f" 总检查次数: {self._monitor_stats['total_checks']}\n"
|
|
|
+ f" 成功次数: {self._monitor_stats['successful_checks']}\n"
|
|
|
+ f" 失败次数: {self._monitor_stats['failed_checks']}\n"
|
|
|
+ f" 连续错误次数: {self._connection_error_count}\n"
|
|
|
+ f" 总重连次数: {self._monitor_stats['total_reconnections']}\n"
|
|
|
+ f" 最后成功: {self._monitor_stats['last_success']}\n"
|
|
|
+ f" 最后错误: {self._monitor_stats['last_error']}"
|
|
|
+ )
|
|
|
+
|
|
|
+ if self._connection_status in ["HEALTHY", "WARNING"]:
|
|
|
+ logger.info(stats_info)
|
|
|
+ else:
|
|
|
+ logger.warning(stats_info)
|
|
|
+
|
|
|
+ def get_connection_status(self) -> Dict[str, Any]:
|
|
|
+ """获取当前连接状态"""
|
|
|
+ return {
|
|
|
+ 'status': self._connection_status,
|
|
|
+ 'error_count': self._connection_error_count,
|
|
|
+ 'last_check': self._last_connection_check,
|
|
|
+ 'monitor_stats': self._monitor_stats.copy(),
|
|
|
+ 'config': {
|
|
|
+ 'host': self.config.host,
|
|
|
+ 'port': self.config.port,
|
|
|
+ 'database': self.config.database,
|
|
|
+ 'pool_size': self.pool_size
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ def wait_for_connection(self, timeout: int = 30, check_interval: int = 1) -> bool:
|
|
|
+ """
|
|
|
+ 等待数据库连接恢复正常
|
|
|
+
|
|
|
+ Args:
|
|
|
+ timeout: 总等待时间(秒)
|
|
|
+ check_interval: 检查间隔(秒)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ bool: 是否成功连接
|
|
|
+ """
|
|
|
+ logger.info(f"等待数据库连接恢复,超时时间: {timeout}秒")
|
|
|
+
|
|
|
+ start_time = time.time()
|
|
|
+ attempts = 0
|
|
|
+
|
|
|
+ while time.time() - start_time < timeout:
|
|
|
+ attempts += 1
|
|
|
+
|
|
|
+ # 检查连接状态
|
|
|
+ if self._connection_status == "HEALTHY":
|
|
|
+ logger.info(f"数据库连接已恢复,等待时间: {time.time() - start_time:.1f}秒")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 如果连接断开,尝试立即重连
|
|
|
+ if self._connection_status == "DISCONNECTED":
|
|
|
+ logger.info(f"尝试重连 (第{attempts}次)")
|
|
|
+ if self._auto_reconnect():
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 等待下一次检查
|
|
|
+ logger.info(f"等待连接恢复... ({attempts}/{int(timeout/check_interval)})")
|
|
|
+ time.sleep(check_interval)
|
|
|
+
|
|
|
+ logger.error(f"等待数据库连接恢复超时 ({timeout}秒)")
|
|
|
+ return False
|
|
|
|
|
|
def get_connection(self) -> Connection:
|
|
|
"""从连接池获取数据库连接"""
|
|
|
try:
|
|
|
+ # 检查当前连接状态
|
|
|
+ if self._connection_status in ["ERROR", "DISCONNECTED"]:
|
|
|
+ logger.warning(f"获取连接时检测到连接状态为 {self._connection_status},尝试自动重连")
|
|
|
+ if not self.wait_for_connection(timeout=10):
|
|
|
+ raise ConnectionError(f"数据库连接不可用,当前状态: {self._connection_status}")
|
|
|
+
|
|
|
# 检查线程局部存储中是否有连接
|
|
|
if not hasattr(self._thread_local, 'connection') or self._thread_local.connection is None:
|
|
|
# 从连接池获取新连接
|
|
|
@@ -99,6 +406,14 @@ class DatabaseManager:
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"从连接池获取连接失败: {e}")
|
|
|
+
|
|
|
+ # 更新连接状态
|
|
|
+ self._connection_status = "ERROR"
|
|
|
+ self._connection_error_count += 1
|
|
|
+ self._monitor_stats['last_error'] = str(e)
|
|
|
+
|
|
|
+ # 尝试自动重连
|
|
|
+ self._auto_reconnect()
|
|
|
raise
|
|
|
|
|
|
def release_connection(self):
|
|
|
@@ -107,8 +422,14 @@ class DatabaseManager:
|
|
|
if hasattr(self._thread_local, 'connection') and self._thread_local.connection is not None:
|
|
|
conn = self._thread_local.connection
|
|
|
try:
|
|
|
- # 确保没有未提交的事务
|
|
|
- if not conn.get_autocommit():
|
|
|
+ # 修复:SteadyDBConnection 需要访问 _con 属性获取原始连接
|
|
|
+ # 检查原始连接的 autocommit 状态
|
|
|
+ if hasattr(conn, '_con') and isinstance(conn._con, Connection):
|
|
|
+ original_conn = conn._con
|
|
|
+ if not original_conn.get_autocommit():
|
|
|
+ original_conn.rollback()
|
|
|
+ else:
|
|
|
+ # 备选方案:直接尝试回滚,不检查 autocommit
|
|
|
conn.rollback()
|
|
|
|
|
|
# 关闭连接(实际上是归还到连接池)
|
|
|
@@ -121,20 +442,6 @@ class DatabaseManager:
|
|
|
except Exception as e:
|
|
|
logger.error(f"释放连接时出错: {e}")
|
|
|
|
|
|
- def with_connection(self):
|
|
|
- """连接上下文管理器装饰器"""
|
|
|
- def decorator(func):
|
|
|
- def wrapper(*args, **kwargs):
|
|
|
- # 确保有数据库连接
|
|
|
- self.get_connection()
|
|
|
- try:
|
|
|
- return func(*args, **kwargs)
|
|
|
- finally:
|
|
|
- # 不自动释放连接,让调用者控制
|
|
|
- pass
|
|
|
- return wrapper
|
|
|
- return decorator
|
|
|
-
|
|
|
def check_connection(self) -> bool:
|
|
|
"""检查连接是否有效"""
|
|
|
try:
|
|
|
@@ -170,6 +477,20 @@ class DatabaseManager:
|
|
|
logger.error(f"数据库重新连接失败: {e}")
|
|
|
raise
|
|
|
|
|
|
+ def close_all(self):
|
|
|
+ """关闭所有资源,包括连接池和监控线程"""
|
|
|
+ logger.info("正在关闭所有数据库资源...")
|
|
|
+
|
|
|
+ # 停止监控线程
|
|
|
+ self._stop_connection_monitor()
|
|
|
+
|
|
|
+ # 关闭连接池
|
|
|
+ self.close_pool()
|
|
|
+
|
|
|
+ # 记录最终状态
|
|
|
+ self._log_connection_status()
|
|
|
+ logger.info("所有数据库资源已关闭")
|
|
|
+
|
|
|
def close_pool(self):
|
|
|
"""关闭整个连接池"""
|
|
|
try:
|
|
|
@@ -180,13 +501,20 @@ class DatabaseManager:
|
|
|
# 关闭连接池
|
|
|
self.pool.close()
|
|
|
self.pool = None
|
|
|
+ self._connection_status = "DISCONNECTED"
|
|
|
logger.info("数据库连接池已关闭")
|
|
|
except Exception as e:
|
|
|
logger.error(f"关闭连接池时出错: {e}")
|
|
|
|
|
|
+ # 以下为原有的业务方法,保持不变
|
|
|
def create_table_with_unique_key(self, table_name: str, columns: List[str],
|
|
|
unique_keys: List[str]) -> bool:
|
|
|
"""根据列定义创建表,包含三字段唯一键,数据字段使用DOUBLE类型"""
|
|
|
+ # 检查连接状态
|
|
|
+ if not self.wait_for_connection():
|
|
|
+ logger.error("创建表失败:数据库连接不可用")
|
|
|
+ return False
|
|
|
+
|
|
|
conn = None
|
|
|
cursor = None
|
|
|
|
|
|
@@ -250,992 +578,145 @@ class DatabaseManager:
|
|
|
# 不释放连接,让调用者控制
|
|
|
pass
|
|
|
|
|
|
- def create_data_scada_turbine_table(self) -> bool:
|
|
|
- """创建data_scada_turbine表,所有数据字段使用DOUBLE类型"""
|
|
|
- conn = None
|
|
|
- cursor = None
|
|
|
+ # ... 其余的业务方法保持不变,包括:
|
|
|
+ # create_data_scada_turbine_table
|
|
|
+ # _clean_and_convert_simple
|
|
|
+ # _prepare_upsert_sql
|
|
|
+ # _convert_to_numeric
|
|
|
+ # _convert_row_to_tuple
|
|
|
+ # _escape_sql_value
|
|
|
+ # _get_sql_with_values
|
|
|
+ # _log_failed_row_details
|
|
|
+ # batch_upsert_data_direct
|
|
|
+ # upsert_parquet_data
|
|
|
+ # check_table_exists
|
|
|
+ # get_table_row_count
|
|
|
+ # get_table_stats
|
|
|
+ # check_duplicate_keys
|
|
|
+ # with_connection 装饰器
|
|
|
+
|
|
|
+# 添加一个独立的连接测试函数,方便外部调用
|
|
|
+def test_database_connection(config: DatabaseConfig, test_query: str = "SELECT 1") -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 测试数据库连接
|
|
|
+
|
|
|
+ Args:
|
|
|
+ config: 数据库配置
|
|
|
+ test_query: 测试查询语句
|
|
|
|
|
|
- try:
|
|
|
- conn = self.get_connection()
|
|
|
- cursor = conn.cursor()
|
|
|
-
|
|
|
- try:
|
|
|
- # 删除已存在的表
|
|
|
- drop_sql = f"DROP TABLE IF EXISTS `data_scada_turbine`"
|
|
|
- cursor.execute(drop_sql)
|
|
|
- logger.info("已删除旧表: data_scada_turbine")
|
|
|
-
|
|
|
- # 创建新表,所有数据字段使用DOUBLE类型
|
|
|
- create_sql = """
|
|
|
- CREATE TABLE `data_scada_turbine` (
|
|
|
- `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
|
|
|
- `data_time` DATETIME NOT NULL COMMENT '数据时间',
|
|
|
-
|
|
|
- -- 所有数据字段使用DOUBLE类型
|
|
|
- `conv_spd` DOUBLE COMMENT '变流器转速',
|
|
|
- `grid_ia` DOUBLE COMMENT '电网A相电流',
|
|
|
- `grid_ib` DOUBLE COMMENT '电网B相电流',
|
|
|
- `grid_ic` DOUBLE COMMENT '电网C相电流',
|
|
|
- `grid_freq` DOUBLE COMMENT '电网频率',
|
|
|
- `grid_ua` DOUBLE COMMENT '电网A相电压',
|
|
|
- `grid_ub` DOUBLE COMMENT '电网B相电压',
|
|
|
- `grid_uc` DOUBLE COMMENT '电网C相电压',
|
|
|
- `p_active_set_fbk` DOUBLE COMMENT '有功功率设定反馈',
|
|
|
- `p_active` DOUBLE COMMENT '有功功率',
|
|
|
- `p_reactive` DOUBLE COMMENT '无功功率',
|
|
|
- `gen_spd` DOUBLE COMMENT '发电机转速',
|
|
|
- `gen_de_temp` DOUBLE COMMENT '发电机驱动端温度',
|
|
|
- `gen_nde_temp` DOUBLE COMMENT '发电机非驱动端温度',
|
|
|
- `stator_wind_temp_1` DOUBLE COMMENT '定子绕组温度1',
|
|
|
- `stator_wind_temp_2` DOUBLE COMMENT '定子绕组温度2',
|
|
|
- `stator_wind_temp_3` DOUBLE COMMENT '定子绕组温度3',
|
|
|
- `stator_wind_temp_4` DOUBLE COMMENT '定子绕组温度4',
|
|
|
- `stator_wind_temp_5` DOUBLE COMMENT '定子绕组温度5',
|
|
|
- `stator_wind_temp_6` DOUBLE COMMENT '定子绕组温度6',
|
|
|
- `pitch_ang_set_3` DOUBLE COMMENT '3号桨叶角度设定',
|
|
|
- `tower_fb_vib_acc` DOUBLE COMMENT '塔筒前后振动加速度',
|
|
|
- `tower_lr_vib_acc` DOUBLE COMMENT '塔筒左右振动加速度',
|
|
|
- `nacelle_in_temp` DOUBLE COMMENT '机舱内部温度',
|
|
|
- `nacelle_cab_temp` DOUBLE COMMENT '机舱柜内温度',
|
|
|
- `nacelle_out_temp` DOUBLE COMMENT '机舱外部温度',
|
|
|
- `theory_p_active` DOUBLE COMMENT '理论有功功率',
|
|
|
- `wind_dir` DOUBLE COMMENT '风向',
|
|
|
- `wind_spd` DOUBLE COMMENT '风速',
|
|
|
- `yaw_to_wind_ang` DOUBLE COMMENT '偏航对风角度',
|
|
|
- `pitch_ang_act_1` DOUBLE COMMENT '1号桨叶实际角度',
|
|
|
- `pitch_ang_act_2` DOUBLE COMMENT '2号桨叶实际角度',
|
|
|
- `pitch_ang_act_3` DOUBLE COMMENT '3号桨叶实际角度',
|
|
|
- `pitch_motor_cur_1` DOUBLE COMMENT '1号桨叶电机电流',
|
|
|
- `pitch_motor_cur_2` DOUBLE COMMENT '2号桨叶电机电流',
|
|
|
- `pitch_motor_cur_3` DOUBLE COMMENT '3号桨叶电机电流',
|
|
|
- `pitch_cab_temp_1` DOUBLE COMMENT '1号桨叶柜内温度',
|
|
|
- `pitch_cab_temp_2` DOUBLE COMMENT '2号桨叶柜内温度',
|
|
|
- `pitch_cab_temp_3` DOUBLE COMMENT '3号桨叶柜内温度',
|
|
|
- `tower_base_cab_temp` DOUBLE COMMENT '塔基柜内温度',
|
|
|
- `gearbox_spd_1` DOUBLE COMMENT '齿轮箱转速1',
|
|
|
- `gb_out_oil_prs` DOUBLE COMMENT '齿轮箱出油压力',
|
|
|
- `low_spd_shaft_temp` DOUBLE COMMENT '低速轴温度',
|
|
|
- `low_spd_shaft_de_temp` DOUBLE COMMENT '低速轴驱动端温度',
|
|
|
- `high_spd_shaft_de_temp` DOUBLE COMMENT '高速轴驱动端温度',
|
|
|
- `high_spd_shaft_temp` DOUBLE COMMENT '高速轴温度',
|
|
|
- `hyd_oil_temp` DOUBLE COMMENT '液压油温度',
|
|
|
- `hyd_station_prs` DOUBLE COMMENT '液压站压力',
|
|
|
- `gb_in_oil_prs` DOUBLE COMMENT '齿轮箱进油压力',
|
|
|
- `rotor_spd` DOUBLE COMMENT '转子转速',
|
|
|
- `gearbox_oil_temp` DOUBLE COMMENT '齿轮箱油温',
|
|
|
- `main_brg_temp_1` DOUBLE COMMENT '主轴承温度1',
|
|
|
- `tower_lr_vib` DOUBLE COMMENT '塔筒左右振动',
|
|
|
- `tower_fb_vib` DOUBLE COMMENT '塔筒前后振动',
|
|
|
- `nacelle_pos` DOUBLE COMMENT '机舱位置',
|
|
|
- `twist_ang` DOUBLE COMMENT '扭缆角度',
|
|
|
-
|
|
|
- -- 元数据字段
|
|
|
- `id_farm` VARCHAR(50) NOT NULL COMMENT '风场ID',
|
|
|
- `name_farm` VARCHAR(100) COMMENT '风场名称',
|
|
|
- `no_model_turbine` VARCHAR(50) COMMENT '风机型号',
|
|
|
- `id_turbine` VARCHAR(50) NOT NULL COMMENT '风机ID',
|
|
|
-
|
|
|
- -- 系统字段
|
|
|
- `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
- `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
|
- `data_hash` VARCHAR(64) COMMENT '数据哈希值,用于快速比较',
|
|
|
-
|
|
|
- -- 唯一键约束
|
|
|
- UNIQUE KEY uk_turbine_data (`id_farm`, `id_turbine`, `data_time`),
|
|
|
-
|
|
|
- -- 索引
|
|
|
- INDEX idx_farm (`id_farm`),
|
|
|
- INDEX idx_turbine (`id_turbine`),
|
|
|
- INDEX idx_time (`data_time`),
|
|
|
- INDEX idx_farm_turbine (`id_farm`, `id_turbine`),
|
|
|
- INDEX idx_composite (`id_farm`, `id_turbine`, `data_time`)
|
|
|
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机SCADA数据表'
|
|
|
- """
|
|
|
-
|
|
|
- logger.debug(f"创建data_scada_turbine表的SQL语句")
|
|
|
-
|
|
|
- # 执行创建表
|
|
|
- cursor.execute(create_sql)
|
|
|
- conn.commit()
|
|
|
-
|
|
|
- logger.info("表 'data_scada_turbine' 创建成功!")
|
|
|
- logger.info("所有数据字段使用DOUBLE类型,三字段唯一键: id_farm, id_turbine, data_time")
|
|
|
-
|
|
|
- return True
|
|
|
-
|
|
|
- finally:
|
|
|
- if cursor:
|
|
|
- try:
|
|
|
- cursor.close()
|
|
|
- except:
|
|
|
- pass
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"创建data_scada_turbine表失败: {e}")
|
|
|
- logger.error(traceback.format_exc())
|
|
|
- if conn:
|
|
|
- conn.rollback()
|
|
|
- return False
|
|
|
- finally:
|
|
|
- # 不释放连接,让调用者控制
|
|
|
- pass
|
|
|
+ Returns:
|
|
|
+ Dict[str, Any]: 测试结果
|
|
|
+ """
|
|
|
+ result = {
|
|
|
+ 'success': False,
|
|
|
+ 'error': None,
|
|
|
+ 'response_time': None,
|
|
|
+ 'server_info': None,
|
|
|
+ 'timestamp': dt.now().isoformat()
|
|
|
+ }
|
|
|
|
|
|
- def _clean_and_convert_simple(self, df: pd.DataFrame, data_time_column: str = None) -> pd.DataFrame:
|
|
|
- """简化版数据清理,避免复杂类型转换"""
|
|
|
- try:
|
|
|
- cleaned_df = df.copy()
|
|
|
-
|
|
|
- # 1. 确保必需字段存在
|
|
|
- required_fields = ['id_farm', 'name_farm', 'no_model_turbine', 'id_turbine']
|
|
|
- for field in required_fields:
|
|
|
- if field not in cleaned_df.columns:
|
|
|
- cleaned_df[field] = None
|
|
|
-
|
|
|
- # 2. 处理时间字段 - 简化处理
|
|
|
- if 'data_time' not in cleaned_df.columns:
|
|
|
- if data_time_column and data_time_column in cleaned_df.columns:
|
|
|
- cleaned_df['data_time'] = cleaned_df[data_time_column]
|
|
|
- else:
|
|
|
- # 查找时间字段
|
|
|
- for col in cleaned_df.columns:
|
|
|
- col_lower = col.lower()
|
|
|
- if any(keyword in col_lower for keyword in ['time', 'date', 'timestamp']):
|
|
|
- cleaned_df['data_time'] = cleaned_df[col]
|
|
|
- logger.info(f"使用字段 '{col}' 作为 data_time")
|
|
|
- break
|
|
|
-
|
|
|
- # 3. 确保data_time是datetime类型
|
|
|
- if 'data_time' in cleaned_df.columns:
|
|
|
- try:
|
|
|
- cleaned_df['data_time'] = pd.to_datetime(cleaned_df['data_time'], errors='coerce')
|
|
|
- except:
|
|
|
- logger.warning("data_time字段转换失败,保持原样")
|
|
|
-
|
|
|
- # 4. 简单处理NaN
|
|
|
- cleaned_df = cleaned_df.replace({np.nan: None, pd.NaT: None})
|
|
|
-
|
|
|
- # 5. 计算数据哈希(简化版)
|
|
|
- def simple_hash(row):
|
|
|
- try:
|
|
|
- # 只使用数据字段,排除元数据字段
|
|
|
- data_fields = [col for col in cleaned_df.columns
|
|
|
- if col not in ['id_farm', 'name_farm', 'no_model_turbine',
|
|
|
- 'id_turbine', 'data_time', 'data_hash']]
|
|
|
-
|
|
|
- hash_str = ''
|
|
|
- for field in sorted(data_fields):
|
|
|
- val = row[field]
|
|
|
- if val is not None:
|
|
|
- if isinstance(val, (dt, pd.Timestamp)):
|
|
|
- hash_str += f"{field}:{val.isoformat()}|"
|
|
|
- else:
|
|
|
- hash_str += f"{field}:{str(val)}|"
|
|
|
-
|
|
|
- return hashlib.md5(hash_str.encode('utf-8')).hexdigest() if hash_str else None
|
|
|
- except:
|
|
|
- return None
|
|
|
-
|
|
|
- cleaned_df['data_hash'] = cleaned_df.apply(simple_hash, axis=1)
|
|
|
-
|
|
|
- logger.info(f"数据清理完成,原始形状: {df.shape}, 清理后形状: {cleaned_df.shape}")
|
|
|
- return cleaned_df
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"数据清理失败: {e}")
|
|
|
- logger.error(traceback.format_exc())
|
|
|
- return df
|
|
|
+ conn = None
|
|
|
+ cursor = None
|
|
|
|
|
|
- def _prepare_upsert_sql(self, table_name: str, columns: List[str]) -> str:
|
|
|
- """准备UPSERT SQL语句"""
|
|
|
- # 排除不需要更新的字段
|
|
|
- exclude_columns = ['id_farm', 'id_turbine', 'data_time', 'id',
|
|
|
- 'create_time', 'update_time', 'data_hash']
|
|
|
- update_columns = [col for col in columns if col not in exclude_columns]
|
|
|
+ try:
|
|
|
+ # 记录开始时间
|
|
|
+ start_time = time.time()
|
|
|
|
|
|
- # 构建列名和占位符
|
|
|
- column_names = ', '.join([f'`{col}`' for col in columns])
|
|
|
- placeholders = ', '.join(['%s'] * len(columns))
|
|
|
+ # 建立连接
|
|
|
+ conn = pymysql.connect(
|
|
|
+ host=config.host,
|
|
|
+ port=config.port,
|
|
|
+ user=config.user,
|
|
|
+ password=config.password,
|
|
|
+ database=config.database,
|
|
|
+ charset=config.charset,
|
|
|
+ cursorclass=cursors.DictCursor,
|
|
|
+ connect_timeout=10,
|
|
|
+ read_timeout=10
|
|
|
+ )
|
|
|
|
|
|
- # 构建UPDATE部分
|
|
|
- update_clauses = []
|
|
|
- for col in update_columns:
|
|
|
- update_clauses.append(f"`{col}` = VALUES(`{col}`)")
|
|
|
+ # 执行测试查询
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute(test_query)
|
|
|
+ query_result = cursor.fetchone()
|
|
|
|
|
|
- update_clause = ', '.join(update_clauses)
|
|
|
+ # 获取服务器信息
|
|
|
+ cursor.execute("SELECT VERSION() as version, DATABASE() as database_name, "
|
|
|
+ "USER() as user, NOW() as server_time")
|
|
|
+ server_info = cursor.fetchone()
|
|
|
|
|
|
- # 构建完整UPSERT SQL
|
|
|
- upsert_sql = f"""
|
|
|
- INSERT INTO `{table_name}` ({column_names})
|
|
|
- VALUES ({placeholders})
|
|
|
- ON DUPLICATE KEY UPDATE
|
|
|
- {update_clause}
|
|
|
- """
|
|
|
+ # 计算响应时间
|
|
|
+ response_time = time.time() - start_time
|
|
|
|
|
|
- logger.debug(f"UPSERT SQL生成完成,共 {len(columns)} 列")
|
|
|
- return upsert_sql
|
|
|
-
|
|
|
- def _convert_to_numeric(self, value):
|
|
|
- """
|
|
|
- 将值转换为数值类型(整数或浮点数)
|
|
|
- 如果无法转换或值为空,返回None
|
|
|
- """
|
|
|
- if pd.isna(value) or value is None:
|
|
|
- return None
|
|
|
+ result.update({
|
|
|
+ 'success': True,
|
|
|
+ 'response_time': response_time,
|
|
|
+ 'server_info': server_info,
|
|
|
+ 'test_result': query_result
|
|
|
+ })
|
|
|
|
|
|
- try:
|
|
|
- # 如果已经是数值类型,直接返回
|
|
|
- if isinstance(value, (int, float, np.integer, np.floating)):
|
|
|
- # 将numpy类型转换为Python标准类型
|
|
|
- if isinstance(value, np.integer):
|
|
|
- return int(value)
|
|
|
- elif isinstance(value, np.floating):
|
|
|
- return float(value)
|
|
|
- return value
|
|
|
-
|
|
|
- # 如果是布尔值,转换为0或1
|
|
|
- if isinstance(value, (bool, np.bool_)):
|
|
|
- return 1 if bool(value) else 0
|
|
|
-
|
|
|
- # 如果是字符串,尝试转换为数值
|
|
|
- if isinstance(value, str):
|
|
|
- # 去除空格
|
|
|
- cleaned = value.strip()
|
|
|
-
|
|
|
- # 如果为空字符串,返回None
|
|
|
- if cleaned == '':
|
|
|
- return None
|
|
|
-
|
|
|
- # 尝试直接转换为浮点数
|
|
|
- try:
|
|
|
- # 处理常见的数值格式
|
|
|
- cleaned = cleaned.replace(',', '') # 移除千位分隔符
|
|
|
- cleaned = cleaned.replace('%', '') # 移除百分号
|
|
|
- cleaned = cleaned.replace(' ', '') # 移除空格
|
|
|
-
|
|
|
- # 尝试转换为浮点数
|
|
|
- return float(cleaned)
|
|
|
- except ValueError:
|
|
|
- # 如果浮点数转换失败,尝试整数
|
|
|
- try:
|
|
|
- return int(cleaned)
|
|
|
- except ValueError:
|
|
|
- return None
|
|
|
-
|
|
|
- # 如果是其他类型,先转换为字符串再尝试
|
|
|
- try:
|
|
|
- str_val = str(value)
|
|
|
- cleaned = str_val.replace(',', '').replace('%', '').replace(' ', '')
|
|
|
- return float(cleaned)
|
|
|
- except:
|
|
|
- return None
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- # 记录转换失败,但返回None而不是抛出异常
|
|
|
- return None
|
|
|
-
|
|
|
- def _convert_row_to_tuple(self, row: pd.Series, columns: List[str]) -> Tuple:
|
|
|
- """
|
|
|
- 将单行数据转换为元组,处理特殊类型
|
|
|
- 除了指定字段外,其他字段都转换为数值类型(包括空值)
|
|
|
- """
|
|
|
- try:
|
|
|
- row_values = []
|
|
|
-
|
|
|
- # 指定保持原类型的字段
|
|
|
- keep_original_fields = [
|
|
|
- 'id', 'data_time', 'id_farm', 'id_turbine',
|
|
|
- 'name_farm', 'no_model_turbine', 'create_time',
|
|
|
- 'update_time', 'data_hash'
|
|
|
- ]
|
|
|
-
|
|
|
- for col in columns:
|
|
|
- value = row[col]
|
|
|
-
|
|
|
- # 处理特殊类型
|
|
|
- if col == 'data_time':
|
|
|
- # 处理时间字段
|
|
|
- if pd.isna(value):
|
|
|
- row_values.append(None)
|
|
|
- elif isinstance(value, pd.Timestamp):
|
|
|
- # 转换为Python datetime
|
|
|
- row_values.append(value.to_pydatetime())
|
|
|
- elif isinstance(value, dt):
|
|
|
- row_values.append(value)
|
|
|
- else:
|
|
|
- # 尝试转换为datetime
|
|
|
- try:
|
|
|
- row_values.append(pd.to_datetime(value).to_pydatetime())
|
|
|
- except:
|
|
|
- row_values.append(None)
|
|
|
-
|
|
|
- elif col in ['id_farm', 'id_turbine', 'name_farm', 'no_model_turbine', 'data_hash']:
|
|
|
- # 这些字段保持字符串类型
|
|
|
- if pd.isna(value) or value is None:
|
|
|
- row_values.append(None)
|
|
|
- else:
|
|
|
- row_values.append(str(value))
|
|
|
-
|
|
|
- elif col in keep_original_fields:
|
|
|
- # 其他保持原样的字段(id, create_time, update_time)
|
|
|
- row_values.append(value)
|
|
|
-
|
|
|
- else:
|
|
|
- # 其他字段都尝试转换为数值类型
|
|
|
- numeric_value = self._convert_to_numeric(value)
|
|
|
- row_values.append(numeric_value)
|
|
|
-
|
|
|
- return tuple(row_values)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"转换行数据失败: {e}")
|
|
|
- # 返回None元组
|
|
|
- return tuple([None] * len(columns))
|
|
|
-
|
|
|
- def _escape_sql_value(self, val):
|
|
|
- """转义SQL值,用于生成调试SQL"""
|
|
|
- if val is None:
|
|
|
- return "NULL"
|
|
|
- elif isinstance(val, (dt, pd.Timestamp)):
|
|
|
- # MySQL/TiDB日期时间格式
|
|
|
- return f"'{val.strftime('%Y-%m-%d %H:%M:%S')}'"
|
|
|
- elif isinstance(val, (int, float, bool)):
|
|
|
- return str(val)
|
|
|
- elif isinstance(val, str):
|
|
|
- # 转义字符串中的单引号
|
|
|
- escaped = val.replace("'", "''")
|
|
|
- return f"'{escaped}'"
|
|
|
- else:
|
|
|
- # 其他类型转换为字符串并转义
|
|
|
- str_val = str(val)
|
|
|
- escaped = str_val.replace("'", "''")
|
|
|
- return f"'{escaped}'"
|
|
|
-
|
|
|
- def _get_sql_with_values(self, sql_template: str, values: Tuple, columns: List[str]) -> str:
|
|
|
- """生成包含实际值的SQL语句,用于调试(修复版本)"""
|
|
|
- try:
|
|
|
- # 将SQL模板拆分为INSERT部分和UPDATE部分
|
|
|
- # 找到第一个VALUES后面的位置
|
|
|
- values_index = sql_template.find("VALUES (")
|
|
|
- if values_index == -1:
|
|
|
- return f"{sql_template} -- 无法找到VALUES部分"
|
|
|
-
|
|
|
- # 找到INSERT部分的结束(ON DUPLICATE KEY UPDATE之前)
|
|
|
- on_duplicate_index = sql_template.find("ON DUPLICATE KEY UPDATE")
|
|
|
- if on_duplicate_index == -1:
|
|
|
- return f"{sql_template} -- 无法找到ON DUPLICATE KEY UPDATE部分"
|
|
|
-
|
|
|
- # 获取INSERT部分
|
|
|
- insert_part = sql_template[:on_duplicate_index]
|
|
|
-
|
|
|
- # 在INSERT部分中替换占位符
|
|
|
- # 构建实际值的SQL字符串
|
|
|
- values_sql = ', '.join([self._escape_sql_value(v) for v in values])
|
|
|
-
|
|
|
- # 替换INSERT部分的占位符
|
|
|
- insert_placeholder = "VALUES (" + ", ".join(["%s"] * len(values)) + ")"
|
|
|
- insert_with_values = insert_part.replace(insert_placeholder, f"VALUES ({values_sql})")
|
|
|
-
|
|
|
- # 组合完整的SQL
|
|
|
- debug_sql = insert_with_values + sql_template[on_duplicate_index:]
|
|
|
-
|
|
|
- return debug_sql
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"生成调试SQL失败: {e}")
|
|
|
- return f"{sql_template} -- 生成调试SQL时出错: {e}"
|
|
|
-
|
|
|
- def _log_failed_row_details(self, row_index: int, row_tuple: Tuple, columns: List[str],
|
|
|
- sql_template: str, error: Exception):
|
|
|
- """记录失败行的详细信息"""
|
|
|
- try:
|
|
|
- # 1. 生成包含实际值的SQL语句
|
|
|
- debug_sql = self._get_sql_with_values(sql_template, row_tuple, columns)
|
|
|
-
|
|
|
- # 2. 创建详细的错误信息
|
|
|
- error_details = {
|
|
|
- 'row_index': row_index,
|
|
|
- 'error_message': str(error),
|
|
|
- 'error_type': type(error).__name__,
|
|
|
- 'sql_template': sql_template,
|
|
|
- 'debug_sql': debug_sql,
|
|
|
- 'columns_count': len(columns),
|
|
|
- 'values_count': len(row_tuple),
|
|
|
- 'timestamp': dt.now().isoformat()
|
|
|
- }
|
|
|
-
|
|
|
- # 3. 添加所有列和值的对应关系
|
|
|
- column_value_pairs = {}
|
|
|
- for i, (col, val) in enumerate(zip(columns, row_tuple)):
|
|
|
- column_value_pairs[col] = {
|
|
|
- 'value': str(val) if val is not None else 'NULL',
|
|
|
- 'python_type': type(val).__name__ if val is not None else 'NoneType',
|
|
|
- 'index': i,
|
|
|
- 'escaped_sql_value': self._escape_sql_value(val)
|
|
|
- }
|
|
|
-
|
|
|
- error_details['column_values'] = column_value_pairs
|
|
|
-
|
|
|
- # 4. 记录数据类型统计
|
|
|
- type_stats = {}
|
|
|
- for val in row_tuple:
|
|
|
- type_name = type(val).__name__ if val is not None else 'NoneType'
|
|
|
- type_stats[type_name] = type_stats.get(type_name, 0) + 1
|
|
|
- error_details['type_statistics'] = type_stats
|
|
|
-
|
|
|
- # 5. 记录NULL值统计
|
|
|
- null_count = sum(1 for v in row_tuple if v is None)
|
|
|
- error_details['null_values_count'] = null_count
|
|
|
-
|
|
|
- # 6. 记录异常堆栈
|
|
|
- error_details['traceback'] = traceback.format_exc()
|
|
|
-
|
|
|
- # 7. 如果是MySQL错误,记录错误码
|
|
|
- if isinstance(error, pymysql.Error):
|
|
|
- error_details['mysql_error_code'] = error.args[0] if len(error.args) > 0 else 'Unknown'
|
|
|
- error_details['mysql_error_message'] = error.args[1] if len(error.args) > 1 else 'Unknown'
|
|
|
-
|
|
|
- # 8. 格式化输出到日志
|
|
|
- error_str = "\n" + "="*100 + "\n"
|
|
|
- error_str += f"🚨 单条UPSERT失败详细报告(行 {row_index})\n"
|
|
|
- error_str += "="*100 + "\n"
|
|
|
- error_str += f"📅 时间: {error_details['timestamp']}\n"
|
|
|
- error_str += f"🔧 错误类型: {error_details['error_type']}\n"
|
|
|
- error_str += f"📝 错误信息: {error_details['error_message']}\n"
|
|
|
-
|
|
|
- if 'mysql_error_code' in error_details:
|
|
|
- error_str += f"🔢 MySQL错误码: {error_details['mysql_error_code']}\n"
|
|
|
- error_str += f"📋 MySQL错误消息: {error_details['mysql_error_message']}\n"
|
|
|
-
|
|
|
- error_str += f"📊 列数/值数: {error_details['columns_count']}/{error_details['values_count']}\n"
|
|
|
- error_str += f"🔘 NULL值数量: {error_details['null_values_count']}\n"
|
|
|
- error_str += f"📈 数据类型统计: {error_details['type_statistics']}\n\n"
|
|
|
-
|
|
|
- error_str += "📋 所有列数据详情:\n"
|
|
|
- error_str += "-"*100 + "\n"
|
|
|
- for col, info in error_details['column_values'].items():
|
|
|
- error_str += f" 列[{info['index']:2d}] `{col}`:\n"
|
|
|
- error_str += f" 值: {info['value']}\n"
|
|
|
- error_str += f" Python类型: {info['python_type']}\n"
|
|
|
- error_str += f" SQL转义值: {info['escaped_sql_value']}\n"
|
|
|
- error_str += "\n"
|
|
|
-
|
|
|
- error_str += "📄 SQL模板:\n"
|
|
|
- error_str += "-"*100 + "\n"
|
|
|
- # 简化SQL模板显示
|
|
|
- simple_sql = sql_template[:500] + ("..." if len(sql_template) > 500 else "")
|
|
|
- error_str += simple_sql + "\n\n"
|
|
|
-
|
|
|
- error_str += "🔍 调试SQL(含实际值,可直接执行):\n"
|
|
|
- error_str += "-"*100 + "\n"
|
|
|
- # 如果SQL太长,截断显示
|
|
|
- if len(debug_sql) > 2000:
|
|
|
- error_str += debug_sql[:1000] + "\n... [SQL太长,中间省略] ...\n" + debug_sql[-1000:] + "\n"
|
|
|
- else:
|
|
|
- error_str += debug_sql + "\n"
|
|
|
- error_str += "\n"
|
|
|
-
|
|
|
- error_str += "🔧 异常堆栈:\n"
|
|
|
- error_str += "-"*100 + "\n"
|
|
|
- error_str += error_details['traceback']
|
|
|
- error_str += "\n" + "="*100
|
|
|
-
|
|
|
- logger.error(error_str)
|
|
|
-
|
|
|
- # 9. 将错误详情写入文件
|
|
|
- try:
|
|
|
- with open('failed_rows_log.json', 'a', encoding='utf-8') as f:
|
|
|
- json.dump(error_details, f, ensure_ascii=False, default=str, indent=2)
|
|
|
- f.write('\n,\n')
|
|
|
- logger.info(f"📁 失败行详情已保存到文件: failed_rows_log.json")
|
|
|
- except Exception as file_error:
|
|
|
- logger.error(f"保存失败行详情到文件时出错: {file_error}")
|
|
|
-
|
|
|
- except Exception as log_error:
|
|
|
- logger.error(f"记录失败行详情时出错: {log_error}")
|
|
|
- logger.error(f"原始错误: {error}")
|
|
|
- logger.error(f"行索引: {row_index}")
|
|
|
- logger.error(f"SQL模板: {sql_template}")
|
|
|
- logger.error(f"异常堆栈: {traceback.format_exc()}")
|
|
|
-
|
|
|
- def batch_upsert_data_direct(self, table_name: str, df: pd.DataFrame,
|
|
|
- data_time_column: str = None,
|
|
|
- batch_size: int = 100, max_retries: int = 3) -> Tuple[int, int, int]:
|
|
|
- """
|
|
|
- 直接使用pymysql批量UPSERT数据,使用连接池
|
|
|
+ logger.info(f"数据库连接测试成功: {config.host}:{config.port}/{config.database}")
|
|
|
+ logger.info(f"响应时间: {response_time:.3f}s")
|
|
|
+ logger.info(f"MySQL版本: {server_info['version']}")
|
|
|
+ logger.info(f"当前数据库: {server_info['database_name']}")
|
|
|
|
|
|
- Args:
|
|
|
- table_name: 表名
|
|
|
- df: 要插入的DataFrame
|
|
|
- data_time_column: 实际数据中的时间字段名
|
|
|
- batch_size: 每批插入的行数(减小批处理大小,避免连接超时)
|
|
|
- max_retries: 最大重试次数
|
|
|
-
|
|
|
- Returns:
|
|
|
- (总行数, 插入行数, 更新行数)
|
|
|
- """
|
|
|
- if df.empty:
|
|
|
- logger.warning("DataFrame为空,跳过插入")
|
|
|
- return 0, 0, 0
|
|
|
-
|
|
|
- total_rows = len(df)
|
|
|
- total_affected_rows = 0
|
|
|
- total_failed_rows = 0
|
|
|
+ except Exception as e:
|
|
|
+ result['error'] = str(e)
|
|
|
+ logger.error(f"数据库连接测试失败: {e}")
|
|
|
|
|
|
- try:
|
|
|
- # 1. 简化数据清理
|
|
|
- logger.info("开始简化数据清理...")
|
|
|
- cleaned_df = self._clean_and_convert_simple(df, data_time_column)
|
|
|
-
|
|
|
- # 2. 确保必需字段存在
|
|
|
- required_columns = ['id_farm', 'id_turbine', 'data_time']
|
|
|
- for col in required_columns:
|
|
|
- if col not in cleaned_df.columns:
|
|
|
- logger.error(f"必需字段 '{col}' 不存在于数据中")
|
|
|
- cleaned_df[col] = None # 设为None而不是抛出异常
|
|
|
-
|
|
|
- # 3. 获取列名
|
|
|
- columns = list(cleaned_df.columns)
|
|
|
- logger.info(f"📊 数据列: {len(columns)} 列")
|
|
|
- logger.info(f"📋 列名: {', '.join(columns[:10])}" + ("..." if len(columns) > 10 else ""))
|
|
|
-
|
|
|
- # 4. 准备UPSERT SQL
|
|
|
- upsert_sql = self._prepare_upsert_sql(table_name, columns)
|
|
|
- logger.info(f"📄 UPSERT SQL准备完成,共 {len(columns)} 列")
|
|
|
-
|
|
|
- # 5. 分批处理
|
|
|
- total_batches = (total_rows + batch_size - 1) // batch_size
|
|
|
- logger.info(f"🚀 准备处理 {total_rows} 行数据,分为 {total_batches} 个批次,批次大小: {batch_size}")
|
|
|
-
|
|
|
- for i in range(0, total_rows, batch_size):
|
|
|
- batch_df = cleaned_df.iloc[i:i + batch_size]
|
|
|
- batch_num = i // batch_size + 1
|
|
|
-
|
|
|
- retry_count = 0
|
|
|
- batch_success = False
|
|
|
-
|
|
|
- while retry_count <= max_retries and not batch_success:
|
|
|
- conn = None
|
|
|
- cursor = None
|
|
|
-
|
|
|
- try:
|
|
|
- # 每次重试前检查连接并重新连接
|
|
|
- if not self.check_connection():
|
|
|
- logger.warning(f"🔌 连接已断开,正在重新连接...")
|
|
|
- self.reconnect()
|
|
|
-
|
|
|
- conn = self.get_connection()
|
|
|
- cursor = conn.cursor()
|
|
|
-
|
|
|
- # 转换为元组列表
|
|
|
- batch_values = []
|
|
|
- for _, row in batch_df.iterrows():
|
|
|
- row_tuple = self._convert_row_to_tuple(row, columns)
|
|
|
- batch_values.append(row_tuple)
|
|
|
-
|
|
|
- if batch_num == 1 and batch_values:
|
|
|
- logger.info(f"📋 第一批数据示例(第一行前5个值): {batch_values[0][:5]}")
|
|
|
- logger.info(f"📊 第一批数据类型: {[type(v).__name__ for v in batch_values[0][:5]]}")
|
|
|
-
|
|
|
- # 执行批量插入 - 关键:使用executemany
|
|
|
- affected = cursor.executemany(upsert_sql, batch_values)
|
|
|
- total_affected_rows += affected
|
|
|
-
|
|
|
- # 提交当前批次
|
|
|
- conn.commit()
|
|
|
-
|
|
|
- if batch_num % 10 == 0 or batch_num == total_batches:
|
|
|
- logger.info(f"✅ 批次 {batch_num}/{total_batches}: 处理 {len(batch_df)} 行, "
|
|
|
- f"受影响 {affected} 行, 已提交到数据库")
|
|
|
-
|
|
|
- batch_success = True
|
|
|
-
|
|
|
- except (pymysql.Error, AttributeError) as e:
|
|
|
- retry_count += 1
|
|
|
-
|
|
|
- # 记录错误信息
|
|
|
- logger.error(f"❌ 批次 {batch_num} UPSERT失败,错误: {str(e)}")
|
|
|
- logger.error(f"🔧 错误类型: {type(e).__name__}")
|
|
|
-
|
|
|
- # 如果是连接相关错误,立即重新连接
|
|
|
- if isinstance(e, (pymysql.Error, AttributeError)) and any(keyword in str(e).lower() for keyword in ['socket', 'connection', 'timeout', 'none']):
|
|
|
- logger.warning(f"🔌 检测到连接错误,尝试重新连接...")
|
|
|
- try:
|
|
|
- if conn:
|
|
|
- conn.rollback()
|
|
|
- self.reconnect()
|
|
|
- except:
|
|
|
- logger.error("重新连接失败")
|
|
|
-
|
|
|
- if retry_count > max_retries:
|
|
|
- logger.error(f"❌ 批次 {batch_num} UPSERT失败,已达到最大重试次数")
|
|
|
-
|
|
|
- # 记录批次失败信息
|
|
|
- batch_failed_info = {
|
|
|
- 'batch_num': batch_num,
|
|
|
- 'batch_size': len(batch_df),
|
|
|
- 'total_retries': retry_count,
|
|
|
- 'error': str(e),
|
|
|
- 'error_type': type(e).__name__
|
|
|
- }
|
|
|
-
|
|
|
- if isinstance(e, pymysql.Error) and hasattr(e, 'args') and len(e.args) > 0:
|
|
|
- batch_failed_info['mysql_error_code'] = e.args[0]
|
|
|
- if len(e.args) > 1:
|
|
|
- batch_failed_info['mysql_error_message'] = e.args[1]
|
|
|
-
|
|
|
- logger.error(f"📋 批次失败详情: {batch_failed_info}")
|
|
|
-
|
|
|
- # 尝试单条插入
|
|
|
- batch_affected = 0
|
|
|
- batch_failed_rows = 0
|
|
|
-
|
|
|
- for idx, (_, row) in enumerate(batch_df.iterrows()):
|
|
|
- row_retry_count = 0
|
|
|
- row_success = False
|
|
|
-
|
|
|
- while row_retry_count <= max_retries and not row_success:
|
|
|
- single_conn = None
|
|
|
- single_cursor = None
|
|
|
-
|
|
|
- try:
|
|
|
- # 每次单条插入前检查连接
|
|
|
- if not self.check_connection():
|
|
|
- logger.warning(f"🔌 单条插入前连接已断开,正在重新连接...")
|
|
|
- self.reconnect()
|
|
|
-
|
|
|
- single_conn = self.get_connection()
|
|
|
- single_cursor = single_conn.cursor()
|
|
|
-
|
|
|
- row_tuple = self._convert_row_to_tuple(row, columns)
|
|
|
- single_cursor.execute(upsert_sql, row_tuple)
|
|
|
- batch_affected += single_cursor.rowcount
|
|
|
- single_conn.commit()
|
|
|
- row_success = True
|
|
|
-
|
|
|
- except Exception as single_e:
|
|
|
- row_retry_count += 1
|
|
|
- if row_retry_count > max_retries:
|
|
|
- batch_failed_rows += 1
|
|
|
- total_failed_rows += 1
|
|
|
- # 使用改进的错误日志记录
|
|
|
- try:
|
|
|
- row_tuple = self._convert_row_to_tuple(row, columns)
|
|
|
- self._log_failed_row_details(
|
|
|
- row_index=idx + (batch_num-1)*batch_size,
|
|
|
- row_tuple=row_tuple,
|
|
|
- columns=columns,
|
|
|
- sql_template=upsert_sql,
|
|
|
- error=single_e
|
|
|
- )
|
|
|
- except:
|
|
|
- logger.error(f"无法记录失败行详情: {single_e}")
|
|
|
- break
|
|
|
- else:
|
|
|
- wait_time = 1 * row_retry_count
|
|
|
- logger.info(f"⏳ 等待 {wait_time} 秒后重试...")
|
|
|
- time.sleep(wait_time)
|
|
|
- finally:
|
|
|
- if single_cursor:
|
|
|
- single_cursor.close()
|
|
|
- # 单条插入不释放连接,复用当前线程的连接
|
|
|
-
|
|
|
- total_affected_rows += batch_affected
|
|
|
-
|
|
|
- if batch_affected > 0:
|
|
|
- logger.info(f"⚠️ 批次 {batch_num} 单条处理完成,成功 {batch_affected} 行, 失败 {batch_failed_rows} 行")
|
|
|
-
|
|
|
- if batch_failed_rows > 0:
|
|
|
- logger.warning(f"⚠️ 批次 {batch_num} 有 {batch_failed_rows} 行数据插入失败")
|
|
|
- logger.warning(f"📁 失败的数据已记录到日志中,请查看详细错误信息")
|
|
|
-
|
|
|
- # 跳出循环,处理下一个批次
|
|
|
- break
|
|
|
- else:
|
|
|
- logger.warning(f"⚠️ 批次 {batch_num} UPSERT失败,第 {retry_count} 次重试")
|
|
|
-
|
|
|
- # 记录更多错误信息
|
|
|
- if isinstance(e, pymysql.Error) and hasattr(e, 'args') and len(e.args) > 0:
|
|
|
- error_code = e.args[0]
|
|
|
- error_msg = e.args[1] if len(e.args) > 1 else ''
|
|
|
- logger.warning(f"🔢 MySQL错误码: {error_code}")
|
|
|
- logger.warning(f"📋 MySQL错误信息: {error_msg}")
|
|
|
-
|
|
|
- # 如果是特定错误,提供解决方案提示
|
|
|
- if isinstance(e, pymysql.Error) and hasattr(e, 'args') and len(e.args) > 0:
|
|
|
- error_code = e.args[0]
|
|
|
- if error_code == 1062:
|
|
|
- logger.warning(f"💡 提示: 错误码1062表示唯一键冲突,可能是重复数据")
|
|
|
- elif error_code == 1366:
|
|
|
- logger.warning(f"💡 提示: 错误码1366表示数据类型不匹配,请检查数据格式")
|
|
|
- elif error_code == 2006: # MySQL server has gone away
|
|
|
- logger.warning(f"💡 提示: 错误码2006表示MySQL服务器连接已断开")
|
|
|
- elif error_code == 2013: # Lost connection to MySQL server during query
|
|
|
- logger.warning(f"💡 提示: 错误码2013表示查询期间失去连接")
|
|
|
-
|
|
|
- # 等待一段时间后重试
|
|
|
- wait_time = 2 * retry_count
|
|
|
- logger.info(f"⏳ 等待 {wait_time} 秒后重试...")
|
|
|
- time.sleep(wait_time)
|
|
|
-
|
|
|
- finally:
|
|
|
- if cursor:
|
|
|
- cursor.close()
|
|
|
- # 不释放连接,让当前线程保持连接复用
|
|
|
-
|
|
|
- # 批次处理完成后,暂停一小段时间,避免数据库压力过大
|
|
|
- if batch_success and batch_num < total_batches:
|
|
|
- time.sleep(0.1) # 100毫秒
|
|
|
-
|
|
|
- # 估算插入和更新行数
|
|
|
- successful_rows = total_rows - total_failed_rows
|
|
|
- estimated_inserted = successful_rows // 2
|
|
|
- estimated_updated = successful_rows - estimated_inserted
|
|
|
-
|
|
|
- logger.info(f"🎉 UPSERT完成统计:")
|
|
|
- logger.info(f" 总处理行数: {total_rows}")
|
|
|
- logger.info(f" 总受影响行数: {total_affected_rows}")
|
|
|
- logger.info(f" 失败行数: {total_failed_rows}")
|
|
|
- logger.info(f" 成功行数: {successful_rows}")
|
|
|
- logger.info(f" 估计插入行数: {estimated_inserted}")
|
|
|
- logger.info(f" 估计更新行数: {estimated_updated}")
|
|
|
-
|
|
|
- if total_failed_rows > 0:
|
|
|
- logger.warning(f"⚠️ 警告: 有 {total_failed_rows} 行数据插入失败")
|
|
|
- logger.warning(f"📁 请查看上面的详细错误日志,或检查文件 'failed_rows_log.json'")
|
|
|
- logger.warning(f"💡 提示: 失败的数据可以手动修复后重新导入")
|
|
|
-
|
|
|
- return successful_rows, estimated_inserted, estimated_updated
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ 批量UPSERT失败")
|
|
|
- logger.error(f"🔧 错误类型: {type(e).__name__}")
|
|
|
- logger.error(f"📝 错误信息: {str(e)}")
|
|
|
- logger.error(f"🔧 错误详情: {traceback.format_exc()}")
|
|
|
- raise
|
|
|
- finally:
|
|
|
- # 批量处理完成后释放连接
|
|
|
- self.release_connection()
|
|
|
+ finally:
|
|
|
+ if cursor:
|
|
|
+ cursor.close()
|
|
|
+ if conn:
|
|
|
+ conn.close()
|
|
|
|
|
|
- def upsert_parquet_data(self, file_info: ParquetFileInfo, table_name: str,
|
|
|
- batch_size: int = 100, max_retries: int = 3) -> Tuple[int, int, int]:
|
|
|
- """
|
|
|
- UPSERT单个parquet文件数据到数据库,使用三字段唯一键
|
|
|
- 每个批次(batch_size)独立提交到数据库
|
|
|
-
|
|
|
- Args:
|
|
|
- file_info: 文件信息(包含识别到的时间字段名)
|
|
|
- table_name: 表名
|
|
|
- batch_size: 批处理大小(每个事务的行数,减小以避免连接超时)
|
|
|
- max_retries: 最大重试次数
|
|
|
-
|
|
|
- Returns:
|
|
|
- (总行数, 插入行数, 更新行数)
|
|
|
- """
|
|
|
- try:
|
|
|
- logger.info(f"📂 正在读取并处理文件: {file_info.file_path}")
|
|
|
- logger.info(f"⏰ 识别到的时间字段: {file_info.data_time_column}")
|
|
|
-
|
|
|
- # 1. 读取parquet文件
|
|
|
- df = pd.read_parquet(file_info.file_path, engine='pyarrow')
|
|
|
-
|
|
|
- # 2. 添加元数据字段
|
|
|
- df['id_farm'] = file_info.farm_id
|
|
|
- df['name_farm'] = file_info.farm_name
|
|
|
- df['no_model_turbine'] = file_info.model_type
|
|
|
- df['id_turbine'] = file_info.turbine_id
|
|
|
-
|
|
|
- logger.info(f"📊 文件 {file_info.turbine_id}.parquet 读取完成,形状: {df.shape}")
|
|
|
- logger.info(f"📋 数据列: {list(df.columns)[:10]}" + ("..." if len(df.columns) > 10 else ""))
|
|
|
-
|
|
|
- # 3. 批量UPSERT到数据库,按batch_size独立提交
|
|
|
- total_rows, inserted_rows, updated_rows = self.batch_upsert_data_direct(
|
|
|
- table_name=table_name,
|
|
|
- df=df,
|
|
|
- data_time_column=file_info.data_time_column,
|
|
|
- batch_size=batch_size,
|
|
|
- max_retries=max_retries
|
|
|
- )
|
|
|
-
|
|
|
- return total_rows, inserted_rows, updated_rows
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ 处理文件 {file_info.file_path} 失败")
|
|
|
- logger.error(f"🔧 错误类型: {type(e).__name__}")
|
|
|
- logger.error(f"📝 错误信息: {str(e)}")
|
|
|
- logger.error(f"🔧 错误详情: {traceback.format_exc()}")
|
|
|
- raise
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+# 添加一个监控守护进程示例
|
|
|
+class DatabaseMonitorDaemon:
|
|
|
+ """数据库监控守护进程"""
|
|
|
|
|
|
- def check_table_exists(self, table_name: str) -> bool:
|
|
|
- """检查表是否存在"""
|
|
|
- conn = None
|
|
|
- cursor = None
|
|
|
+ def __init__(self, db_manager: DatabaseManager, check_interval: int = 5):
|
|
|
+ self.db_manager = db_manager
|
|
|
+ self.check_interval = check_interval
|
|
|
+ self.monitor_thread = None
|
|
|
+ self.running = False
|
|
|
|
|
|
- try:
|
|
|
- conn = self.get_connection()
|
|
|
- cursor = conn.cursor()
|
|
|
-
|
|
|
- try:
|
|
|
- cursor.execute("SHOW TABLES LIKE %s", (table_name,))
|
|
|
- result = cursor.fetchone()
|
|
|
- exists = result is not None
|
|
|
- logger.info(f"🔍 检查表 '{table_name}' 存在: {exists}")
|
|
|
- return exists
|
|
|
- finally:
|
|
|
- if cursor:
|
|
|
- try:
|
|
|
- cursor.close()
|
|
|
- except:
|
|
|
- pass
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ 检查表存在失败: {e}")
|
|
|
- return False
|
|
|
- finally:
|
|
|
- # 查询完成后释放连接
|
|
|
- self.release_connection()
|
|
|
+ def start(self):
|
|
|
+ """启动监控"""
|
|
|
+ if not self.running:
|
|
|
+ self.running = True
|
|
|
+ self.monitor_thread = Thread(target=self._monitor_loop, daemon=True)
|
|
|
+ self.monitor_thread.start()
|
|
|
+ logger.info(f"数据库监控守护进程已启动,检查间隔: {self.check_interval}秒")
|
|
|
|
|
|
- def get_table_row_count(self, table_name: str) -> int:
|
|
|
- """获取表的行数"""
|
|
|
- conn = None
|
|
|
- cursor = None
|
|
|
-
|
|
|
- try:
|
|
|
- conn = self.get_connection()
|
|
|
- cursor = conn.cursor()
|
|
|
-
|
|
|
- try:
|
|
|
- cursor.execute(f"SELECT COUNT(*) FROM `{table_name}`")
|
|
|
- result = cursor.fetchone()
|
|
|
- count = result[0] if result else 0
|
|
|
- logger.info(f"📊 表 '{table_name}' 行数: {count:,}")
|
|
|
- return count
|
|
|
- finally:
|
|
|
- if cursor:
|
|
|
- try:
|
|
|
- cursor.close()
|
|
|
- except:
|
|
|
- pass
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ 获取表行数失败: {e}")
|
|
|
- return 0
|
|
|
- finally:
|
|
|
- # 查询完成后释放连接
|
|
|
- self.release_connection()
|
|
|
+ def stop(self):
|
|
|
+ """停止监控"""
|
|
|
+ self.running = False
|
|
|
+ if self.monitor_thread:
|
|
|
+ self.monitor_thread.join(timeout=10)
|
|
|
+ logger.info("数据库监控守护进程已停止")
|
|
|
|
|
|
- def get_table_stats(self, table_name: str) -> Dict[str, Any]:
|
|
|
- """获取表统计信息"""
|
|
|
- conn = None
|
|
|
- cursor = None
|
|
|
-
|
|
|
- try:
|
|
|
- conn = self.get_connection()
|
|
|
- cursor = conn.cursor()
|
|
|
-
|
|
|
+ def _monitor_loop(self):
|
|
|
+ """监控循环"""
|
|
|
+ while self.running:
|
|
|
try:
|
|
|
- cursor.execute(f"""
|
|
|
- SELECT
|
|
|
- COUNT(*) as total_rows,
|
|
|
- COUNT(DISTINCT id_farm) as farm_count,
|
|
|
- COUNT(DISTINCT id_turbine) as turbine_count,
|
|
|
- MIN(data_time) as first_data_time,
|
|
|
- MAX(data_time) as last_data_time,
|
|
|
- MIN(create_time) as first_create,
|
|
|
- MAX(update_time) as last_update
|
|
|
- FROM `{table_name}`
|
|
|
- """)
|
|
|
+ # 获取当前状态
|
|
|
+ status = self.db_manager.get_connection_status()
|
|
|
|
|
|
- result = cursor.fetchone()
|
|
|
- if result:
|
|
|
- stats = {
|
|
|
- 'total_rows': result[0],
|
|
|
- 'farm_count': result[1],
|
|
|
- 'turbine_count': result[2],
|
|
|
- 'first_data_time': result[3],
|
|
|
- 'last_data_time': result[4],
|
|
|
- 'first_create': result[5],
|
|
|
- 'last_update': result[6]
|
|
|
- }
|
|
|
+ # 记录状态变化
|
|
|
+ if status['status'] != "HEALTHY":
|
|
|
+ logger.warning(f"数据库连接状态异常: {status['status']}")
|
|
|
|
|
|
- logger.info(f"📈 表 '{table_name}' 统计信息:")
|
|
|
- logger.info(f" 总行数: {stats['total_rows']:,}")
|
|
|
- logger.info(f" 风场数量: {stats['farm_count']}")
|
|
|
- logger.info(f" 风机数量: {stats['turbine_count']}")
|
|
|
- logger.info(f" 最早数据时间: {stats['first_data_time']}")
|
|
|
- logger.info(f" 最新数据时间: {stats['last_data_time']}")
|
|
|
-
|
|
|
- return stats
|
|
|
- else:
|
|
|
- return {}
|
|
|
- finally:
|
|
|
- if cursor:
|
|
|
- try:
|
|
|
- cursor.close()
|
|
|
- except:
|
|
|
- pass
|
|
|
+ # 如果连续错误超过阈值,尝试自动修复
|
|
|
+ if status['error_count'] >= 3:
|
|
|
+ logger.info("检测到连续错误,尝试自动修复连接...")
|
|
|
+ self.db_manager._auto_reconnect()
|
|
|
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ 获取表统计失败: {e}")
|
|
|
- return {}
|
|
|
- finally:
|
|
|
- # 查询完成后释放连接
|
|
|
- self.release_connection()
|
|
|
-
|
|
|
- def check_duplicate_keys(self, table_name: str) -> List[Dict]:
|
|
|
- """检查重复的唯一键记录"""
|
|
|
- conn = None
|
|
|
- cursor = None
|
|
|
-
|
|
|
- try:
|
|
|
- conn = self.get_connection()
|
|
|
- cursor = conn.cursor()
|
|
|
-
|
|
|
- try:
|
|
|
- cursor.execute(f"""
|
|
|
- SELECT
|
|
|
- id_farm,
|
|
|
- id_turbine,
|
|
|
- data_time,
|
|
|
- COUNT(*) as duplicate_count,
|
|
|
- MIN(id) as min_id,
|
|
|
- MAX(id) as max_id
|
|
|
- FROM `{table_name}`
|
|
|
- GROUP BY id_farm, id_turbine, data_time
|
|
|
- HAVING COUNT(*) > 1
|
|
|
- ORDER BY duplicate_count DESC
|
|
|
- LIMIT 10
|
|
|
- """)
|
|
|
+ # 等待下一次检查
|
|
|
+ time.sleep(self.check_interval)
|
|
|
|
|
|
- duplicates = []
|
|
|
- for row in cursor.fetchall():
|
|
|
- duplicate_info = {
|
|
|
- 'id_farm': row[0],
|
|
|
- 'id_turbine': row[1],
|
|
|
- 'data_time': row[2],
|
|
|
- 'duplicate_count': row[3],
|
|
|
- 'min_id': row[4],
|
|
|
- 'max_id': row[5]
|
|
|
- }
|
|
|
- duplicates.append(duplicate_info)
|
|
|
-
|
|
|
- if duplicates:
|
|
|
- logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组")
|
|
|
- for dup in duplicates[:3]: # 只显示前3个
|
|
|
- logger.warning(f" 重复: 风场={dup['id_farm']}, 风机={dup['id_turbine']}, "
|
|
|
- f"时间={dup['data_time']}, 重复次数={dup['duplicate_count']}")
|
|
|
- else:
|
|
|
- logger.info(f"✅ 无重复的唯一键记录")
|
|
|
-
|
|
|
- return duplicates
|
|
|
- finally:
|
|
|
- if cursor:
|
|
|
- try:
|
|
|
- cursor.close()
|
|
|
- except:
|
|
|
- pass
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ 检查重复键失败: {e}")
|
|
|
- return []
|
|
|
- finally:
|
|
|
- # 查询完成后释放连接
|
|
|
- self.release_connection()
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"监控循环异常: {e}")
|
|
|
+ time.sleep(self.check_interval)
|