database.py 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031
  1. import pandas as pd
  2. import numpy as np
  3. from typing import List, Dict, Any, Optional, Tuple
  4. import traceback
  5. import logging
  6. from datetime import datetime as dt
  7. import hashlib
  8. import pymysql
  9. from pymysql import Connection, cursors
  10. from dbutils.pooled_db import PooledDB
  11. import json
  12. import time
  13. import re
  14. import threading
  15. from threading import Thread, Event
  16. import signal
  17. import sys
  18. import os
  19. from config import DatabaseConfig, TableConfig
  20. from file_scanner import ParquetFileInfo
  21. # 配置日志
  22. logging.basicConfig(
  23. level=logging.INFO,
  24. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  25. datefmt='%Y-%m-%d %H:%M:%S'
  26. )
  27. logger = logging.getLogger(__name__)
  28. class DatabaseManager:
  29. """数据库管理器,使用连接池管理数据库连接,包含连通性测试和自动重连"""
  30. # 线程局部存储,用于保存每个线程的数据库连接
  31. _thread_local = threading.local()
  32. def __init__(self, config: DatabaseConfig, table_config: TableConfig, pool_size: int = 6):
  33. self.config = config
  34. self.table_config = table_config
  35. self.pool_size = pool_size
  36. # 连接状态监控相关
  37. self._connection_monitor_thread = None
  38. self._monitor_running = Event()
  39. self._last_connection_check = None
  40. self._connection_status = "UNKNOWN" # UNKNOWN, HEALTHY, WARNING, ERROR, DISCONNECTED
  41. self._connection_error_count = 0
  42. self._max_error_count = 5 # 连续错误次数阈值
  43. self._monitor_interval = 1 # 监控间隔(秒)
  44. self._monitor_stats = {
  45. 'total_checks': 0,
  46. 'successful_checks': 0,
  47. 'failed_checks': 0,
  48. 'total_reconnections': 0,
  49. 'last_error': None,
  50. 'last_success': None
  51. }
  52. # 连接池
  53. self.pool: Optional[PooledDB] = None
  54. # 初始化连接池
  55. self._init_pool()
  56. # 启动连接监控线程
  57. self._start_connection_monitor()
  58. # 注册信号处理,优雅关闭
  59. self._setup_signal_handlers()
  60. def _setup_signal_handlers(self):
  61. """设置信号处理,确保程序退出时能正确关闭资源"""
  62. def signal_handler(signum, frame):
  63. logger.info(f"接收到信号 {signum},正在关闭数据库连接池...")
  64. self.close()
  65. sys.exit(0)
  66. signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
  67. signal.signal(signal.SIGTERM, signal_handler) # 终止信号
  68. def _init_pool(self):
  69. """初始化数据库连接池"""
  70. try:
  71. logger.info(f"正在初始化数据库连接池到 {self.config.host}:{self.config.port}/{self.config.database}")
  72. self.pool = PooledDB(
  73. creator=pymysql,
  74. maxconnections=self.pool_size,
  75. mincached=2,
  76. maxcached=5,
  77. maxshared=3,
  78. blocking=True,
  79. maxusage=None,
  80. setsession=[],
  81. ping=1,
  82. host=self.config.host,
  83. port=self.config.port,
  84. user=self.config.user,
  85. password=self.config.password,
  86. database=self.config.database,
  87. charset=self.config.charset,
  88. cursorclass=cursors.DictCursor,
  89. autocommit=False,
  90. connect_timeout=30,
  91. read_timeout=600,
  92. write_timeout=600,
  93. client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS
  94. )
  95. logger.info(f"数据库连接池初始化成功")
  96. # 测试连接池
  97. test_result = self._test_pool_connection()
  98. if test_result:
  99. self._connection_status = "HEALTHY"
  100. self._monitor_stats['last_success'] = dt.now()
  101. except Exception as e:
  102. logger.error(f"数据库连接池初始化失败: {e}")
  103. self._connection_status = "ERROR"
  104. self._monitor_stats['last_error'] = str(e)
  105. raise
  106. def _test_pool_connection(self):
  107. """测试连接池连接"""
  108. try:
  109. conn = self.pool.connection()
  110. cursor = conn.cursor()
  111. start_time = time.time()
  112. cursor.execute("SELECT 1 as test, NOW() as server_time")
  113. result = cursor.fetchone()
  114. elapsed = time.time() - start_time
  115. cursor.close()
  116. conn.close()
  117. logger.info(f"连接池测试成功: 响应时间={elapsed:.3f}s, 服务器时间={result['server_time']}")
  118. return True
  119. except Exception as e:
  120. logger.error(f"连接池测试失败: {e}")
  121. return False
  122. def _start_connection_monitor(self):
  123. """启动连接监控线程"""
  124. if self._connection_monitor_thread is None or not self._connection_monitor_thread.is_alive():
  125. self._monitor_running.clear()
  126. self._connection_monitor_thread = Thread(
  127. target=self._connection_monitor_loop,
  128. name="DBConnectionMonitor",
  129. daemon=True
  130. )
  131. self._connection_monitor_thread.start()
  132. logger.info("数据库连接监控线程已启动")
  133. def _stop_connection_monitor(self):
  134. """停止连接监控线程"""
  135. if self._connection_monitor_thread and self._connection_monitor_thread.is_alive():
  136. self._monitor_running.set()
  137. self._connection_monitor_thread.join(timeout=5)
  138. logger.info("数据库连接监控线程已停止")
  139. def _connection_monitor_loop(self):
  140. """连接监控循环"""
  141. logger.info(f"连接监控线程开始运行,检查间隔: {self._monitor_interval}秒")
  142. last_log_time = time.time()
  143. log_interval = 60
  144. while not self._monitor_running.is_set():
  145. try:
  146. self._perform_connection_check()
  147. current_time = time.time()
  148. if current_time - last_log_time >= log_interval:
  149. self._log_connection_status()
  150. last_log_time = current_time
  151. time.sleep(self._monitor_interval)
  152. except Exception as e:
  153. logger.error(f"连接监控循环异常: {e}")
  154. time.sleep(self._monitor_interval)
  155. logger.info("连接监控循环结束")
  156. def _perform_connection_check(self):
  157. """执行连接检查"""
  158. try:
  159. self._monitor_stats['total_checks'] += 1
  160. self._last_connection_check = dt.now()
  161. if self.pool is None:
  162. self._connection_status = "ERROR"
  163. self._connection_error_count += 1
  164. self._monitor_stats['failed_checks'] += 1
  165. self._monitor_stats['last_error'] = "连接池未初始化"
  166. logger.warning("连接池未初始化")
  167. return
  168. conn = None
  169. cursor = None
  170. try:
  171. conn = self.pool.connection()
  172. cursor = conn.cursor()
  173. start_time = time.time()
  174. cursor.execute("SELECT 1 as test, NOW() as server_time, "
  175. "VERSION() as version, CONNECTION_ID() as connection_id")
  176. result = cursor.fetchone()
  177. elapsed = time.time() - start_time
  178. self._connection_status = "HEALTHY"
  179. self._connection_error_count = 0
  180. self._monitor_stats['successful_checks'] += 1
  181. self._monitor_stats['last_success'] = dt.now()
  182. if logger.isEnabledFor(logging.DEBUG):
  183. logger.debug(f"连接检查成功: "
  184. f"响应时间={elapsed:.3f}s, "
  185. f"服务器时间={result['server_time']}, "
  186. f"MySQL版本={result['version']}, "
  187. f"连接ID={result['connection_id']}")
  188. except Exception as e:
  189. self._connection_status = "ERROR"
  190. self._connection_error_count += 1
  191. self._monitor_stats['failed_checks'] += 1
  192. self._monitor_stats['last_error'] = str(e)
  193. if self._connection_error_count >= self._max_error_count:
  194. self._connection_status = "DISCONNECTED"
  195. logger.error(f"数据库连接失败,已连续失败 {self._connection_error_count} 次: {e}")
  196. else:
  197. logger.warning(f"数据库连接检查失败 (第{self._connection_error_count}次): {e}")
  198. if self._connection_error_count >= 3:
  199. self._auto_reconnect()
  200. finally:
  201. if cursor:
  202. cursor.close()
  203. if conn:
  204. conn.close()
  205. except Exception as e:
  206. logger.error(f"执行连接检查时发生异常: {e}")
  207. self._monitor_stats['failed_checks'] += 1
  208. self._monitor_stats['last_error'] = str(e)
  209. def _auto_reconnect(self):
  210. """自动重连机制"""
  211. try:
  212. logger.warning(f"检测到连接问题,正在尝试自动重连 (错误计数: {self._connection_error_count})")
  213. self.release_connection()
  214. time.sleep(2)
  215. if self.pool:
  216. test_result = self._test_pool_connection()
  217. if test_result:
  218. self._connection_status = "HEALTHY"
  219. self._connection_error_count = 0
  220. self._monitor_stats['total_reconnections'] += 1
  221. logger.info("自动重连成功")
  222. return True
  223. else:
  224. logger.warning("连接池测试失败,尝试重新初始化")
  225. old_pool = self.pool
  226. try:
  227. self.pool = None
  228. if old_pool:
  229. old_pool.close()
  230. self._init_pool()
  231. self._connection_status = "HEALTHY"
  232. self._connection_error_count = 0
  233. self._monitor_stats['total_reconnections'] += 1
  234. logger.info("连接池重新初始化成功")
  235. return True
  236. except Exception as e:
  237. logger.error(f"连接池重新初始化失败: {e}")
  238. self._connection_status = "ERROR"
  239. return False
  240. except Exception as e:
  241. logger.error(f"自动重连失败: {e}")
  242. return False
  243. def _log_connection_status(self):
  244. """记录连接状态"""
  245. status_map = {
  246. "HEALTHY": "✅ 健康",
  247. "WARNING": "⚠️ 警告",
  248. "ERROR": "❌ 错误",
  249. "DISCONNECTED": "🔌 断开连接",
  250. "UNKNOWN": "❓ 未知"
  251. }
  252. status_text = status_map.get(self._connection_status, self._connection_status)
  253. stats_info = (
  254. f"连接状态统计:\n"
  255. f" 当前状态: {status_text}\n"
  256. f" 总检查次数: {self._monitor_stats['total_checks']}\n"
  257. f" 成功次数: {self._monitor_stats['successful_checks']}\n"
  258. f" 失败次数: {self._monitor_stats['failed_checks']}\n"
  259. f" 连续错误次数: {self._connection_error_count}\n"
  260. f" 总重连次数: {self._monitor_stats['total_reconnections']}\n"
  261. f" 最后成功: {self._monitor_stats['last_success']}\n"
  262. f" 最后错误: {self._monitor_stats['last_error']}"
  263. )
  264. if self._connection_status in ["HEALTHY", "WARNING"]:
  265. logger.info(stats_info)
  266. else:
  267. logger.warning(stats_info)
  268. def get_connection_status(self) -> Dict[str, Any]:
  269. """获取当前连接状态"""
  270. return {
  271. 'status': self._connection_status,
  272. 'error_count': self._connection_error_count,
  273. 'last_check': self._last_connection_check,
  274. 'monitor_stats': self._monitor_stats.copy(),
  275. 'config': {
  276. 'host': self.config.host,
  277. 'port': self.config.port,
  278. 'database': self.config.database,
  279. 'pool_size': self.pool_size
  280. }
  281. }
  282. def wait_for_connection(self, timeout: int = 30, check_interval: int = 1) -> bool:
  283. """
  284. 等待数据库连接恢复正常
  285. Args:
  286. timeout: 总等待时间(秒)
  287. check_interval: 检查间隔(秒)
  288. Returns:
  289. bool: 是否成功连接
  290. """
  291. logger.info(f"等待数据库连接恢复,超时时间: {timeout}秒")
  292. start_time = time.time()
  293. attempts = 0
  294. while time.time() - start_time < timeout:
  295. attempts += 1
  296. if self._connection_status == "HEALTHY":
  297. logger.info(f"数据库连接已恢复,等待时间: {time.time() - start_time:.1f}秒")
  298. return True
  299. if self._connection_status == "DISCONNECTED":
  300. logger.info(f"尝试重连 (第{attempts}次)")
  301. if self._auto_reconnect():
  302. return True
  303. logger.info(f"等待连接恢复... ({attempts}/{int(timeout/check_interval)})")
  304. time.sleep(check_interval)
  305. logger.error(f"等待数据库连接恢复超时 ({timeout}秒)")
  306. return False
  307. def get_connection(self) -> Connection:
  308. """从连接池获取数据库连接"""
  309. try:
  310. if self._connection_status in ["ERROR", "DISCONNECTED"]:
  311. logger.warning(f"获取连接时检测到连接状态为 {self._connection_status},尝试自动重连")
  312. if not self.wait_for_connection(timeout=10):
  313. raise ConnectionError(f"数据库连接不可用,当前状态: {self._connection_status}")
  314. if not hasattr(self._thread_local, 'connection') or self._thread_local.connection is None:
  315. conn = self.pool.connection()
  316. self._thread_local.connection = conn
  317. logger.debug(f"从连接池获取新连接,当前线程: {threading.current_thread().name}")
  318. return self._thread_local.connection
  319. except Exception as e:
  320. logger.error(f"从连接池获取连接失败: {e}")
  321. self._connection_status = "ERROR"
  322. self._connection_error_count += 1
  323. self._monitor_stats['last_error'] = str(e)
  324. self._auto_reconnect()
  325. raise
  326. def release_connection(self):
  327. """释放当前线程的数据库连接回连接池"""
  328. try:
  329. if hasattr(self._thread_local, 'connection') and self._thread_local.connection is not None:
  330. conn = self._thread_local.connection
  331. try:
  332. if hasattr(conn, '_con') and isinstance(conn._con, Connection):
  333. original_conn = conn._con
  334. if not original_conn.get_autocommit():
  335. original_conn.rollback()
  336. else:
  337. conn.rollback()
  338. conn.close()
  339. logger.debug(f"连接已归还到连接池,当前线程: {threading.current_thread().name}")
  340. except Exception as e:
  341. logger.warning(f"关闭连接时出错: {e}")
  342. finally:
  343. self._thread_local.connection = None
  344. except Exception as e:
  345. logger.error(f"释放连接时出错: {e}")
  346. def check_connection(self) -> bool:
  347. """检查连接是否有效"""
  348. try:
  349. conn = self.get_connection()
  350. cursor = None
  351. try:
  352. cursor = conn.cursor()
  353. cursor.execute("SELECT 1")
  354. cursor.fetchone()
  355. return True
  356. finally:
  357. if cursor:
  358. cursor.close()
  359. except Exception as e:
  360. logger.warning(f"连接检查失败: {e}")
  361. self.release_connection()
  362. return False
  363. def reconnect(self):
  364. """重新连接数据库(连接池会自动处理)"""
  365. try:
  366. logger.info("正在重新连接数据库...")
  367. self.release_connection()
  368. time.sleep(1)
  369. self.get_connection()
  370. logger.info("数据库重新连接成功")
  371. except Exception as e:
  372. logger.error(f"数据库重新连接失败: {e}")
  373. raise
  374. def close(self):
  375. """关闭数据库连接池的简便方法"""
  376. self.close_pool()
  377. def close_all(self):
  378. """关闭所有资源,包括连接池和监控线程"""
  379. logger.info("正在关闭所有数据库资源...")
  380. self._stop_connection_monitor()
  381. self.close_pool()
  382. self._log_connection_status()
  383. logger.info("所有数据库资源已关闭")
  384. def close_pool(self):
  385. """关闭整个连接池"""
  386. try:
  387. if self.pool:
  388. self.release_connection()
  389. self.pool.close()
  390. self.pool = None
  391. self._connection_status = "DISCONNECTED"
  392. logger.info("数据库连接池已关闭")
  393. except Exception as e:
  394. logger.error(f"关闭连接池时出错: {e}")
  395. # ============ 数据库操作相关方法 ============
  396. def check_table_exists(self, table_name: str) -> bool:
  397. """检查表是否存在"""
  398. conn = None
  399. cursor = None
  400. try:
  401. conn = self.get_connection()
  402. cursor = conn.cursor()
  403. try:
  404. cursor.execute("SHOW TABLES LIKE %s", (table_name,))
  405. result = cursor.fetchone()
  406. exists = result is not None
  407. logger.info(f"🔍 检查表 '{table_name}' 存在: {exists}")
  408. return exists
  409. finally:
  410. if cursor:
  411. cursor.close()
  412. except Exception as e:
  413. logger.error(f"❌ 检查表存在失败: {e}")
  414. return False
  415. finally:
  416. self.release_connection()
  417. def create_table_with_unique_key(self, table_name: str, columns: List[str],
  418. unique_keys: List[str]) -> bool:
  419. """根据列定义创建表,包含三字段唯一键,数据字段使用DOUBLE类型"""
  420. if not self.wait_for_connection():
  421. logger.error("创建表失败:数据库连接不可用")
  422. return False
  423. conn = None
  424. cursor = None
  425. try:
  426. conn = self.get_connection()
  427. cursor = conn.cursor()
  428. try:
  429. drop_sql = f"DROP TABLE IF EXISTS `{table_name}`"
  430. cursor.execute(drop_sql)
  431. logger.info(f"已删除旧表: {table_name}")
  432. columns_sql = ",\n ".join(columns)
  433. unique_keys_str = ', '.join([f'`{key}`' for key in unique_keys])
  434. create_sql = f"""
  435. CREATE TABLE `{table_name}` (
  436. `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
  437. {columns_sql},
  438. `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  439. `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  440. `data_hash` VARCHAR(64) COMMENT '数据哈希值,用于快速比较',
  441. UNIQUE KEY uk_turbine_data ({unique_keys_str}),
  442. INDEX idx_farm (`id_farm`),
  443. INDEX idx_turbine (`id_turbine`),
  444. INDEX idx_time (`data_time`),
  445. INDEX idx_farm_turbine (`id_farm`, `id_turbine`),
  446. INDEX idx_composite (`id_farm`, `id_turbine`, `data_time`)
  447. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  448. """
  449. logger.debug(f"创建表的SQL语句:\n{create_sql}")
  450. cursor.execute(create_sql)
  451. conn.commit()
  452. logger.info(f"表 '{table_name}' 创建成功!")
  453. logger.info(f"三字段唯一键: {unique_keys}")
  454. return True
  455. finally:
  456. if cursor:
  457. cursor.close()
  458. except Exception as e:
  459. logger.error(f"创建表失败: {e}")
  460. logger.error(traceback.format_exc())
  461. if conn:
  462. conn.rollback()
  463. return False
  464. finally:
  465. pass
  466. def get_table_row_count(self, table_name: str) -> int:
  467. """获取表的行数"""
  468. conn = None
  469. cursor = None
  470. try:
  471. conn = self.get_connection()
  472. cursor = conn.cursor()
  473. try:
  474. cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")
  475. result = cursor.fetchone()
  476. count = result['count'] if result else 0
  477. logger.info(f"📊 表 '{table_name}' 行数: {count:,}")
  478. return count
  479. finally:
  480. if cursor:
  481. cursor.close()
  482. except Exception as e:
  483. logger.error(f"❌ 获取表行数失败: {e}")
  484. return 0
  485. finally:
  486. self.release_connection()
  487. def get_table_stats(self, table_name: str) -> Dict[str, Any]:
  488. """获取表统计信息"""
  489. conn = None
  490. cursor = None
  491. try:
  492. conn = self.get_connection()
  493. cursor = conn.cursor()
  494. try:
  495. cursor.execute(f"""
  496. SELECT
  497. COUNT(*) as total_rows,
  498. COUNT(DISTINCT id_farm) as farm_count,
  499. COUNT(DISTINCT id_turbine) as turbine_count,
  500. MIN(data_time) as first_data_time,
  501. MAX(data_time) as last_data_time,
  502. MIN(create_time) as first_create,
  503. MAX(update_time) as last_update
  504. FROM `{table_name}`
  505. """)
  506. result = cursor.fetchone()
  507. if result:
  508. stats = {
  509. 'total_rows': result[0],
  510. 'farm_count': result[1],
  511. 'turbine_count': result[2],
  512. 'first_data_time': result[3],
  513. 'last_data_time': result[4],
  514. 'first_create': result[5],
  515. 'last_update': result[6]
  516. }
  517. logger.info(f"📈 表 '{table_name}' 统计信息:")
  518. logger.info(f" 总行数: {stats['total_rows']:,}")
  519. logger.info(f" 风场数量: {stats['farm_count']}")
  520. logger.info(f" 风机数量: {stats['turbine_count']}")
  521. logger.info(f" 最早数据时间: {stats['first_data_time']}")
  522. logger.info(f" 最新数据时间: {stats['last_data_time']}")
  523. return stats
  524. else:
  525. return {}
  526. finally:
  527. if cursor:
  528. cursor.close()
  529. except Exception as e:
  530. logger.error(f"❌ 获取表统计失败: {e}")
  531. return {}
  532. finally:
  533. self.release_connection()
  534. def check_duplicate_keys(self, table_name: str) -> List[Dict]:
  535. """检查重复的唯一键记录"""
  536. conn = None
  537. cursor = None
  538. try:
  539. conn = self.get_connection()
  540. cursor = conn.cursor()
  541. try:
  542. cursor.execute(f"""
  543. SELECT
  544. id_farm,
  545. id_turbine,
  546. data_time,
  547. COUNT(*) as duplicate_count,
  548. MIN(id) as min_id,
  549. MAX(id) as max_id
  550. FROM `{table_name}`
  551. GROUP BY id_farm, id_turbine, data_time
  552. HAVING COUNT(*) > 1
  553. ORDER BY duplicate_count DESC
  554. LIMIT 10
  555. """)
  556. duplicates = []
  557. for row in cursor.fetchall():
  558. duplicate_info = {
  559. 'id_farm': row[0],
  560. 'id_turbine': row[1],
  561. 'data_time': row[2],
  562. 'duplicate_count': row[3],
  563. 'min_id': row[4],
  564. 'max_id': row[5]
  565. }
  566. duplicates.append(duplicate_info)
  567. if duplicates:
  568. logger.warning(f"⚠️ 发现重复的唯一键记录: {len(duplicates)} 组")
  569. for dup in duplicates[:3]:
  570. logger.warning(f" 重复: 风场={dup['id_farm']}, 风机={dup['id_turbine']}, "
  571. f"时间={dup['data_time']}, 重复次数={dup['duplicate_count']}")
  572. else:
  573. logger.info(f"✅ 无重复的唯一键记录")
  574. return duplicates
  575. finally:
  576. if cursor:
  577. cursor.close()
  578. except Exception as e:
  579. logger.error(f"❌ 检查重复键失败: {e}")
  580. return []
  581. finally:
  582. self.release_connection()
  583. def upsert_parquet_data(self, file_info: ParquetFileInfo, table_name: str,
  584. batch_size: int = 100, max_retries: int = 3) -> Tuple[int, int, int]:
  585. """
  586. UPSERT单个parquet文件数据到数据库,使用三字段唯一键
  587. Args:
  588. file_info: 文件信息(包含识别到的时间字段名)
  589. table_name: 表名
  590. batch_size: 批处理大小
  591. max_retries: 最大重试次数
  592. Returns:
  593. (总行数, 插入行数, 更新行数)
  594. """
  595. try:
  596. logger.info(f"📂 正在读取并处理文件: {file_info.file_path}")
  597. logger.info(f"⏰ 识别到的时间字段: {file_info.data_time_column}")
  598. # 读取parquet文件
  599. df = pd.read_parquet(file_info.file_path, engine='pyarrow')
  600. # 添加元数据字段
  601. df['id_farm'] = file_info.farm_id
  602. df['name_farm'] = file_info.farm_name
  603. df['no_model_turbine'] = file_info.model_type
  604. df['id_turbine'] = file_info.turbine_id
  605. logger.info(f"📊 文件 {file_info.turbine_id}.parquet 读取完成,形状: {df.shape}")
  606. # 简化数据清理
  607. cleaned_df = self._clean_and_convert_simple(df, file_info.data_time_column)
  608. # 确保必需字段存在
  609. required_columns = ['id_farm', 'id_turbine', 'data_time']
  610. for col in required_columns:
  611. if col not in cleaned_df.columns:
  612. logger.error(f"必需字段 '{col}' 不存在于数据中")
  613. cleaned_df[col] = None
  614. # 获取列名
  615. columns = list(cleaned_df.columns)
  616. # 准备UPSERT SQL
  617. upsert_sql = self._prepare_upsert_sql(table_name, columns)
  618. # 分批处理
  619. total_rows = len(cleaned_df)
  620. total_batches = (total_rows + batch_size - 1) // batch_size
  621. total_affected_rows = 0
  622. total_failed_rows = 0
  623. logger.info(f"🚀 准备处理 {total_rows} 行数据,分为 {total_batches} 个批次")
  624. for i in range(0, total_rows, batch_size):
  625. batch_df = cleaned_df.iloc[i:i + batch_size]
  626. batch_num = i // batch_size + 1
  627. retry_count = 0
  628. batch_success = False
  629. while retry_count <= max_retries and not batch_success:
  630. conn = None
  631. cursor = None
  632. try:
  633. if not self.check_connection():
  634. logger.warning(f"🔌 连接已断开,正在重新连接...")
  635. self.reconnect()
  636. conn = self.get_connection()
  637. cursor = conn.cursor()
  638. # 转换为元组列表
  639. batch_values = []
  640. for _, row in batch_df.iterrows():
  641. row_tuple = self._convert_row_to_tuple(row, columns)
  642. batch_values.append(row_tuple)
  643. # 执行批量插入
  644. affected = cursor.executemany(upsert_sql, batch_values)
  645. total_affected_rows += affected
  646. conn.commit()
  647. batch_success = True
  648. if batch_num % 10 == 0 or batch_num == total_batches:
  649. logger.info(f"✅ 批次 {batch_num}/{total_batches}: 处理 {len(batch_df)} 行, "
  650. f"受影响 {affected} 行")
  651. except (pymysql.Error, AttributeError) as e:
  652. retry_count += 1
  653. logger.error(f"❌ 批次 {batch_num} UPSERT失败,错误: {str(e)}")
  654. if retry_count > max_retries:
  655. logger.error(f"❌ 批次 {batch_num} UPSERT失败,已达到最大重试次数")
  656. # 单条插入
  657. batch_affected = 0
  658. batch_failed = 0
  659. for idx, (_, row) in enumerate(batch_df.iterrows()):
  660. row_retry_count = 0
  661. row_success = False
  662. while row_retry_count <= max_retries and not row_success:
  663. try:
  664. if not self.check_connection():
  665. self.reconnect()
  666. single_conn = self.get_connection()
  667. single_cursor = single_conn.cursor()
  668. row_tuple = self._convert_row_to_tuple(row, columns)
  669. single_cursor.execute(upsert_sql, row_tuple)
  670. batch_affected += single_cursor.rowcount
  671. single_conn.commit()
  672. row_success = True
  673. except Exception as single_e:
  674. row_retry_count += 1
  675. if row_retry_count > max_retries:
  676. batch_failed += 1
  677. break
  678. else:
  679. time.sleep(1 * row_retry_count)
  680. finally:
  681. if 'single_cursor' in locals() and single_cursor:
  682. single_cursor.close()
  683. total_affected_rows += batch_affected
  684. total_failed_rows += batch_failed
  685. if batch_affected > 0:
  686. logger.info(f"⚠️ 批次 {batch_num} 单条处理完成,成功 {batch_affected} 行, 失败 {batch_failed} 行")
  687. break
  688. else:
  689. logger.warning(f"⚠️ 批次 {batch_num} UPSERT失败,第 {retry_count} 次重试")
  690. time.sleep(2 * retry_count)
  691. finally:
  692. if cursor:
  693. cursor.close()
  694. # 批次处理完成后暂停一小段时间
  695. if batch_success and batch_num < total_batches:
  696. time.sleep(0.1)
  697. # 估算插入和更新行数
  698. successful_rows = total_rows - total_failed_rows
  699. estimated_inserted = successful_rows // 2
  700. estimated_updated = successful_rows - estimated_inserted
  701. logger.info(f"🎉 文件 {os.path.basename(file_info.file_path)} UPSERT完成:")
  702. logger.info(f" 总处理行数: {total_rows}")
  703. logger.info(f" 总受影响行数: {total_affected_rows}")
  704. logger.info(f" 失败行数: {total_failed_rows}")
  705. return total_rows, estimated_inserted, estimated_updated
  706. except Exception as e:
  707. logger.error(f"❌ 处理文件 {file_info.file_path} 失败: {str(e)}")
  708. raise
  709. def _clean_and_convert_simple(self, df: pd.DataFrame, data_time_column: str = None) -> pd.DataFrame:
  710. """简化版数据清理"""
  711. try:
  712. cleaned_df = df.copy()
  713. # 确保必需字段存在
  714. required_fields = ['id_farm', 'name_farm', 'no_model_turbine', 'id_turbine']
  715. for field in required_fields:
  716. if field not in cleaned_df.columns:
  717. cleaned_df[field] = None
  718. # 处理时间字段
  719. if 'data_time' not in cleaned_df.columns:
  720. if data_time_column and data_time_column in cleaned_df.columns:
  721. cleaned_df['data_time'] = cleaned_df[data_time_column]
  722. else:
  723. for col in cleaned_df.columns:
  724. col_lower = col.lower()
  725. if any(keyword in col_lower for keyword in ['time', 'date', 'timestamp']):
  726. cleaned_df['data_time'] = cleaned_df[col]
  727. logger.info(f"使用字段 '{col}' 作为 data_time")
  728. break
  729. # 确保data_time是datetime类型
  730. if 'data_time' in cleaned_df.columns:
  731. try:
  732. cleaned_df['data_time'] = pd.to_datetime(cleaned_df['data_time'], errors='coerce')
  733. except:
  734. logger.warning("data_time字段转换失败,保持原样")
  735. # 处理NaN
  736. cleaned_df = cleaned_df.replace({np.nan: None, pd.NaT: None})
  737. # 计算数据哈希
  738. def simple_hash(row):
  739. try:
  740. data_fields = [col for col in cleaned_df.columns
  741. if col not in ['id_farm', 'name_farm', 'no_model_turbine',
  742. 'id_turbine', 'data_time', 'data_hash']]
  743. hash_str = ''
  744. for field in sorted(data_fields):
  745. val = row[field]
  746. if val is not None:
  747. if isinstance(val, (dt, pd.Timestamp)):
  748. hash_str += f"{field}:{val.isoformat()}|"
  749. else:
  750. hash_str += f"{field}:{str(val)}|"
  751. return hashlib.md5(hash_str.encode('utf-8')).hexdigest() if hash_str else None
  752. except:
  753. return None
  754. cleaned_df['data_hash'] = cleaned_df.apply(simple_hash, axis=1)
  755. logger.info(f"数据清理完成,原始形状: {df.shape}, 清理后形状: {cleaned_df.shape}")
  756. return cleaned_df
  757. except Exception as e:
  758. logger.error(f"数据清理失败: {e}")
  759. logger.error(traceback.format_exc())
  760. return df
  761. def _prepare_upsert_sql(self, table_name: str, columns: List[str]) -> str:
  762. """准备UPSERT SQL语句"""
  763. exclude_columns = ['id_farm', 'id_turbine', 'data_time', 'id',
  764. 'create_time', 'update_time', 'data_hash']
  765. update_columns = [col for col in columns if col not in exclude_columns]
  766. column_names = ', '.join([f'`{col}`' for col in columns])
  767. placeholders = ', '.join(['%s'] * len(columns))
  768. update_clauses = []
  769. for col in update_columns:
  770. update_clauses.append(f"`{col}` = VALUES(`{col}`)")
  771. update_clause = ', '.join(update_clauses)
  772. upsert_sql = f"""
  773. INSERT INTO `{table_name}` ({column_names})
  774. VALUES ({placeholders})
  775. ON DUPLICATE KEY UPDATE
  776. {update_clause}
  777. """
  778. logger.debug(f"UPSERT SQL生成完成,共 {len(columns)} 列")
  779. return upsert_sql
  780. def _convert_to_numeric(self, value):
  781. """将值转换为数值类型"""
  782. if pd.isna(value) or value is None:
  783. return None
  784. try:
  785. if isinstance(value, (int, float, np.integer, np.floating)):
  786. if isinstance(value, np.integer):
  787. return int(value)
  788. elif isinstance(value, np.floating):
  789. return float(value)
  790. return value
  791. if isinstance(value, (bool, np.bool_)):
  792. return 1 if bool(value) else 0
  793. if isinstance(value, str):
  794. cleaned = value.strip()
  795. if cleaned == '':
  796. return None
  797. cleaned = cleaned.replace(',', '').replace('%', '').replace(' ', '')
  798. try:
  799. return float(cleaned)
  800. except ValueError:
  801. try:
  802. return int(cleaned)
  803. except ValueError:
  804. return None
  805. try:
  806. str_val = str(value)
  807. cleaned = str_val.replace(',', '').replace('%', '').replace(' ', '')
  808. return float(cleaned)
  809. except:
  810. return None
  811. except Exception as e:
  812. return None
  813. def _convert_row_to_tuple(self, row: pd.Series, columns: List[str]) -> Tuple:
  814. """将单行数据转换为元组"""
  815. try:
  816. row_values = []
  817. keep_original_fields = [
  818. 'id', 'data_time', 'id_farm', 'id_turbine',
  819. 'name_farm', 'no_model_turbine', 'create_time',
  820. 'update_time', 'data_hash'
  821. ]
  822. for col in columns:
  823. value = row[col]
  824. if col == 'data_time':
  825. if pd.isna(value):
  826. row_values.append(None)
  827. elif isinstance(value, pd.Timestamp):
  828. row_values.append(value.to_pydatetime())
  829. elif isinstance(value, dt):
  830. row_values.append(value)
  831. else:
  832. try:
  833. row_values.append(pd.to_datetime(value).to_pydatetime())
  834. except:
  835. row_values.append(None)
  836. elif col in ['id_farm', 'id_turbine', 'name_farm', 'no_model_turbine', 'data_hash']:
  837. if pd.isna(value) or value is None:
  838. row_values.append(None)
  839. else:
  840. row_values.append(str(value))
  841. elif col in keep_original_fields:
  842. row_values.append(value)
  843. else:
  844. numeric_value = self._convert_to_numeric(value)
  845. row_values.append(numeric_value)
  846. return tuple(row_values)
  847. except Exception as e:
  848. logger.warning(f"转换行数据失败: {e}")
  849. return tuple([None] * len(columns))