| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- # 主程序
- import os
- import sys
- import time
- import json
- from datetime import datetime
- from typing import List, Set, Dict, Any
- import logging
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
- from config import AppConfig, DatabaseConfig, TableConfig
- from file_scanner import FileScanner, ParquetFileInfo
- from schema_reader import SchemaReader
- from database import DatabaseManager
- from data_loader import DataLoader
- from thread_pool import ThreadPoolManager
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('parquet_processor_three_key.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class ParquetProcessor:
- """Parquet文件处理器主类"""
-
- def __init__(self, config: AppConfig):
- self.config = config
- self.file_scanner = FileScanner(config.base_path)
- self.file_infos: List[ParquetFileInfo] = []
- self.db_manager = DatabaseManager(config.db_config, config.table_config)
-
- # 初始化线程池管理器(带记录管理功能)
- self.thread_pool = ThreadPoolManager(
- max_workers=self.config.max_workers,
- record_file="record_processed.json"
- )
-
- def run(self):
- """运行整个处理流程"""
- logger.info("=" * 60)
- logger.info("Parquet文件处理系统 (SQLAlchemy + 三字段唯一键UPSERT)")
- logger.info("=" * 60)
-
- try:
- # 步骤1: 扫描文件
- logger.info("\n步骤1: 扫描Parquet文件...")
- all_file_infos = self.file_scanner.scan_files()
-
- if not all_file_infos:
- logger.warning("⚠️ 未找到任何parquet文件,程序退出")
- return
-
- logger.info(f"📁 扫描完成!共找到 {len(all_file_infos)} 个parquet文件")
- logger.info(f"📊 机型型号种类: {len(set(f.model_type for f in all_file_infos))}")
- logger.info(f"🌾 风场数量: {len(set(f.farm_id for f in all_file_infos))}")
-
- # 检查有多少文件未处理
- unprocessed_count = self.thread_pool.get_unprocessed_count(all_file_infos)
- logger.info(f"📝 未处理文件数量: {unprocessed_count}")
- logger.info(f"✅ 已处理文件数量: {self.thread_pool.get_record_count()}")
-
- if unprocessed_count == 0:
- logger.info("🎉 所有文件都已处理过,无需处理新文件")
- return
-
- # 步骤2: 读取表头并识别时间字段
- logger.info("\n步骤2: 读取表头信息并识别时间字段...")
-
- # 使用所有文件读取表头(包括已处理的和未处理的)
- # 这样即使之前已处理过部分文件,也能正确读取表结构
- schema_reader = SchemaReader(all_file_infos, self.config.table_config.time_column_aliases)
- all_columns = schema_reader.read_all_headers()
- sql_columns = schema_reader.get_sql_columns()
-
- unique_keys = schema_reader.get_unique_key_columns(self.config.table_config.unique_keys)
- logger.info(f"🔑 确定三字段唯一键: {unique_keys}")
-
- if not schema_reader.identified_time_column:
- logger.error("❌ 未识别到时间字段,无法创建三字段唯一键!")
- logger.error("💡 请检查数据中是否包含时间字段,或配置正确的时间字段别名")
- return
-
- # 显示部分字段信息
- if all_columns:
- logger.info(f"📋 字段数量: {len(all_columns)}")
- logger.info("📊 前20个字段:")
- for i, col in enumerate(sorted(list(all_columns))[:20]):
- logger.info(f" {i+1:2d}. {col}")
-
- # 步骤3: 创建数据库表(带三字段唯一键)
- logger.info("\n步骤3: 创建数据库表(带三字段唯一键)...")
- if not self.db_manager.check_table_exists(self.config.table_config.table_name):
- success = self.db_manager.create_table_with_unique_key(
- self.config.table_config.table_name,
- sql_columns,
- unique_keys
- )
- if not success:
- logger.error("❌ 创建表失败,程序退出")
- return
- else:
- logger.info(f"✅ 表 {self.config.table_config.table_name} 已存在")
- # 检查表结构
- stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
- if stats:
- logger.info(f"📊 当前表统计: {stats}")
-
- # 检查重复键
- duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
- if duplicates:
- logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组")
- for dup in duplicates[:5]: # 显示前5个重复
- logger.warning(f" 🔄 重复: {dup}")
-
- # 检查表初始行数
- initial_count = self.db_manager.get_table_row_count(self.config.table_config.table_name)
- logger.info(f"📊 表初始行数: {initial_count:,}")
-
- # 步骤4: 使用线程池加载数据
- logger.info(f"\n步骤4: 使用线程池加载数据({self.config.max_workers}个线程,批量大小: {self.config.batch_size},模式: {'UPSERT' if self.config.upsert_enabled else 'INSERT'})...")
- data_loader = DataLoader(
- self.db_manager,
- self.config.table_config.table_name,
- self.config.batch_size,
- max_retries=3,
- upsert=self.config.upsert_enabled
- )
-
- start_time = datetime.now()
- logger.info(f"⏰ 开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
-
- # 处理文件(线程池会自动过滤已处理的文件并记录成功处理的文件)
- results = self.thread_pool.process_with_data_loader(all_file_infos, data_loader)
-
- end_time = datetime.now()
- elapsed = end_time - start_time
-
- # 统计结果
- success_results = []
- failed_results = []
-
- total_rows_processed = 0
- total_rows_inserted = 0
- total_rows_updated = 0
-
- for file_info, result in results:
- if isinstance(result, tuple) and len(result) == 3:
- # 成功处理
- success_results.append((file_info, result))
-
- # 累加统计
- total_rows, inserted_rows, updated_rows = result
- total_rows_processed += total_rows
- total_rows_inserted += inserted_rows
- total_rows_updated += updated_rows
-
- else:
- # 处理失败
- failed_results.append((file_info, result))
-
- # 打印最终统计
- logger.info(f"\n{'='*60}")
- logger.info("🎉 数据处理完成!")
- logger.info(f"{'='*60}")
- logger.info(f"📁 总扫描文件数: {len(all_file_infos)}")
- logger.info(f"⏭️ 跳过文件: {len(all_file_infos) - len(results)} (已处理过)")
- logger.info(f"✅ 本次成功处理: {len(success_results)} 个文件")
- logger.info(f"❌ 本次失败文件: {len(failed_results)} 个文件")
- logger.info(f"📊 总处理行数: {total_rows_processed:,}")
- logger.info(f" 📥 新插入行数: {total_rows_inserted:,}")
- logger.info(f" 🔄 更新行数: {total_rows_updated:,}")
- logger.info(f"⏱️ 总耗时: {elapsed}")
-
- if len(success_results) > 0:
- avg_time_per_file = elapsed.total_seconds() / len(success_results)
- logger.info(f"📈 平均每个文件处理时间: {avg_time_per_file:.2f} 秒")
-
- # 检查最终行数和统计
- final_count = self.db_manager.get_table_row_count(self.config.table_config.table_name)
- logger.info(f"📊 表初始行数: {initial_count:,}")
- logger.info(f"📊 表最终行数: {final_count:,}")
- logger.info(f"📈 实际新增行数: {final_count - initial_count:,}")
-
- # 获取最新统计
- final_stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
- if final_stats:
- logger.info("\n📈 表最终统计:")
- for key, value in final_stats.items():
- logger.info(f" {key}: {value}")
-
- # 检查重复键
- final_duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
- if final_duplicates:
- logger.warning(f"\n⚠️ 最终重复的唯一键记录: {len(final_duplicates)} 组")
- for dup in final_duplicates[:10]: # 显示前10个重复
- logger.warning(f" 🔄 重复: {dup}")
- else:
- logger.info("\n✅ 无重复的唯一键记录")
-
- # 打印失败文件
- if failed_results:
- logger.warning(f"\n⚠️ 失败文件列表 ({len(failed_results)} 个):")
- for file_info, error in failed_results:
- logger.warning(f" 📄 {os.path.basename(file_info.file_path)}")
- if isinstance(error, Exception):
- logger.warning(f" 错误类型: {type(error).__name__}")
- logger.warning(f" 错误信息: {str(error)[:200]}" + ("..." if len(str(error)) > 200 else ""))
- else:
- logger.warning(f" 错误: {error}")
-
- # 记录文件总数
- logger.info(f"\n💾 总记录处理文件数: {self.thread_pool.get_record_count()}")
-
- except KeyboardInterrupt:
- logger.warning("\n⚠️ 用户中断程序执行")
- except Exception as e:
- logger.error(f"\n❌ 程序执行出错: {e}")
- import traceback
- logger.error(traceback.format_exc())
- finally:
- # 清理
- self.db_manager.close()
- logger.info("🏁 程序执行完成")
- def main():
- """主函数"""
- # 创建配置
- config = AppConfig(
- # base_path=r"F:\BaiduNetdiskDownload\标准化数据\stander_parquet",
- base_path=f"./data/stander_parquet",
- max_workers=10,
- batch_size=10000,
- upsert_enabled=True, # 启用UPSERT
- db_config=DatabaseConfig(
- host="192.168.50.234",
- port=24001,
- user="root",
- password="admin123456",
- database="wind_data"
- ),
- table_config=TableConfig(
- table_name="data_scada_turbine",
- # 三字段唯一键
- unique_keys=["id_farm", "id_turbine", "data_time"],
- time_column_aliases=[
- "data_time", "time", "timestamp", "datetime", "采集时间",
- "时间", "记录时间", "数据时间", "Time", "Timestamp", "DATA_TIME"
- ]
- )
- )
-
- # 检查路径是否存在
- if not os.path.exists(config.base_path):
- logger.error(f"❌ 错误: 路径不存在: {config.base_path}")
- logger.error("💡 请修改config.py中的base_path为正确的路径")
- sys.exit(1)
-
- # 运行处理器
- processor = ParquetProcessor(config)
- processor.run()
- if __name__ == "__main__":
- main()
|