HealthDataFetcher.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import traceback
  2. import pandas as pd
  3. from sqlalchemy import inspect
  4. from app.config import dataBase
  5. from app.database import get_engine
  6. from app.logger import logger
  7. class DataFetcher:
  8. def get_turbine_columns(self, windcode):
  9. """
  10. 获取指定风场数据表的所有列名
  11. :param windcode: 风场编号 (如 "WF001")
  12. :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
  13. """
  14. table_name = f"{windcode}_minute"
  15. try:
  16. inspector = inspect(get_engine(dataBase.DATA_DB))
  17. columns = inspector.get_columns(table_name)
  18. return [col['name'] for col in columns]
  19. except Exception as e:
  20. logger.error(f"Error fetching columns for {table_name}: {str(e)}")
  21. return []
  22. """
  23. 获取风场下所有风机信息
  24. 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
  25. """
  26. def get_turbines(self, windcode):
  27. query = f"""
  28. SELECT engine_code, mill_type_code,engine_name
  29. FROM wind_engine_group
  30. WHERE field_code = '{windcode}'
  31. """
  32. return pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
  33. """
  34. 获取机型驱动类型
  35. 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
  36. """
  37. def get_mill_type(self, mill_type_code):
  38. query = f"""
  39. SELECT curved_motion_type
  40. FROM wind_engine_mill
  41. WHERE mill_type_code = '{mill_type_code}'
  42. """
  43. result = pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
  44. return result.iloc[0, 0] if not result.empty else None
  45. """
  46. 获取风机时序数据
  47. 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
  48. """
  49. def fetch_turbine_data(self, windcode, engine_code, month, features):
  50. """获取指定月份风机数据(安全参数化版本)
  51. Args:
  52. windcode: 风场编号 (如 "WF001")
  53. engine_code: 风机编号 (如 "WT001")
  54. month: 月份字符串 (格式 "YYYY-MM")
  55. features: 需要查询的字段列表
  56. Returns:
  57. pd.DataFrame: 包含查询结果的DataFrame
  58. """
  59. try:
  60. # 1. 转换并验证月份格式
  61. year_month_int = int(month.replace('-', ''))
  62. if not 100001 <= year_month_int <= 999912: # 基本格式验证
  63. raise ValueError("月份格式应为YYYY-MM")
  64. # 2. 验证特征列名安全性
  65. safe_features = []
  66. for feat in features:
  67. if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
  68. safe_features.append(f'`{feat}`') # 用反引号包裹
  69. else:
  70. logger.info(f"警告:忽略非法特征名 '{feat}'")
  71. if not safe_features:
  72. logger.info("错误:无有效特征列")
  73. return pd.DataFrame()
  74. # 3. 构建参数化查询
  75. query = f"""
  76. SELECT `year_month`, {','.join(safe_features)}
  77. FROM `{windcode}_minute`
  78. WHERE `wind_turbine_number` = %s
  79. AND `year_month` = %s
  80. """
  81. logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year_month_int})")
  82. # 4. 执行参数化查询
  83. return pd.read_sql(query, get_engine(dataBase.DATA_DB),
  84. params=(engine_code, year_month_int))
  85. except ValueError as e:
  86. logger.error(f"输入参数错误: {traceback.print_exc()}")
  87. return pd.DataFrame()
  88. except Exception as e:
  89. logger.error(f"数据库查询失败: {traceback.print_exc()}")
  90. return pd.DataFrame()
  91. def get_turbine_columns(self, windcode):
  92. """获取指定风场数据表的所有列名"""
  93. table_name = f"{windcode}_minute"
  94. try:
  95. inspector = inspect(get_engine(dataBase.DATA_DB))
  96. columns = inspector.get_columns(table_name)
  97. return [col['name'] for col in columns]
  98. except Exception as e:
  99. logger.error(f"获取列名失败: {traceback.print_exc()}")
  100. return []