HealthDataFetcher.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import traceback
  2. import pandas as pd
  3. from sqlalchemy import inspect
  4. from functools import lru_cache
  5. from typing import List, Dict
  6. from app.config import dataBase
  7. from app.database import get_engine
  8. from app.logger import logger
  9. class DataFetcher:
  10. @lru_cache(maxsize=10)
  11. def get_turbine_columns_cached(self, windcode: str):
  12. return self.get_turbine_columns(windcode)
  13. def get_turbine_columns(self, windcode):
  14. """
  15. 获取指定风场数据表的所有列名
  16. :param windcode: 风场编号 (如 "WF001")
  17. :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
  18. """
  19. table_name = f"{windcode}_minute"
  20. try:
  21. inspector = inspect(get_engine(dataBase.DATA_DB))
  22. columns = inspector.get_columns(table_name)
  23. return [col['name'] for col in columns]
  24. except Exception as e:
  25. logger.error(f"Error fetching columns for {table_name}: {str(e)}")
  26. return []
  27. """
  28. 获取风场下所有风机信息
  29. 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
  30. """
  31. def get_turbines(self, windcode):
  32. query = f"""
  33. SELECT engine_code, mill_type_code,engine_name
  34. FROM wind_engine_group
  35. WHERE field_code = '{windcode}'
  36. """
  37. return pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
  38. """
  39. 获取机型驱动类型
  40. 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
  41. """
  42. def get_mill_type(self, mill_type_code):
  43. query = f"""
  44. SELECT curved_motion_type
  45. FROM wind_engine_mill
  46. WHERE mill_type_code = '{mill_type_code}'
  47. """
  48. result = pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
  49. return result.iloc[0, 0] if not result.empty else None
  50. """
  51. 获取风机时序数据
  52. 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
  53. """
  54. def fetch_turbine_data(self, windcode, engine_code, month, features):
  55. """获取指定月份风机数据(安全参数化版本)
  56. Args:
  57. windcode: 风场编号 (如 "WF001")
  58. engine_code: 风机编号 (如 "WT001")
  59. month: 月份字符串 (格式 "YYYY-MM")
  60. features: 需要查询的字段列表
  61. Returns:
  62. pd.DataFrame: 包含查询结果的DataFrame
  63. """
  64. try:
  65. # 将month格式从yyyy-mm转换为单独的年份和月份
  66. year, month = month.split('-')
  67. # 2. 验证特征列名安全性
  68. safe_features = []
  69. for feat in features:
  70. if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
  71. safe_features.append(f'`{feat}`') # 用反引号包裹
  72. else:
  73. logger.info(f"警告:忽略非法特征名 '{feat}'")
  74. if not safe_features:
  75. logger.info("错误:无有效特征列")
  76. return pd.DataFrame()
  77. # 3. 构建参数化查询
  78. table_name = f"{windcode}_minute"
  79. query = f"""
  80. SELECT `year`,`mont`, {','.join(safe_features)}
  81. FROM {table_name}
  82. WHERE `wind_turbine_number` = %s
  83. AND `year` = %s
  84. AND `month` = %s
  85. """
  86. logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year},{month})")
  87. # 4. 执行参数化查询
  88. return pd.read_sql(query, get_engine(dataBase.DATA_DB),
  89. params=(engine_code,year,month))
  90. except ValueError as e:
  91. logger.error(f"输入参数错误: {traceback.print_exc()}")
  92. return pd.DataFrame()
  93. except Exception as e:
  94. logger.error(f"数据库查询失败: {traceback.print_exc()}")
  95. traceback.print_exc()
  96. return pd.DataFrame()
  97. def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
  98. """批量获取风场下所有风机数据"""
  99. try:
  100. # 安全特征检查
  101. safe_features = [f'`{f}`' for f in features if isinstance(f, str) and all(c.isalnum() or c == '_' for c in f)]
  102. if not safe_features:
  103. return {}
  104. # 将month格式从yyyy-mm转换为单独的年份和月份
  105. year, month = month.split('-')
  106. table_name = f"{windcode}_minute"
  107. # 单次查询获取所有风机数据
  108. query = f"""
  109. SELECT `wind_turbine_number`, {','.join(safe_features)}
  110. FROM {table_name}
  111. WHERE `year` = %s AND `month` = %s
  112. """
  113. # 执行查询
  114. all_data = pd.read_sql(query, get_engine(dataBase.DATA_DB), params=(year, month))
  115. # 按风机分组
  116. return {turbine: group.drop(columns=['wind_turbine_number'])
  117. for turbine, group in all_data.groupby('wind_turbine_number')}
  118. except Exception as e:
  119. print(f"批量查询失败: {str(e)}")
  120. return {}