# 主程序 import os import sys import time import json from datetime import datetime from typing import List, Set, Dict, Any import logging # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from config import AppConfig, DatabaseConfig, TableConfig from file_scanner import FileScanner, ParquetFileInfo from schema_reader import SchemaReader from database import DatabaseManager from data_loader import DataLoader from thread_pool import ThreadPoolManager # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('parquet_processor_three_key.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class ParquetProcessor: """Parquet文件处理器主类""" def __init__(self, config: AppConfig): self.config = config self.file_scanner = FileScanner(config.base_path) self.file_infos: List[ParquetFileInfo] = [] self.db_manager = DatabaseManager(config.db_config, config.table_config) # 初始化线程池管理器(带记录管理功能) self.thread_pool = ThreadPoolManager( max_workers=self.config.max_workers, record_file="record_processed.json" ) def run(self): """运行整个处理流程""" logger.info("=" * 60) logger.info("Parquet文件处理系统 (SQLAlchemy + 三字段唯一键UPSERT)") logger.info("=" * 60) try: # 步骤1: 扫描文件 logger.info("\n步骤1: 扫描Parquet文件...") all_file_infos = self.file_scanner.scan_files() if not all_file_infos: logger.warning("⚠️ 未找到任何parquet文件,程序退出") return logger.info(f"📁 扫描完成!共找到 {len(all_file_infos)} 个parquet文件") logger.info(f"📊 机型型号种类: {len(set(f.model_type for f in all_file_infos))}") logger.info(f"🌾 风场数量: {len(set(f.farm_id for f in all_file_infos))}") # 检查有多少文件未处理 unprocessed_count = self.thread_pool.get_unprocessed_count(all_file_infos) logger.info(f"📝 未处理文件数量: {unprocessed_count}") logger.info(f"✅ 已处理文件数量: {self.thread_pool.get_record_count()}") if unprocessed_count == 0: logger.info("🎉 所有文件都已处理过,无需处理新文件") return # 步骤2: 读取表头并识别时间字段 logger.info("\n步骤2: 读取表头信息并识别时间字段...") # 使用所有文件读取表头(包括已处理的和未处理的) # 这样即使之前已处理过部分文件,也能正确读取表结构 schema_reader = SchemaReader(all_file_infos, self.config.table_config.time_column_aliases) all_columns = schema_reader.read_all_headers() sql_columns = schema_reader.get_sql_columns() unique_keys = schema_reader.get_unique_key_columns(self.config.table_config.unique_keys) logger.info(f"🔑 确定三字段唯一键: {unique_keys}") if not schema_reader.identified_time_column: logger.error("❌ 未识别到时间字段,无法创建三字段唯一键!") logger.error("💡 请检查数据中是否包含时间字段,或配置正确的时间字段别名") return # 显示部分字段信息 if all_columns: logger.info(f"📋 字段数量: {len(all_columns)}") logger.info("📊 前20个字段:") for i, col in enumerate(sorted(list(all_columns))[:20]): logger.info(f" {i+1:2d}. {col}") # 步骤3: 创建数据库表(带三字段唯一键) logger.info("\n步骤3: 创建数据库表(带三字段唯一键)...") if not self.db_manager.check_table_exists(self.config.table_config.table_name): success = self.db_manager.create_table_with_unique_key( self.config.table_config.table_name, sql_columns, unique_keys ) if not success: logger.error("❌ 创建表失败,程序退出") return else: logger.info(f"✅ 表 {self.config.table_config.table_name} 已存在") # 检查表结构 stats = self.db_manager.get_table_stats(self.config.table_config.table_name) if stats: logger.info(f"📊 当前表统计: {stats}") # 检查重复键 duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name) if duplicates: logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组") for dup in duplicates[:5]: # 显示前5个重复 logger.warning(f" 🔄 重复: {dup}") # 检查表初始行数 initial_count = self.db_manager.get_table_row_count(self.config.table_config.table_name) logger.info(f"📊 表初始行数: {initial_count:,}") # 步骤4: 使用线程池加载数据 logger.info(f"\n步骤4: 使用线程池加载数据({self.config.max_workers}个线程,批量大小: {self.config.batch_size},模式: {'UPSERT' if self.config.upsert_enabled else 'INSERT'})...") data_loader = DataLoader( self.db_manager, self.config.table_config.table_name, self.config.batch_size, max_retries=3, upsert=self.config.upsert_enabled ) start_time = datetime.now() logger.info(f"⏰ 开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}") # 处理文件(线程池会自动过滤已处理的文件并记录成功处理的文件) results = self.thread_pool.process_with_data_loader(all_file_infos, data_loader) end_time = datetime.now() elapsed = end_time - start_time # 统计结果 success_results = [] failed_results = [] total_rows_processed = 0 total_rows_inserted = 0 total_rows_updated = 0 for file_info, result in results: if isinstance(result, tuple) and len(result) == 3: # 成功处理 success_results.append((file_info, result)) # 累加统计 total_rows, inserted_rows, updated_rows = result total_rows_processed += total_rows total_rows_inserted += inserted_rows total_rows_updated += updated_rows else: # 处理失败 failed_results.append((file_info, result)) # 打印最终统计 logger.info(f"\n{'='*60}") logger.info("🎉 数据处理完成!") logger.info(f"{'='*60}") logger.info(f"📁 总扫描文件数: {len(all_file_infos)}") logger.info(f"⏭️ 跳过文件: {len(all_file_infos) - len(results)} (已处理过)") logger.info(f"✅ 本次成功处理: {len(success_results)} 个文件") logger.info(f"❌ 本次失败文件: {len(failed_results)} 个文件") logger.info(f"📊 总处理行数: {total_rows_processed:,}") logger.info(f" 📥 新插入行数: {total_rows_inserted:,}") logger.info(f" 🔄 更新行数: {total_rows_updated:,}") logger.info(f"⏱️ 总耗时: {elapsed}") if len(success_results) > 0: avg_time_per_file = elapsed.total_seconds() / len(success_results) logger.info(f"📈 平均每个文件处理时间: {avg_time_per_file:.2f} 秒") # 检查最终行数和统计 final_count = self.db_manager.get_table_row_count(self.config.table_config.table_name) logger.info(f"📊 表初始行数: {initial_count:,}") logger.info(f"📊 表最终行数: {final_count:,}") logger.info(f"📈 实际新增行数: {final_count - initial_count:,}") # 获取最新统计 final_stats = self.db_manager.get_table_stats(self.config.table_config.table_name) if final_stats: logger.info("\n📈 表最终统计:") for key, value in final_stats.items(): logger.info(f" {key}: {value}") # 检查重复键 final_duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name) if final_duplicates: logger.warning(f"\n⚠️ 最终重复的唯一键记录: {len(final_duplicates)} 组") for dup in final_duplicates[:10]: # 显示前10个重复 logger.warning(f" 🔄 重复: {dup}") else: logger.info("\n✅ 无重复的唯一键记录") # 打印失败文件 if failed_results: logger.warning(f"\n⚠️ 失败文件列表 ({len(failed_results)} 个):") for file_info, error in failed_results: logger.warning(f" 📄 {os.path.basename(file_info.file_path)}") if isinstance(error, Exception): logger.warning(f" 错误类型: {type(error).__name__}") logger.warning(f" 错误信息: {str(error)[:200]}" + ("..." if len(str(error)) > 200 else "")) else: logger.warning(f" 错误: {error}") # 记录文件总数 logger.info(f"\n💾 总记录处理文件数: {self.thread_pool.get_record_count()}") except KeyboardInterrupt: logger.warning("\n⚠️ 用户中断程序执行") except Exception as e: logger.error(f"\n❌ 程序执行出错: {e}") import traceback logger.error(traceback.format_exc()) finally: # 清理 self.db_manager.close() logger.info("🏁 程序执行完成") def main(): """主函数""" # 创建配置 config = AppConfig( # base_path=r"F:\BaiduNetdiskDownload\标准化数据\stander_parquet", base_path=f"./data/标准化数据/stander_parquet", max_workers=10, batch_size=10000, upsert_enabled=True, # 启用UPSERT db_config=DatabaseConfig( host="192.168.50.234", port=4000, user="root", password="123456", database="wind_data" ), table_config=TableConfig( table_name="data_scada_turbine", # 三字段唯一键 unique_keys=["id_farm", "id_turbine", "data_time"], time_column_aliases=[ "data_time", "time", "timestamp", "datetime", "采集时间", "时间", "记录时间", "数据时间", "Time", "Timestamp", "DATA_TIME" ] ) ) # 检查路径是否存在 if not os.path.exists(config.base_path): logger.error(f"❌ 错误: 路径不存在: {config.base_path}") logger.error("💡 请修改config.py中的base_path为正确的路径") sys.exit(1) # 运行处理器 processor = ParquetProcessor(config) processor.run() if __name__ == "__main__": main()