info_model_turbine_v2.py 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100
  1. import pymysql
  2. import threading
  3. from typing import List, Dict, Any, Optional, Tuple
  4. import logging
  5. from datetime import datetime
  6. from collections import Counter
  7. import statistics
  8. import math
  9. # 配置日志
  10. logging.basicConfig(
  11. level=logging.INFO,
  12. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  13. )
  14. logger = logging.getLogger(__name__)
  15. class ConnectionPool:
  16. """MySQL数据库连接池"""
  17. def __init__(self, host, port, user, password, database, charset='utf8mb4',
  18. max_connections=2, mem_quota=4 << 30):
  19. self.host = host
  20. self.port = port
  21. self.user = user
  22. self.password = password
  23. self.database = database
  24. self.charset = charset
  25. self.max_connections = max_connections
  26. self.mem_quota = mem_quota
  27. self._lock = threading.Lock()
  28. self._connections = []
  29. self._in_use = set()
  30. def _create_connection(self):
  31. """创建新连接并设置内存配额"""
  32. try:
  33. conn = pymysql.connect(
  34. host=self.host,
  35. port=self.port,
  36. user=self.user,
  37. password=self.password,
  38. database=self.database,
  39. charset=self.charset,
  40. cursorclass=pymysql.cursors.DictCursor
  41. )
  42. # 设置会话内存配额
  43. with conn.cursor() as cursor:
  44. cursor.execute(f"SET SESSION tidb_mem_quota_query = {self.mem_quota}")
  45. conn.commit()
  46. logger.debug(f"创建新数据库连接,设置内存配额为 {self.mem_quota}")
  47. return conn
  48. except Exception as e:
  49. logger.error(f"创建数据库连接失败: {e}")
  50. raise
  51. def get_connection(self):
  52. """从连接池获取连接"""
  53. with self._lock:
  54. # 如果有空闲连接,返回一个
  55. for conn in self._connections:
  56. if conn not in self._in_use:
  57. self._in_use.add(conn)
  58. logger.debug(f"从连接池获取现有连接")
  59. return conn
  60. # 如果没有空闲连接但可以创建新连接
  61. if len(self._connections) < self.max_connections:
  62. conn = self._create_connection()
  63. self._connections.append(conn)
  64. self._in_use.add(conn)
  65. logger.debug(f"创建新连接,当前连接数: {len(self._connections)}")
  66. return conn
  67. # 连接池已满,等待
  68. logger.warning("连接池已满,等待可用连接...")
  69. # 这里简单实现为等待并重试
  70. import time
  71. time.sleep(1)
  72. return self.get_connection() # 递归重试
  73. def release_connection(self, conn):
  74. """释放连接回连接池"""
  75. with self._lock:
  76. if conn in self._in_use:
  77. self._in_use.remove(conn)
  78. logger.debug(f"释放连接回连接池")
  79. def close_all(self):
  80. """关闭所有连接"""
  81. with self._lock:
  82. for conn in self._connections:
  83. try:
  84. conn.close()
  85. except:
  86. pass
  87. self._connections.clear()
  88. self._in_use.clear()
  89. logger.info("已关闭所有数据库连接")
  90. class DatabaseConfig:
  91. """数据库配置类"""
  92. def __init__(self, host='192.168.50.234', port=4000, user='root',
  93. password='123456', database='wind_data', charset='utf8mb4',
  94. max_connections=2, mem_quota=4 << 30):
  95. self.host = host
  96. self.port = port
  97. self.user = user
  98. self.password = password
  99. self.database = database
  100. self.charset = charset
  101. self.max_connections = max_connections
  102. self.mem_quota = mem_quota
  103. class SCADADataProcessor:
  104. """SCADA数据处理类,用于计算额定转速和传动比"""
  105. @staticmethod
  106. def calculate_mode(values: List[float], decimal_places: int = 1) -> float:
  107. """
  108. 计算众数(对连续数据使用四舍五入后统计)
  109. Args:
  110. values: 数值列表
  111. decimal_places: 保留的小数位数
  112. Returns:
  113. 众数值
  114. """
  115. if not values:
  116. return 0.0
  117. # 对值进行四舍五入处理,减少噪声影响
  118. rounded_values = [round(v, decimal_places) for v in values]
  119. # 使用Counter统计频率
  120. counter = Counter(rounded_values)
  121. # 找到众数
  122. if counter:
  123. mode_value, count = counter.most_common(1)[0]
  124. logger.debug(f"众数统计: 值={mode_value}, 频次={count}, 总数据点={len(values)}")
  125. return mode_value
  126. return 0.0
  127. @staticmethod
  128. def calculate_median(values: List[float]) -> float:
  129. """
  130. 计算中位数
  131. Args:
  132. values: 数值列表
  133. Returns:
  134. 中位数值
  135. """
  136. if not values:
  137. return 0.0
  138. try:
  139. return float(statistics.median(values))
  140. except:
  141. # 如果统计模块出错,使用排序方法
  142. sorted_values = sorted(values)
  143. n = len(sorted_values)
  144. if n % 2 == 1:
  145. return sorted_values[n // 2]
  146. else:
  147. return (sorted_values[n // 2 - 1] + sorted_values[n // 2]) / 2
  148. @staticmethod
  149. def calculate_rated_speeds_and_ratio(data: List[Dict[str, Any]]) -> Tuple[float, float, float]:
  150. """
  151. 计算额定叶轮转速、额定发电机转速和传动比
  152. Args:
  153. data: SCADA数据列表,包含转子转速和发电机转速
  154. Returns:
  155. Tuple[float, float, float]: (rated_rotor_spd, rated_gen_spd, transmission_ratio)
  156. """
  157. if not data:
  158. return 0.0, 0.0, 0.0
  159. # 提取数据
  160. rotor_speeds = []
  161. gen_speeds = []
  162. for record in data:
  163. rotor_spd = record.get('rotor_spd')
  164. gen_spd = record.get('gen_spd')
  165. if rotor_spd is not None and gen_spd is not None:
  166. try:
  167. rotor_speeds.append(float(rotor_spd))
  168. gen_speeds.append(float(gen_spd))
  169. except (ValueError, TypeError):
  170. continue
  171. if not rotor_speeds or not gen_speeds:
  172. logger.warning(f"数据不足: 转子转速点={len(rotor_speeds)}, 发电机转速点={len(gen_speeds)}")
  173. return 0.0, 0.0, 0.0
  174. # 计算额定转速(使用众数方法)
  175. rated_rotor_spd = SCADADataProcessor.calculate_mode(rotor_speeds, decimal_places=1)
  176. rated_gen_spd = SCADADataProcessor.calculate_mode(gen_speeds, decimal_places=1)
  177. # 如果众数方法效果不好,尝试使用中位数
  178. if rated_rotor_spd <= 0 or rated_gen_spd <= 0:
  179. rated_rotor_spd = SCADADataProcessor.calculate_median(rotor_speeds)
  180. rated_gen_spd = SCADADataProcessor.calculate_median(gen_speeds)
  181. # 计算传动比
  182. if rated_rotor_spd > 0:
  183. transmission_ratio = rated_gen_spd / rated_rotor_spd
  184. else:
  185. transmission_ratio = 0.0
  186. # 验证数据合理性
  187. if rated_rotor_spd > 30: # 叶轮转速通常小于30 rpm
  188. logger.warning(f"叶轮转速异常高: {rated_rotor_spd} rpm,使用中位数重新计算")
  189. rated_rotor_spd = SCADADataProcessor.calculate_median(rotor_speeds)
  190. if rated_rotor_spd > 0:
  191. transmission_ratio = rated_gen_spd / rated_rotor_spd
  192. if transmission_ratio > 200: # 传动比通常小于200
  193. logger.warning(f"传动比异常高: {transmission_ratio}")
  194. transmission_ratio = 0.0
  195. logger.debug(f"计算结果: 转子转速={rated_rotor_spd:.2f} rpm, "
  196. f"发电机转速={rated_gen_spd:.2f} rpm, "
  197. f"传动比={transmission_ratio:.2f}")
  198. return rated_rotor_spd, rated_gen_spd, transmission_ratio
  199. @staticmethod
  200. def detect_turbine_type(transmission_ratio: float, rated_gen_spd: float) -> str:
  201. """
  202. 根据传动比和发电机额定转速判断风机类型
  203. Returns:
  204. str: 风机类型 (直驱/双馈/半直驱/未知)
  205. """
  206. if transmission_ratio <= 1.2:
  207. return "直驱"
  208. elif 1.2 < transmission_ratio <= 30:
  209. return "半直驱"
  210. elif 30 < transmission_ratio <= 120:
  211. return "双馈"
  212. else:
  213. # 根据发电机转速进一步判断
  214. if rated_gen_spd < 50: # 直驱发电机转速很低
  215. return "直驱"
  216. elif 1000 <= rated_gen_spd <= 2000: # 双馈通常在同步转速附近
  217. return "双馈"
  218. elif rated_gen_spd > 2000: # 半直驱转速较高
  219. return "半直驱"
  220. else:
  221. return "未知"
  222. # SQL语句定义
  223. CREATE_MODEL_TURBINE_TABLE_SQL = """
  224. CREATE TABLE IF NOT EXISTS info_model_turbine (
  225. id INT AUTO_INCREMENT PRIMARY KEY,
  226. no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识',
  227. model VARCHAR(100) COMMENT '机型',
  228. manufacturer VARCHAR(100) COMMENT '制造商',
  229. rated_capacity INT COMMENT '额定容量(kW)',
  230. cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)',
  231. cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)',
  232. rotor_diameter INT COMMENT '叶轮直径(m)',
  233. hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)',
  234. turbine_count INT DEFAULT 0 COMMENT '该机型风机数量',
  235. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  236. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  237. UNIQUE KEY idx_no_model (no_model),
  238. INDEX idx_model (model),
  239. INDEX idx_manufacturer (manufacturer),
  240. INDEX idx_rated_capacity (rated_capacity)
  241. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表';
  242. """
  243. ALTER_MODEL_TURBINE_TABLE_SQL = """
  244. ALTER TABLE info_model_turbine
  245. ADD COLUMN IF NOT EXISTS rated_rotor_spd DECIMAL(10, 3) COMMENT '额定叶轮转速(rpm)',
  246. ADD COLUMN IF NOT EXISTS rated_gen_spd DECIMAL(10, 3) COMMENT '额定发电机转速(rpm)',
  247. ADD COLUMN IF NOT EXISTS transmission_ratio DECIMAL(10, 4) COMMENT '传动比',
  248. ADD COLUMN IF NOT EXISTS turbine_type VARCHAR(20) COMMENT '风机类型(直驱/双馈/半直驱)',
  249. ADD COLUMN IF NOT EXISTS calculation_time TIMESTAMP COMMENT '参数计算时间',
  250. ADD COLUMN IF NOT EXISTS data_points INT DEFAULT 0 COMMENT '用于计算的数据点数';
  251. """
  252. SELECT_MODEL_DATA_SQL = """
  253. SELECT
  254. CONCAT(
  255. IFNULL(model, ''),
  256. '-',
  257. IFNULL(cut_in_wind_speed, ''),
  258. '-',
  259. IFNULL(cut_out_wind_speed, ''),
  260. '-',
  261. IFNULL(hub_height, '')
  262. ) AS no_model,
  263. model,
  264. manufacturer,
  265. rated_capacity,
  266. cut_in_wind_speed,
  267. cut_out_wind_speed,
  268. rotor_diameter,
  269. hub_height,
  270. COUNT(*) AS turbine_count
  271. FROM info_turbine
  272. WHERE model IS NOT NULL
  273. GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height
  274. ORDER BY model, manufacturer, rated_capacity;
  275. """
  276. SELECT_NO_MODEL_LIST_SQL = """
  277. SELECT DISTINCT no_model FROM info_model_turbine ORDER BY no_model;
  278. """
  279. SELECT_SCADA_FOR_NO_MODEL_SQL = """
  280. SELECT
  281. it.wind_farm_id,
  282. it.turbine_id,
  283. it.model,
  284. icpt.rated_wind_speed,
  285. imt.no_model,
  286. imt.rated_capacity,
  287. imt.cut_in_wind_speed,
  288. imt.cut_out_wind_speed,
  289. imt.rotor_diameter,
  290. imt.hub_height,
  291. dst.data_time,
  292. dst.wind_spd,
  293. dst.rotor_spd,
  294. dst.gen_spd,
  295. dst.p_active
  296. FROM info_turbine it,
  297. info_curve_power_turbine icpt,
  298. info_model_turbine imt,
  299. data_scada_turbine dst
  300. WHERE 1=1
  301. AND it.wind_farm_id = dst.id_farm
  302. AND it.turbine_id = dst.id_turbine
  303. AND it.model = dst.no_model_turbine
  304. AND it.model = imt.model
  305. AND it.cut_in_wind_speed = imt.cut_in_wind_speed
  306. AND it.cut_out_wind_speed = imt.cut_out_wind_speed
  307. AND it.rotor_diameter = imt.rotor_diameter
  308. AND it.hub_height = imt.hub_height
  309. AND it.wind_farm_id = icpt.wind_farm_id
  310. AND it.model = icpt.standard_model
  311. AND imt.no_model = %s
  312. AND dst.wind_spd >= icpt.rated_wind_speed
  313. AND dst.p_active >= imt.rated_capacity * 0.95
  314. AND dst.p_active <= imt.rated_capacity * 1.05
  315. AND dst.rotor_spd IS NOT NULL
  316. AND dst.gen_spd IS NOT NULL
  317. AND dst.rotor_spd > 0
  318. AND dst.gen_spd > 0
  319. ORDER BY dst.data_time;
  320. """
  321. DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine"
  322. CHECK_TABLE_EXISTS_SQL = """
  323. SELECT COUNT(*) as table_exists
  324. FROM information_schema.tables
  325. WHERE table_schema = %s AND table_name = 'info_model_turbine'
  326. """
  327. CHECK_TABLE_COLUMNS_SQL = """
  328. SELECT COLUMN_NAME
  329. FROM INFORMATION_SCHEMA.COLUMNS
  330. WHERE TABLE_SCHEMA = %s AND TABLE_NAME = 'info_model_turbine'
  331. """
  332. UPDATE_MODEL_PARAMETERS_SQL = """
  333. UPDATE info_model_turbine
  334. SET rated_rotor_spd = %s,
  335. rated_gen_spd = %s,
  336. transmission_ratio = %s,
  337. turbine_type = %s,
  338. calculation_time = %s,
  339. data_points = %s,
  340. updated_at = CURRENT_TIMESTAMP
  341. WHERE no_model = %s
  342. """
  343. class ModelTurbineManager:
  344. """风机机型信息管理器"""
  345. def __init__(self, db_config: DatabaseConfig):
  346. """
  347. 初始化管理器
  348. Args:
  349. db_config: 数据库配置对象
  350. """
  351. self.db_config = db_config
  352. self.connection_pool = None
  353. self._initialize_connection_pool()
  354. def _initialize_connection_pool(self):
  355. """初始化数据库连接池"""
  356. self.connection_pool = ConnectionPool(
  357. host=self.db_config.host,
  358. port=self.db_config.port,
  359. user=self.db_config.user,
  360. password=self.db_config.password,
  361. database=self.db_config.database,
  362. charset=self.db_config.charset,
  363. max_connections=self.db_config.max_connections,
  364. mem_quota=self.db_config.mem_quota
  365. )
  366. logger.info(f"数据库连接池初始化完成,最大连接数: {self.db_config.max_connections}")
  367. def get_connection(self):
  368. """从连接池获取连接"""
  369. return self.connection_pool.get_connection()
  370. def release_connection(self, conn):
  371. """释放连接回连接池"""
  372. self.connection_pool.release_connection(conn)
  373. def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
  374. """
  375. 执行查询语句
  376. Args:
  377. sql: SQL查询语句
  378. params: SQL参数
  379. Returns:
  380. 查询结果列表
  381. """
  382. conn = self.get_connection()
  383. if not conn:
  384. raise Exception("无法从连接池获取数据库连接")
  385. try:
  386. with conn.cursor() as cursor:
  387. cursor.execute(sql, params)
  388. result = cursor.fetchall()
  389. logger.debug(f"查询执行成功,返回 {len(result)} 条记录")
  390. return result
  391. except Exception as e:
  392. logger.error(f"查询执行失败: {e}")
  393. logger.error(f"SQL: {sql}")
  394. if params:
  395. logger.error(f"参数: {params}")
  396. raise
  397. finally:
  398. self.release_connection(conn)
  399. def execute_update(self, sql: str, params: Optional[tuple] = None) -> int:
  400. """
  401. 执行更新语句(INSERT, UPDATE, DELETE)
  402. Args:
  403. sql: SQL更新语句
  404. params: SQL参数
  405. Returns:
  406. 影响的行数
  407. """
  408. conn = self.get_connection()
  409. if not conn:
  410. raise Exception("无法从连接池获取数据库连接")
  411. try:
  412. with conn.cursor() as cursor:
  413. affected_rows = cursor.execute(sql, params)
  414. conn.commit()
  415. logger.debug(f"更新执行成功,影响 {affected_rows} 行")
  416. return affected_rows
  417. except Exception as e:
  418. conn.rollback()
  419. logger.error(f"更新执行失败: {e}")
  420. logger.error(f"SQL: {sql}")
  421. if params:
  422. logger.error(f"参数: {params}")
  423. raise
  424. finally:
  425. self.release_connection(conn)
  426. def execute_batch_update(self, sql: str, params_list: List[tuple]) -> int:
  427. """
  428. 批量执行更新语句
  429. Args:
  430. sql: SQL更新语句
  431. params_list: SQL参数列表
  432. Returns:
  433. 影响的行数
  434. """
  435. conn = self.get_connection()
  436. if not conn:
  437. raise Exception("无法从连接池获取数据库连接")
  438. try:
  439. with conn.cursor() as cursor:
  440. affected_rows = cursor.executemany(sql, params_list)
  441. conn.commit()
  442. logger.debug(f"批量更新执行成功,影响 {affected_rows} 行")
  443. return affected_rows
  444. except Exception as e:
  445. conn.rollback()
  446. logger.error(f"批量更新执行失败: {e}")
  447. logger.error(f"SQL: {sql}")
  448. raise
  449. finally:
  450. self.release_connection(conn)
  451. def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool:
  452. """
  453. 检查表是否存在
  454. Args:
  455. table_name: 表名
  456. Returns:
  457. bool: 表是否存在
  458. """
  459. try:
  460. result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,))
  461. return result[0]['table_exists'] > 0
  462. except Exception as e:
  463. logger.error(f"检查表存在性失败: {e}")
  464. return False
  465. def check_table_columns(self) -> List[str]:
  466. """
  467. 检查表的列
  468. Returns:
  469. List[str]: 列名列表
  470. """
  471. try:
  472. result = self.execute_query(CHECK_TABLE_COLUMNS_SQL, (self.db_config.database,))
  473. return [row['COLUMN_NAME'] for row in result]
  474. except Exception as e:
  475. logger.error(f"检查表列失败: {e}")
  476. return []
  477. def add_missing_columns(self):
  478. """添加缺失的列"""
  479. try:
  480. logger.info("检查并添加缺失的列...")
  481. self.execute_update(ALTER_MODEL_TURBINE_TABLE_SQL)
  482. logger.info("表结构更新完成")
  483. except Exception as e:
  484. logger.error(f"更新表结构失败: {e}")
  485. def create_model_turbine_table(self) -> bool:
  486. """
  487. 创建风机机型信息表
  488. Returns:
  489. bool: 是否成功创建表
  490. """
  491. try:
  492. logger.info("开始创建风机机型信息表...")
  493. self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL)
  494. logger.info("风机机型信息表创建成功")
  495. return True
  496. except Exception as e:
  497. logger.error(f"创建风机机型信息表失败: {e}")
  498. return False
  499. def drop_model_turbine_table(self) -> bool:
  500. """
  501. 删除风机机型信息表
  502. Returns:
  503. bool: 是否成功删除表
  504. """
  505. try:
  506. logger.info("开始删除风机机型信息表...")
  507. self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL)
  508. logger.info("风机机型信息表删除成功")
  509. return True
  510. except Exception as e:
  511. logger.error(f"删除风机机型信息表失败: {e}")
  512. return False
  513. def get_model_data(self) -> List[Dict[str, Any]]:
  514. """
  515. 从info_turbine表获取机型分组数据
  516. Returns:
  517. 机型数据列表
  518. """
  519. try:
  520. logger.info("开始从info_turbine表查询机型数据...")
  521. data = self.execute_query(SELECT_MODEL_DATA_SQL)
  522. logger.info(f"成功查询到 {len(data)} 条机型记录")
  523. return data
  524. except Exception as e:
  525. logger.error(f"查询机型数据失败: {e}")
  526. return []
  527. def get_no_model_list(self) -> List[str]:
  528. """
  529. 获取所有no_model列表
  530. Returns:
  531. no_model列表
  532. """
  533. try:
  534. logger.info("开始查询所有机型标识...")
  535. result = self.execute_query(SELECT_NO_MODEL_LIST_SQL)
  536. no_models = [row['no_model'] for row in result]
  537. logger.info(f"成功获取 {len(no_models)} 个机型标识")
  538. return no_models
  539. except Exception as e:
  540. logger.error(f"查询机型标识列表失败: {e}")
  541. return []
  542. def get_scada_data_for_no_model(self, no_model: str) -> List[Dict[str, Any]]:
  543. """
  544. 获取指定机型的SCADA数据用于参数计算
  545. Args:
  546. no_model: 机型标识
  547. Returns:
  548. SCADA数据列表
  549. """
  550. try:
  551. logger.debug(f"开始查询机型 {no_model} 的SCADA数据...")
  552. data = self.execute_query(SELECT_SCADA_FOR_NO_MODEL_SQL, (no_model,))
  553. logger.debug(f"机型 {no_model} 查询到 {len(data)} 条SCADA记录")
  554. return data
  555. except Exception as e:
  556. logger.error(f"查询机型 {no_model} 的SCADA数据失败: {e}")
  557. return []
  558. def calculate_and_update_parameters_for_model(self, no_model: str) -> Dict[str, Any]:
  559. """
  560. 计算并更新指定机型的额定参数
  561. Args:
  562. no_model: 机型标识
  563. Returns:
  564. 计算结果的字典
  565. """
  566. try:
  567. # 获取该机型的SCADA数据
  568. scada_data = self.get_scada_data_for_no_model(no_model)
  569. if not scada_data:
  570. logger.warning(f"机型 {no_model}: 没有可用于计算的SCADA数据")
  571. return {
  572. "no_model": no_model,
  573. "success": False,
  574. "reason": "无SCADA数据",
  575. "data_points": 0
  576. }
  577. # 计算额定参数
  578. rated_rotor_spd, rated_gen_spd, transmission_ratio = \
  579. SCADADataProcessor.calculate_rated_speeds_and_ratio(scada_data)
  580. if rated_rotor_spd <= 0 or transmission_ratio <= 0:
  581. logger.warning(f"机型 {no_model}: 计算出的参数无效")
  582. return {
  583. "no_model": no_model,
  584. "success": False,
  585. "reason": "参数无效",
  586. "rated_rotor_spd": rated_rotor_spd,
  587. "rated_gen_spd": rated_gen_spd,
  588. "transmission_ratio": transmission_ratio,
  589. "data_points": len(scada_data)
  590. }
  591. # 判断风机类型
  592. turbine_type = SCADADataProcessor.detect_turbine_type(transmission_ratio, rated_gen_spd)
  593. # 更新数据库
  594. update_result = self.execute_update(
  595. UPDATE_MODEL_PARAMETERS_SQL,
  596. (
  597. rated_rotor_spd,
  598. rated_gen_spd,
  599. transmission_ratio,
  600. turbine_type,
  601. datetime.now(),
  602. len(scada_data),
  603. no_model
  604. )
  605. )
  606. if update_result > 0:
  607. logger.info(f"机型 {no_model}: 成功更新参数 - "
  608. f"叶轮转速={rated_rotor_spd:.2f} rpm, "
  609. f"发电机转速={rated_gen_spd:.2f} rpm, "
  610. f"传动比={transmission_ratio:.2f}, "
  611. f"类型={turbine_type}, "
  612. f"数据点={len(scada_data)}")
  613. return {
  614. "no_model": no_model,
  615. "success": True,
  616. "rated_rotor_spd": rated_rotor_spd,
  617. "rated_gen_spd": rated_gen_spd,
  618. "transmission_ratio": transmission_ratio,
  619. "turbine_type": turbine_type,
  620. "data_points": len(scada_data)
  621. }
  622. else:
  623. logger.warning(f"机型 {no_model}: 数据库更新失败")
  624. return {
  625. "no_model": no_model,
  626. "success": False,
  627. "reason": "数据库更新失败",
  628. "rated_rotor_spd": rated_rotor_spd,
  629. "rated_gen_spd": rated_gen_spd,
  630. "transmission_ratio": transmission_ratio,
  631. "turbine_type": turbine_type,
  632. "data_points": len(scada_data)
  633. }
  634. except Exception as e:
  635. logger.error(f"计算机型 {no_model} 参数时出错: {e}")
  636. return {
  637. "no_model": no_model,
  638. "success": False,
  639. "reason": str(e),
  640. "data_points": 0
  641. }
  642. def calculate_and_update_all_parameters(self) -> Dict[str, Any]:
  643. """
  644. 计算并更新所有机型的额定参数
  645. Returns:
  646. 计算统计信息
  647. """
  648. try:
  649. logger.info("开始计算并更新所有机型的额定参数...")
  650. # 获取所有机型标识
  651. no_model_list = self.get_no_model_list()
  652. if not no_model_list:
  653. logger.warning("没有找到任何机型标识")
  654. return {
  655. "total_models": 0,
  656. "success_count": 0,
  657. "failed_count": 0,
  658. "results": []
  659. }
  660. logger.info(f"共发现 {len(no_model_list)} 个机型需要计算")
  661. results = []
  662. success_count = 0
  663. failed_count = 0
  664. # 遍历每个机型进行计算
  665. for i, no_model in enumerate(no_model_list, 1):
  666. logger.info(f"处理机型 {i}/{len(no_model_list)}: {no_model}")
  667. result = self.calculate_and_update_parameters_for_model(no_model)
  668. results.append(result)
  669. if result.get("success"):
  670. success_count += 1
  671. else:
  672. failed_count += 1
  673. # 汇总统计
  674. stats = {
  675. "total_models": len(no_model_list),
  676. "success_count": success_count,
  677. "failed_count": failed_count,
  678. "success_rate": success_count / len(no_model_list) * 100 if no_model_list else 0,
  679. "results": results
  680. }
  681. logger.info(f"参数计算完成: 成功={success_count}, 失败={failed_count}, "
  682. f"成功率={stats['success_rate']:.1f}%")
  683. return stats
  684. except Exception as e:
  685. logger.error(f"计算并更新所有参数失败: {e}")
  686. raise
  687. def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int:
  688. """
  689. 将机型数据插入到info_model_turbine表
  690. Args:
  691. model_data: 机型数据列表
  692. Returns:
  693. int: 成功插入的记录数
  694. """
  695. if not model_data:
  696. logger.warning("没有数据需要插入")
  697. return 0
  698. insert_sql = """
  699. INSERT INTO info_model_turbine
  700. (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed,
  701. cut_out_wind_speed, rotor_diameter, hub_height, turbine_count)
  702. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  703. ON DUPLICATE KEY UPDATE
  704. turbine_count = VALUES(turbine_count),
  705. updated_at = CURRENT_TIMESTAMP
  706. """
  707. try:
  708. logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...")
  709. # 准备插入数据
  710. insert_data = []
  711. for item in model_data:
  712. insert_data.append((
  713. item['no_model'],
  714. item['model'],
  715. item['manufacturer'],
  716. item['rated_capacity'],
  717. item['cut_in_wind_speed'],
  718. item['cut_out_wind_speed'],
  719. item['rotor_diameter'],
  720. item['hub_height'],
  721. item.get('turbine_count', 1) # 使用查询中的count值
  722. ))
  723. # 批量插入数据
  724. affected_rows = self.execute_batch_update(insert_sql, insert_data)
  725. logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表")
  726. return affected_rows
  727. except Exception as e:
  728. logger.error(f"插入机型数据失败: {e}")
  729. raise
  730. def get_model_turbine_stats(self) -> Dict[str, Any]:
  731. """
  732. 获取info_model_turbine表的统计信息
  733. Returns:
  734. 统计信息字典
  735. """
  736. try:
  737. # 查询统计信息
  738. stats_sql = """
  739. SELECT
  740. COUNT(*) as total_models,
  741. COUNT(DISTINCT manufacturer) as manufacturer_count,
  742. COUNT(DISTINCT model) as model_count,
  743. MIN(rated_capacity) as min_capacity,
  744. MAX(rated_capacity) as max_capacity,
  745. AVG(rated_capacity) as avg_capacity,
  746. SUM(turbine_count) as total_turbines,
  747. COUNT(CASE WHEN rated_rotor_spd IS NOT NULL AND rated_rotor_spd > 0 THEN 1 END) as calculated_models,
  748. COUNT(DISTINCT turbine_type) as turbine_type_count,
  749. SUM(data_points) as total_data_points
  750. FROM info_model_turbine
  751. """
  752. result = self.execute_query(stats_sql)
  753. return result[0] if result else {}
  754. except Exception as e:
  755. logger.error(f"获取统计信息失败: {e}")
  756. return {}
  757. def get_turbine_type_distribution(self) -> List[Dict[str, Any]]:
  758. """
  759. 获取风机类型分布
  760. Returns:
  761. 类型分布列表
  762. """
  763. try:
  764. type_sql = """
  765. SELECT
  766. IFNULL(turbine_type, '未知') as turbine_type,
  767. COUNT(*) as model_count,
  768. SUM(turbine_count) as turbine_count,
  769. AVG(transmission_ratio) as avg_ratio,
  770. AVG(rated_rotor_spd) as avg_rotor_spd,
  771. AVG(rated_gen_spd) as avg_gen_spd
  772. FROM info_model_turbine
  773. WHERE turbine_type IS NOT NULL
  774. GROUP BY turbine_type
  775. ORDER BY model_count DESC
  776. """
  777. return self.execute_query(type_sql)
  778. except Exception as e:
  779. logger.error(f"获取风机类型分布失败: {e}")
  780. return []
  781. def print_model_summary(self, model_data: List[Dict[str, Any]]):
  782. """
  783. 打印机型数据摘要
  784. Args:
  785. model_data: 机型数据列表
  786. """
  787. if not model_data:
  788. logger.info("没有机型数据")
  789. return
  790. print("\n" + "="*80)
  791. print("风机机型数据摘要")
  792. print("="*80)
  793. print(f"总机型数: {len(model_data)}")
  794. # 按制造商统计
  795. manufacturers = {}
  796. for item in model_data:
  797. manufacturer = item.get('manufacturer', '未知')
  798. manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1
  799. print(f"制造商数: {len(manufacturers)}")
  800. print("\n制造商分布:")
  801. for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True):
  802. print(f" {manufacturer}: {count} 种机型")
  803. # 按额定容量统计
  804. capacities = {}
  805. for item in model_data:
  806. capacity = item.get('rated_capacity', 0)
  807. if capacity:
  808. capacity_range = f"{capacity}kW"
  809. capacities[capacity_range] = capacities.get(capacity_range, 0) + 1
  810. print("\n额定容量分布:")
  811. for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))):
  812. print(f" {capacity}: {count} 种机型")
  813. print("="*80)
  814. def print_calculation_summary(self, stats: Dict[str, Any]):
  815. """打印参数计算摘要"""
  816. print("\n" + "="*80)
  817. print("参数计算统计")
  818. print("="*80)
  819. print(f"总机型数: {stats.get('total_models', 0)}")
  820. print(f"成功计算的机型数: {stats.get('success_count', 0)}")
  821. print(f"失败的机型数: {stats.get('failed_count', 0)}")
  822. print(f"成功率: {stats.get('success_rate', 0):.1f}%")
  823. # 显示成功计算的部分结果
  824. success_results = [r for r in stats.get('results', []) if r.get('success')]
  825. if success_results:
  826. print(f"\n成功计算的机型示例 (前5个):")
  827. for i, result in enumerate(success_results[:5]):
  828. print(f" {i+1}. {result['no_model']}:")
  829. print(f" 叶轮转速: {result['rated_rotor_spd']:.2f} rpm")
  830. print(f" 发电机转速: {result['rated_gen_spd']:.2f} rpm")
  831. print(f" 传动比: {result['transmission_ratio']:.2f}")
  832. print(f" 类型: {result['turbine_type']}")
  833. print(f" 数据点数: {result['data_points']}")
  834. # 显示失败的部分原因
  835. failed_results = [r for r in stats.get('results', []) if not r.get('success')]
  836. if failed_results:
  837. print(f"\n失败的机型示例 (前5个):")
  838. for i, result in enumerate(failed_results[:5]):
  839. print(f" {i+1}. {result['no_model']}: {result.get('reason', '未知原因')}")
  840. print("="*80)
  841. def run_model_extraction_pipeline(self, recreate_table: bool = False, calculate_params: bool = True) -> bool:
  842. """
  843. 运行完整的机型数据提取和参数计算流程
  844. Args:
  845. recreate_table: 是否重新创建表
  846. calculate_params: 是否计算额定参数
  847. Returns:
  848. bool: 整个流程是否成功
  849. """
  850. try:
  851. logger.info("开始执行风机机型数据提取和参数计算流程...")
  852. # 步骤1: 检查或创建表
  853. if recreate_table:
  854. self.drop_model_turbine_table()
  855. self.create_model_turbine_table()
  856. else:
  857. if not self.check_table_exists():
  858. self.create_model_turbine_table()
  859. else:
  860. logger.info("info_model_turbine表已存在,将追加数据")
  861. # 检查并添加缺失的列
  862. self.add_missing_columns()
  863. # 步骤2: 从info_turbine表获取机型数据
  864. model_data = self.get_model_data()
  865. if not model_data:
  866. logger.error("未获取到机型数据,流程终止")
  867. return False
  868. # 步骤3: 打印摘要信息
  869. self.print_model_summary(model_data)
  870. # 步骤4: 插入数据到info_model_turbine表
  871. inserted_count = self.insert_model_data(model_data)
  872. # 步骤5: 计算额定参数
  873. if calculate_params:
  874. calculation_stats = self.calculate_and_update_all_parameters()
  875. self.print_calculation_summary(calculation_stats)
  876. # 步骤6: 获取并显示最终统计信息
  877. stats = self.get_model_turbine_stats()
  878. if stats:
  879. print("\n数据库统计信息:")
  880. print(f" 总机型数: {stats.get('total_models', 0)}")
  881. print(f" 制造商数: {stats.get('manufacturer_count', 0)}")
  882. print(f" 总风机数: {stats.get('total_turbines', 0)}")
  883. print(f" 额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW")
  884. print(f" 平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW")
  885. print(f" 已计算参数的机型数: {stats.get('calculated_models', 0)}")
  886. print(f" 使用的总数据点数: {stats.get('total_data_points', 0)}")
  887. # 显示风机类型分布
  888. type_dist = self.get_turbine_type_distribution()
  889. if type_dist:
  890. print("\n风机类型分布:")
  891. for item in type_dist:
  892. print(f" {item['turbine_type']}: {item['model_count']} 种机型, "
  893. f"{item['turbine_count']} 台风机")
  894. if item['avg_ratio']:
  895. print(f" 平均传动比: {item['avg_ratio']:.2f}, "
  896. f"平均叶轮转速: {item['avg_rotor_spd']:.2f} rpm, "
  897. f"平均发电机转速: {item['avg_gen_spd']:.2f} rpm")
  898. logger.info("风机机型数据提取和参数计算流程执行完成!")
  899. return True
  900. except Exception as e:
  901. logger.error(f"流程执行失败: {e}")
  902. import traceback
  903. traceback.print_exc()
  904. return False
  905. finally:
  906. # 关闭连接池
  907. if self.connection_pool:
  908. self.connection_pool.close_all()
  909. def main():
  910. """主函数"""
  911. # 数据库配置
  912. db_config = DatabaseConfig(
  913. # host="106.120.102.238",
  914. # port=44000,
  915. host="192.168.50.234",
  916. port=4000,
  917. user='root',
  918. password='123456',
  919. database='wind_data',
  920. charset='utf8mb4',
  921. max_connections=2,
  922. mem_quota=4 << 30 # 4GB
  923. )
  924. # 创建管理器实例
  925. manager = ModelTurbineManager(db_config)
  926. # 运行机型数据提取和参数计算流程
  927. # recreate_table=True 会删除并重新创建表
  928. # calculate_params=True 会计算额定参数
  929. success = manager.run_model_extraction_pipeline(
  930. recreate_table=False,
  931. calculate_params=True
  932. )
  933. if success:
  934. logger.info("风机机型数据提取和参数计算成功完成!")
  935. else:
  936. logger.error("风机机型数据提取和参数计算失败!")
  937. if __name__ == "__main__":
  938. main()