database.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import pandas as pd
  2. from sqlalchemy import create_engine, inspect
  3. import traceback
  4. import logging
  5. from functools import lru_cache
  6. from typing import List, Dict, Optional
  7. class DataFetcher:
  8. def __init__(self):
  9. self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
  10. self.data_engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
  11. # self.show_engine = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')
  12. # self.data_engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')
  13. @lru_cache(maxsize=10)
  14. def get_turbine_columns_cached(self, windcode: str):
  15. return self.get_turbine_columns(windcode)
  16. def get_turbine_columns(self, windcode):
  17. """
  18. 获取指定风场数据表的所有列名
  19. :param windcode: 风场编号 (如 "WF001")
  20. :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
  21. """
  22. # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
  23. special_wind_farms = {
  24. #训练诺木洪风场时 删掉反引号
  25. "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
  26. # "WOF093400005": f"{windcode}-WOB000001_minute"
  27. }
  28. table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
  29. try:
  30. inspector = inspect(self.data_engine)
  31. print(f"正在查询表: {table_name}") # 调试用
  32. columns = inspector.get_columns(table_name)
  33. return [col['name'] for col in columns]
  34. except Exception as e:
  35. print(f"Error fetching columns for {table_name}: {str(e)}")
  36. return []
  37. """
  38. 获取风场下所有风机信息
  39. 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
  40. """
  41. def get_turbines(self, windcode):
  42. query = f"""
  43. SELECT engine_code, mill_type_code,engine_name
  44. FROM wind_engine_group
  45. WHERE field_code = '{windcode}'
  46. """
  47. return pd.read_sql(query, self.show_engine)
  48. """
  49. 获取机型驱动类型
  50. 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
  51. """
  52. def get_mill_type(self, mill_type_code):
  53. query = f"""
  54. SELECT curved_motion_type
  55. FROM wind_engine_mill
  56. WHERE mill_type_code = '{mill_type_code}'
  57. """
  58. result = pd.read_sql(query, self.show_engine)
  59. return result.iloc[0, 0] if not result.empty else None
  60. """
  61. 获取风机时序数据
  62. 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
  63. """
  64. def fetch_turbine_data(self, windcode, engine_code, month, features):
  65. """获取指定月份风机数据(安全参数化版本)
  66. Args:
  67. windcode: 风场编号 (如 "WF001")
  68. engine_code: 风机编号 (如 "WT001")
  69. month: 月份字符串 (格式 "YYYY-MM")
  70. features: 需要查询的字段列表
  71. Returns:
  72. pd.DataFrame: 包含查询结果的DataFrame
  73. """
  74. try:
  75. # 将month格式从yyyy-mm转换为单独的年份和月份
  76. year, month = month.split('-')
  77. # 2. 验证特征列名安全性
  78. safe_features = []
  79. for feat in features:
  80. if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
  81. safe_features.append(f'`{feat}`') # 用反引号包裹
  82. else:
  83. print(f"警告:忽略非法特征名 '{feat}'")
  84. if not safe_features:
  85. print("错误:无有效特征列")
  86. return pd.DataFrame()
  87. # 3. 构建参数化查询
  88. # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
  89. special_wind_farms = {
  90. "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
  91. #"WOF093400005": f"{windcode}-WOB000001_minute"
  92. }
  93. table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
  94. query = f"""
  95. SELECT `year`,`mont`, {','.join(safe_features)}
  96. FROM {table_name}
  97. WHERE `wind_turbine_number` = %s
  98. AND `year` = %s
  99. AND `month` = %s
  100. """
  101. print(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year},{month})")
  102. # 4. 执行参数化查询
  103. return pd.read_sql(query, self.data_engine,
  104. params=(engine_code, year,month))
  105. except ValueError as e:
  106. print(f"输入参数错误: {str(e)}")
  107. return pd.DataFrame()
  108. except Exception as e:
  109. print(f"数据库查询失败: {str(e)}")
  110. import traceback
  111. traceback.print_exc()
  112. return pd.DataFrame()
  113. def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
  114. """批量获取风场下所有风机数据"""
  115. try:
  116. # 安全特征检查
  117. safe_features = [f'`{f}`' for f in features if isinstance(f, str) and all(c.isalnum() or c == '_' for c in f)]
  118. if not safe_features:
  119. return {}
  120. # 将month格式从yyyy-mm转换为单独的年份和月份
  121. year, month = month.split('-')
  122. # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
  123. special_wind_farms = {
  124. "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
  125. #"WOF093400005": f"{windcode}-WOB000001_minute"
  126. }
  127. table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
  128. # 单次查询获取所有风机数据
  129. query = f"""
  130. SELECT `wind_turbine_number`, {','.join(safe_features)}
  131. FROM {table_name}
  132. WHERE `year` = %s AND `month` = %s
  133. """
  134. # 执行查询
  135. all_data = pd.read_sql(query, self.data_engine, params=(year, month))
  136. # 按风机分组
  137. return {turbine: group.drop(columns=['wind_turbine_number'])
  138. for turbine, group in all_data.groupby('wind_turbine_number')}
  139. except Exception as e:
  140. print(f"批量查询失败: {str(e)}")
  141. return {}