| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- # 主程序
- import os
- import sys
- import time
- import json
- from datetime import datetime
- from typing import List, Set, Dict, Any
- import logging
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- from config import AppConfig, DatabaseConfig, TableConfig
- from file_scanner import FileScanner, ParquetFileInfo
- from schema_reader import SchemaReader
- from database import DatabaseManager
- from data_loader import DataLoader
- from thread_pool import ThreadPoolManager
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('parquet_processor_three_key.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class ProcessedRecordManager:
- """已处理文件记录管理器"""
-
- def __init__(self, record_file: str = "record_processed.json"):
- """
- 初始化记录管理器
-
- Args:
- record_file: 记录文件路径
- """
- self.record_file = record_file
- self.processed_files: Set[str] = set()
- self._load_records()
-
- def _load_records(self):
- """加载已处理文件记录"""
- try:
- if os.path.exists(self.record_file):
- with open(self.record_file, 'r', encoding='utf-8') as f:
- records = json.load(f)
- if isinstance(records, list):
- self.processed_files = set(records)
- logger.info(f"📁 已加载 {len(self.processed_files)} 个已处理文件记录")
- else:
- logger.info(f"📁 记录文件不存在,将创建新文件: {self.record_file}")
- except Exception as e:
- logger.warning(f"❌ 加载记录文件失败: {e}")
- self.processed_files = set()
-
- def is_processed(self, file_path: str) -> bool:
- """检查文件是否已处理过"""
- return file_path in self.processed_files
-
- def add_record(self, file_path: str):
- """添加处理记录"""
- if file_path not in self.processed_files:
- self.processed_files.add(file_path)
-
- def save_records(self):
- """保存记录到文件"""
- try:
- # 转换为列表并排序,便于阅读
- records_list = sorted(list(self.processed_files))
-
- with open(self.record_file, 'w', encoding='utf-8') as f:
- json.dump(records_list, f, ensure_ascii=False, indent=2)
-
- logger.info(f"💾 已保存 {len(records_list)} 个处理记录到 {self.record_file}")
- return True
- except Exception as e:
- logger.error(f"❌ 保存记录文件失败: {e}")
- return False
-
- def get_record_count(self) -> int:
- """获取记录数量"""
- return len(self.processed_files)
-
- def clear_records(self):
- """清空记录"""
- self.processed_files.clear()
- try:
- if os.path.exists(self.record_file):
- os.remove(self.record_file)
- logger.info(f"🗑️ 已删除记录文件: {self.record_file}")
- except Exception as e:
- logger.warning(f"删除记录文件失败: {e}")
- class ParquetProcessor:
- """Parquet文件处理器主类"""
-
- def __init__(self, config: AppConfig):
- self.config = config
- self.file_scanner = FileScanner(config.base_path)
- self.file_infos: List[ParquetFileInfo] = []
- self.db_manager = DatabaseManager(config.db_config, config.table_config)
-
- # 初始化记录管理器
- self.record_manager = ProcessedRecordManager()
-
- # 统计信息
- self.stats = {
- 'start_time': None,
- 'end_time': None,
- 'total_files': 0,
- 'processed_files': 0,
- 'skipped_files': 0,
- 'failed_files': 0,
- 'total_rows': 0,
- 'inserted_rows': 0,
- 'updated_rows': 0,
- 'table_initial_count': 0,
- 'table_final_count': 0
- }
-
- def filter_unprocessed_files(self, file_infos: List[ParquetFileInfo]) -> List[ParquetFileInfo]:
- """
- 过滤掉已处理过的文件
-
- Args:
- file_infos: 所有扫描到的文件信息
-
- Returns:
- List[ParquetFileInfo]: 未处理过的文件信息
- """
- unprocessed_files = []
- skipped_count = 0
-
- for file_info in file_infos:
- if self.record_manager.is_processed(file_info.file_path):
- skipped_count += 1
- logger.debug(f"⏭️ 跳过已处理文件: {os.path.basename(file_info.file_path)}")
- else:
- unprocessed_files.append(file_info)
-
- if skipped_count > 0:
- logger.info(f"⏭️ 跳过 {skipped_count} 个已处理过的文件")
- logger.info(f"📝 需要处理 {len(unprocessed_files)} 个新文件")
-
- return unprocessed_files
-
- def update_record_for_file(self, file_info: ParquetFileInfo, success: bool = True):
- """
- 更新文件处理记录
-
- Args:
- file_info: 文件信息
- success: 是否处理成功
- """
- if success:
- self.record_manager.add_record(file_info.file_path)
- logger.info(f"✅ 已记录成功处理文件: {os.path.basename(file_info.file_path)}")
- else:
- logger.warning(f"❌ 文件处理失败,不记录: {os.path.basename(file_info.file_path)}")
-
- def save_processed_records(self):
- """保存处理记录到文件"""
- try:
- if self.record_manager.save_records():
- logger.info(f"💾 处理记录已保存,共 {self.record_manager.get_record_count()} 个文件")
- else:
- logger.warning("⚠️ 处理记录保存失败")
- except Exception as e:
- logger.error(f"❌ 保存处理记录时出错: {e}")
-
- def run(self):
- """运行整个处理流程"""
- logger.info("=" * 60)
- logger.info("Parquet文件处理系统 (SQLAlchemy + 三字段唯一键UPSERT)")
- logger.info("=" * 60)
-
- try:
- # 记录开始时间
- self.stats['start_time'] = datetime.now()
-
- # 步骤1: 扫描文件
- logger.info("\n步骤1: 扫描Parquet文件...")
- all_file_infos = self.file_scanner.scan_files()
-
- # 打印扫描结果摘要
- if all_file_infos:
- logger.info(f"📁 扫描完成!共找到 {len(all_file_infos)} 个parquet文件")
- logger.info(f"📊 机型型号种类: {len(set(f.model_type for f in all_file_infos))}")
- logger.info(f"🌾 风场数量: {len(set(f.farm_id for f in all_file_infos))}")
- else:
- logger.warning("⚠️ 未找到任何parquet文件,程序退出")
- return
-
- # 过滤已处理过的文件
- self.file_infos = self.filter_unprocessed_files(all_file_infos)
- self.stats['skipped_files'] = len(all_file_infos) - len(self.file_infos)
- self.stats['total_files'] = len(all_file_infos)
-
- if not self.file_infos:
- logger.info("🎉 所有文件都已处理过,无需处理新文件")
- logger.info(f"📊 已处理文件总数: {self.record_manager.get_record_count()}")
- return
-
- # 步骤2: 读取表头并识别时间字段
- logger.info("\n步骤2: 读取表头信息并识别时间字段...")
- schema_reader = SchemaReader(self.file_infos, self.config.table_config.time_column_aliases)
- all_columns = schema_reader.read_all_headers()
- sql_columns = schema_reader.get_sql_columns()
-
- # 确定唯一键
- unique_keys = schema_reader.get_unique_key_columns(self.config.table_config.unique_keys)
- logger.info(f"🔑 确定三字段唯一键: {unique_keys}")
-
- if not schema_reader.identified_time_column:
- logger.error("❌ 未识别到时间字段,无法创建三字段唯一键!")
- logger.error("💡 请检查数据中是否包含时间字段,或配置正确的时间字段别名")
- return
-
- # 显示部分字段信息
- if all_columns:
- logger.info(f"📋 字段数量: {len(all_columns)}")
- logger.info("📊 前20个字段:")
- for i, col in enumerate(sorted(list(all_columns))[:20]):
- logger.info(f" {i+1:2d}. {col}")
-
- # 步骤3: 创建数据库表(带三字段唯一键)
- logger.info("\n步骤3: 创建数据库表(带三字段唯一键)...")
- if not self.db_manager.check_table_exists(self.config.table_config.table_name):
- success = self.db_manager.create_table_with_unique_key(
- self.config.table_config.table_name,
- sql_columns,
- unique_keys
- )
- if not success:
- logger.error("❌ 创建表失败,程序退出")
- return
- else:
- logger.info(f"✅ 表 {self.config.table_config.table_name} 已存在")
- # 检查表结构
- stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
- if stats:
- logger.info(f"📊 当前表统计: {stats}")
-
- # 检查重复键
- duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
- if duplicates:
- logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组")
- for dup in duplicates[:5]: # 显示前5个重复
- logger.warning(f" 🔄 重复: {dup}")
-
- # 检查表初始行数
- self.stats['table_initial_count'] = self.db_manager.get_table_row_count(
- self.config.table_config.table_name
- )
- logger.info(f"📊 表初始行数: {self.stats['table_initial_count']:,}")
-
- # 步骤4: 使用线程池加载数据
- logger.info(f"\n步骤4: 使用线程池加载数据({self.config.max_workers}个线程,批量大小: {self.config.batch_size},模式: {'UPSERT' if self.config.upsert_enabled else 'INSERT'})...")
- data_loader = DataLoader(
- self.db_manager,
- self.config.table_config.table_name,
- self.config.batch_size,
- max_retries=3,
- upsert=self.config.upsert_enabled
- )
- thread_pool = ThreadPoolManager(max_workers=self.config.max_workers)
-
- start_time = datetime.now()
- logger.info(f"⏰ 开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
-
- # 处理文件
- results = thread_pool.process_with_data_loader(self.file_infos, data_loader)
-
- end_time = datetime.now()
- elapsed = end_time - start_time
-
- # 统计结果并更新记录
- success_results = []
- failed_results = []
-
- for file_info, result in results:
- if isinstance(result, tuple) and len(result) == 3:
- # 成功处理
- success_results.append((file_info, result))
- self.update_record_for_file(file_info, success=True)
-
- # 累加统计
- total_rows, inserted_rows, updated_rows = result
- self.stats['total_rows'] += total_rows
- self.stats['inserted_rows'] += inserted_rows
- self.stats['updated_rows'] += updated_rows
- self.stats['processed_files'] += 1
-
- else:
- # 处理失败
- failed_results.append((file_info, result))
- self.update_record_for_file(file_info, success=False)
- self.stats['failed_files'] += 1
-
- # 保存处理记录
- self.save_processed_records()
-
- # 检查最终行数和统计
- self.stats['table_final_count'] = self.db_manager.get_table_row_count(
- self.config.table_config.table_name
- )
- self.stats['end_time'] = datetime.now()
-
- # 打印最终统计
- self.print_summary(elapsed)
-
- # 打印失败文件
- if failed_results:
- self.print_failed_files(failed_results)
-
- except KeyboardInterrupt:
- logger.warning("\n⚠️ 用户中断程序执行")
- # 尝试保存已处理的记录
- self.save_processed_records()
-
- except Exception as e:
- logger.error(f"\n❌ 程序执行出错: {e}")
- import traceback
- logger.error(traceback.format_exc())
- # 尝试保存已处理的记录
- self.save_processed_records()
-
- finally:
- # 清理
- self.db_manager.close()
- logger.info("🏁 程序执行完成")
-
- def print_summary(self, elapsed):
- """打印处理摘要"""
- logger.info(f"\n{'='*60}")
- logger.info("🎉 数据处理完成!")
- logger.info(f"{'='*60}")
- logger.info(f"📁 总文件数: {self.stats['total_files']}")
- logger.info(f"⏭️ 跳过文件: {self.stats['skipped_files']} (已处理过)")
- logger.info(f"✅ 成功处理: {self.stats['processed_files']} 个文件")
- logger.info(f"❌ 失败文件: {self.stats['failed_files']} 个文件")
- logger.info(f"📊 总处理行数: {self.stats['total_rows']:,}")
- logger.info(f" 📥 新插入行数: {self.stats['inserted_rows']:,}")
- logger.info(f" 🔄 更新行数: {self.stats['updated_rows']:,}")
- logger.info(f"⏱️ 总耗时: {elapsed}")
-
- if self.stats['processed_files'] > 0:
- avg_time_per_file = elapsed.total_seconds() / self.stats['processed_files']
- logger.info(f"📈 平均每个文件处理时间: {avg_time_per_file:.2f} 秒")
-
- logger.info(f"📊 表初始行数: {self.stats['table_initial_count']:,}")
- logger.info(f"📊 表最终行数: {self.stats['table_final_count']:,}")
- logger.info(f"📈 实际新增行数: {self.stats['table_final_count'] - self.stats['table_initial_count']:,}")
-
- # 获取最新统计
- final_stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
- if final_stats:
- logger.info("\n📈 表最终统计:")
- for key, value in final_stats.items():
- logger.info(f" {key}: {value}")
-
- # 检查重复键
- final_duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
- if final_duplicates:
- logger.warning(f"\n⚠️ 最终重复的唯一键记录: {len(final_duplicates)} 组")
- for dup in final_duplicates[:10]: # 显示前10个重复
- logger.warning(f" 🔄 重复: {dup}")
- else:
- logger.info("\n✅ 无重复的唯一键记录")
-
- # 记录文件统计
- logger.info(f"\n💾 已记录处理文件总数: {self.record_manager.get_record_count()}")
-
- def print_failed_files(self, failed_results):
- """打印失败文件列表"""
- logger.warning(f"\n⚠️ 失败文件列表 ({len(failed_results)} 个):")
- for file_info, error in failed_results:
- logger.warning(f" 📄 {os.path.basename(file_info.file_path)}")
- if isinstance(error, Exception):
- logger.warning(f" 错误类型: {type(error).__name__}")
- logger.warning(f" 错误信息: {str(error)[:200]}" + ("..." if len(str(error)) > 200 else ""))
- else:
- logger.warning(f" 错误: {error}")
- def main():
- """主函数"""
- # 创建配置
- config = AppConfig(
- base_path=r"F:\BaiduNetdiskDownload\标准化数据\stander_parquet",
- max_workers=20,
- batch_size=10000,
- upsert_enabled=True, # 启用UPSERT
- db_config=DatabaseConfig(
- # host="192.168.50.234",
- # port=4000,
- host="106.120.102.238",
- port=44000,
- user="root",
- password="123456",
- database="wind_data"
- ),
- table_config=TableConfig(
- table_name="data_scada_turbine",
- # 三字段唯一键
- unique_keys=["id_farm", "id_turbine", "data_time"],
- time_column_aliases=[
- "data_time", "time", "timestamp", "datetime", "采集时间",
- "时间", "记录时间", "数据时间", "Time", "Timestamp", "DATA_TIME"
- ]
- )
- )
-
- # 检查路径是否存在
- if not os.path.exists(config.base_path):
- logger.error(f"❌ 错误: 路径不存在: {config.base_path}")
- logger.error("💡 请修改config.py中的base_path为正确的路径")
- sys.exit(1)
-
- # 运行处理器
- processor = ParquetProcessor(config)
- processor.run()
- if __name__ == "__main__":
- main()
|