main.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. # 主程序
  2. import os
  3. import sys
  4. import time
  5. import json
  6. from datetime import datetime
  7. from typing import List, Set, Dict, Any
  8. import logging
  9. # 添加项目根目录到Python路径
  10. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  11. from config import AppConfig, DatabaseConfig, TableConfig
  12. from file_scanner import FileScanner, ParquetFileInfo
  13. from schema_reader import SchemaReader
  14. from database import DatabaseManager
  15. from data_loader import DataLoader
  16. from thread_pool import ThreadPoolManager
  17. # 配置日志
  18. logging.basicConfig(
  19. level=logging.INFO,
  20. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  21. handlers=[
  22. logging.FileHandler('parquet_processor_three_key.log', encoding='utf-8'),
  23. logging.StreamHandler()
  24. ]
  25. )
  26. logger = logging.getLogger(__name__)
  27. class ProcessedRecordManager:
  28. """已处理文件记录管理器"""
  29. def __init__(self, record_file: str = "record_processed.json"):
  30. """
  31. 初始化记录管理器
  32. Args:
  33. record_file: 记录文件路径
  34. """
  35. self.record_file = record_file
  36. self.processed_files: Set[str] = set()
  37. self._load_records()
  38. def _load_records(self):
  39. """加载已处理文件记录"""
  40. try:
  41. if os.path.exists(self.record_file):
  42. with open(self.record_file, 'r', encoding='utf-8') as f:
  43. records = json.load(f)
  44. if isinstance(records, list):
  45. self.processed_files = set(records)
  46. logger.info(f"📁 已加载 {len(self.processed_files)} 个已处理文件记录")
  47. else:
  48. logger.info(f"📁 记录文件不存在,将创建新文件: {self.record_file}")
  49. except Exception as e:
  50. logger.warning(f"❌ 加载记录文件失败: {e}")
  51. self.processed_files = set()
  52. def is_processed(self, file_path: str) -> bool:
  53. """检查文件是否已处理过"""
  54. return file_path in self.processed_files
  55. def add_record(self, file_path: str):
  56. """添加处理记录"""
  57. if file_path not in self.processed_files:
  58. self.processed_files.add(file_path)
  59. def save_records(self):
  60. """保存记录到文件"""
  61. try:
  62. # 转换为列表并排序,便于阅读
  63. records_list = sorted(list(self.processed_files))
  64. with open(self.record_file, 'w', encoding='utf-8') as f:
  65. json.dump(records_list, f, ensure_ascii=False, indent=2)
  66. logger.info(f"💾 已保存 {len(records_list)} 个处理记录到 {self.record_file}")
  67. return True
  68. except Exception as e:
  69. logger.error(f"❌ 保存记录文件失败: {e}")
  70. return False
  71. def get_record_count(self) -> int:
  72. """获取记录数量"""
  73. return len(self.processed_files)
  74. def clear_records(self):
  75. """清空记录"""
  76. self.processed_files.clear()
  77. try:
  78. if os.path.exists(self.record_file):
  79. os.remove(self.record_file)
  80. logger.info(f"🗑️ 已删除记录文件: {self.record_file}")
  81. except Exception as e:
  82. logger.warning(f"删除记录文件失败: {e}")
  83. class ParquetProcessor:
  84. """Parquet文件处理器主类"""
  85. def __init__(self, config: AppConfig):
  86. self.config = config
  87. self.file_scanner = FileScanner(config.base_path)
  88. self.file_infos: List[ParquetFileInfo] = []
  89. self.db_manager = DatabaseManager(config.db_config, config.table_config)
  90. # 初始化记录管理器
  91. self.record_manager = ProcessedRecordManager()
  92. # 统计信息
  93. self.stats = {
  94. 'start_time': None,
  95. 'end_time': None,
  96. 'total_files': 0,
  97. 'processed_files': 0,
  98. 'skipped_files': 0,
  99. 'failed_files': 0,
  100. 'total_rows': 0,
  101. 'inserted_rows': 0,
  102. 'updated_rows': 0,
  103. 'table_initial_count': 0,
  104. 'table_final_count': 0
  105. }
  106. def filter_unprocessed_files(self, file_infos: List[ParquetFileInfo]) -> List[ParquetFileInfo]:
  107. """
  108. 过滤掉已处理过的文件
  109. Args:
  110. file_infos: 所有扫描到的文件信息
  111. Returns:
  112. List[ParquetFileInfo]: 未处理过的文件信息
  113. """
  114. unprocessed_files = []
  115. skipped_count = 0
  116. for file_info in file_infos:
  117. if self.record_manager.is_processed(file_info.file_path):
  118. skipped_count += 1
  119. logger.debug(f"⏭️ 跳过已处理文件: {os.path.basename(file_info.file_path)}")
  120. else:
  121. unprocessed_files.append(file_info)
  122. if skipped_count > 0:
  123. logger.info(f"⏭️ 跳过 {skipped_count} 个已处理过的文件")
  124. logger.info(f"📝 需要处理 {len(unprocessed_files)} 个新文件")
  125. return unprocessed_files
  126. def update_record_for_file(self, file_info: ParquetFileInfo, success: bool = True):
  127. """
  128. 更新文件处理记录
  129. Args:
  130. file_info: 文件信息
  131. success: 是否处理成功
  132. """
  133. if success:
  134. self.record_manager.add_record(file_info.file_path)
  135. logger.info(f"✅ 已记录成功处理文件: {os.path.basename(file_info.file_path)}")
  136. else:
  137. logger.warning(f"❌ 文件处理失败,不记录: {os.path.basename(file_info.file_path)}")
  138. def save_processed_records(self):
  139. """保存处理记录到文件"""
  140. try:
  141. if self.record_manager.save_records():
  142. logger.info(f"💾 处理记录已保存,共 {self.record_manager.get_record_count()} 个文件")
  143. else:
  144. logger.warning("⚠️ 处理记录保存失败")
  145. except Exception as e:
  146. logger.error(f"❌ 保存处理记录时出错: {e}")
  147. def run(self):
  148. """运行整个处理流程"""
  149. logger.info("=" * 60)
  150. logger.info("Parquet文件处理系统 (SQLAlchemy + 三字段唯一键UPSERT)")
  151. logger.info("=" * 60)
  152. try:
  153. # 记录开始时间
  154. self.stats['start_time'] = datetime.now()
  155. # 步骤1: 扫描文件
  156. logger.info("\n步骤1: 扫描Parquet文件...")
  157. all_file_infos = self.file_scanner.scan_files()
  158. # 打印扫描结果摘要
  159. if all_file_infos:
  160. logger.info(f"📁 扫描完成!共找到 {len(all_file_infos)} 个parquet文件")
  161. logger.info(f"📊 机型型号种类: {len(set(f.model_type for f in all_file_infos))}")
  162. logger.info(f"🌾 风场数量: {len(set(f.farm_id for f in all_file_infos))}")
  163. else:
  164. logger.warning("⚠️ 未找到任何parquet文件,程序退出")
  165. return
  166. # 过滤已处理过的文件
  167. self.file_infos = self.filter_unprocessed_files(all_file_infos)
  168. self.stats['skipped_files'] = len(all_file_infos) - len(self.file_infos)
  169. self.stats['total_files'] = len(all_file_infos)
  170. if not self.file_infos:
  171. logger.info("🎉 所有文件都已处理过,无需处理新文件")
  172. logger.info(f"📊 已处理文件总数: {self.record_manager.get_record_count()}")
  173. return
  174. # 步骤2: 读取表头并识别时间字段
  175. logger.info("\n步骤2: 读取表头信息并识别时间字段...")
  176. schema_reader = SchemaReader(self.file_infos, self.config.table_config.time_column_aliases)
  177. all_columns = schema_reader.read_all_headers()
  178. sql_columns = schema_reader.get_sql_columns()
  179. # 确定唯一键
  180. unique_keys = schema_reader.get_unique_key_columns(self.config.table_config.unique_keys)
  181. logger.info(f"🔑 确定三字段唯一键: {unique_keys}")
  182. if not schema_reader.identified_time_column:
  183. logger.error("❌ 未识别到时间字段,无法创建三字段唯一键!")
  184. logger.error("💡 请检查数据中是否包含时间字段,或配置正确的时间字段别名")
  185. return
  186. # 显示部分字段信息
  187. if all_columns:
  188. logger.info(f"📋 字段数量: {len(all_columns)}")
  189. logger.info("📊 前20个字段:")
  190. for i, col in enumerate(sorted(list(all_columns))[:20]):
  191. logger.info(f" {i+1:2d}. {col}")
  192. # 步骤3: 创建数据库表(带三字段唯一键)
  193. logger.info("\n步骤3: 创建数据库表(带三字段唯一键)...")
  194. if not self.db_manager.check_table_exists(self.config.table_config.table_name):
  195. success = self.db_manager.create_table_with_unique_key(
  196. self.config.table_config.table_name,
  197. sql_columns,
  198. unique_keys
  199. )
  200. if not success:
  201. logger.error("❌ 创建表失败,程序退出")
  202. return
  203. else:
  204. logger.info(f"✅ 表 {self.config.table_config.table_name} 已存在")
  205. # 检查表结构
  206. stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
  207. if stats:
  208. logger.info(f"📊 当前表统计: {stats}")
  209. # 检查重复键
  210. duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
  211. if duplicates:
  212. logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组")
  213. for dup in duplicates[:5]: # 显示前5个重复
  214. logger.warning(f" 🔄 重复: {dup}")
  215. # 检查表初始行数
  216. self.stats['table_initial_count'] = self.db_manager.get_table_row_count(
  217. self.config.table_config.table_name
  218. )
  219. logger.info(f"📊 表初始行数: {self.stats['table_initial_count']:,}")
  220. # 步骤4: 使用线程池加载数据
  221. logger.info(f"\n步骤4: 使用线程池加载数据({self.config.max_workers}个线程,批量大小: {self.config.batch_size},模式: {'UPSERT' if self.config.upsert_enabled else 'INSERT'})...")
  222. data_loader = DataLoader(
  223. self.db_manager,
  224. self.config.table_config.table_name,
  225. self.config.batch_size,
  226. max_retries=3,
  227. upsert=self.config.upsert_enabled
  228. )
  229. thread_pool = ThreadPoolManager(max_workers=self.config.max_workers)
  230. start_time = datetime.now()
  231. logger.info(f"⏰ 开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
  232. # 处理文件
  233. results = thread_pool.process_with_data_loader(self.file_infos, data_loader)
  234. end_time = datetime.now()
  235. elapsed = end_time - start_time
  236. # 统计结果并更新记录
  237. success_results = []
  238. failed_results = []
  239. for file_info, result in results:
  240. if isinstance(result, tuple) and len(result) == 3:
  241. # 成功处理
  242. success_results.append((file_info, result))
  243. self.update_record_for_file(file_info, success=True)
  244. # 累加统计
  245. total_rows, inserted_rows, updated_rows = result
  246. self.stats['total_rows'] += total_rows
  247. self.stats['inserted_rows'] += inserted_rows
  248. self.stats['updated_rows'] += updated_rows
  249. self.stats['processed_files'] += 1
  250. else:
  251. # 处理失败
  252. failed_results.append((file_info, result))
  253. self.update_record_for_file(file_info, success=False)
  254. self.stats['failed_files'] += 1
  255. # 保存处理记录
  256. self.save_processed_records()
  257. # 检查最终行数和统计
  258. self.stats['table_final_count'] = self.db_manager.get_table_row_count(
  259. self.config.table_config.table_name
  260. )
  261. self.stats['end_time'] = datetime.now()
  262. # 打印最终统计
  263. self.print_summary(elapsed)
  264. # 打印失败文件
  265. if failed_results:
  266. self.print_failed_files(failed_results)
  267. except KeyboardInterrupt:
  268. logger.warning("\n⚠️ 用户中断程序执行")
  269. # 尝试保存已处理的记录
  270. self.save_processed_records()
  271. except Exception as e:
  272. logger.error(f"\n❌ 程序执行出错: {e}")
  273. import traceback
  274. logger.error(traceback.format_exc())
  275. # 尝试保存已处理的记录
  276. self.save_processed_records()
  277. finally:
  278. # 清理
  279. self.db_manager.close()
  280. logger.info("🏁 程序执行完成")
  281. def print_summary(self, elapsed):
  282. """打印处理摘要"""
  283. logger.info(f"\n{'='*60}")
  284. logger.info("🎉 数据处理完成!")
  285. logger.info(f"{'='*60}")
  286. logger.info(f"📁 总文件数: {self.stats['total_files']}")
  287. logger.info(f"⏭️ 跳过文件: {self.stats['skipped_files']} (已处理过)")
  288. logger.info(f"✅ 成功处理: {self.stats['processed_files']} 个文件")
  289. logger.info(f"❌ 失败文件: {self.stats['failed_files']} 个文件")
  290. logger.info(f"📊 总处理行数: {self.stats['total_rows']:,}")
  291. logger.info(f" 📥 新插入行数: {self.stats['inserted_rows']:,}")
  292. logger.info(f" 🔄 更新行数: {self.stats['updated_rows']:,}")
  293. logger.info(f"⏱️ 总耗时: {elapsed}")
  294. if self.stats['processed_files'] > 0:
  295. avg_time_per_file = elapsed.total_seconds() / self.stats['processed_files']
  296. logger.info(f"📈 平均每个文件处理时间: {avg_time_per_file:.2f} 秒")
  297. logger.info(f"📊 表初始行数: {self.stats['table_initial_count']:,}")
  298. logger.info(f"📊 表最终行数: {self.stats['table_final_count']:,}")
  299. logger.info(f"📈 实际新增行数: {self.stats['table_final_count'] - self.stats['table_initial_count']:,}")
  300. # 获取最新统计
  301. final_stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
  302. if final_stats:
  303. logger.info("\n📈 表最终统计:")
  304. for key, value in final_stats.items():
  305. logger.info(f" {key}: {value}")
  306. # 检查重复键
  307. final_duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
  308. if final_duplicates:
  309. logger.warning(f"\n⚠️ 最终重复的唯一键记录: {len(final_duplicates)} 组")
  310. for dup in final_duplicates[:10]: # 显示前10个重复
  311. logger.warning(f" 🔄 重复: {dup}")
  312. else:
  313. logger.info("\n✅ 无重复的唯一键记录")
  314. # 记录文件统计
  315. logger.info(f"\n💾 已记录处理文件总数: {self.record_manager.get_record_count()}")
  316. def print_failed_files(self, failed_results):
  317. """打印失败文件列表"""
  318. logger.warning(f"\n⚠️ 失败文件列表 ({len(failed_results)} 个):")
  319. for file_info, error in failed_results:
  320. logger.warning(f" 📄 {os.path.basename(file_info.file_path)}")
  321. if isinstance(error, Exception):
  322. logger.warning(f" 错误类型: {type(error).__name__}")
  323. logger.warning(f" 错误信息: {str(error)[:200]}" + ("..." if len(str(error)) > 200 else ""))
  324. else:
  325. logger.warning(f" 错误: {error}")
  326. def main():
  327. """主函数"""
  328. # 创建配置
  329. config = AppConfig(
  330. base_path=r"F:\BaiduNetdiskDownload\标准化数据\stander_parquet",
  331. max_workers=20,
  332. batch_size=10000,
  333. upsert_enabled=True, # 启用UPSERT
  334. db_config=DatabaseConfig(
  335. # host="192.168.50.234",
  336. # port=4000,
  337. host="106.120.102.238",
  338. port=44000,
  339. user="root",
  340. password="123456",
  341. database="wind_data"
  342. ),
  343. table_config=TableConfig(
  344. table_name="data_scada_turbine",
  345. # 三字段唯一键
  346. unique_keys=["id_farm", "id_turbine", "data_time"],
  347. time_column_aliases=[
  348. "data_time", "time", "timestamp", "datetime", "采集时间",
  349. "时间", "记录时间", "数据时间", "Time", "Timestamp", "DATA_TIME"
  350. ]
  351. )
  352. )
  353. # 检查路径是否存在
  354. if not os.path.exists(config.base_path):
  355. logger.error(f"❌ 错误: 路径不存在: {config.base_path}")
  356. logger.error("💡 请修改config.py中的base_path为正确的路径")
  357. sys.exit(1)
  358. # 运行处理器
  359. processor = ParquetProcessor(config)
  360. processor.run()
  361. if __name__ == "__main__":
  362. main()