| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031 |
- import pandas as pd
- import numpy as np
- from typing import List, Dict, Any, Optional, Tuple
- import traceback
- import logging
- from datetime import datetime as dt
- import hashlib
- import pymysql
- from pymysql import Connection, cursors
- from dbutils.pooled_db import PooledDB
- import json
- import time
- import re
- import threading
- from threading import Thread, Event
- import signal
- import sys
- import os
- from config import DatabaseConfig, TableConfig
- from file_scanner import ParquetFileInfo
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- logger = logging.getLogger(__name__)
- class DatabaseManager:
- """数据库管理器,使用连接池管理数据库连接,包含连通性测试和自动重连"""
-
- # 线程局部存储,用于保存每个线程的数据库连接
- _thread_local = threading.local()
-
- def __init__(self, config: DatabaseConfig, table_config: TableConfig, pool_size: int = 6):
- self.config = config
- self.table_config = table_config
- self.pool_size = pool_size
-
- # 连接状态监控相关
- self._connection_monitor_thread = None
- self._monitor_running = Event()
- self._last_connection_check = None
- self._connection_status = "UNKNOWN" # UNKNOWN, HEALTHY, WARNING, ERROR, DISCONNECTED
- self._connection_error_count = 0
- self._max_error_count = 5 # 连续错误次数阈值
- self._monitor_interval = 1 # 监控间隔(秒)
- self._monitor_stats = {
- 'total_checks': 0,
- 'successful_checks': 0,
- 'failed_checks': 0,
- 'total_reconnections': 0,
- 'last_error': None,
- 'last_success': None
- }
-
- # 连接池
- self.pool: Optional[PooledDB] = None
-
- # 初始化连接池
- self._init_pool()
-
- # 启动连接监控线程
- self._start_connection_monitor()
-
- # 注册信号处理,优雅关闭
- self._setup_signal_handlers()
-
- def _setup_signal_handlers(self):
- """设置信号处理,确保程序退出时能正确关闭资源"""
- def signal_handler(signum, frame):
- logger.info(f"接收到信号 {signum},正在关闭数据库连接池...")
- self.close()
- sys.exit(0)
-
- signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
- signal.signal(signal.SIGTERM, signal_handler) # 终止信号
-
- def _init_pool(self):
- """初始化数据库连接池"""
- try:
- logger.info(f"正在初始化数据库连接池到 {self.config.host}:{self.config.port}/{self.config.database}")
-
- self.pool = PooledDB(
- creator=pymysql,
- maxconnections=self.pool_size,
- mincached=2,
- maxcached=5,
- maxshared=3,
- blocking=True,
- maxusage=None,
- setsession=[],
- ping=1,
- host=self.config.host,
- port=self.config.port,
- user=self.config.user,
- password=self.config.password,
- database=self.config.database,
- charset=self.config.charset,
- cursorclass=cursors.DictCursor,
- autocommit=False,
- connect_timeout=30,
- read_timeout=600,
- write_timeout=600,
- client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS
- )
-
- logger.info(f"数据库连接池初始化成功")
-
- # 测试连接池
- test_result = self._test_pool_connection()
- if test_result:
- self._connection_status = "HEALTHY"
- self._monitor_stats['last_success'] = dt.now()
-
- except Exception as e:
- logger.error(f"数据库连接池初始化失败: {e}")
- self._connection_status = "ERROR"
- self._monitor_stats['last_error'] = str(e)
- raise
-
- def _test_pool_connection(self):
- """测试连接池连接"""
- try:
- conn = self.pool.connection()
- cursor = conn.cursor()
- start_time = time.time()
- cursor.execute("SELECT 1 as test, NOW() as server_time")
- result = cursor.fetchone()
- elapsed = time.time() - start_time
-
- cursor.close()
- conn.close()
-
- logger.info(f"连接池测试成功: 响应时间={elapsed:.3f}s, 服务器时间={result['server_time']}")
- return True
-
- except Exception as e:
- logger.error(f"连接池测试失败: {e}")
- return False
-
- def _start_connection_monitor(self):
- """启动连接监控线程"""
- if self._connection_monitor_thread is None or not self._connection_monitor_thread.is_alive():
- self._monitor_running.clear()
- self._connection_monitor_thread = Thread(
- target=self._connection_monitor_loop,
- name="DBConnectionMonitor",
- daemon=True
- )
- self._connection_monitor_thread.start()
- logger.info("数据库连接监控线程已启动")
-
- def _stop_connection_monitor(self):
- """停止连接监控线程"""
- if self._connection_monitor_thread and self._connection_monitor_thread.is_alive():
- self._monitor_running.set()
- self._connection_monitor_thread.join(timeout=5)
- logger.info("数据库连接监控线程已停止")
-
- def _connection_monitor_loop(self):
- """连接监控循环"""
- logger.info(f"连接监控线程开始运行,检查间隔: {self._monitor_interval}秒")
-
- last_log_time = time.time()
- log_interval = 60
-
- while not self._monitor_running.is_set():
- try:
- self._perform_connection_check()
-
- current_time = time.time()
- if current_time - last_log_time >= log_interval:
- self._log_connection_status()
- last_log_time = current_time
-
- time.sleep(self._monitor_interval)
-
- except Exception as e:
- logger.error(f"连接监控循环异常: {e}")
- time.sleep(self._monitor_interval)
-
- logger.info("连接监控循环结束")
-
- def _perform_connection_check(self):
- """执行连接检查"""
- try:
- self._monitor_stats['total_checks'] += 1
- self._last_connection_check = dt.now()
-
- if self.pool is None:
- self._connection_status = "ERROR"
- self._connection_error_count += 1
- self._monitor_stats['failed_checks'] += 1
- self._monitor_stats['last_error'] = "连接池未初始化"
- logger.warning("连接池未初始化")
- return
-
- conn = None
- cursor = None
- try:
- conn = self.pool.connection()
- cursor = conn.cursor()
-
- start_time = time.time()
- cursor.execute("SELECT 1 as test, NOW() as server_time, "
- "VERSION() as version, CONNECTION_ID() as connection_id")
- result = cursor.fetchone()
- elapsed = time.time() - start_time
-
- self._connection_status = "HEALTHY"
- self._connection_error_count = 0
- self._monitor_stats['successful_checks'] += 1
- self._monitor_stats['last_success'] = dt.now()
-
- if logger.isEnabledFor(logging.DEBUG):
- logger.debug(f"连接检查成功: "
- f"响应时间={elapsed:.3f}s, "
- f"服务器时间={result['server_time']}, "
- f"MySQL版本={result['version']}, "
- f"连接ID={result['connection_id']}")
-
- except Exception as e:
- self._connection_status = "ERROR"
- self._connection_error_count += 1
- self._monitor_stats['failed_checks'] += 1
- self._monitor_stats['last_error'] = str(e)
-
- if self._connection_error_count >= self._max_error_count:
- self._connection_status = "DISCONNECTED"
- logger.error(f"数据库连接失败,已连续失败 {self._connection_error_count} 次: {e}")
- else:
- logger.warning(f"数据库连接检查失败 (第{self._connection_error_count}次): {e}")
-
- if self._connection_error_count >= 3:
- self._auto_reconnect()
-
- finally:
- if cursor:
- cursor.close()
- if conn:
- conn.close()
-
- except Exception as e:
- logger.error(f"执行连接检查时发生异常: {e}")
- self._monitor_stats['failed_checks'] += 1
- self._monitor_stats['last_error'] = str(e)
-
- def _auto_reconnect(self):
- """自动重连机制"""
- try:
- logger.warning(f"检测到连接问题,正在尝试自动重连 (错误计数: {self._connection_error_count})")
-
- self.release_connection()
- time.sleep(2)
-
- if self.pool:
- test_result = self._test_pool_connection()
- if test_result:
- self._connection_status = "HEALTHY"
- self._connection_error_count = 0
- self._monitor_stats['total_reconnections'] += 1
- logger.info("自动重连成功")
- return True
- else:
- logger.warning("连接池测试失败,尝试重新初始化")
-
- old_pool = self.pool
- try:
- self.pool = None
- if old_pool:
- old_pool.close()
-
- self._init_pool()
- self._connection_status = "HEALTHY"
- self._connection_error_count = 0
- self._monitor_stats['total_reconnections'] += 1
- logger.info("连接池重新初始化成功")
- return True
-
- except Exception as e:
- logger.error(f"连接池重新初始化失败: {e}")
- self._connection_status = "ERROR"
- return False
-
- except Exception as e:
- logger.error(f"自动重连失败: {e}")
- return False
-
- def _log_connection_status(self):
- """记录连接状态"""
- status_map = {
- "HEALTHY": "✅ 健康",
- "WARNING": "⚠️ 警告",
- "ERROR": "❌ 错误",
- "DISCONNECTED": "🔌 断开连接",
- "UNKNOWN": "❓ 未知"
- }
-
- status_text = status_map.get(self._connection_status, self._connection_status)
-
- stats_info = (
- f"连接状态统计:\n"
- f" 当前状态: {status_text}\n"
- f" 总检查次数: {self._monitor_stats['total_checks']}\n"
- f" 成功次数: {self._monitor_stats['successful_checks']}\n"
- f" 失败次数: {self._monitor_stats['failed_checks']}\n"
- f" 连续错误次数: {self._connection_error_count}\n"
- f" 总重连次数: {self._monitor_stats['total_reconnections']}\n"
- f" 最后成功: {self._monitor_stats['last_success']}\n"
- f" 最后错误: {self._monitor_stats['last_error']}"
- )
-
- if self._connection_status in ["HEALTHY", "WARNING"]:
- logger.info(stats_info)
- else:
- logger.warning(stats_info)
-
- def get_connection_status(self) -> Dict[str, Any]:
- """获取当前连接状态"""
- return {
- 'status': self._connection_status,
- 'error_count': self._connection_error_count,
- 'last_check': self._last_connection_check,
- 'monitor_stats': self._monitor_stats.copy(),
- 'config': {
- 'host': self.config.host,
- 'port': self.config.port,
- 'database': self.config.database,
- 'pool_size': self.pool_size
- }
- }
-
- def wait_for_connection(self, timeout: int = 30, check_interval: int = 1) -> bool:
- """
- 等待数据库连接恢复正常
-
- Args:
- timeout: 总等待时间(秒)
- check_interval: 检查间隔(秒)
-
- Returns:
- bool: 是否成功连接
- """
- logger.info(f"等待数据库连接恢复,超时时间: {timeout}秒")
-
- start_time = time.time()
- attempts = 0
-
- while time.time() - start_time < timeout:
- attempts += 1
-
- if self._connection_status == "HEALTHY":
- logger.info(f"数据库连接已恢复,等待时间: {time.time() - start_time:.1f}秒")
- return True
-
- if self._connection_status == "DISCONNECTED":
- logger.info(f"尝试重连 (第{attempts}次)")
- if self._auto_reconnect():
- return True
-
- logger.info(f"等待连接恢复... ({attempts}/{int(timeout/check_interval)})")
- time.sleep(check_interval)
-
- logger.error(f"等待数据库连接恢复超时 ({timeout}秒)")
- return False
-
- def get_connection(self) -> Connection:
- """从连接池获取数据库连接"""
- try:
- if self._connection_status in ["ERROR", "DISCONNECTED"]:
- logger.warning(f"获取连接时检测到连接状态为 {self._connection_status},尝试自动重连")
- if not self.wait_for_connection(timeout=10):
- raise ConnectionError(f"数据库连接不可用,当前状态: {self._connection_status}")
-
- if not hasattr(self._thread_local, 'connection') or self._thread_local.connection is None:
- conn = self.pool.connection()
- self._thread_local.connection = conn
- logger.debug(f"从连接池获取新连接,当前线程: {threading.current_thread().name}")
-
- return self._thread_local.connection
-
- except Exception as e:
- logger.error(f"从连接池获取连接失败: {e}")
-
- self._connection_status = "ERROR"
- self._connection_error_count += 1
- self._monitor_stats['last_error'] = str(e)
-
- self._auto_reconnect()
- raise
-
- def release_connection(self):
- """释放当前线程的数据库连接回连接池"""
- try:
- if hasattr(self._thread_local, 'connection') and self._thread_local.connection is not None:
- conn = self._thread_local.connection
- try:
- if hasattr(conn, '_con') and isinstance(conn._con, Connection):
- original_conn = conn._con
- if not original_conn.get_autocommit():
- original_conn.rollback()
- else:
- conn.rollback()
-
- conn.close()
- logger.debug(f"连接已归还到连接池,当前线程: {threading.current_thread().name}")
- except Exception as e:
- logger.warning(f"关闭连接时出错: {e}")
- finally:
- self._thread_local.connection = None
- except Exception as e:
- logger.error(f"释放连接时出错: {e}")
-
- def check_connection(self) -> bool:
- """检查连接是否有效"""
- try:
- conn = self.get_connection()
-
- cursor = None
- try:
- cursor = conn.cursor()
- cursor.execute("SELECT 1")
- cursor.fetchone()
- return True
- finally:
- if cursor:
- cursor.close()
- except Exception as e:
- logger.warning(f"连接检查失败: {e}")
- self.release_connection()
- return False
-
- def reconnect(self):
- """重新连接数据库(连接池会自动处理)"""
- try:
- logger.info("正在重新连接数据库...")
- self.release_connection()
- time.sleep(1)
- self.get_connection()
- logger.info("数据库重新连接成功")
- except Exception as e:
- logger.error(f"数据库重新连接失败: {e}")
- raise
-
- def close(self):
- """关闭数据库连接池的简便方法"""
- self.close_pool()
-
- def close_all(self):
- """关闭所有资源,包括连接池和监控线程"""
- logger.info("正在关闭所有数据库资源...")
- self._stop_connection_monitor()
- self.close_pool()
- self._log_connection_status()
- logger.info("所有数据库资源已关闭")
-
- def close_pool(self):
- """关闭整个连接池"""
- try:
- if self.pool:
- self.release_connection()
- self.pool.close()
- self.pool = None
- self._connection_status = "DISCONNECTED"
- logger.info("数据库连接池已关闭")
- except Exception as e:
- logger.error(f"关闭连接池时出错: {e}")
-
- # ============ 数据库操作相关方法 ============
-
- def check_table_exists(self, table_name: str) -> bool:
- """检查表是否存在"""
- conn = None
- cursor = None
-
- try:
- conn = self.get_connection()
- cursor = conn.cursor()
-
- try:
- cursor.execute("SHOW TABLES LIKE %s", (table_name,))
- result = cursor.fetchone()
- exists = result is not None
- logger.info(f"🔍 检查表 '{table_name}' 存在: {exists}")
- return exists
- finally:
- if cursor:
- cursor.close()
-
- except Exception as e:
- logger.error(f"❌ 检查表存在失败: {e}")
- return False
- finally:
- self.release_connection()
-
- def create_table_with_unique_key(self, table_name: str, columns: List[str],
- unique_keys: List[str]) -> bool:
- """根据列定义创建表,包含三字段唯一键,数据字段使用DOUBLE类型"""
- if not self.wait_for_connection():
- logger.error("创建表失败:数据库连接不可用")
- return False
-
- conn = None
- cursor = None
-
- try:
- conn = self.get_connection()
- cursor = conn.cursor()
-
- try:
- drop_sql = f"DROP TABLE IF EXISTS `{table_name}`"
- cursor.execute(drop_sql)
- logger.info(f"已删除旧表: {table_name}")
-
- columns_sql = ",\n ".join(columns)
- unique_keys_str = ', '.join([f'`{key}`' for key in unique_keys])
-
- create_sql = f"""
- CREATE TABLE `{table_name}` (
- `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
- {columns_sql},
- `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- `data_hash` VARCHAR(64) COMMENT '数据哈希值,用于快速比较',
- UNIQUE KEY uk_turbine_data ({unique_keys_str}),
- INDEX idx_farm (`id_farm`),
- INDEX idx_turbine (`id_turbine`),
- INDEX idx_time (`data_time`),
- INDEX idx_farm_turbine (`id_farm`, `id_turbine`),
- INDEX idx_composite (`id_farm`, `id_turbine`, `data_time`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
-
- logger.debug(f"创建表的SQL语句:\n{create_sql}")
- cursor.execute(create_sql)
- conn.commit()
-
- logger.info(f"表 '{table_name}' 创建成功!")
- logger.info(f"三字段唯一键: {unique_keys}")
- return True
-
- finally:
- if cursor:
- cursor.close()
-
- except Exception as e:
- logger.error(f"创建表失败: {e}")
- logger.error(traceback.format_exc())
- if conn:
- conn.rollback()
- return False
- finally:
- pass
-
- def get_table_row_count(self, table_name: str) -> int:
- """获取表的行数"""
- conn = None
- cursor = None
-
- try:
- conn = self.get_connection()
- cursor = conn.cursor()
-
- try:
- cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
- result = cursor.fetchone()
- count = result['count'] if result else 0
- logger.info(f"📊 表 '{table_name}' 行数: {count:,}")
- return count
- finally:
- if cursor:
- cursor.close()
-
- except Exception as e:
- logger.error(f"❌ 获取表行数失败: {e}")
- return 0
- finally:
- self.release_connection()
-
- def get_table_stats(self, table_name: str) -> Dict[str, Any]:
- """获取表统计信息"""
- conn = None
- cursor = None
-
- try:
- conn = self.get_connection()
- cursor = conn.cursor()
-
- try:
- cursor.execute(f"""
- SELECT
- COUNT(*) as total_rows,
- COUNT(DISTINCT id_farm) as farm_count,
- COUNT(DISTINCT id_turbine) as turbine_count,
- MIN(data_time) as first_data_time,
- MAX(data_time) as last_data_time,
- MIN(create_time) as first_create,
- MAX(update_time) as last_update
- FROM `{table_name}`
- """)
-
- result = cursor.fetchone()
- if result:
- stats = {
- 'total_rows': result[0],
- 'farm_count': result[1],
- 'turbine_count': result[2],
- 'first_data_time': result[3],
- 'last_data_time': result[4],
- 'first_create': result[5],
- 'last_update': result[6]
- }
-
- logger.info(f"📈 表 '{table_name}' 统计信息:")
- logger.info(f" 总行数: {stats['total_rows']:,}")
- logger.info(f" 风场数量: {stats['farm_count']}")
- logger.info(f" 风机数量: {stats['turbine_count']}")
- logger.info(f" 最早数据时间: {stats['first_data_time']}")
- logger.info(f" 最新数据时间: {stats['last_data_time']}")
-
- return stats
- else:
- return {}
- finally:
- if cursor:
- cursor.close()
-
- except Exception as e:
- logger.error(f"❌ 获取表统计失败: {e}")
- return {}
- finally:
- self.release_connection()
-
- def check_duplicate_keys(self, table_name: str) -> List[Dict]:
- """检查重复的唯一键记录"""
- conn = None
- cursor = None
-
- try:
- conn = self.get_connection()
- cursor = conn.cursor()
-
- try:
- cursor.execute(f"""
- SELECT
- id_farm,
- id_turbine,
- data_time,
- COUNT(*) as duplicate_count,
- MIN(id) as min_id,
- MAX(id) as max_id
- FROM `{table_name}`
- GROUP BY id_farm, id_turbine, data_time
- HAVING COUNT(*) > 1
- ORDER BY duplicate_count DESC
- LIMIT 10
- """)
-
- duplicates = []
- for row in cursor.fetchall():
- duplicate_info = {
- 'id_farm': row[0],
- 'id_turbine': row[1],
- 'data_time': row[2],
- 'duplicate_count': row[3],
- 'min_id': row[4],
- 'max_id': row[5]
- }
- duplicates.append(duplicate_info)
-
- if duplicates:
- logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组")
- for dup in duplicates[:3]:
- logger.warning(f" 重复: 风场={dup['id_farm']}, 风机={dup['id_turbine']}, "
- f"时间={dup['data_time']}, 重复次数={dup['duplicate_count']}")
- else:
- logger.info(f"✅ 无重复的唯一键记录")
-
- return duplicates
- finally:
- if cursor:
- cursor.close()
-
- except Exception as e:
- logger.error(f"❌ 检查重复键失败: {e}")
- return []
- finally:
- self.release_connection()
-
- def upsert_parquet_data(self, file_info: ParquetFileInfo, table_name: str,
- batch_size: int = 100, max_retries: int = 3) -> Tuple[int, int, int]:
- """
- UPSERT单个parquet文件数据到数据库,使用三字段唯一键
-
- Args:
- file_info: 文件信息(包含识别到的时间字段名)
- table_name: 表名
- batch_size: 批处理大小
- max_retries: 最大重试次数
-
- Returns:
- (总行数, 插入行数, 更新行数)
- """
- try:
- logger.info(f"📂 正在读取并处理文件: {file_info.file_path}")
- logger.info(f"⏰ 识别到的时间字段: {file_info.data_time_column}")
-
- # 读取parquet文件
- df = pd.read_parquet(file_info.file_path, engine='pyarrow')
-
- # 添加元数据字段
- df['id_farm'] = file_info.farm_id
- df['name_farm'] = file_info.farm_name
- df['no_model_turbine'] = file_info.model_type
- df['id_turbine'] = file_info.turbine_id
-
- logger.info(f"📊 文件 {file_info.turbine_id}.parquet 读取完成,形状: {df.shape}")
-
- # 简化数据清理
- cleaned_df = self._clean_and_convert_simple(df, file_info.data_time_column)
-
- # 确保必需字段存在
- required_columns = ['id_farm', 'id_turbine', 'data_time']
- for col in required_columns:
- if col not in cleaned_df.columns:
- logger.error(f"必需字段 '{col}' 不存在于数据中")
- cleaned_df[col] = None
-
- # 获取列名
- columns = list(cleaned_df.columns)
-
- # 准备UPSERT SQL
- upsert_sql = self._prepare_upsert_sql(table_name, columns)
-
- # 分批处理
- total_rows = len(cleaned_df)
- total_batches = (total_rows + batch_size - 1) // batch_size
- total_affected_rows = 0
- total_failed_rows = 0
-
- logger.info(f"🚀 准备处理 {total_rows} 行数据,分为 {total_batches} 个批次")
-
- for i in range(0, total_rows, batch_size):
- batch_df = cleaned_df.iloc[i:i + batch_size]
- batch_num = i // batch_size + 1
-
- retry_count = 0
- batch_success = False
-
- while retry_count <= max_retries and not batch_success:
- conn = None
- cursor = None
-
- try:
- if not self.check_connection():
- logger.warning(f"🔌 连接已断开,正在重新连接...")
- self.reconnect()
-
- conn = self.get_connection()
- cursor = conn.cursor()
-
- # 转换为元组列表
- batch_values = []
- for _, row in batch_df.iterrows():
- row_tuple = self._convert_row_to_tuple(row, columns)
- batch_values.append(row_tuple)
-
- # 执行批量插入
- affected = cursor.executemany(upsert_sql, batch_values)
- total_affected_rows += affected
-
- conn.commit()
- batch_success = True
-
- if batch_num % 10 == 0 or batch_num == total_batches:
- logger.info(f"✅ 批次 {batch_num}/{total_batches}: 处理 {len(batch_df)} 行, "
- f"受影响 {affected} 行")
-
- except (pymysql.Error, AttributeError) as e:
- retry_count += 1
- logger.error(f"❌ 批次 {batch_num} UPSERT失败,错误: {str(e)}")
-
- if retry_count > max_retries:
- logger.error(f"❌ 批次 {batch_num} UPSERT失败,已达到最大重试次数")
- # 单条插入
- batch_affected = 0
- batch_failed = 0
-
- for idx, (_, row) in enumerate(batch_df.iterrows()):
- row_retry_count = 0
- row_success = False
-
- while row_retry_count <= max_retries and not row_success:
- try:
- if not self.check_connection():
- self.reconnect()
-
- single_conn = self.get_connection()
- single_cursor = single_conn.cursor()
-
- row_tuple = self._convert_row_to_tuple(row, columns)
- single_cursor.execute(upsert_sql, row_tuple)
- batch_affected += single_cursor.rowcount
- single_conn.commit()
- row_success = True
-
- except Exception as single_e:
- row_retry_count += 1
- if row_retry_count > max_retries:
- batch_failed += 1
- break
- else:
- time.sleep(1 * row_retry_count)
-
- finally:
- if 'single_cursor' in locals() and single_cursor:
- single_cursor.close()
-
- total_affected_rows += batch_affected
- total_failed_rows += batch_failed
-
- if batch_affected > 0:
- logger.info(f"⚠️ 批次 {batch_num} 单条处理完成,成功 {batch_affected} 行, 失败 {batch_failed} 行")
-
- break
- else:
- logger.warning(f"⚠️ 批次 {batch_num} UPSERT失败,第 {retry_count} 次重试")
- time.sleep(2 * retry_count)
-
- finally:
- if cursor:
- cursor.close()
-
- # 批次处理完成后暂停一小段时间
- if batch_success and batch_num < total_batches:
- time.sleep(0.1)
-
- # 估算插入和更新行数
- successful_rows = total_rows - total_failed_rows
- estimated_inserted = successful_rows // 2
- estimated_updated = successful_rows - estimated_inserted
-
- logger.info(f"🎉 文件 {os.path.basename(file_info.file_path)} UPSERT完成:")
- logger.info(f" 总处理行数: {total_rows}")
- logger.info(f" 总受影响行数: {total_affected_rows}")
- logger.info(f" 失败行数: {total_failed_rows}")
-
- return total_rows, estimated_inserted, estimated_updated
-
- except Exception as e:
- logger.error(f"❌ 处理文件 {file_info.file_path} 失败: {str(e)}")
- raise
-
- def _clean_and_convert_simple(self, df: pd.DataFrame, data_time_column: str = None) -> pd.DataFrame:
- """简化版数据清理"""
- try:
- cleaned_df = df.copy()
-
- # 确保必需字段存在
- required_fields = ['id_farm', 'name_farm', 'no_model_turbine', 'id_turbine']
- for field in required_fields:
- if field not in cleaned_df.columns:
- cleaned_df[field] = None
-
- # 处理时间字段
- if 'data_time' not in cleaned_df.columns:
- if data_time_column and data_time_column in cleaned_df.columns:
- cleaned_df['data_time'] = cleaned_df[data_time_column]
- else:
- for col in cleaned_df.columns:
- col_lower = col.lower()
- if any(keyword in col_lower for keyword in ['time', 'date', 'timestamp']):
- cleaned_df['data_time'] = cleaned_df[col]
- logger.info(f"使用字段 '{col}' 作为 data_time")
- break
-
- # 确保data_time是datetime类型
- if 'data_time' in cleaned_df.columns:
- try:
- cleaned_df['data_time'] = pd.to_datetime(cleaned_df['data_time'], errors='coerce')
- except:
- logger.warning("data_time字段转换失败,保持原样")
-
- # 处理NaN
- cleaned_df = cleaned_df.replace({np.nan: None, pd.NaT: None})
-
- # 计算数据哈希
- def simple_hash(row):
- try:
- data_fields = [col for col in cleaned_df.columns
- if col not in ['id_farm', 'name_farm', 'no_model_turbine',
- 'id_turbine', 'data_time', 'data_hash']]
-
- hash_str = ''
- for field in sorted(data_fields):
- val = row[field]
- if val is not None:
- if isinstance(val, (dt, pd.Timestamp)):
- hash_str += f"{field}:{val.isoformat()}|"
- else:
- hash_str += f"{field}:{str(val)}|"
-
- return hashlib.md5(hash_str.encode('utf-8')).hexdigest() if hash_str else None
- except:
- return None
-
- cleaned_df['data_hash'] = cleaned_df.apply(simple_hash, axis=1)
-
- logger.info(f"数据清理完成,原始形状: {df.shape}, 清理后形状: {cleaned_df.shape}")
- return cleaned_df
-
- except Exception as e:
- logger.error(f"数据清理失败: {e}")
- logger.error(traceback.format_exc())
- return df
-
- def _prepare_upsert_sql(self, table_name: str, columns: List[str]) -> str:
- """准备UPSERT SQL语句"""
- exclude_columns = ['id_farm', 'id_turbine', 'data_time', 'id',
- 'create_time', 'update_time', 'data_hash']
- update_columns = [col for col in columns if col not in exclude_columns]
-
- column_names = ', '.join([f'`{col}`' for col in columns])
- placeholders = ', '.join(['%s'] * len(columns))
-
- update_clauses = []
- for col in update_columns:
- update_clauses.append(f"`{col}` = VALUES(`{col}`)")
-
- update_clause = ', '.join(update_clauses)
-
- upsert_sql = f"""
- INSERT INTO `{table_name}` ({column_names})
- VALUES ({placeholders})
- ON DUPLICATE KEY UPDATE
- {update_clause}
- """
-
- logger.debug(f"UPSERT SQL生成完成,共 {len(columns)} 列")
- return upsert_sql
-
- def _convert_to_numeric(self, value):
- """将值转换为数值类型"""
- if pd.isna(value) or value is None:
- return None
-
- try:
- if isinstance(value, (int, float, np.integer, np.floating)):
- if isinstance(value, np.integer):
- return int(value)
- elif isinstance(value, np.floating):
- return float(value)
- return value
-
- if isinstance(value, (bool, np.bool_)):
- return 1 if bool(value) else 0
-
- if isinstance(value, str):
- cleaned = value.strip()
- if cleaned == '':
- return None
-
- cleaned = cleaned.replace(',', '').replace('%', '').replace(' ', '')
-
- try:
- return float(cleaned)
- except ValueError:
- try:
- return int(cleaned)
- except ValueError:
- return None
-
- try:
- str_val = str(value)
- cleaned = str_val.replace(',', '').replace('%', '').replace(' ', '')
- return float(cleaned)
- except:
- return None
-
- except Exception as e:
- return None
-
- def _convert_row_to_tuple(self, row: pd.Series, columns: List[str]) -> Tuple:
- """将单行数据转换为元组"""
- try:
- row_values = []
-
- keep_original_fields = [
- 'id', 'data_time', 'id_farm', 'id_turbine',
- 'name_farm', 'no_model_turbine', 'create_time',
- 'update_time', 'data_hash'
- ]
-
- for col in columns:
- value = row[col]
-
- if col == 'data_time':
- if pd.isna(value):
- row_values.append(None)
- elif isinstance(value, pd.Timestamp):
- row_values.append(value.to_pydatetime())
- elif isinstance(value, dt):
- row_values.append(value)
- else:
- try:
- row_values.append(pd.to_datetime(value).to_pydatetime())
- except:
- row_values.append(None)
-
- elif col in ['id_farm', 'id_turbine', 'name_farm', 'no_model_turbine', 'data_hash']:
- if pd.isna(value) or value is None:
- row_values.append(None)
- else:
- row_values.append(str(value))
-
- elif col in keep_original_fields:
- row_values.append(value)
-
- else:
- numeric_value = self._convert_to_numeric(value)
- row_values.append(numeric_value)
-
- return tuple(row_values)
-
- except Exception as e:
- logger.warning(f"转换行数据失败: {e}")
- return tuple([None] * len(columns))
|