info_model_turbine_v1.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. import pymysql
  2. from typing import List, Dict, Any, Optional
  3. import logging
  4. from datetime import datetime
  5. # 配置日志
  6. logging.basicConfig(
  7. level=logging.INFO,
  8. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  9. )
  10. logger = logging.getLogger(__name__)
  11. # SQL语句定义
  12. CREATE_MODEL_TURBINE_TABLE_SQL = """
  13. CREATE TABLE IF NOT EXISTS info_model_turbine (
  14. id INT AUTO_INCREMENT PRIMARY KEY,
  15. no_model VARCHAR(255) NOT NULL COMMENT '机型唯一标识',
  16. model VARCHAR(100) COMMENT '机型',
  17. manufacturer VARCHAR(100) COMMENT '制造商',
  18. rated_capacity INT COMMENT '额定容量(kW)',
  19. cut_in_wind_speed DECIMAL(5, 2) COMMENT '切入风速(m/s)',
  20. cut_out_wind_speed DECIMAL(5, 2) COMMENT '切出风速(m/s)',
  21. rotor_diameter INT COMMENT '叶轮直径(m)',
  22. hub_height DECIMAL(10, 2) COMMENT '轮毂高度(m)',
  23. turbine_count INT DEFAULT 0 COMMENT '该机型风机数量',
  24. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  25. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  26. UNIQUE KEY idx_no_model (no_model),
  27. INDEX idx_model (model),
  28. INDEX idx_manufacturer (manufacturer),
  29. INDEX idx_rated_capacity (rated_capacity)
  30. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='风机机型信息表';
  31. """
  32. SELECT_MODEL_DATA_SQL = """
  33. SELECT
  34. CONCAT(
  35. IFNULL(model, ''),
  36. '-',
  37. IFNULL(cut_in_wind_speed, ''),
  38. '-',
  39. IFNULL(cut_out_wind_speed, ''),
  40. '-',
  41. IFNULL(hub_height, '')
  42. ) AS no_model,
  43. model,
  44. manufacturer,
  45. rated_capacity,
  46. cut_in_wind_speed,
  47. cut_out_wind_speed,
  48. rotor_diameter,
  49. hub_height,
  50. COUNT(*) AS turbine_count
  51. FROM info_turbine
  52. WHERE model IS NOT NULL
  53. GROUP BY model, manufacturer, rated_capacity, cut_in_wind_speed, cut_out_wind_speed, rotor_diameter, hub_height
  54. ORDER BY model, manufacturer, rated_capacity;
  55. """
  56. DROP_MODEL_TURBINE_TABLE_SQL = "DROP TABLE IF EXISTS info_model_turbine"
  57. CHECK_TABLE_EXISTS_SQL = """
  58. SELECT COUNT(*) as table_exists
  59. FROM information_schema.tables
  60. WHERE table_schema = %s AND table_name = 'info_model_turbine'
  61. """
  62. class DatabaseConfig:
  63. """数据库配置类"""
  64. def __init__(self, host='192.168.50.234', port=4000, user='root',
  65. password='123456', database='wind_data', charset='utf8mb4'):
  66. self.host = host
  67. self.port = port
  68. self.user = user
  69. self.password = password
  70. self.database = database
  71. self.charset = charset
  72. def get_connection_config(self) -> Dict[str, Any]:
  73. """获取数据库连接配置"""
  74. return {
  75. 'host': self.host,
  76. 'port': self.port,
  77. 'user': self.user,
  78. 'password': self.password,
  79. 'database': self.database,
  80. 'charset': self.charset,
  81. 'cursorclass': pymysql.cursors.DictCursor # 返回字典格式的结果
  82. }
  83. class ModelTurbineManager:
  84. """风机机型信息管理器"""
  85. def __init__(self, db_config: DatabaseConfig):
  86. """
  87. 初始化管理器
  88. Args:
  89. db_config: 数据库配置对象
  90. """
  91. self.db_config = db_config
  92. self.connection = None
  93. def connect(self) -> pymysql.connections.Connection:
  94. """创建数据库连接"""
  95. try:
  96. self.connection = pymysql.connect(**self.db_config.get_connection_config())
  97. logger.info(f"成功连接到数据库: {self.db_config.host}:{self.db_config.port}/{self.db_config.database}")
  98. return self.connection
  99. except Exception as e:
  100. logger.error(f"数据库连接失败: {e}")
  101. raise
  102. def close(self):
  103. """关闭数据库连接"""
  104. if self.connection:
  105. self.connection.close()
  106. logger.info("数据库连接已关闭")
  107. def execute_query(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
  108. """
  109. 执行查询语句
  110. Args:
  111. sql: SQL查询语句
  112. params: SQL参数
  113. Returns:
  114. 查询结果列表
  115. """
  116. if not self.connection:
  117. self.connect()
  118. try:
  119. with self.connection.cursor() as cursor:
  120. cursor.execute(sql, params)
  121. result = cursor.fetchall()
  122. logger.debug(f"查询执行成功,返回 {len(result)} 条记录")
  123. return result
  124. except Exception as e:
  125. logger.error(f"查询执行失败: {e}")
  126. raise
  127. def execute_update(self, sql: str, params: Optional[tuple] = None) -> int:
  128. """
  129. 执行更新语句(INSERT, UPDATE, DELETE)
  130. Args:
  131. sql: SQL更新语句
  132. params: SQL参数
  133. Returns:
  134. 影响的行数
  135. """
  136. if not self.connection:
  137. self.connect()
  138. try:
  139. with self.connection.cursor() as cursor:
  140. affected_rows = cursor.execute(sql, params)
  141. self.connection.commit()
  142. logger.debug(f"更新执行成功,影响 {affected_rows} 行")
  143. return affected_rows
  144. except Exception as e:
  145. self.connection.rollback()
  146. logger.error(f"更新执行失败: {e}")
  147. raise
  148. def check_table_exists(self, table_name: str = 'info_model_turbine') -> bool:
  149. """
  150. 检查表是否存在
  151. Args:
  152. table_name: 表名
  153. Returns:
  154. bool: 表是否存在
  155. """
  156. try:
  157. result = self.execute_query(CHECK_TABLE_EXISTS_SQL, (self.db_config.database,))
  158. return result[0]['table_exists'] > 0
  159. except Exception as e:
  160. logger.error(f"检查表存在性失败: {e}")
  161. return False
  162. def create_model_turbine_table(self) -> bool:
  163. """
  164. 创建风机机型信息表
  165. Returns:
  166. bool: 是否成功创建表
  167. """
  168. try:
  169. logger.info("开始创建风机机型信息表...")
  170. self.execute_update(CREATE_MODEL_TURBINE_TABLE_SQL)
  171. logger.info("风机机型信息表创建成功")
  172. return True
  173. except Exception as e:
  174. logger.error(f"创建风机机型信息表失败: {e}")
  175. return False
  176. def drop_model_turbine_table(self) -> bool:
  177. """
  178. 删除风机机型信息表
  179. Returns:
  180. bool: 是否成功删除表
  181. """
  182. try:
  183. logger.info("开始删除风机机型信息表...")
  184. self.execute_update(DROP_MODEL_TURBINE_TABLE_SQL)
  185. logger.info("风机机型信息表删除成功")
  186. return True
  187. except Exception as e:
  188. logger.error(f"删除风机机型信息表失败: {e}")
  189. return False
  190. def get_model_data(self) -> List[Dict[str, Any]]:
  191. """
  192. 从info_turbine表获取机型分组数据
  193. Returns:
  194. 机型数据列表
  195. """
  196. try:
  197. logger.info("开始从info_turbine表查询机型数据...")
  198. data = self.execute_query(SELECT_MODEL_DATA_SQL)
  199. logger.info(f"成功查询到 {len(data)} 条机型记录")
  200. return data
  201. except Exception as e:
  202. logger.error(f"查询机型数据失败: {e}")
  203. return []
  204. def insert_model_data(self, model_data: List[Dict[str, Any]]) -> int:
  205. """
  206. 将机型数据插入到info_model_turbine表
  207. Args:
  208. model_data: 机型数据列表
  209. Returns:
  210. int: 成功插入的记录数
  211. """
  212. if not model_data:
  213. logger.warning("没有数据需要插入")
  214. return 0
  215. insert_sql = """
  216. INSERT INTO info_model_turbine
  217. (no_model, model, manufacturer, rated_capacity, cut_in_wind_speed,
  218. cut_out_wind_speed, rotor_diameter, hub_height, turbine_count)
  219. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  220. ON DUPLICATE KEY UPDATE
  221. turbine_count = VALUES(turbine_count),
  222. updated_at = CURRENT_TIMESTAMP
  223. """
  224. try:
  225. logger.info(f"开始向info_model_turbine表插入 {len(model_data)} 条记录...")
  226. # 准备插入数据
  227. insert_data = []
  228. for item in model_data:
  229. insert_data.append((
  230. item['no_model'],
  231. item['model'],
  232. item['manufacturer'],
  233. item['rated_capacity'],
  234. item['cut_in_wind_speed'],
  235. item['cut_out_wind_speed'],
  236. item['rotor_diameter'],
  237. item['hub_height'],
  238. item.get('turbine_count', 1) # 使用查询中的count值
  239. ))
  240. # 批量插入数据
  241. with self.connection.cursor() as cursor:
  242. affected_rows = cursor.executemany(insert_sql, insert_data)
  243. self.connection.commit()
  244. logger.info(f"成功插入/更新 {affected_rows} 条记录到info_model_turbine表")
  245. return affected_rows
  246. except Exception as e:
  247. self.connection.rollback()
  248. logger.error(f"插入机型数据失败: {e}")
  249. raise
  250. def get_model_turbine_stats(self) -> Dict[str, Any]:
  251. """
  252. 获取info_model_turbine表的统计信息
  253. Returns:
  254. 统计信息字典
  255. """
  256. try:
  257. # 查询统计信息
  258. stats_sql = """
  259. SELECT
  260. COUNT(*) as total_models,
  261. COUNT(DISTINCT manufacturer) as manufacturer_count,
  262. COUNT(DISTINCT model) as model_count,
  263. MIN(rated_capacity) as min_capacity,
  264. MAX(rated_capacity) as max_capacity,
  265. AVG(rated_capacity) as avg_capacity,
  266. SUM(turbine_count) as total_turbines
  267. FROM info_model_turbine
  268. """
  269. result = self.execute_query(stats_sql)
  270. return result[0] if result else {}
  271. except Exception as e:
  272. logger.error(f"获取统计信息失败: {e}")
  273. return {}
  274. def print_model_summary(self, model_data: List[Dict[str, Any]]):
  275. """
  276. 打印机型数据摘要
  277. Args:
  278. model_data: 机型数据列表
  279. """
  280. if not model_data:
  281. logger.info("没有机型数据")
  282. return
  283. print("\n" + "="*80)
  284. print("风机机型数据摘要")
  285. print("="*80)
  286. print(f"总机型数: {len(model_data)}")
  287. # 按制造商统计
  288. manufacturers = {}
  289. for item in model_data:
  290. manufacturer = item.get('manufacturer', '未知')
  291. manufacturers[manufacturer] = manufacturers.get(manufacturer, 0) + 1
  292. print(f"制造商数: {len(manufacturers)}")
  293. print("\n制造商分布:")
  294. for manufacturer, count in sorted(manufacturers.items(), key=lambda x: x[1], reverse=True):
  295. print(f" {manufacturer}: {count} 种机型")
  296. # 按额定容量统计
  297. capacities = {}
  298. for item in model_data:
  299. capacity = item.get('rated_capacity', 0)
  300. if capacity:
  301. capacity_range = f"{capacity}kW"
  302. capacities[capacity_range] = capacities.get(capacity_range, 0) + 1
  303. print("\n额定容量分布:")
  304. for capacity, count in sorted(capacities.items(), key=lambda x: int(x[0].replace('kW', ''))):
  305. print(f" {capacity}: {count} 种机型")
  306. print("="*80)
  307. def run_model_extraction_pipeline(self, recreate_table: bool = False) -> bool:
  308. """
  309. 运行完整的机型数据提取和入库流程
  310. Args:
  311. recreate_table: 是否重新创建表
  312. Returns:
  313. bool: 整个流程是否成功
  314. """
  315. try:
  316. logger.info("开始执行风机机型数据提取流程...")
  317. # 步骤1: 连接数据库
  318. self.connect()
  319. # 步骤2: 检查或创建表
  320. if recreate_table:
  321. self.drop_model_turbine_table()
  322. self.create_model_turbine_table()
  323. else:
  324. if not self.check_table_exists():
  325. self.create_model_turbine_table()
  326. else:
  327. logger.info("info_model_turbine表已存在,将追加数据")
  328. # 步骤3: 从info_turbine表获取机型数据
  329. model_data = self.get_model_data()
  330. if not model_data:
  331. logger.error("未获取到机型数据,流程终止")
  332. return False
  333. # 步骤4: 打印摘要信息
  334. self.print_model_summary(model_data)
  335. # 步骤5: 插入数据到info_model_turbine表
  336. inserted_count = self.insert_model_data(model_data)
  337. # 步骤6: 获取并显示统计信息
  338. stats = self.get_model_turbine_stats()
  339. if stats:
  340. print("\n数据库统计信息:")
  341. print(f" 总机型数: {stats.get('total_models', 0)}")
  342. print(f" 制造商数: {stats.get('manufacturer_count', 0)}")
  343. print(f" 总风机数: {stats.get('total_turbines', 0)}")
  344. print(f" 额定容量范围: {stats.get('min_capacity', 0)}kW - {stats.get('max_capacity', 0)}kW")
  345. print(f" 平均额定容量: {round(stats.get('avg_capacity', 0), 1)}kW")
  346. logger.info("风机机型数据提取流程执行完成!")
  347. return True
  348. except Exception as e:
  349. logger.error(f"机型数据提取流程执行失败: {e}")
  350. return False
  351. finally:
  352. # 关闭连接
  353. self.close()
  354. def main():
  355. """主函数"""
  356. # 数据库配置
  357. db_config = DatabaseConfig(
  358. host='192.168.50.234',
  359. port=4000,
  360. user='root',
  361. password='123456',
  362. database='wind_data',
  363. charset='utf8mb4'
  364. )
  365. # 创建管理器实例
  366. manager = ModelTurbineManager(db_config)
  367. # 运行机型数据提取流程
  368. # 设置 recreate_table=True 会删除并重新创建表
  369. # 设置 recreate_table=False 会追加数据(使用ON DUPLICATE KEY UPDATE)
  370. success = manager.run_model_extraction_pipeline(
  371. recreate_table=False # 设置为True会重新创建表
  372. )
  373. if success:
  374. logger.info("风机机型数据提取成功完成!")
  375. else:
  376. logger.error("风机机型数据提取失败!")
  377. if __name__ == "__main__":
  378. main()