zhouyang.xie преди 2 месеца
родител
ревизия
bed86a8029

BIN
dataStorage_datang/__pycache__/database.cpython-311.pyc


BIN
dataStorage_datang/__pycache__/thread_pool.cpython-311.pyc


+ 513 - 204
dataStorage_datang/database.py

@@ -15,6 +15,7 @@ import threading
 from threading import Thread, Event
 import signal
 import sys
+import os
 
 from config import DatabaseConfig, TableConfig
 from file_scanner import ParquetFileInfo
@@ -67,12 +68,12 @@ class DatabaseManager:
         
         # 注册信号处理,优雅关闭
         self._setup_signal_handlers()
-        
+    
     def _setup_signal_handlers(self):
         """设置信号处理,确保程序退出时能正确关闭资源"""
         def signal_handler(signum, frame):
             logger.info(f"接收到信号 {signum},正在关闭数据库连接池...")
-            self.close_all()
+            self.close()
             sys.exit(0)
         
         signal.signal(signal.SIGINT, signal_handler)  # Ctrl+C
@@ -84,15 +85,15 @@ class DatabaseManager:
             logger.info(f"正在初始化数据库连接池到 {self.config.host}:{self.config.port}/{self.config.database}")
             
             self.pool = PooledDB(
-                creator=pymysql,  # 使用的数据库驱动
-                maxconnections=self.pool_size,  # 连接池允许的最大连接数
-                mincached=2,  # 初始化时创建的连接数
-                maxcached=5,  # 连接池中空闲连接的最大数量
-                maxshared=3,  # 最大共享连接数
-                blocking=True,  # 连接数达到最大时是否阻塞等待
-                maxusage=None,  # 连接的最大使用次数,None表示不限制
-                setsession=[],  # 连接时执行的SQL语句列表
-                ping=1,  # 使用连接前是否ping检查连接可用性 (0=从不, 1=每次, 2=每2次请求, 4=每4次请求, 7=每次请求)
+                creator=pymysql,
+                maxconnections=self.pool_size,
+                mincached=2,
+                maxcached=5,
+                maxshared=3,
+                blocking=True,
+                maxusage=None,
+                setsession=[],
+                ping=1,
                 host=self.config.host,
                 port=self.config.port,
                 user=self.config.user,
@@ -100,15 +101,14 @@ class DatabaseManager:
                 database=self.config.database,
                 charset=self.config.charset,
                 cursorclass=cursors.DictCursor,
-                autocommit=False,  # 手动控制事务
-                connect_timeout=30,  # 连接超时时间
-                read_timeout=600,    # 读取超时时间
-                write_timeout=600,   # 写入超时时间
-                client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS  # 支持多语句
+                autocommit=False,
+                connect_timeout=30,
+                read_timeout=600,
+                write_timeout=600,
+                client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS
             )
             
             logger.info(f"数据库连接池初始化成功")
-            logger.info(f"连接池配置: maxconnections={self.pool_size}, mincached=2, maxcached=5")
             
             # 测试连接池
             test_result = self._test_pool_connection()
@@ -133,7 +133,7 @@ class DatabaseManager:
             elapsed = time.time() - start_time
             
             cursor.close()
-            conn.close()  # 归还连接
+            conn.close()
             
             logger.info(f"连接池测试成功: 响应时间={elapsed:.3f}s, 服务器时间={result['server_time']}")
             return True
@@ -149,7 +149,7 @@ class DatabaseManager:
             self._connection_monitor_thread = Thread(
                 target=self._connection_monitor_loop,
                 name="DBConnectionMonitor",
-                daemon=True  # 设置为守护线程,主程序退出时自动结束
+                daemon=True
             )
             self._connection_monitor_thread.start()
             logger.info("数据库连接监控线程已启动")
@@ -157,7 +157,7 @@ class DatabaseManager:
     def _stop_connection_monitor(self):
         """停止连接监控线程"""
         if self._connection_monitor_thread and self._connection_monitor_thread.is_alive():
-            self._monitor_running.set()  # 设置事件,通知线程退出
+            self._monitor_running.set()
             self._connection_monitor_thread.join(timeout=5)
             logger.info("数据库连接监控线程已停止")
     
@@ -166,25 +166,22 @@ class DatabaseManager:
         logger.info(f"连接监控线程开始运行,检查间隔: {self._monitor_interval}秒")
         
         last_log_time = time.time()
-        log_interval = 60  # 每分钟记录一次状态
+        log_interval = 60
         
         while not self._monitor_running.is_set():
             try:
-                # 执行连接检查
                 self._perform_connection_check()
                 
-                # 定期记录状态
                 current_time = time.time()
                 if current_time - last_log_time >= log_interval:
                     self._log_connection_status()
                     last_log_time = current_time
                 
-                # 等待下一次检查
                 time.sleep(self._monitor_interval)
                 
             except Exception as e:
                 logger.error(f"连接监控循环异常: {e}")
-                time.sleep(self._monitor_interval)  # 异常后继续尝试
+                time.sleep(self._monitor_interval)
         
         logger.info("连接监控循环结束")
     
@@ -194,7 +191,6 @@ class DatabaseManager:
             self._monitor_stats['total_checks'] += 1
             self._last_connection_check = dt.now()
             
-            # 检查连接池是否初始化
             if self.pool is None:
                 self._connection_status = "ERROR"
                 self._connection_error_count += 1
@@ -203,27 +199,23 @@ class DatabaseManager:
                 logger.warning("连接池未初始化")
                 return
             
-            # 尝试获取连接并执行简单查询
             conn = None
             cursor = None
             try:
                 conn = self.pool.connection()
                 cursor = conn.cursor()
                 
-                # 执行一个简单的查询测试连接
                 start_time = time.time()
                 cursor.execute("SELECT 1 as test, NOW() as server_time, "
                              "VERSION() as version, CONNECTION_ID() as connection_id")
                 result = cursor.fetchone()
                 elapsed = time.time() - start_time
                 
-                # 更新状态
                 self._connection_status = "HEALTHY"
                 self._connection_error_count = 0
                 self._monitor_stats['successful_checks'] += 1
                 self._monitor_stats['last_success'] = dt.now()
                 
-                # 记录详细连接信息(调试级别)
                 if logger.isEnabledFor(logging.DEBUG):
                     logger.debug(f"连接检查成功: "
                                f"响应时间={elapsed:.3f}s, "
@@ -237,14 +229,12 @@ class DatabaseManager:
                 self._monitor_stats['failed_checks'] += 1
                 self._monitor_stats['last_error'] = str(e)
                 
-                # 根据错误计数判断连接状态
                 if self._connection_error_count >= self._max_error_count:
                     self._connection_status = "DISCONNECTED"
                     logger.error(f"数据库连接失败,已连续失败 {self._connection_error_count} 次: {e}")
                 else:
                     logger.warning(f"数据库连接检查失败 (第{self._connection_error_count}次): {e}")
                 
-                # 尝试自动重连
                 if self._connection_error_count >= 3:
                     self._auto_reconnect()
                 
@@ -252,7 +242,7 @@ class DatabaseManager:
                 if cursor:
                     cursor.close()
                 if conn:
-                    conn.close()  # 归还连接到连接池
+                    conn.close()
         
         except Exception as e:
             logger.error(f"执行连接检查时发生异常: {e}")
@@ -264,13 +254,9 @@ class DatabaseManager:
         try:
             logger.warning(f"检测到连接问题,正在尝试自动重连 (错误计数: {self._connection_error_count})")
             
-            # 1. 先释放当前线程的连接
             self.release_connection()
-            
-            # 2. 等待一小段时间
             time.sleep(2)
             
-            # 3. 测试当前连接池
             if self.pool:
                 test_result = self._test_pool_connection()
                 if test_result:
@@ -282,7 +268,6 @@ class DatabaseManager:
                 else:
                     logger.warning("连接池测试失败,尝试重新初始化")
             
-            # 4. 重新初始化连接池
             old_pool = self.pool
             try:
                 self.pool = None
@@ -368,18 +353,15 @@ class DatabaseManager:
         while time.time() - start_time < timeout:
             attempts += 1
             
-            # 检查连接状态
             if self._connection_status == "HEALTHY":
                 logger.info(f"数据库连接已恢复,等待时间: {time.time() - start_time:.1f}秒")
                 return True
             
-            # 如果连接断开,尝试立即重连
             if self._connection_status == "DISCONNECTED":
                 logger.info(f"尝试重连 (第{attempts}次)")
                 if self._auto_reconnect():
                     return True
             
-            # 等待下一次检查
             logger.info(f"等待连接恢复... ({attempts}/{int(timeout/check_interval)})")
             time.sleep(check_interval)
         
@@ -389,15 +371,12 @@ class DatabaseManager:
     def get_connection(self) -> Connection:
         """从连接池获取数据库连接"""
         try:
-            # 检查当前连接状态
             if self._connection_status in ["ERROR", "DISCONNECTED"]:
                 logger.warning(f"获取连接时检测到连接状态为 {self._connection_status},尝试自动重连")
                 if not self.wait_for_connection(timeout=10):
                     raise ConnectionError(f"数据库连接不可用,当前状态: {self._connection_status}")
             
-            # 检查线程局部存储中是否有连接
             if not hasattr(self._thread_local, 'connection') or self._thread_local.connection is None:
-                # 从连接池获取新连接
                 conn = self.pool.connection()
                 self._thread_local.connection = conn
                 logger.debug(f"从连接池获取新连接,当前线程: {threading.current_thread().name}")
@@ -407,12 +386,10 @@ class DatabaseManager:
         except Exception as e:
             logger.error(f"从连接池获取连接失败: {e}")
             
-            # 更新连接状态
             self._connection_status = "ERROR"
             self._connection_error_count += 1
             self._monitor_stats['last_error'] = str(e)
             
-            # 尝试自动重连
             self._auto_reconnect()
             raise
     
@@ -422,17 +399,13 @@ class DatabaseManager:
             if hasattr(self._thread_local, 'connection') and self._thread_local.connection is not None:
                 conn = self._thread_local.connection
                 try:
-                    # 修复:SteadyDBConnection 需要访问 _con 属性获取原始连接
-                    # 检查原始连接的 autocommit 状态
                     if hasattr(conn, '_con') and isinstance(conn._con, Connection):
                         original_conn = conn._con
                         if not original_conn.get_autocommit():
                             original_conn.rollback()
                     else:
-                        # 备选方案:直接尝试回滚,不检查 autocommit
                         conn.rollback()
                     
-                    # 关闭连接(实际上是归还到连接池)
                     conn.close()
                     logger.debug(f"连接已归还到连接池,当前线程: {threading.current_thread().name}")
                 except Exception as e:
@@ -447,7 +420,6 @@ class DatabaseManager:
         try:
             conn = self.get_connection()
             
-            # 执行一个简单的查询来检查连接
             cursor = None
             try:
                 cursor = conn.cursor()
@@ -459,7 +431,6 @@ class DatabaseManager:
                     cursor.close()
         except Exception as e:
             logger.warning(f"连接检查失败: {e}")
-            # 释放无效连接
             self.release_connection()
             return False
     
@@ -467,27 +438,23 @@ class DatabaseManager:
         """重新连接数据库(连接池会自动处理)"""
         try:
             logger.info("正在重新连接数据库...")
-            # 释放当前连接,下次获取时会自动创建新连接
             self.release_connection()
-            time.sleep(1)  # 等待1秒
-            # 获取新连接
+            time.sleep(1)
             self.get_connection()
             logger.info("数据库重新连接成功")
         except Exception as e:
             logger.error(f"数据库重新连接失败: {e}")
             raise
     
+    def close(self):
+        """关闭数据库连接池的简便方法"""
+        self.close_pool()
+    
     def close_all(self):
         """关闭所有资源,包括连接池和监控线程"""
         logger.info("正在关闭所有数据库资源...")
-        
-        # 停止监控线程
         self._stop_connection_monitor()
-        
-        # 关闭连接池
         self.close_pool()
-        
-        # 记录最终状态
         self._log_connection_status()
         logger.info("所有数据库资源已关闭")
     
@@ -495,10 +462,7 @@ class DatabaseManager:
         """关闭整个连接池"""
         try:
             if self.pool:
-                # 释放当前线程的连接
                 self.release_connection()
-                
-                # 关闭连接池
                 self.pool.close()
                 self.pool = None
                 self._connection_status = "DISCONNECTED"
@@ -506,11 +470,36 @@ class DatabaseManager:
         except Exception as e:
             logger.error(f"关闭连接池时出错: {e}")
     
-    # 以下为原有的业务方法,保持不变
+    # ============ 数据库操作相关方法 ============
+    
+    def check_table_exists(self, table_name: str) -> bool:
+        """检查表是否存在"""
+        conn = None
+        cursor = None
+        
+        try:
+            conn = self.get_connection()
+            cursor = conn.cursor()
+            
+            try:
+                cursor.execute("SHOW TABLES LIKE %s", (table_name,))
+                result = cursor.fetchone()
+                exists = result is not None
+                logger.info(f"🔍 检查表 '{table_name}' 存在: {exists}")
+                return exists
+            finally:
+                if cursor:
+                    cursor.close()
+                
+        except Exception as e:
+            logger.error(f"❌ 检查表存在失败: {e}")
+            return False
+        finally:
+            self.release_connection()
+    
     def create_table_with_unique_key(self, table_name: str, columns: List[str], 
                                    unique_keys: List[str]) -> bool:
         """根据列定义创建表,包含三字段唯一键,数据字段使用DOUBLE类型"""
-        # 检查连接状态
         if not self.wait_for_connection():
             logger.error("创建表失败:数据库连接不可用")
             return False
@@ -523,15 +512,11 @@ class DatabaseManager:
             cursor = conn.cursor()
             
             try:
-                # 删除已存在的表
                 drop_sql = f"DROP TABLE IF EXISTS `{table_name}`"
                 cursor.execute(drop_sql)
                 logger.info(f"已删除旧表: {table_name}")
                 
-                # 构建创建表的SQL
                 columns_sql = ",\n    ".join(columns)
-                
-                # 添加三字段唯一键约束
                 unique_keys_str = ', '.join([f'`{key}`' for key in unique_keys])
                 
                 create_sql = f"""
@@ -551,22 +536,16 @@ class DatabaseManager:
                 """
                 
                 logger.debug(f"创建表的SQL语句:\n{create_sql}")
-                
-                # 执行创建表
                 cursor.execute(create_sql)
                 conn.commit()
                 
                 logger.info(f"表 '{table_name}' 创建成功!")
                 logger.info(f"三字段唯一键: {unique_keys}")
-                
                 return True
                 
             finally:
                 if cursor:
-                    try:
-                        cursor.close()
-                    except:
-                        pass
+                    cursor.close()
                 
         except Exception as e:
             logger.error(f"创建表失败: {e}")
@@ -575,148 +554,478 @@ class DatabaseManager:
                 conn.rollback()
             return False
         finally:
-            # 不释放连接,让调用者控制
             pass
     
-    # ... 其余的业务方法保持不变,包括:
-    # create_data_scada_turbine_table
-    # _clean_and_convert_simple
-    # _prepare_upsert_sql
-    # _convert_to_numeric
-    # _convert_row_to_tuple
-    # _escape_sql_value
-    # _get_sql_with_values
-    # _log_failed_row_details
-    # batch_upsert_data_direct
-    # upsert_parquet_data
-    # check_table_exists
-    # get_table_row_count
-    # get_table_stats
-    # check_duplicate_keys
-    # with_connection 装饰器
-
-# 添加一个独立的连接测试函数,方便外部调用
-def test_database_connection(config: DatabaseConfig, test_query: str = "SELECT 1") -> Dict[str, Any]:
-    """
-    测试数据库连接
-    
-    Args:
-        config: 数据库配置
-        test_query: 测试查询语句
+    def get_table_row_count(self, table_name: str) -> int:
+        """获取表的行数"""
+        conn = None
+        cursor = None
         
-    Returns:
-        Dict[str, Any]: 测试结果
-    """
-    result = {
-        'success': False,
-        'error': None,
-        'response_time': None,
-        'server_info': None,
-        'timestamp': dt.now().isoformat()
-    }
-    
-    conn = None
-    cursor = None
+        try:
+            conn = self.get_connection()
+            cursor = conn.cursor()
+            
+            try:
+                cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
+                result = cursor.fetchone()
+                count = result['count'] if result else 0
+                logger.info(f"📊 表 '{table_name}' 行数: {count:,}")
+                return count
+            finally:
+                if cursor:
+                    cursor.close()
+                
+        except Exception as e:
+            logger.error(f"❌ 获取表行数失败: {e}")
+            return 0
+        finally:
+            self.release_connection()
     
-    try:
-        # 记录开始时间
-        start_time = time.time()
+    def get_table_stats(self, table_name: str) -> Dict[str, Any]:
+        """获取表统计信息"""
+        conn = None
+        cursor = None
         
-        # 建立连接
-        conn = pymysql.connect(
-            host=config.host,
-            port=config.port,
-            user=config.user,
-            password=config.password,
-            database=config.database,
-            charset=config.charset,
-            cursorclass=cursors.DictCursor,
-            connect_timeout=10,
-            read_timeout=10
-        )
+        try:
+            conn = self.get_connection()
+            cursor = conn.cursor()
+            
+            try:
+                cursor.execute(f"""
+                    SELECT 
+                        COUNT(*) as total_rows,
+                        COUNT(DISTINCT id_farm) as farm_count,
+                        COUNT(DISTINCT id_turbine) as turbine_count,
+                        MIN(data_time) as first_data_time,
+                        MAX(data_time) as last_data_time,
+                        MIN(create_time) as first_create,
+                        MAX(update_time) as last_update
+                    FROM `{table_name}`
+                """)
+                
+                result = cursor.fetchone()
+                if result:
+                    stats = {
+                        'total_rows': result[0],
+                        'farm_count': result[1],
+                        'turbine_count': result[2],
+                        'first_data_time': result[3],
+                        'last_data_time': result[4],
+                        'first_create': result[5],
+                        'last_update': result[6]
+                    }
+                    
+                    logger.info(f"📈 表 '{table_name}' 统计信息:")
+                    logger.info(f"   总行数: {stats['total_rows']:,}")
+                    logger.info(f"   风场数量: {stats['farm_count']}")
+                    logger.info(f"   风机数量: {stats['turbine_count']}")
+                    logger.info(f"   最早数据时间: {stats['first_data_time']}")
+                    logger.info(f"   最新数据时间: {stats['last_data_time']}")
+                    
+                    return stats
+                else:
+                    return {}
+            finally:
+                if cursor:
+                    cursor.close()
+                
+        except Exception as e:
+            logger.error(f"❌ 获取表统计失败: {e}")
+            return {}
+        finally:
+            self.release_connection()
+    
+    def check_duplicate_keys(self, table_name: str) -> List[Dict]:
+        """检查重复的唯一键记录"""
+        conn = None
+        cursor = None
         
-        # 执行测试查询
-        cursor = conn.cursor()
-        cursor.execute(test_query)
-        query_result = cursor.fetchone()
+        try:
+            conn = self.get_connection()
+            cursor = conn.cursor()
+            
+            try:
+                cursor.execute(f"""
+                    SELECT 
+                        id_farm, 
+                        id_turbine, 
+                        data_time,
+                        COUNT(*) as duplicate_count,
+                        MIN(id) as min_id,
+                        MAX(id) as max_id
+                    FROM `{table_name}`
+                    GROUP BY id_farm, id_turbine, data_time
+                    HAVING COUNT(*) > 1
+                    ORDER BY duplicate_count DESC
+                    LIMIT 10
+                """)
+                
+                duplicates = []
+                for row in cursor.fetchall():
+                    duplicate_info = {
+                        'id_farm': row[0],
+                        'id_turbine': row[1],
+                        'data_time': row[2],
+                        'duplicate_count': row[3],
+                        'min_id': row[4],
+                        'max_id': row[5]
+                    }
+                    duplicates.append(duplicate_info)
+                
+                if duplicates:
+                    logger.warning(f"⚠️  发现重复的唯一键记录: {len(duplicates)} 组")
+                    for dup in duplicates[:3]:
+                        logger.warning(f"   重复: 风场={dup['id_farm']}, 风机={dup['id_turbine']}, "
+                                     f"时间={dup['data_time']}, 重复次数={dup['duplicate_count']}")
+                else:
+                    logger.info(f"✅ 无重复的唯一键记录")
+                
+                return duplicates
+            finally:
+                if cursor:
+                    cursor.close()
+                
+        except Exception as e:
+            logger.error(f"❌ 检查重复键失败: {e}")
+            return []
+        finally:
+            self.release_connection()
+    
+    def upsert_parquet_data(self, file_info: ParquetFileInfo, table_name: str, 
+                           batch_size: int = 100, max_retries: int = 3) -> Tuple[int, int, int]:
+        """
+        UPSERT单个parquet文件数据到数据库,使用三字段唯一键
         
-        # 获取服务器信息
-        cursor.execute("SELECT VERSION() as version, DATABASE() as database_name, "
-                      "USER() as user, NOW() as server_time")
-        server_info = cursor.fetchone()
+        Args:
+            file_info: 文件信息(包含识别到的时间字段名)
+            table_name: 表名
+            batch_size: 批处理大小
+            max_retries: 最大重试次数
+            
+        Returns:
+            (总行数, 插入行数, 更新行数)
+        """
+        try:
+            logger.info(f"📂 正在读取并处理文件: {file_info.file_path}")
+            logger.info(f"⏰ 识别到的时间字段: {file_info.data_time_column}")
+            
+            # 读取parquet文件
+            df = pd.read_parquet(file_info.file_path, engine='pyarrow')
+            
+            # 添加元数据字段
+            df['id_farm'] = file_info.farm_id
+            df['name_farm'] = file_info.farm_name
+            df['no_model_turbine'] = file_info.model_type
+            df['id_turbine'] = file_info.turbine_id
+            
+            logger.info(f"📊 文件 {file_info.turbine_id}.parquet 读取完成,形状: {df.shape}")
+            
+            # 简化数据清理
+            cleaned_df = self._clean_and_convert_simple(df, file_info.data_time_column)
+            
+            # 确保必需字段存在
+            required_columns = ['id_farm', 'id_turbine', 'data_time']
+            for col in required_columns:
+                if col not in cleaned_df.columns:
+                    logger.error(f"必需字段 '{col}' 不存在于数据中")
+                    cleaned_df[col] = None
+            
+            # 获取列名
+            columns = list(cleaned_df.columns)
+            
+            # 准备UPSERT SQL
+            upsert_sql = self._prepare_upsert_sql(table_name, columns)
+            
+            # 分批处理
+            total_rows = len(cleaned_df)
+            total_batches = (total_rows + batch_size - 1) // batch_size
+            total_affected_rows = 0
+            total_failed_rows = 0
+            
+            logger.info(f"🚀 准备处理 {total_rows} 行数据,分为 {total_batches} 个批次")
+            
+            for i in range(0, total_rows, batch_size):
+                batch_df = cleaned_df.iloc[i:i + batch_size]
+                batch_num = i // batch_size + 1
+                
+                retry_count = 0
+                batch_success = False
+                
+                while retry_count <= max_retries and not batch_success:
+                    conn = None
+                    cursor = None
+                    
+                    try:
+                        if not self.check_connection():
+                            logger.warning(f"🔌 连接已断开,正在重新连接...")
+                            self.reconnect()
+                        
+                        conn = self.get_connection()
+                        cursor = conn.cursor()
+                        
+                        # 转换为元组列表
+                        batch_values = []
+                        for _, row in batch_df.iterrows():
+                            row_tuple = self._convert_row_to_tuple(row, columns)
+                            batch_values.append(row_tuple)
+                        
+                        # 执行批量插入
+                        affected = cursor.executemany(upsert_sql, batch_values)
+                        total_affected_rows += affected
+                        
+                        conn.commit()
+                        batch_success = True
+                        
+                        if batch_num % 10 == 0 or batch_num == total_batches:
+                            logger.info(f"✅ 批次 {batch_num}/{total_batches}: 处理 {len(batch_df)} 行, "
+                                      f"受影响 {affected} 行")
+                        
+                    except (pymysql.Error, AttributeError) as e:
+                        retry_count += 1
+                        logger.error(f"❌ 批次 {batch_num} UPSERT失败,错误: {str(e)}")
+                        
+                        if retry_count > max_retries:
+                            logger.error(f"❌ 批次 {batch_num} UPSERT失败,已达到最大重试次数")
+                            # 单条插入
+                            batch_affected = 0
+                            batch_failed = 0
+                            
+                            for idx, (_, row) in enumerate(batch_df.iterrows()):
+                                row_retry_count = 0
+                                row_success = False
+                                
+                                while row_retry_count <= max_retries and not row_success:
+                                    try:
+                                        if not self.check_connection():
+                                            self.reconnect()
+                                        
+                                        single_conn = self.get_connection()
+                                        single_cursor = single_conn.cursor()
+                                        
+                                        row_tuple = self._convert_row_to_tuple(row, columns)
+                                        single_cursor.execute(upsert_sql, row_tuple)
+                                        batch_affected += single_cursor.rowcount
+                                        single_conn.commit()
+                                        row_success = True
+                                        
+                                    except Exception as single_e:
+                                        row_retry_count += 1
+                                        if row_retry_count > max_retries:
+                                            batch_failed += 1
+                                            break
+                                        else:
+                                            time.sleep(1 * row_retry_count)
+                                    
+                                    finally:
+                                        if 'single_cursor' in locals() and single_cursor:
+                                            single_cursor.close()
+                            
+                            total_affected_rows += batch_affected
+                            total_failed_rows += batch_failed
+                            
+                            if batch_affected > 0:
+                                logger.info(f"⚠️  批次 {batch_num} 单条处理完成,成功 {batch_affected} 行, 失败 {batch_failed} 行")
+                            
+                            break
+                        else:
+                            logger.warning(f"⚠️  批次 {batch_num} UPSERT失败,第 {retry_count} 次重试")
+                            time.sleep(2 * retry_count)
+                    
+                    finally:
+                        if cursor:
+                            cursor.close()
+                
+                # 批次处理完成后暂停一小段时间
+                if batch_success and batch_num < total_batches:
+                    time.sleep(0.1)
+            
+            # 估算插入和更新行数
+            successful_rows = total_rows - total_failed_rows
+            estimated_inserted = successful_rows // 2
+            estimated_updated = successful_rows - estimated_inserted
+            
+            logger.info(f"🎉 文件 {os.path.basename(file_info.file_path)} UPSERT完成:")
+            logger.info(f"  总处理行数: {total_rows}")
+            logger.info(f"  总受影响行数: {total_affected_rows}")
+            logger.info(f"  失败行数: {total_failed_rows}")
+            
+            return total_rows, estimated_inserted, estimated_updated
+                
+        except Exception as e:
+            logger.error(f"❌ 处理文件 {file_info.file_path} 失败: {str(e)}")
+            raise
+    
+    def _clean_and_convert_simple(self, df: pd.DataFrame, data_time_column: str = None) -> pd.DataFrame:
+        """简化版数据清理"""
+        try:
+            cleaned_df = df.copy()
+            
+            # 确保必需字段存在
+            required_fields = ['id_farm', 'name_farm', 'no_model_turbine', 'id_turbine']
+            for field in required_fields:
+                if field not in cleaned_df.columns:
+                    cleaned_df[field] = None
+            
+            # 处理时间字段
+            if 'data_time' not in cleaned_df.columns:
+                if data_time_column and data_time_column in cleaned_df.columns:
+                    cleaned_df['data_time'] = cleaned_df[data_time_column]
+                else:
+                    for col in cleaned_df.columns:
+                        col_lower = col.lower()
+                        if any(keyword in col_lower for keyword in ['time', 'date', 'timestamp']):
+                            cleaned_df['data_time'] = cleaned_df[col]
+                            logger.info(f"使用字段 '{col}' 作为 data_time")
+                            break
+            
+            # 确保data_time是datetime类型
+            if 'data_time' in cleaned_df.columns:
+                try:
+                    cleaned_df['data_time'] = pd.to_datetime(cleaned_df['data_time'], errors='coerce')
+                except:
+                    logger.warning("data_time字段转换失败,保持原样")
+            
+            # 处理NaN
+            cleaned_df = cleaned_df.replace({np.nan: None, pd.NaT: None})
+            
+            # 计算数据哈希
+            def simple_hash(row):
+                try:
+                    data_fields = [col for col in cleaned_df.columns 
+                                  if col not in ['id_farm', 'name_farm', 'no_model_turbine', 
+                                               'id_turbine', 'data_time', 'data_hash']]
+                    
+                    hash_str = ''
+                    for field in sorted(data_fields):
+                        val = row[field]
+                        if val is not None:
+                            if isinstance(val, (dt, pd.Timestamp)):
+                                hash_str += f"{field}:{val.isoformat()}|"
+                            else:
+                                hash_str += f"{field}:{str(val)}|"
+                    
+                    return hashlib.md5(hash_str.encode('utf-8')).hexdigest() if hash_str else None
+                except:
+                    return None
+            
+            cleaned_df['data_hash'] = cleaned_df.apply(simple_hash, axis=1)
+            
+            logger.info(f"数据清理完成,原始形状: {df.shape}, 清理后形状: {cleaned_df.shape}")
+            return cleaned_df
+            
+        except Exception as e:
+            logger.error(f"数据清理失败: {e}")
+            logger.error(traceback.format_exc())
+            return df
+    
+    def _prepare_upsert_sql(self, table_name: str, columns: List[str]) -> str:
+        """准备UPSERT SQL语句"""
+        exclude_columns = ['id_farm', 'id_turbine', 'data_time', 'id', 
+                          'create_time', 'update_time', 'data_hash']
+        update_columns = [col for col in columns if col not in exclude_columns]
         
-        # 计算响应时间
-        response_time = time.time() - start_time
+        column_names = ', '.join([f'`{col}`' for col in columns])
+        placeholders = ', '.join(['%s'] * len(columns))
         
-        result.update({
-            'success': True,
-            'response_time': response_time,
-            'server_info': server_info,
-            'test_result': query_result
-        })
+        update_clauses = []
+        for col in update_columns:
+            update_clauses.append(f"`{col}` = VALUES(`{col}`)")
         
-        logger.info(f"数据库连接测试成功: {config.host}:{config.port}/{config.database}")
-        logger.info(f"响应时间: {response_time:.3f}s")
-        logger.info(f"MySQL版本: {server_info['version']}")
-        logger.info(f"当前数据库: {server_info['database_name']}")
+        update_clause = ', '.join(update_clauses)
         
-    except Exception as e:
-        result['error'] = str(e)
-        logger.error(f"数据库连接测试失败: {e}")
+        upsert_sql = f"""
+        INSERT INTO `{table_name}` ({column_names}) 
+        VALUES ({placeholders}) 
+        ON DUPLICATE KEY UPDATE 
+        {update_clause}
+        """
         
-    finally:
-        if cursor:
-            cursor.close()
-        if conn:
-            conn.close()
+        logger.debug(f"UPSERT SQL生成完成,共 {len(columns)} 列")
+        return upsert_sql
     
-    return result
-
-
-# 添加一个监控守护进程示例
-class DatabaseMonitorDaemon:
-    """数据库监控守护进程"""
-    
-    def __init__(self, db_manager: DatabaseManager, check_interval: int = 5):
-        self.db_manager = db_manager
-        self.check_interval = check_interval
-        self.monitor_thread = None
-        self.running = False
+    def _convert_to_numeric(self, value):
+        """将值转换为数值类型"""
+        if pd.isna(value) or value is None:
+            return None
         
-    def start(self):
-        """启动监控"""
-        if not self.running:
-            self.running = True
-            self.monitor_thread = Thread(target=self._monitor_loop, daemon=True)
-            self.monitor_thread.start()
-            logger.info(f"数据库监控守护进程已启动,检查间隔: {self.check_interval}秒")
-    
-    def stop(self):
-        """停止监控"""
-        self.running = False
-        if self.monitor_thread:
-            self.monitor_thread.join(timeout=10)
-            logger.info("数据库监控守护进程已停止")
-    
-    def _monitor_loop(self):
-        """监控循环"""
-        while self.running:
+        try:
+            if isinstance(value, (int, float, np.integer, np.floating)):
+                if isinstance(value, np.integer):
+                    return int(value)
+                elif isinstance(value, np.floating):
+                    return float(value)
+                return value
+            
+            if isinstance(value, (bool, np.bool_)):
+                return 1 if bool(value) else 0
+            
+            if isinstance(value, str):
+                cleaned = value.strip()
+                if cleaned == '':
+                    return None
+                
+                cleaned = cleaned.replace(',', '').replace('%', '').replace(' ', '')
+                
+                try:
+                    return float(cleaned)
+                except ValueError:
+                    try:
+                        return int(cleaned)
+                    except ValueError:
+                        return None
+            
             try:
-                # 获取当前状态
-                status = self.db_manager.get_connection_status()
+                str_val = str(value)
+                cleaned = str_val.replace(',', '').replace('%', '').replace(' ', '')
+                return float(cleaned)
+            except:
+                return None
                 
-                # 记录状态变化
-                if status['status'] != "HEALTHY":
-                    logger.warning(f"数据库连接状态异常: {status['status']}")
-                    
-                    # 如果连续错误超过阈值,尝试自动修复
-                    if status['error_count'] >= 3:
-                        logger.info("检测到连续错误,尝试自动修复连接...")
-                        self.db_manager._auto_reconnect()
+        except Exception as e:
+            return None
+    
+    def _convert_row_to_tuple(self, row: pd.Series, columns: List[str]) -> Tuple:
+        """将单行数据转换为元组"""
+        try:
+            row_values = []
+            
+            keep_original_fields = [
+                'id', 'data_time', 'id_farm', 'id_turbine', 
+                'name_farm', 'no_model_turbine', 'create_time', 
+                'update_time', 'data_hash'
+            ]
+            
+            for col in columns:
+                value = row[col]
                 
-                # 等待下一次检查
-                time.sleep(self.check_interval)
+                if col == 'data_time':
+                    if pd.isna(value):
+                        row_values.append(None)
+                    elif isinstance(value, pd.Timestamp):
+                        row_values.append(value.to_pydatetime())
+                    elif isinstance(value, dt):
+                        row_values.append(value)
+                    else:
+                        try:
+                            row_values.append(pd.to_datetime(value).to_pydatetime())
+                        except:
+                            row_values.append(None)
                 
-            except Exception as e:
-                logger.error(f"监控循环异常: {e}")
-                time.sleep(self.check_interval)
+                elif col in ['id_farm', 'id_turbine', 'name_farm', 'no_model_turbine', 'data_hash']:
+                    if pd.isna(value) or value is None:
+                        row_values.append(None)
+                    else:
+                        row_values.append(str(value))
+                
+                elif col in keep_original_fields:
+                    row_values.append(value)
+                
+                else:
+                    numeric_value = self._convert_to_numeric(value)
+                    row_values.append(numeric_value)
+            
+            return tuple(row_values)
+            
+        except Exception as e:
+            logger.warning(f"转换行数据失败: {e}")
+            return tuple([None] * len(columns))

+ 4 - 2
dataStorage_datang/info_model_turbine_v1.py

@@ -420,8 +420,10 @@ def main():
     """主函数"""
     # 数据库配置
     db_config = DatabaseConfig(
-        host='192.168.50.234',
-        port=4000,
+        # host="192.168.50.234",
+        # port=4000,
+        host="106.120.102.238",
+        port=44000,
         user='root',
         password='123456',
         database='wind_data',

+ 80 - 239
dataStorage_datang/main.py

@@ -30,74 +30,6 @@ logging.basicConfig(
 logger = logging.getLogger(__name__)
 
 
-class ProcessedRecordManager:
-    """已处理文件记录管理器"""
-    
-    def __init__(self, record_file: str = "record_processed.json"):
-        """
-        初始化记录管理器
-        
-        Args:
-            record_file: 记录文件路径
-        """
-        self.record_file = record_file
-        self.processed_files: Set[str] = set()
-        self._load_records()
-    
-    def _load_records(self):
-        """加载已处理文件记录"""
-        try:
-            if os.path.exists(self.record_file):
-                with open(self.record_file, 'r', encoding='utf-8') as f:
-                    records = json.load(f)
-                    if isinstance(records, list):
-                        self.processed_files = set(records)
-                    logger.info(f"📁 已加载 {len(self.processed_files)} 个已处理文件记录")
-            else:
-                logger.info(f"📁 记录文件不存在,将创建新文件: {self.record_file}")
-        except Exception as e:
-            logger.warning(f"❌ 加载记录文件失败: {e}")
-            self.processed_files = set()
-    
-    def is_processed(self, file_path: str) -> bool:
-        """检查文件是否已处理过"""
-        return file_path in self.processed_files
-    
-    def add_record(self, file_path: str):
-        """添加处理记录"""
-        if file_path not in self.processed_files:
-            self.processed_files.add(file_path)
-    
-    def save_records(self):
-        """保存记录到文件"""
-        try:
-            # 转换为列表并排序,便于阅读
-            records_list = sorted(list(self.processed_files))
-            
-            with open(self.record_file, 'w', encoding='utf-8') as f:
-                json.dump(records_list, f, ensure_ascii=False, indent=2)
-            
-            logger.info(f"💾 已保存 {len(records_list)} 个处理记录到 {self.record_file}")
-            return True
-        except Exception as e:
-            logger.error(f"❌ 保存记录文件失败: {e}")
-            return False
-    
-    def get_record_count(self) -> int:
-        """获取记录数量"""
-        return len(self.processed_files)
-    
-    def clear_records(self):
-        """清空记录"""
-        self.processed_files.clear()
-        try:
-            if os.path.exists(self.record_file):
-                os.remove(self.record_file)
-                logger.info(f"🗑️  已删除记录文件: {self.record_file}")
-        except Exception as e:
-            logger.warning(f"删除记录文件失败: {e}")
-
-
 class ParquetProcessor:
     """Parquet文件处理器主类"""
     
@@ -107,74 +39,12 @@ class ParquetProcessor:
         self.file_infos: List[ParquetFileInfo] = []
         self.db_manager = DatabaseManager(config.db_config, config.table_config)
         
-        # 初始化记录管理器
-        self.record_manager = ProcessedRecordManager()
-        
-        # 统计信息
-        self.stats = {
-            'start_time': None,
-            'end_time': None,
-            'total_files': 0,
-            'processed_files': 0,
-            'skipped_files': 0,
-            'failed_files': 0,
-            'total_rows': 0,
-            'inserted_rows': 0,
-            'updated_rows': 0,
-            'table_initial_count': 0,
-            'table_final_count': 0
-        }
-    
-    def filter_unprocessed_files(self, file_infos: List[ParquetFileInfo]) -> List[ParquetFileInfo]:
-        """
-        过滤掉已处理过的文件
-        
-        Args:
-            file_infos: 所有扫描到的文件信息
-            
-        Returns:
-            List[ParquetFileInfo]: 未处理过的文件信息
-        """
-        unprocessed_files = []
-        skipped_count = 0
-        
-        for file_info in file_infos:
-            if self.record_manager.is_processed(file_info.file_path):
-                skipped_count += 1
-                logger.debug(f"⏭️  跳过已处理文件: {os.path.basename(file_info.file_path)}")
-            else:
-                unprocessed_files.append(file_info)
-        
-        if skipped_count > 0:
-            logger.info(f"⏭️  跳过 {skipped_count} 个已处理过的文件")
-            logger.info(f"📝 需要处理 {len(unprocessed_files)} 个新文件")
-        
-        return unprocessed_files
-    
-    def update_record_for_file(self, file_info: ParquetFileInfo, success: bool = True):
-        """
-        更新文件处理记录
+        # 初始化线程池管理器(带记录管理功能)
+        self.thread_pool = ThreadPoolManager(
+            max_workers=self.config.max_workers,
+            record_file="record_processed.json"
+        )
         
-        Args:
-            file_info: 文件信息
-            success: 是否处理成功
-        """
-        if success:
-            self.record_manager.add_record(file_info.file_path)
-            logger.info(f"✅ 已记录成功处理文件: {os.path.basename(file_info.file_path)}")
-        else:
-            logger.warning(f"❌ 文件处理失败,不记录: {os.path.basename(file_info.file_path)}")
-    
-    def save_processed_records(self):
-        """保存处理记录到文件"""
-        try:
-            if self.record_manager.save_records():
-                logger.info(f"💾 处理记录已保存,共 {self.record_manager.get_record_count()} 个文件")
-            else:
-                logger.warning("⚠️  处理记录保存失败")
-        except Exception as e:
-            logger.error(f"❌ 保存处理记录时出错: {e}")
-    
     def run(self):
         """运行整个处理流程"""
         logger.info("=" * 60)
@@ -182,39 +52,36 @@ class ParquetProcessor:
         logger.info("=" * 60)
         
         try:
-            # 记录开始时间
-            self.stats['start_time'] = datetime.now()
-            
             # 步骤1: 扫描文件
             logger.info("\n步骤1: 扫描Parquet文件...")
             all_file_infos = self.file_scanner.scan_files()
             
-            # 打印扫描结果摘要
-            if all_file_infos:
-                logger.info(f"📁 扫描完成!共找到 {len(all_file_infos)} 个parquet文件")
-                logger.info(f"📊 机型型号种类: {len(set(f.model_type for f in all_file_infos))}")
-                logger.info(f"🌾 风场数量: {len(set(f.farm_id for f in all_file_infos))}")
-            else:
+            if not all_file_infos:
                 logger.warning("⚠️  未找到任何parquet文件,程序退出")
                 return
             
-            # 过滤已处理过的文件
-            self.file_infos = self.filter_unprocessed_files(all_file_infos)
-            self.stats['skipped_files'] = len(all_file_infos) - len(self.file_infos)
-            self.stats['total_files'] = len(all_file_infos)
+            logger.info(f"📁 扫描完成!共找到 {len(all_file_infos)} 个parquet文件")
+            logger.info(f"📊 机型型号种类: {len(set(f.model_type for f in all_file_infos))}")
+            logger.info(f"🌾 风场数量: {len(set(f.farm_id for f in all_file_infos))}")
             
-            if not self.file_infos:
+            # 检查有多少文件未处理
+            unprocessed_count = self.thread_pool.get_unprocessed_count(all_file_infos)
+            logger.info(f"📝 未处理文件数量: {unprocessed_count}")
+            logger.info(f"✅ 已处理文件数量: {self.thread_pool.get_record_count()}")
+            
+            if unprocessed_count == 0:
                 logger.info("🎉 所有文件都已处理过,无需处理新文件")
-                logger.info(f"📊 已处理文件总数: {self.record_manager.get_record_count()}")
                 return
             
             # 步骤2: 读取表头并识别时间字段
             logger.info("\n步骤2: 读取表头信息并识别时间字段...")
-            schema_reader = SchemaReader(self.file_infos, self.config.table_config.time_column_aliases)
+            
+            # 使用所有文件读取表头(包括已处理的和未处理的)
+            # 这样即使之前已处理过部分文件,也能正确读取表结构
+            schema_reader = SchemaReader(all_file_infos, self.config.table_config.time_column_aliases)
             all_columns = schema_reader.read_all_headers()
             sql_columns = schema_reader.get_sql_columns()
             
-            # 确定唯一键
             unique_keys = schema_reader.get_unique_key_columns(self.config.table_config.unique_keys)
             logger.info(f"🔑 确定三字段唯一键: {unique_keys}")
             
@@ -256,10 +123,8 @@ class ParquetProcessor:
                         logger.warning(f"  🔄 重复: {dup}")
             
             # 检查表初始行数
-            self.stats['table_initial_count'] = self.db_manager.get_table_row_count(
-                self.config.table_config.table_name
-            )
-            logger.info(f"📊 表初始行数: {self.stats['table_initial_count']:,}")
+            initial_count = self.db_manager.get_table_row_count(self.config.table_config.table_name)
+            logger.info(f"📊 表初始行数: {initial_count:,}")
             
             # 步骤4: 使用线程池加载数据
             logger.info(f"\n步骤4: 使用线程池加载数据({self.config.max_workers}个线程,批量大小: {self.config.batch_size},模式: {'UPSERT' if self.config.upsert_enabled else 'INSERT'})...")
@@ -270,124 +135,102 @@ class ParquetProcessor:
                 max_retries=3,
                 upsert=self.config.upsert_enabled
             )
-            thread_pool = ThreadPoolManager(max_workers=self.config.max_workers)
             
             start_time = datetime.now()
             logger.info(f"⏰ 开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
             
-            # 处理文件
-            results = thread_pool.process_with_data_loader(self.file_infos, data_loader)
+            # 处理文件(线程池会自动过滤已处理的文件并记录成功处理的文件)
+            results = self.thread_pool.process_with_data_loader(all_file_infos, data_loader)
             
             end_time = datetime.now()
             elapsed = end_time - start_time
             
-            # 统计结果并更新记录
+            # 统计结果
             success_results = []
             failed_results = []
             
+            total_rows_processed = 0
+            total_rows_inserted = 0
+            total_rows_updated = 0
+            
             for file_info, result in results:
                 if isinstance(result, tuple) and len(result) == 3:
                     # 成功处理
                     success_results.append((file_info, result))
-                    self.update_record_for_file(file_info, success=True)
                     
                     # 累加统计
                     total_rows, inserted_rows, updated_rows = result
-                    self.stats['total_rows'] += total_rows
-                    self.stats['inserted_rows'] += inserted_rows
-                    self.stats['updated_rows'] += updated_rows
-                    self.stats['processed_files'] += 1
+                    total_rows_processed += total_rows
+                    total_rows_inserted += inserted_rows
+                    total_rows_updated += updated_rows
                     
                 else:
                     # 处理失败
                     failed_results.append((file_info, result))
-                    self.update_record_for_file(file_info, success=False)
-                    self.stats['failed_files'] += 1
             
-            # 保存处理记录
-            self.save_processed_records()
+            # 打印最终统计
+            logger.info(f"\n{'='*60}")
+            logger.info("🎉 数据处理完成!")
+            logger.info(f"{'='*60}")
+            logger.info(f"📁 总扫描文件数: {len(all_file_infos)}")
+            logger.info(f"⏭️  跳过文件: {len(all_file_infos) - len(results)} (已处理过)")
+            logger.info(f"✅ 本次成功处理: {len(success_results)} 个文件")
+            logger.info(f"❌ 本次失败文件: {len(failed_results)} 个文件")
+            logger.info(f"📊 总处理行数: {total_rows_processed:,}")
+            logger.info(f"  📥 新插入行数: {total_rows_inserted:,}")
+            logger.info(f"  🔄 更新行数: {total_rows_updated:,}")
+            logger.info(f"⏱️  总耗时: {elapsed}")
+            
+            if len(success_results) > 0:
+                avg_time_per_file = elapsed.total_seconds() / len(success_results)
+                logger.info(f"📈 平均每个文件处理时间: {avg_time_per_file:.2f} 秒")
             
             # 检查最终行数和统计
-            self.stats['table_final_count'] = self.db_manager.get_table_row_count(
-                self.config.table_config.table_name
-            )
-            self.stats['end_time'] = datetime.now()
+            final_count = self.db_manager.get_table_row_count(self.config.table_config.table_name)
+            logger.info(f"📊 表初始行数: {initial_count:,}")
+            logger.info(f"📊 表最终行数: {final_count:,}")
+            logger.info(f"📈 实际新增行数: {final_count - initial_count:,}")
             
-            # 打印最终统计
-            self.print_summary(elapsed)
+            # 获取最新统计
+            final_stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
+            if final_stats:
+                logger.info("\n📈 表最终统计:")
+                for key, value in final_stats.items():
+                    logger.info(f"  {key}: {value}")
+            
+            # 检查重复键
+            final_duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
+            if final_duplicates:
+                logger.warning(f"\n⚠️  最终重复的唯一键记录: {len(final_duplicates)} 组")
+                for dup in final_duplicates[:10]:  # 显示前10个重复
+                    logger.warning(f"  🔄 重复: {dup}")
+            else:
+                logger.info("\n✅ 无重复的唯一键记录")
             
             # 打印失败文件
             if failed_results:
-                self.print_failed_files(failed_results)
+                logger.warning(f"\n⚠️  失败文件列表 ({len(failed_results)} 个):")
+                for file_info, error in failed_results:
+                    logger.warning(f"  📄 {os.path.basename(file_info.file_path)}")
+                    if isinstance(error, Exception):
+                        logger.warning(f"    错误类型: {type(error).__name__}")
+                        logger.warning(f"    错误信息: {str(error)[:200]}" + ("..." if len(str(error)) > 200 else ""))
+                    else:
+                        logger.warning(f"    错误: {error}")
+            
+            # 记录文件总数
+            logger.info(f"\n💾 总记录处理文件数: {self.thread_pool.get_record_count()}")
             
         except KeyboardInterrupt:
             logger.warning("\n⚠️  用户中断程序执行")
-            # 尝试保存已处理的记录
-            self.save_processed_records()
-            
         except Exception as e:
             logger.error(f"\n❌ 程序执行出错: {e}")
             import traceback
             logger.error(traceback.format_exc())
-            # 尝试保存已处理的记录
-            self.save_processed_records()
-            
         finally:
             # 清理
             self.db_manager.close()
             logger.info("🏁 程序执行完成")
-    
-    def print_summary(self, elapsed):
-        """打印处理摘要"""
-        logger.info(f"\n{'='*60}")
-        logger.info("🎉 数据处理完成!")
-        logger.info(f"{'='*60}")
-        logger.info(f"📁 总文件数: {self.stats['total_files']}")
-        logger.info(f"⏭️  跳过文件: {self.stats['skipped_files']} (已处理过)")
-        logger.info(f"✅ 成功处理: {self.stats['processed_files']} 个文件")
-        logger.info(f"❌ 失败文件: {self.stats['failed_files']} 个文件")
-        logger.info(f"📊 总处理行数: {self.stats['total_rows']:,}")
-        logger.info(f"  📥 新插入行数: {self.stats['inserted_rows']:,}")
-        logger.info(f"  🔄 更新行数: {self.stats['updated_rows']:,}")
-        logger.info(f"⏱️  总耗时: {elapsed}")
-        
-        if self.stats['processed_files'] > 0:
-            avg_time_per_file = elapsed.total_seconds() / self.stats['processed_files']
-            logger.info(f"📈 平均每个文件处理时间: {avg_time_per_file:.2f} 秒")
-        
-        logger.info(f"📊 表初始行数: {self.stats['table_initial_count']:,}")
-        logger.info(f"📊 表最终行数: {self.stats['table_final_count']:,}")
-        logger.info(f"📈 实际新增行数: {self.stats['table_final_count'] - self.stats['table_initial_count']:,}")
-        
-        # 获取最新统计
-        final_stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
-        if final_stats:
-            logger.info("\n📈 表最终统计:")
-            for key, value in final_stats.items():
-                logger.info(f"  {key}: {value}")
-        
-        # 检查重复键
-        final_duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
-        if final_duplicates:
-            logger.warning(f"\n⚠️  最终重复的唯一键记录: {len(final_duplicates)} 组")
-            for dup in final_duplicates[:10]:  # 显示前10个重复
-                logger.warning(f"  🔄 重复: {dup}")
-        else:
-            logger.info("\n✅ 无重复的唯一键记录")
-        
-        # 记录文件统计
-        logger.info(f"\n💾 已记录处理文件总数: {self.record_manager.get_record_count()}")
-    
-    def print_failed_files(self, failed_results):
-        """打印失败文件列表"""
-        logger.warning(f"\n⚠️  失败文件列表 ({len(failed_results)} 个):")
-        for file_info, error in failed_results:
-            logger.warning(f"  📄 {os.path.basename(file_info.file_path)}")
-            if isinstance(error, Exception):
-                logger.warning(f"    错误类型: {type(error).__name__}")
-                logger.warning(f"    错误信息: {str(error)[:200]}" + ("..." if len(str(error)) > 200 else ""))
-            else:
-                logger.warning(f"    错误: {error}")
 
 
 def main():
@@ -395,14 +238,12 @@ def main():
     # 创建配置
     config = AppConfig(
         base_path=r"F:\BaiduNetdiskDownload\标准化数据\stander_parquet",
-        max_workers=20,
+        max_workers=2,
         batch_size=10000,
         upsert_enabled=True,  # 启用UPSERT
         db_config=DatabaseConfig(
-            # host="192.168.50.234",
-            # port=4000,
-            host="106.120.102.238",
-            port=44000,
+            host="192.168.50.234",
+            port=4000,
             user="root",
             password="123456",
             database="wind_data"

+ 153 - 15
dataStorage_datang/thread_pool.py

@@ -1,24 +1,128 @@
-# 线程池类
-
-
 import concurrent.futures
-from typing import List, Callable, Any, Tuple, Optional
+from typing import List, Callable, Any, Tuple, Optional, Set
 import logging
+import os
+import json
+import threading
+from datetime import datetime
 
 from file_scanner import ParquetFileInfo
 
 logger = logging.getLogger(__name__)
 
+
+class ProcessedRecordManager:
+    """已处理文件记录管理器(线程安全)"""
+    
+    def __init__(self, record_file: str = "record_processed.json"):
+        """
+        初始化记录管理器
+        
+        Args:
+            record_file: 记录文件路径
+        """
+        self.record_file = record_file
+        self.processed_files: Set[str] = set()
+        self._lock = threading.Lock()  # 线程锁,确保线程安全
+        self._load_records()
+    
+    def _load_records(self):
+        """加载已处理文件记录"""
+        try:
+            with self._lock:
+                if os.path.exists(self.record_file):
+                    with open(self.record_file, 'r', encoding='utf-8') as f:
+                        records = json.load(f)
+                        if isinstance(records, list):
+                            self.processed_files = set(records)
+                    logger.info(f"📁 已加载 {len(self.processed_files)} 个已处理文件记录")
+                else:
+                    logger.info(f"📁 记录文件不存在,将创建新文件: {self.record_file}")
+        except Exception as e:
+            logger.warning(f"❌ 加载记录文件失败: {e}")
+            self.processed_files = set()
+    
+    def is_processed(self, file_path: str) -> bool:
+        """检查文件是否已处理过"""
+        with self._lock:
+            return file_path in self.processed_files
+    
+    def add_record(self, file_path: str, metadata: dict = None):
+        """添加处理记录并立即保存(线程安全)"""
+        with self._lock:
+            if file_path not in self.processed_files:
+                self.processed_files.add(file_path)
+                self._save_records_internal(file_path, metadata)
+    
+    def _save_records_internal(self, file_path: str = None, metadata: dict = None):
+        """内部方法:保存记录到文件(线程安全)"""
+        try:
+            # 转换为列表并排序,便于阅读
+            records_list = sorted(list(self.processed_files))
+            
+            # 准备要保存的数据
+            save_data = records_list
+            
+            with open(self.record_file, 'w', encoding='utf-8') as f:
+                json.dump(save_data, f, ensure_ascii=False, indent=2)
+            
+            logger.info(f"💾 已保存 {len(records_list)} 个处理记录到 {self.record_file}")
+            return True
+        except Exception as e:
+            logger.error(f"❌ 保存记录文件失败: {e}")
+            return False
+    
+    def get_record_count(self) -> int:
+        """获取记录数量"""
+        with self._lock:
+            return len(self.processed_files)
+    
+    def clear_records(self):
+        """清空记录"""
+        with self._lock:
+            self.processed_files.clear()
+            try:
+                if os.path.exists(self.record_file):
+                    os.remove(self.record_file)
+                    logger.info(f"🗑️  已删除记录文件: {self.record_file}")
+            except Exception as e:
+                logger.warning(f"删除记录文件失败: {e}")
+
+
 class ThreadPoolManager:
-    """线程池管理器"""
+    """线程池管理器,集成记录管理功能"""
     
-    def __init__(self, max_workers: int = 20):
+    def __init__(self, max_workers: int = 20, record_file: str = "record_processed.json"):
         self.max_workers = max_workers
         self.executor = None
+        self.record_manager = ProcessedRecordManager(record_file)
         
     def process_files(self, file_infos: List[ParquetFileInfo], 
                      process_func: Callable[[ParquetFileInfo], Any]) -> List[Tuple[ParquetFileInfo, Any]]:
-        """使用线程池处理文件"""
+        """
+        使用线程池处理文件,自动记录成功处理的文件
+        
+        Args:
+            file_infos: 文件信息列表
+            process_func: 处理函数
+            
+        Returns:
+            处理结果列表
+        """
+        # 先过滤掉已处理过的文件
+        unprocessed_files = []
+        for file_info in file_infos:
+            if self.record_manager.is_processed(file_info.file_path):
+                logger.debug(f"⏭️  跳过已处理文件: {os.path.basename(file_info.file_path)}")
+            else:
+                unprocessed_files.append(file_info)
+        
+        if not unprocessed_files:
+            logger.info("🎉 所有文件都已处理过,无需处理新文件")
+            return []
+        
+        logger.info(f"📝 需要处理 {len(unprocessed_files)} 个新文件(跳过了 {len(file_infos)-len(unprocessed_files)} 个已处理文件)")
+        
         results = []
         
         with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
@@ -27,12 +131,12 @@ class ThreadPoolManager:
             # 提交所有任务
             future_to_file = {
                 executor.submit(process_func, file_info): file_info
-                for file_info in file_infos
+                for file_info in unprocessed_files
             }
             
             # 处理完成的任务
             completed = 0
-            total = len(file_infos)
+            total = len(unprocessed_files)
             
             for future in concurrent.futures.as_completed(future_to_file):
                 file_info = future_to_file[future]
@@ -40,22 +144,56 @@ class ThreadPoolManager:
                 
                 try:
                     result = future.result()
-                    results.append((file_info, result))
                     
                     if isinstance(result, tuple) and len(result) == 3:
+                        # 成功处理,记录文件
                         total_rows, inserted_rows, updated_rows = result
-                        logger.info(f"进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet "
+                        results.append((file_info, result))
+                        
+                        # ✅ 立即记录成功处理的文件
+                        self.record_manager.add_record(file_info.file_path, {
+                            'total_rows': total_rows,
+                            'inserted_rows': inserted_rows,
+                            'updated_rows': updated_rows,
+                            'processed_time': datetime.now().isoformat()
+                        })
+                        
+                        logger.info(f"✅ 进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet "
                                   f"处理完成, 总行: {total_rows}, 插入: {inserted_rows}, 更新: {updated_rows}")
                     else:
-                        logger.info(f"进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet 处理完成")
+                        # 处理失败,不记录
+                        results.append((file_info, result))
+                        logger.error(f"❌ 进度: {completed}/{total} - 文件 {file_info.turbine_id}.parquet 处理失败")
                         
                 except Exception as e:
-                    logger.error(f"处理文件 {file_info.file_path} 时出错: {e}")
+                    # 处理异常,不记录
+                    logger.error(f"❌ 处理文件 {file_info.file_path} 时出错: {e}")
                     results.append((file_info, e))
         
         return results
     
     def process_with_data_loader(self, file_infos: List[ParquetFileInfo], 
                                data_loader: Any) -> List[Tuple[ParquetFileInfo, Any]]:
-        """使用DataLoader处理文件"""
-        return self.process_files(file_infos, data_loader.load_file)
+        """
+        使用DataLoader处理文件,自动记录成功处理的文件
+        
+        Args:
+            file_infos: 文件信息列表
+            data_loader: DataLoader实例
+            
+        Returns:
+            处理结果列表
+        """
+        return self.process_files(file_infos, data_loader.load_file)
+    
+    def get_record_count(self) -> int:
+        """获取已处理文件数量"""
+        return self.record_manager.get_record_count()
+    
+    def get_unprocessed_count(self, all_files: List[ParquetFileInfo]) -> int:
+        """获取未处理文件数量"""
+        unprocessed = 0
+        for file_info in all_files:
+            if not self.record_manager.is_processed(file_info.file_path):
+                unprocessed += 1
+        return unprocessed