# 主程序 import os import sys import time import json from datetime import datetime from typing import List, Set, Dict, Any import logging # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from config import AppConfig, DatabaseConfig, TableConfig from file_scanner import FileScanner, ParquetFileInfo from schema_reader import SchemaReader from database import DatabaseManager from data_loader import DataLoader from thread_pool import ThreadPoolManager # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('parquet_processor_three_key.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class ProcessedRecordManager: """已处理文件记录管理器""" def __init__(self, record_file: str = "record_processed.json"): """ 初始化记录管理器 Args: record_file: 记录文件路径 """ self.record_file = record_file self.processed_files: Set[str] = set() self._load_records() def _load_records(self): """加载已处理文件记录""" try: if os.path.exists(self.record_file): with open(self.record_file, 'r', encoding='utf-8') as f: records = json.load(f) if isinstance(records, list): self.processed_files = set(records) logger.info(f"📁 已加载 {len(self.processed_files)} 个已处理文件记录") else: logger.info(f"📁 记录文件不存在,将创建新文件: {self.record_file}") except Exception as e: logger.warning(f"❌ 加载记录文件失败: {e}") self.processed_files = set() def is_processed(self, file_path: str) -> bool: """检查文件是否已处理过""" return file_path in self.processed_files def add_record(self, file_path: str): """添加处理记录""" if file_path not in self.processed_files: self.processed_files.add(file_path) def save_records(self): """保存记录到文件""" try: # 转换为列表并排序,便于阅读 records_list = sorted(list(self.processed_files)) with open(self.record_file, 'w', encoding='utf-8') as f: json.dump(records_list, f, ensure_ascii=False, indent=2) logger.info(f"💾 已保存 {len(records_list)} 个处理记录到 {self.record_file}") return True except Exception as e: logger.error(f"❌ 保存记录文件失败: {e}") return False def get_record_count(self) -> int: """获取记录数量""" return len(self.processed_files) def clear_records(self): """清空记录""" self.processed_files.clear() try: if os.path.exists(self.record_file): os.remove(self.record_file) logger.info(f"🗑️ 已删除记录文件: {self.record_file}") except Exception as e: logger.warning(f"删除记录文件失败: {e}") class ParquetProcessor: """Parquet文件处理器主类""" def __init__(self, config: AppConfig): self.config = config self.file_scanner = FileScanner(config.base_path) self.file_infos: List[ParquetFileInfo] = [] self.db_manager = DatabaseManager(config.db_config, config.table_config) # 初始化记录管理器 self.record_manager = ProcessedRecordManager() # 统计信息 self.stats = { 'start_time': None, 'end_time': None, 'total_files': 0, 'processed_files': 0, 'skipped_files': 0, 'failed_files': 0, 'total_rows': 0, 'inserted_rows': 0, 'updated_rows': 0, 'table_initial_count': 0, 'table_final_count': 0 } def filter_unprocessed_files(self, file_infos: List[ParquetFileInfo]) -> List[ParquetFileInfo]: """ 过滤掉已处理过的文件 Args: file_infos: 所有扫描到的文件信息 Returns: List[ParquetFileInfo]: 未处理过的文件信息 """ unprocessed_files = [] skipped_count = 0 for file_info in file_infos: if self.record_manager.is_processed(file_info.file_path): skipped_count += 1 logger.debug(f"⏭️ 跳过已处理文件: {os.path.basename(file_info.file_path)}") else: unprocessed_files.append(file_info) if skipped_count > 0: logger.info(f"⏭️ 跳过 {skipped_count} 个已处理过的文件") logger.info(f"📝 需要处理 {len(unprocessed_files)} 个新文件") return unprocessed_files def update_record_for_file(self, file_info: ParquetFileInfo, success: bool = True): """ 更新文件处理记录 Args: file_info: 文件信息 success: 是否处理成功 """ if success: self.record_manager.add_record(file_info.file_path) logger.info(f"✅ 已记录成功处理文件: {os.path.basename(file_info.file_path)}") else: logger.warning(f"❌ 文件处理失败,不记录: {os.path.basename(file_info.file_path)}") def save_processed_records(self): """保存处理记录到文件""" try: if self.record_manager.save_records(): logger.info(f"💾 处理记录已保存,共 {self.record_manager.get_record_count()} 个文件") else: logger.warning("⚠️ 处理记录保存失败") except Exception as e: logger.error(f"❌ 保存处理记录时出错: {e}") def run(self): """运行整个处理流程""" logger.info("=" * 60) logger.info("Parquet文件处理系统 (SQLAlchemy + 三字段唯一键UPSERT)") logger.info("=" * 60) try: # 记录开始时间 self.stats['start_time'] = datetime.now() # 步骤1: 扫描文件 logger.info("\n步骤1: 扫描Parquet文件...") all_file_infos = self.file_scanner.scan_files() # 打印扫描结果摘要 if all_file_infos: logger.info(f"📁 扫描完成!共找到 {len(all_file_infos)} 个parquet文件") logger.info(f"📊 机型型号种类: {len(set(f.model_type for f in all_file_infos))}") logger.info(f"🌾 风场数量: {len(set(f.farm_id for f in all_file_infos))}") else: logger.warning("⚠️ 未找到任何parquet文件,程序退出") return # 过滤已处理过的文件 self.file_infos = self.filter_unprocessed_files(all_file_infos) self.stats['skipped_files'] = len(all_file_infos) - len(self.file_infos) self.stats['total_files'] = len(all_file_infos) if not self.file_infos: logger.info("🎉 所有文件都已处理过,无需处理新文件") logger.info(f"📊 已处理文件总数: {self.record_manager.get_record_count()}") return # 步骤2: 读取表头并识别时间字段 logger.info("\n步骤2: 读取表头信息并识别时间字段...") schema_reader = SchemaReader(self.file_infos, self.config.table_config.time_column_aliases) all_columns = schema_reader.read_all_headers() sql_columns = schema_reader.get_sql_columns() # 确定唯一键 unique_keys = schema_reader.get_unique_key_columns(self.config.table_config.unique_keys) logger.info(f"🔑 确定三字段唯一键: {unique_keys}") if not schema_reader.identified_time_column: logger.error("❌ 未识别到时间字段,无法创建三字段唯一键!") logger.error("💡 请检查数据中是否包含时间字段,或配置正确的时间字段别名") return # 显示部分字段信息 if all_columns: logger.info(f"📋 字段数量: {len(all_columns)}") logger.info("📊 前20个字段:") for i, col in enumerate(sorted(list(all_columns))[:20]): logger.info(f" {i+1:2d}. {col}") # 步骤3: 创建数据库表(带三字段唯一键) logger.info("\n步骤3: 创建数据库表(带三字段唯一键)...") if not self.db_manager.check_table_exists(self.config.table_config.table_name): success = self.db_manager.create_table_with_unique_key( self.config.table_config.table_name, sql_columns, unique_keys ) if not success: logger.error("❌ 创建表失败,程序退出") return else: logger.info(f"✅ 表 {self.config.table_config.table_name} 已存在") # 检查表结构 stats = self.db_manager.get_table_stats(self.config.table_config.table_name) if stats: logger.info(f"📊 当前表统计: {stats}") # 检查重复键 duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name) if duplicates: logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组") for dup in duplicates[:5]: # 显示前5个重复 logger.warning(f" 🔄 重复: {dup}") # 检查表初始行数 self.stats['table_initial_count'] = self.db_manager.get_table_row_count( self.config.table_config.table_name ) logger.info(f"📊 表初始行数: {self.stats['table_initial_count']:,}") # 步骤4: 使用线程池加载数据 logger.info(f"\n步骤4: 使用线程池加载数据({self.config.max_workers}个线程,批量大小: {self.config.batch_size},模式: {'UPSERT' if self.config.upsert_enabled else 'INSERT'})...") data_loader = DataLoader( self.db_manager, self.config.table_config.table_name, self.config.batch_size, max_retries=3, upsert=self.config.upsert_enabled ) thread_pool = ThreadPoolManager(max_workers=self.config.max_workers) start_time = datetime.now() logger.info(f"⏰ 开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}") # 处理文件 results = thread_pool.process_with_data_loader(self.file_infos, data_loader) end_time = datetime.now() elapsed = end_time - start_time # 统计结果并更新记录 success_results = [] failed_results = [] for file_info, result in results: if isinstance(result, tuple) and len(result) == 3: # 成功处理 success_results.append((file_info, result)) self.update_record_for_file(file_info, success=True) # 累加统计 total_rows, inserted_rows, updated_rows = result self.stats['total_rows'] += total_rows self.stats['inserted_rows'] += inserted_rows self.stats['updated_rows'] += updated_rows self.stats['processed_files'] += 1 else: # 处理失败 failed_results.append((file_info, result)) self.update_record_for_file(file_info, success=False) self.stats['failed_files'] += 1 # 保存处理记录 self.save_processed_records() # 检查最终行数和统计 self.stats['table_final_count'] = self.db_manager.get_table_row_count( self.config.table_config.table_name ) self.stats['end_time'] = datetime.now() # 打印最终统计 self.print_summary(elapsed) # 打印失败文件 if failed_results: self.print_failed_files(failed_results) except KeyboardInterrupt: logger.warning("\n⚠️ 用户中断程序执行") # 尝试保存已处理的记录 self.save_processed_records() except Exception as e: logger.error(f"\n❌ 程序执行出错: {e}") import traceback logger.error(traceback.format_exc()) # 尝试保存已处理的记录 self.save_processed_records() finally: # 清理 self.db_manager.close() logger.info("🏁 程序执行完成") def print_summary(self, elapsed): """打印处理摘要""" logger.info(f"\n{'='*60}") logger.info("🎉 数据处理完成!") logger.info(f"{'='*60}") logger.info(f"📁 总文件数: {self.stats['total_files']}") logger.info(f"⏭️ 跳过文件: {self.stats['skipped_files']} (已处理过)") logger.info(f"✅ 成功处理: {self.stats['processed_files']} 个文件") logger.info(f"❌ 失败文件: {self.stats['failed_files']} 个文件") logger.info(f"📊 总处理行数: {self.stats['total_rows']:,}") logger.info(f" 📥 新插入行数: {self.stats['inserted_rows']:,}") logger.info(f" 🔄 更新行数: {self.stats['updated_rows']:,}") logger.info(f"⏱️ 总耗时: {elapsed}") if self.stats['processed_files'] > 0: avg_time_per_file = elapsed.total_seconds() / self.stats['processed_files'] logger.info(f"📈 平均每个文件处理时间: {avg_time_per_file:.2f} 秒") logger.info(f"📊 表初始行数: {self.stats['table_initial_count']:,}") logger.info(f"📊 表最终行数: {self.stats['table_final_count']:,}") logger.info(f"📈 实际新增行数: {self.stats['table_final_count'] - self.stats['table_initial_count']:,}") # 获取最新统计 final_stats = self.db_manager.get_table_stats(self.config.table_config.table_name) if final_stats: logger.info("\n📈 表最终统计:") for key, value in final_stats.items(): logger.info(f" {key}: {value}") # 检查重复键 final_duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name) if final_duplicates: logger.warning(f"\n⚠️ 最终重复的唯一键记录: {len(final_duplicates)} 组") for dup in final_duplicates[:10]: # 显示前10个重复 logger.warning(f" 🔄 重复: {dup}") else: logger.info("\n✅ 无重复的唯一键记录") # 记录文件统计 logger.info(f"\n💾 已记录处理文件总数: {self.record_manager.get_record_count()}") def print_failed_files(self, failed_results): """打印失败文件列表""" logger.warning(f"\n⚠️ 失败文件列表 ({len(failed_results)} 个):") for file_info, error in failed_results: logger.warning(f" 📄 {os.path.basename(file_info.file_path)}") if isinstance(error, Exception): logger.warning(f" 错误类型: {type(error).__name__}") logger.warning(f" 错误信息: {str(error)[:200]}" + ("..." if len(str(error)) > 200 else "")) else: logger.warning(f" 错误: {error}") def main(): """主函数""" # 创建配置 config = AppConfig( base_path=r"F:\BaiduNetdiskDownload\标准化数据\stander_parquet", max_workers=20, batch_size=10000, upsert_enabled=True, # 启用UPSERT db_config=DatabaseConfig( # host="192.168.50.234", # port=4000, host="106.120.102.238", port=44000, user="root", password="123456", database="wind_data" ), table_config=TableConfig( table_name="data_scada_turbine", # 三字段唯一键 unique_keys=["id_farm", "id_turbine", "data_time"], time_column_aliases=[ "data_time", "time", "timestamp", "datetime", "采集时间", "时间", "记录时间", "数据时间", "Time", "Timestamp", "DATA_TIME" ] ) ) # 检查路径是否存在 if not os.path.exists(config.base_path): logger.error(f"❌ 错误: 路径不存在: {config.base_path}") logger.error("💡 请修改config.py中的base_path为正确的路径") sys.exit(1) # 运行处理器 processor = ParquetProcessor(config) processor.run() if __name__ == "__main__": main()