import pandas as pd import numpy as np from typing import List, Dict, Any, Optional, Tuple import traceback import logging from datetime import datetime as dt import hashlib import pymysql from pymysql import Connection, cursors from dbutils.pooled_db import PooledDB import json import time import re import threading from threading import Thread, Event import signal import sys import os from config import DatabaseConfig, TableConfig from file_scanner import ParquetFileInfo # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) class DatabaseManager: """数据库管理器,使用连接池管理数据库连接,包含连通性测试和自动重连""" # 线程局部存储,用于保存每个线程的数据库连接 _thread_local = threading.local() def __init__(self, config: DatabaseConfig, table_config: TableConfig, pool_size: int = 6): self.config = config self.table_config = table_config self.pool_size = pool_size # 连接状态监控相关 self._connection_monitor_thread = None self._monitor_running = Event() self._last_connection_check = None self._connection_status = "UNKNOWN" # UNKNOWN, HEALTHY, WARNING, ERROR, DISCONNECTED self._connection_error_count = 0 self._max_error_count = 5 # 连续错误次数阈值 self._monitor_interval = 1 # 监控间隔(秒) self._monitor_stats = { 'total_checks': 0, 'successful_checks': 0, 'failed_checks': 0, 'total_reconnections': 0, 'last_error': None, 'last_success': None } # 连接池 self.pool: Optional[PooledDB] = None # 初始化连接池 self._init_pool() # 启动连接监控线程 self._start_connection_monitor() # 注册信号处理,优雅关闭 self._setup_signal_handlers() def _setup_signal_handlers(self): """设置信号处理,确保程序退出时能正确关闭资源""" def signal_handler(signum, frame): logger.info(f"接收到信号 {signum},正在关闭数据库连接池...") self.close() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGTERM, signal_handler) # 终止信号 def _init_pool(self): """初始化数据库连接池""" try: logger.info(f"正在初始化数据库连接池到 {self.config.host}:{self.config.port}/{self.config.database}") self.pool = PooledDB( creator=pymysql, maxconnections=self.pool_size, mincached=2, maxcached=5, maxshared=3, blocking=True, maxusage=None, setsession=[], ping=1, host=self.config.host, port=self.config.port, user=self.config.user, password=self.config.password, database=self.config.database, charset=self.config.charset, cursorclass=cursors.DictCursor, autocommit=False, connect_timeout=30, read_timeout=600, write_timeout=600, client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS ) logger.info(f"数据库连接池初始化成功") # 测试连接池 test_result = self._test_pool_connection() if test_result: self._connection_status = "HEALTHY" self._monitor_stats['last_success'] = dt.now() except Exception as e: logger.error(f"数据库连接池初始化失败: {e}") self._connection_status = "ERROR" self._monitor_stats['last_error'] = str(e) raise def _test_pool_connection(self): """测试连接池连接""" try: conn = self.pool.connection() cursor = conn.cursor() start_time = time.time() cursor.execute("SELECT 1 as test, NOW() as server_time") result = cursor.fetchone() elapsed = time.time() - start_time cursor.close() conn.close() logger.info(f"连接池测试成功: 响应时间={elapsed:.3f}s, 服务器时间={result['server_time']}") return True except Exception as e: logger.error(f"连接池测试失败: {e}") return False def _start_connection_monitor(self): """启动连接监控线程""" if self._connection_monitor_thread is None or not self._connection_monitor_thread.is_alive(): self._monitor_running.clear() self._connection_monitor_thread = Thread( target=self._connection_monitor_loop, name="DBConnectionMonitor", daemon=True ) self._connection_monitor_thread.start() logger.info("数据库连接监控线程已启动") def _stop_connection_monitor(self): """停止连接监控线程""" if self._connection_monitor_thread and self._connection_monitor_thread.is_alive(): self._monitor_running.set() self._connection_monitor_thread.join(timeout=5) logger.info("数据库连接监控线程已停止") def _connection_monitor_loop(self): """连接监控循环""" logger.info(f"连接监控线程开始运行,检查间隔: {self._monitor_interval}秒") last_log_time = time.time() log_interval = 60 while not self._monitor_running.is_set(): try: self._perform_connection_check() current_time = time.time() if current_time - last_log_time >= log_interval: self._log_connection_status() last_log_time = current_time time.sleep(self._monitor_interval) except Exception as e: logger.error(f"连接监控循环异常: {e}") time.sleep(self._monitor_interval) logger.info("连接监控循环结束") def _perform_connection_check(self): """执行连接检查""" try: self._monitor_stats['total_checks'] += 1 self._last_connection_check = dt.now() if self.pool is None: self._connection_status = "ERROR" self._connection_error_count += 1 self._monitor_stats['failed_checks'] += 1 self._monitor_stats['last_error'] = "连接池未初始化" logger.warning("连接池未初始化") return conn = None cursor = None try: conn = self.pool.connection() cursor = conn.cursor() start_time = time.time() cursor.execute("SELECT 1 as test, NOW() as server_time, " "VERSION() as version, CONNECTION_ID() as connection_id") result = cursor.fetchone() elapsed = time.time() - start_time self._connection_status = "HEALTHY" self._connection_error_count = 0 self._monitor_stats['successful_checks'] += 1 self._monitor_stats['last_success'] = dt.now() if logger.isEnabledFor(logging.DEBUG): logger.debug(f"连接检查成功: " f"响应时间={elapsed:.3f}s, " f"服务器时间={result['server_time']}, " f"MySQL版本={result['version']}, " f"连接ID={result['connection_id']}") except Exception as e: self._connection_status = "ERROR" self._connection_error_count += 1 self._monitor_stats['failed_checks'] += 1 self._monitor_stats['last_error'] = str(e) if self._connection_error_count >= self._max_error_count: self._connection_status = "DISCONNECTED" logger.error(f"数据库连接失败,已连续失败 {self._connection_error_count} 次: {e}") else: logger.warning(f"数据库连接检查失败 (第{self._connection_error_count}次): {e}") if self._connection_error_count >= 3: self._auto_reconnect() finally: if cursor: cursor.close() if conn: conn.close() except Exception as e: logger.error(f"执行连接检查时发生异常: {e}") self._monitor_stats['failed_checks'] += 1 self._monitor_stats['last_error'] = str(e) def _auto_reconnect(self): """自动重连机制""" try: logger.warning(f"检测到连接问题,正在尝试自动重连 (错误计数: {self._connection_error_count})") self.release_connection() time.sleep(2) if self.pool: test_result = self._test_pool_connection() if test_result: self._connection_status = "HEALTHY" self._connection_error_count = 0 self._monitor_stats['total_reconnections'] += 1 logger.info("自动重连成功") return True else: logger.warning("连接池测试失败,尝试重新初始化") old_pool = self.pool try: self.pool = None if old_pool: old_pool.close() self._init_pool() self._connection_status = "HEALTHY" self._connection_error_count = 0 self._monitor_stats['total_reconnections'] += 1 logger.info("连接池重新初始化成功") return True except Exception as e: logger.error(f"连接池重新初始化失败: {e}") self._connection_status = "ERROR" return False except Exception as e: logger.error(f"自动重连失败: {e}") return False def _log_connection_status(self): """记录连接状态""" status_map = { "HEALTHY": "✅ 健康", "WARNING": "⚠️ 警告", "ERROR": "❌ 错误", "DISCONNECTED": "🔌 断开连接", "UNKNOWN": "❓ 未知" } status_text = status_map.get(self._connection_status, self._connection_status) stats_info = ( f"连接状态统计:\n" f" 当前状态: {status_text}\n" f" 总检查次数: {self._monitor_stats['total_checks']}\n" f" 成功次数: {self._monitor_stats['successful_checks']}\n" f" 失败次数: {self._monitor_stats['failed_checks']}\n" f" 连续错误次数: {self._connection_error_count}\n" f" 总重连次数: {self._monitor_stats['total_reconnections']}\n" f" 最后成功: {self._monitor_stats['last_success']}\n" f" 最后错误: {self._monitor_stats['last_error']}" ) if self._connection_status in ["HEALTHY", "WARNING"]: logger.info(stats_info) else: logger.warning(stats_info) def get_connection_status(self) -> Dict[str, Any]: """获取当前连接状态""" return { 'status': self._connection_status, 'error_count': self._connection_error_count, 'last_check': self._last_connection_check, 'monitor_stats': self._monitor_stats.copy(), 'config': { 'host': self.config.host, 'port': self.config.port, 'database': self.config.database, 'pool_size': self.pool_size } } def wait_for_connection(self, timeout: int = 30, check_interval: int = 1) -> bool: """ 等待数据库连接恢复正常 Args: timeout: 总等待时间(秒) check_interval: 检查间隔(秒) Returns: bool: 是否成功连接 """ logger.info(f"等待数据库连接恢复,超时时间: {timeout}秒") start_time = time.time() attempts = 0 while time.time() - start_time < timeout: attempts += 1 if self._connection_status == "HEALTHY": logger.info(f"数据库连接已恢复,等待时间: {time.time() - start_time:.1f}秒") return True if self._connection_status == "DISCONNECTED": logger.info(f"尝试重连 (第{attempts}次)") if self._auto_reconnect(): return True logger.info(f"等待连接恢复... ({attempts}/{int(timeout/check_interval)})") time.sleep(check_interval) logger.error(f"等待数据库连接恢复超时 ({timeout}秒)") return False def get_connection(self) -> Connection: """从连接池获取数据库连接""" try: if self._connection_status in ["ERROR", "DISCONNECTED"]: logger.warning(f"获取连接时检测到连接状态为 {self._connection_status},尝试自动重连") if not self.wait_for_connection(timeout=10): raise ConnectionError(f"数据库连接不可用,当前状态: {self._connection_status}") if not hasattr(self._thread_local, 'connection') or self._thread_local.connection is None: conn = self.pool.connection() self._thread_local.connection = conn logger.debug(f"从连接池获取新连接,当前线程: {threading.current_thread().name}") return self._thread_local.connection except Exception as e: logger.error(f"从连接池获取连接失败: {e}") self._connection_status = "ERROR" self._connection_error_count += 1 self._monitor_stats['last_error'] = str(e) self._auto_reconnect() raise def release_connection(self): """释放当前线程的数据库连接回连接池""" try: if hasattr(self._thread_local, 'connection') and self._thread_local.connection is not None: conn = self._thread_local.connection try: if hasattr(conn, '_con') and isinstance(conn._con, Connection): original_conn = conn._con if not original_conn.get_autocommit(): original_conn.rollback() else: conn.rollback() conn.close() logger.debug(f"连接已归还到连接池,当前线程: {threading.current_thread().name}") except Exception as e: logger.warning(f"关闭连接时出错: {e}") finally: self._thread_local.connection = None except Exception as e: logger.error(f"释放连接时出错: {e}") def check_connection(self) -> bool: """检查连接是否有效""" try: conn = self.get_connection() cursor = None try: cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() return True finally: if cursor: cursor.close() except Exception as e: logger.warning(f"连接检查失败: {e}") self.release_connection() return False def reconnect(self): """重新连接数据库(连接池会自动处理)""" try: logger.info("正在重新连接数据库...") self.release_connection() time.sleep(1) self.get_connection() logger.info("数据库重新连接成功") except Exception as e: logger.error(f"数据库重新连接失败: {e}") raise def close(self): """关闭数据库连接池的简便方法""" self.close_pool() def close_all(self): """关闭所有资源,包括连接池和监控线程""" logger.info("正在关闭所有数据库资源...") self._stop_connection_monitor() self.close_pool() self._log_connection_status() logger.info("所有数据库资源已关闭") def close_pool(self): """关闭整个连接池""" try: if self.pool: self.release_connection() self.pool.close() self.pool = None self._connection_status = "DISCONNECTED" logger.info("数据库连接池已关闭") except Exception as e: logger.error(f"关闭连接池时出错: {e}") # ============ 数据库操作相关方法 ============ def check_table_exists(self, table_name: str) -> bool: """检查表是否存在""" conn = None cursor = None try: conn = self.get_connection() cursor = conn.cursor() try: cursor.execute("SHOW TABLES LIKE %s", (table_name,)) result = cursor.fetchone() exists = result is not None logger.info(f"🔍 检查表 '{table_name}' 存在: {exists}") return exists finally: if cursor: cursor.close() except Exception as e: logger.error(f"❌ 检查表存在失败: {e}") return False finally: self.release_connection() def create_table_with_unique_key(self, table_name: str, columns: List[str], unique_keys: List[str]) -> bool: """根据列定义创建表,包含三字段唯一键,数据字段使用DOUBLE类型""" if not self.wait_for_connection(): logger.error("创建表失败:数据库连接不可用") return False conn = None cursor = None try: conn = self.get_connection() cursor = conn.cursor() try: drop_sql = f"DROP TABLE IF EXISTS `{table_name}`" cursor.execute(drop_sql) logger.info(f"已删除旧表: {table_name}") columns_sql = ",\n ".join(columns) unique_keys_str = ', '.join([f'`{key}`' for key in unique_keys]) create_sql = f""" CREATE TABLE `{table_name}` ( `id` BIGINT AUTO_INCREMENT PRIMARY KEY, {columns_sql}, `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `data_hash` VARCHAR(64) COMMENT '数据哈希值,用于快速比较', UNIQUE KEY uk_turbine_data ({unique_keys_str}), INDEX idx_farm (`id_farm`), INDEX idx_turbine (`id_turbine`), INDEX idx_time (`data_time`), INDEX idx_farm_turbine (`id_farm`, `id_turbine`), INDEX idx_composite (`id_farm`, `id_turbine`, `data_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ logger.debug(f"创建表的SQL语句:\n{create_sql}") cursor.execute(create_sql) conn.commit() logger.info(f"表 '{table_name}' 创建成功!") logger.info(f"三字段唯一键: {unique_keys}") return True finally: if cursor: cursor.close() except Exception as e: logger.error(f"创建表失败: {e}") logger.error(traceback.format_exc()) if conn: conn.rollback() return False finally: pass def get_table_row_count(self, table_name: str) -> int: """获取表的行数""" conn = None cursor = None try: conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`") result = cursor.fetchone() count = result['count'] if result else 0 logger.info(f"📊 表 '{table_name}' 行数: {count:,}") return count finally: if cursor: cursor.close() except Exception as e: logger.error(f"❌ 获取表行数失败: {e}") return 0 finally: self.release_connection() def get_table_stats(self, table_name: str) -> Dict[str, Any]: """获取表统计信息""" conn = None cursor = None try: conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(f""" SELECT COUNT(*) as total_rows, COUNT(DISTINCT id_farm) as farm_count, COUNT(DISTINCT id_turbine) as turbine_count, MIN(data_time) as first_data_time, MAX(data_time) as last_data_time, MIN(create_time) as first_create, MAX(update_time) as last_update FROM `{table_name}` """) result = cursor.fetchone() if result: stats = { 'total_rows': result[0], 'farm_count': result[1], 'turbine_count': result[2], 'first_data_time': result[3], 'last_data_time': result[4], 'first_create': result[5], 'last_update': result[6] } logger.info(f"📈 表 '{table_name}' 统计信息:") logger.info(f" 总行数: {stats['total_rows']:,}") logger.info(f" 风场数量: {stats['farm_count']}") logger.info(f" 风机数量: {stats['turbine_count']}") logger.info(f" 最早数据时间: {stats['first_data_time']}") logger.info(f" 最新数据时间: {stats['last_data_time']}") return stats else: return {} finally: if cursor: cursor.close() except Exception as e: logger.error(f"❌ 获取表统计失败: {e}") return {} finally: self.release_connection() def check_duplicate_keys(self, table_name: str) -> List[Dict]: """检查重复的唯一键记录""" conn = None cursor = None try: conn = self.get_connection() cursor = conn.cursor() try: cursor.execute(f""" SELECT id_farm, id_turbine, data_time, COUNT(*) as duplicate_count, MIN(id) as min_id, MAX(id) as max_id FROM `{table_name}` GROUP BY id_farm, id_turbine, data_time HAVING COUNT(*) > 1 ORDER BY duplicate_count DESC LIMIT 10 """) duplicates = [] for row in cursor.fetchall(): duplicate_info = { 'id_farm': row[0], 'id_turbine': row[1], 'data_time': row[2], 'duplicate_count': row[3], 'min_id': row[4], 'max_id': row[5] } duplicates.append(duplicate_info) if duplicates: logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组") for dup in duplicates[:3]: logger.warning(f" 重复: 风场={dup['id_farm']}, 风机={dup['id_turbine']}, " f"时间={dup['data_time']}, 重复次数={dup['duplicate_count']}") else: logger.info(f"✅ 无重复的唯一键记录") return duplicates finally: if cursor: cursor.close() except Exception as e: logger.error(f"❌ 检查重复键失败: {e}") return [] finally: self.release_connection() def upsert_parquet_data(self, file_info: ParquetFileInfo, table_name: str, batch_size: int = 100, max_retries: int = 3) -> Tuple[int, int, int]: """ UPSERT单个parquet文件数据到数据库,使用三字段唯一键 Args: file_info: 文件信息(包含识别到的时间字段名) table_name: 表名 batch_size: 批处理大小 max_retries: 最大重试次数 Returns: (总行数, 插入行数, 更新行数) """ try: logger.info(f"📂 正在读取并处理文件: {file_info.file_path}") logger.info(f"⏰ 识别到的时间字段: {file_info.data_time_column}") # 读取parquet文件 df = pd.read_parquet(file_info.file_path, engine='pyarrow') # 添加元数据字段 df['id_farm'] = file_info.farm_id df['name_farm'] = file_info.farm_name df['no_model_turbine'] = file_info.model_type df['id_turbine'] = file_info.turbine_id logger.info(f"📊 文件 {file_info.turbine_id}.parquet 读取完成,形状: {df.shape}") # 简化数据清理 cleaned_df = self._clean_and_convert_simple(df, file_info.data_time_column) # 确保必需字段存在 required_columns = ['id_farm', 'id_turbine', 'data_time'] for col in required_columns: if col not in cleaned_df.columns: logger.error(f"必需字段 '{col}' 不存在于数据中") cleaned_df[col] = None # 获取列名 columns = list(cleaned_df.columns) # 准备UPSERT SQL upsert_sql = self._prepare_upsert_sql(table_name, columns) # 分批处理 total_rows = len(cleaned_df) total_batches = (total_rows + batch_size - 1) // batch_size total_affected_rows = 0 total_failed_rows = 0 logger.info(f"🚀 准备处理 {total_rows} 行数据,分为 {total_batches} 个批次") for i in range(0, total_rows, batch_size): batch_df = cleaned_df.iloc[i:i + batch_size] batch_num = i // batch_size + 1 retry_count = 0 batch_success = False while retry_count <= max_retries and not batch_success: conn = None cursor = None try: if not self.check_connection(): logger.warning(f"🔌 连接已断开,正在重新连接...") self.reconnect() conn = self.get_connection() cursor = conn.cursor() # 转换为元组列表 batch_values = [] for _, row in batch_df.iterrows(): row_tuple = self._convert_row_to_tuple(row, columns) batch_values.append(row_tuple) # 执行批量插入 affected = cursor.executemany(upsert_sql, batch_values) total_affected_rows += affected conn.commit() batch_success = True if batch_num % 10 == 0 or batch_num == total_batches: logger.info(f"✅ 批次 {batch_num}/{total_batches}: 处理 {len(batch_df)} 行, " f"受影响 {affected} 行") except (pymysql.Error, AttributeError) as e: retry_count += 1 logger.error(f"❌ 批次 {batch_num} UPSERT失败,错误: {str(e)}") if retry_count > max_retries: logger.error(f"❌ 批次 {batch_num} UPSERT失败,已达到最大重试次数") # 单条插入 batch_affected = 0 batch_failed = 0 for idx, (_, row) in enumerate(batch_df.iterrows()): row_retry_count = 0 row_success = False while row_retry_count <= max_retries and not row_success: try: if not self.check_connection(): self.reconnect() single_conn = self.get_connection() single_cursor = single_conn.cursor() row_tuple = self._convert_row_to_tuple(row, columns) single_cursor.execute(upsert_sql, row_tuple) batch_affected += single_cursor.rowcount single_conn.commit() row_success = True except Exception as single_e: row_retry_count += 1 if row_retry_count > max_retries: batch_failed += 1 break else: time.sleep(1 * row_retry_count) finally: if 'single_cursor' in locals() and single_cursor: single_cursor.close() total_affected_rows += batch_affected total_failed_rows += batch_failed if batch_affected > 0: logger.info(f"⚠️ 批次 {batch_num} 单条处理完成,成功 {batch_affected} 行, 失败 {batch_failed} 行") break else: logger.warning(f"⚠️ 批次 {batch_num} UPSERT失败,第 {retry_count} 次重试") time.sleep(2 * retry_count) finally: if cursor: cursor.close() # 批次处理完成后暂停一小段时间 if batch_success and batch_num < total_batches: time.sleep(0.1) # 估算插入和更新行数 successful_rows = total_rows - total_failed_rows estimated_inserted = successful_rows // 2 estimated_updated = successful_rows - estimated_inserted logger.info(f"🎉 文件 {os.path.basename(file_info.file_path)} UPSERT完成:") logger.info(f" 总处理行数: {total_rows}") logger.info(f" 总受影响行数: {total_affected_rows}") logger.info(f" 失败行数: {total_failed_rows}") return total_rows, estimated_inserted, estimated_updated except Exception as e: logger.error(f"❌ 处理文件 {file_info.file_path} 失败: {str(e)}") raise def _clean_and_convert_simple(self, df: pd.DataFrame, data_time_column: str = None) -> pd.DataFrame: """简化版数据清理""" try: cleaned_df = df.copy() # 确保必需字段存在 required_fields = ['id_farm', 'name_farm', 'no_model_turbine', 'id_turbine'] for field in required_fields: if field not in cleaned_df.columns: cleaned_df[field] = None # 处理时间字段 if 'data_time' not in cleaned_df.columns: if data_time_column and data_time_column in cleaned_df.columns: cleaned_df['data_time'] = cleaned_df[data_time_column] else: for col in cleaned_df.columns: col_lower = col.lower() if any(keyword in col_lower for keyword in ['time', 'date', 'timestamp']): cleaned_df['data_time'] = cleaned_df[col] logger.info(f"使用字段 '{col}' 作为 data_time") break # 确保data_time是datetime类型 if 'data_time' in cleaned_df.columns: try: cleaned_df['data_time'] = pd.to_datetime(cleaned_df['data_time'], errors='coerce') except: logger.warning("data_time字段转换失败,保持原样") # 处理NaN cleaned_df = cleaned_df.replace({np.nan: None, pd.NaT: None}) # 计算数据哈希 def simple_hash(row): try: data_fields = [col for col in cleaned_df.columns if col not in ['id_farm', 'name_farm', 'no_model_turbine', 'id_turbine', 'data_time', 'data_hash']] hash_str = '' for field in sorted(data_fields): val = row[field] if val is not None: if isinstance(val, (dt, pd.Timestamp)): hash_str += f"{field}:{val.isoformat()}|" else: hash_str += f"{field}:{str(val)}|" return hashlib.md5(hash_str.encode('utf-8')).hexdigest() if hash_str else None except: return None cleaned_df['data_hash'] = cleaned_df.apply(simple_hash, axis=1) logger.info(f"数据清理完成,原始形状: {df.shape}, 清理后形状: {cleaned_df.shape}") return cleaned_df except Exception as e: logger.error(f"数据清理失败: {e}") logger.error(traceback.format_exc()) return df def _prepare_upsert_sql(self, table_name: str, columns: List[str]) -> str: """准备UPSERT SQL语句""" exclude_columns = ['id_farm', 'id_turbine', 'data_time', 'id', 'create_time', 'update_time', 'data_hash'] update_columns = [col for col in columns if col not in exclude_columns] column_names = ', '.join([f'`{col}`' for col in columns]) placeholders = ', '.join(['%s'] * len(columns)) update_clauses = [] for col in update_columns: update_clauses.append(f"`{col}` = VALUES(`{col}`)") update_clause = ', '.join(update_clauses) upsert_sql = f""" INSERT INTO `{table_name}` ({column_names}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {update_clause} """ logger.debug(f"UPSERT SQL生成完成,共 {len(columns)} 列") return upsert_sql def _convert_to_numeric(self, value): """将值转换为数值类型""" if pd.isna(value) or value is None: return None try: if isinstance(value, (int, float, np.integer, np.floating)): if isinstance(value, np.integer): return int(value) elif isinstance(value, np.floating): return float(value) return value if isinstance(value, (bool, np.bool_)): return 1 if bool(value) else 0 if isinstance(value, str): cleaned = value.strip() if cleaned == '': return None cleaned = cleaned.replace(',', '').replace('%', '').replace(' ', '') try: return float(cleaned) except ValueError: try: return int(cleaned) except ValueError: return None try: str_val = str(value) cleaned = str_val.replace(',', '').replace('%', '').replace(' ', '') return float(cleaned) except: return None except Exception as e: return None def _convert_row_to_tuple(self, row: pd.Series, columns: List[str]) -> Tuple: """将单行数据转换为元组""" try: row_values = [] keep_original_fields = [ 'id', 'data_time', 'id_farm', 'id_turbine', 'name_farm', 'no_model_turbine', 'create_time', 'update_time', 'data_hash' ] for col in columns: value = row[col] if col == 'data_time': if pd.isna(value): row_values.append(None) elif isinstance(value, pd.Timestamp): row_values.append(value.to_pydatetime()) elif isinstance(value, dt): row_values.append(value) else: try: row_values.append(pd.to_datetime(value).to_pydatetime()) except: row_values.append(None) elif col in ['id_farm', 'id_turbine', 'name_farm', 'no_model_turbine', 'data_hash']: if pd.isna(value) or value is None: row_values.append(None) else: row_values.append(str(value)) elif col in keep_original_fields: row_values.append(value) else: numeric_value = self._convert_to_numeric(value) row_values.append(numeric_value) return tuple(row_values) except Exception as e: logger.warning(f"转换行数据失败: {e}") return tuple([None] * len(columns))