main.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. # 主程序
  2. import os
  3. import sys
  4. import time
  5. import json
  6. from datetime import datetime
  7. from typing import List, Set, Dict, Any
  8. import logging
  9. # 添加项目根目录到Python路径
  10. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  11. from config import AppConfig, DatabaseConfig, TableConfig
  12. from file_scanner import FileScanner, ParquetFileInfo
  13. from schema_reader import SchemaReader
  14. from database import DatabaseManager
  15. from data_loader import DataLoader
  16. from thread_pool import ThreadPoolManager
  17. # 配置日志
  18. logging.basicConfig(
  19. level=logging.INFO,
  20. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  21. handlers=[
  22. logging.FileHandler('parquet_processor_three_key.log', encoding='utf-8'),
  23. logging.StreamHandler()
  24. ]
  25. )
  26. logger = logging.getLogger(__name__)
  27. class ParquetProcessor:
  28. """Parquet文件处理器主类"""
  29. def __init__(self, config: AppConfig):
  30. self.config = config
  31. self.file_scanner = FileScanner(config.base_path)
  32. self.file_infos: List[ParquetFileInfo] = []
  33. self.db_manager = DatabaseManager(config.db_config, config.table_config)
  34. # 初始化线程池管理器(带记录管理功能)
  35. self.thread_pool = ThreadPoolManager(
  36. max_workers=self.config.max_workers,
  37. record_file="record_processed.json"
  38. )
  39. def run(self):
  40. """运行整个处理流程"""
  41. logger.info("=" * 60)
  42. logger.info("Parquet文件处理系统 (SQLAlchemy + 三字段唯一键UPSERT)")
  43. logger.info("=" * 60)
  44. try:
  45. # 步骤1: 扫描文件
  46. logger.info("\n步骤1: 扫描Parquet文件...")
  47. all_file_infos = self.file_scanner.scan_files()
  48. if not all_file_infos:
  49. logger.warning("⚠️ 未找到任何parquet文件,程序退出")
  50. return
  51. logger.info(f"📁 扫描完成!共找到 {len(all_file_infos)} 个parquet文件")
  52. logger.info(f"📊 机型型号种类: {len(set(f.model_type for f in all_file_infos))}")
  53. logger.info(f"🌾 风场数量: {len(set(f.farm_id for f in all_file_infos))}")
  54. # 检查有多少文件未处理
  55. unprocessed_count = self.thread_pool.get_unprocessed_count(all_file_infos)
  56. logger.info(f"📝 未处理文件数量: {unprocessed_count}")
  57. logger.info(f"✅ 已处理文件数量: {self.thread_pool.get_record_count()}")
  58. if unprocessed_count == 0:
  59. logger.info("🎉 所有文件都已处理过,无需处理新文件")
  60. return
  61. # 步骤2: 读取表头并识别时间字段
  62. logger.info("\n步骤2: 读取表头信息并识别时间字段...")
  63. # 使用所有文件读取表头(包括已处理的和未处理的)
  64. # 这样即使之前已处理过部分文件,也能正确读取表结构
  65. schema_reader = SchemaReader(all_file_infos, self.config.table_config.time_column_aliases)
  66. all_columns = schema_reader.read_all_headers()
  67. sql_columns = schema_reader.get_sql_columns()
  68. unique_keys = schema_reader.get_unique_key_columns(self.config.table_config.unique_keys)
  69. logger.info(f"🔑 确定三字段唯一键: {unique_keys}")
  70. if not schema_reader.identified_time_column:
  71. logger.error("❌ 未识别到时间字段,无法创建三字段唯一键!")
  72. logger.error("💡 请检查数据中是否包含时间字段,或配置正确的时间字段别名")
  73. return
  74. # 显示部分字段信息
  75. if all_columns:
  76. logger.info(f"📋 字段数量: {len(all_columns)}")
  77. logger.info("📊 前20个字段:")
  78. for i, col in enumerate(sorted(list(all_columns))[:20]):
  79. logger.info(f" {i+1:2d}. {col}")
  80. # 步骤3: 创建数据库表(带三字段唯一键)
  81. logger.info("\n步骤3: 创建数据库表(带三字段唯一键)...")
  82. if not self.db_manager.check_table_exists(self.config.table_config.table_name):
  83. success = self.db_manager.create_table_with_unique_key(
  84. self.config.table_config.table_name,
  85. sql_columns,
  86. unique_keys
  87. )
  88. if not success:
  89. logger.error("❌ 创建表失败,程序退出")
  90. return
  91. else:
  92. logger.info(f"✅ 表 {self.config.table_config.table_name} 已存在")
  93. # 检查表结构
  94. stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
  95. if stats:
  96. logger.info(f"📊 当前表统计: {stats}")
  97. # 检查重复键
  98. duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
  99. if duplicates:
  100. logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组")
  101. for dup in duplicates[:5]: # 显示前5个重复
  102. logger.warning(f" 🔄 重复: {dup}")
  103. # 检查表初始行数
  104. initial_count = self.db_manager.get_table_row_count(self.config.table_config.table_name)
  105. logger.info(f"📊 表初始行数: {initial_count:,}")
  106. # 步骤4: 使用线程池加载数据
  107. logger.info(f"\n步骤4: 使用线程池加载数据({self.config.max_workers}个线程,批量大小: {self.config.batch_size},模式: {'UPSERT' if self.config.upsert_enabled else 'INSERT'})...")
  108. data_loader = DataLoader(
  109. self.db_manager,
  110. self.config.table_config.table_name,
  111. self.config.batch_size,
  112. max_retries=3,
  113. upsert=self.config.upsert_enabled
  114. )
  115. start_time = datetime.now()
  116. logger.info(f"⏰ 开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
  117. # 处理文件(线程池会自动过滤已处理的文件并记录成功处理的文件)
  118. results = self.thread_pool.process_with_data_loader(all_file_infos, data_loader)
  119. end_time = datetime.now()
  120. elapsed = end_time - start_time
  121. # 统计结果
  122. success_results = []
  123. failed_results = []
  124. total_rows_processed = 0
  125. total_rows_inserted = 0
  126. total_rows_updated = 0
  127. for file_info, result in results:
  128. if isinstance(result, tuple) and len(result) == 3:
  129. # 成功处理
  130. success_results.append((file_info, result))
  131. # 累加统计
  132. total_rows, inserted_rows, updated_rows = result
  133. total_rows_processed += total_rows
  134. total_rows_inserted += inserted_rows
  135. total_rows_updated += updated_rows
  136. else:
  137. # 处理失败
  138. failed_results.append((file_info, result))
  139. # 打印最终统计
  140. logger.info(f"\n{'='*60}")
  141. logger.info("🎉 数据处理完成!")
  142. logger.info(f"{'='*60}")
  143. logger.info(f"📁 总扫描文件数: {len(all_file_infos)}")
  144. logger.info(f"⏭️ 跳过文件: {len(all_file_infos) - len(results)} (已处理过)")
  145. logger.info(f"✅ 本次成功处理: {len(success_results)} 个文件")
  146. logger.info(f"❌ 本次失败文件: {len(failed_results)} 个文件")
  147. logger.info(f"📊 总处理行数: {total_rows_processed:,}")
  148. logger.info(f" 📥 新插入行数: {total_rows_inserted:,}")
  149. logger.info(f" 🔄 更新行数: {total_rows_updated:,}")
  150. logger.info(f"⏱️ 总耗时: {elapsed}")
  151. if len(success_results) > 0:
  152. avg_time_per_file = elapsed.total_seconds() / len(success_results)
  153. logger.info(f"📈 平均每个文件处理时间: {avg_time_per_file:.2f} 秒")
  154. # 检查最终行数和统计
  155. final_count = self.db_manager.get_table_row_count(self.config.table_config.table_name)
  156. logger.info(f"📊 表初始行数: {initial_count:,}")
  157. logger.info(f"📊 表最终行数: {final_count:,}")
  158. logger.info(f"📈 实际新增行数: {final_count - initial_count:,}")
  159. # 获取最新统计
  160. final_stats = self.db_manager.get_table_stats(self.config.table_config.table_name)
  161. if final_stats:
  162. logger.info("\n📈 表最终统计:")
  163. for key, value in final_stats.items():
  164. logger.info(f" {key}: {value}")
  165. # 检查重复键
  166. final_duplicates = self.db_manager.check_duplicate_keys(self.config.table_config.table_name)
  167. if final_duplicates:
  168. logger.warning(f"\n⚠️ 最终重复的唯一键记录: {len(final_duplicates)} 组")
  169. for dup in final_duplicates[:10]: # 显示前10个重复
  170. logger.warning(f" 🔄 重复: {dup}")
  171. else:
  172. logger.info("\n✅ 无重复的唯一键记录")
  173. # 打印失败文件
  174. if failed_results:
  175. logger.warning(f"\n⚠️ 失败文件列表 ({len(failed_results)} 个):")
  176. for file_info, error in failed_results:
  177. logger.warning(f" 📄 {os.path.basename(file_info.file_path)}")
  178. if isinstance(error, Exception):
  179. logger.warning(f" 错误类型: {type(error).__name__}")
  180. logger.warning(f" 错误信息: {str(error)[:200]}" + ("..." if len(str(error)) > 200 else ""))
  181. else:
  182. logger.warning(f" 错误: {error}")
  183. # 记录文件总数
  184. logger.info(f"\n💾 总记录处理文件数: {self.thread_pool.get_record_count()}")
  185. except KeyboardInterrupt:
  186. logger.warning("\n⚠️ 用户中断程序执行")
  187. except Exception as e:
  188. logger.error(f"\n❌ 程序执行出错: {e}")
  189. import traceback
  190. logger.error(traceback.format_exc())
  191. finally:
  192. # 清理
  193. self.db_manager.close()
  194. logger.info("🏁 程序执行完成")
  195. def main():
  196. """主函数"""
  197. # 创建配置
  198. config = AppConfig(
  199. # base_path=r"F:\BaiduNetdiskDownload\标准化数据\stander_parquet",
  200. base_path=f"./data/标准化数据/stander_parquet",
  201. max_workers=10,
  202. batch_size=10000,
  203. upsert_enabled=True, # 启用UPSERT
  204. db_config=DatabaseConfig(
  205. host="192.168.50.234",
  206. port=4000,
  207. user="root",
  208. password="123456",
  209. database="wind_data"
  210. ),
  211. table_config=TableConfig(
  212. table_name="data_scada_turbine",
  213. # 三字段唯一键
  214. unique_keys=["id_farm", "id_turbine", "data_time"],
  215. time_column_aliases=[
  216. "data_time", "time", "timestamp", "datetime", "采集时间",
  217. "时间", "记录时间", "数据时间", "Time", "Timestamp", "DATA_TIME"
  218. ]
  219. )
  220. )
  221. # 检查路径是否存在
  222. if not os.path.exists(config.base_path):
  223. logger.error(f"❌ 错误: 路径不存在: {config.base_path}")
  224. logger.error("💡 请修改config.py中的base_path为正确的路径")
  225. sys.exit(1)
  226. # 运行处理器
  227. processor = ParquetProcessor(config)
  228. processor.run()
  229. if __name__ == "__main__":
  230. main()