HealthDataFetcher.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import traceback
  2. import pandas as pd
  3. from sqlalchemy import inspect
  4. from functools import lru_cache
  5. from typing import List, Dict
  6. from app.config import dataBase
  7. from app.database import get_engine
  8. from app.logger import logger
  9. class DataFetcher:
  10. @lru_cache(maxsize=10)
  11. def get_turbine_columns_cached(self, windcode: str):
  12. return self.get_turbine_columns(windcode)
  13. def get_turbine_columns(self, windcode):
  14. """
  15. 获取指定风场数据表的所有列名
  16. :param windcode: 风场编号 (如 "WF001")
  17. :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
  18. """
  19. # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
  20. special_wind_farms = {
  21. #训练诺木洪风场时 删掉反引号
  22. "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
  23. # "WOF093400005": f"{windcode}-WOB000001_minute"
  24. }
  25. table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
  26. try:
  27. inspector = inspect(get_engine(dataBase.DATA_DB))
  28. columns = inspector.get_columns(table_name)
  29. return [col['name'] for col in columns]
  30. except Exception as e:
  31. logger.error(f"Error fetching columns for {table_name}: {str(e)}")
  32. return []
  33. """
  34. 获取风场下所有风机信息
  35. 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
  36. """
  37. def get_turbines(self, windcode):
  38. query = f"""
  39. SELECT engine_code, mill_type_code,engine_name
  40. FROM wind_engine_group
  41. WHERE field_code = '{windcode}'
  42. """
  43. return pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
  44. """
  45. 获取机型驱动类型
  46. 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
  47. """
  48. def get_mill_type(self, mill_type_code):
  49. query = f"""
  50. SELECT curved_motion_type
  51. FROM wind_engine_mill
  52. WHERE mill_type_code = '{mill_type_code}'
  53. """
  54. result = pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
  55. return result.iloc[0, 0] if not result.empty else None
  56. """
  57. 获取风机时序数据
  58. 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
  59. """
  60. def fetch_turbine_data(self, windcode, engine_code, month, features):
  61. """获取指定月份风机数据(安全参数化版本)
  62. Args:
  63. windcode: 风场编号 (如 "WF001")
  64. engine_code: 风机编号 (如 "WT001")
  65. month: 月份字符串 (格式 "YYYY-MM")
  66. features: 需要查询的字段列表
  67. Returns:
  68. pd.DataFrame: 包含查询结果的DataFrame
  69. """
  70. try:
  71. # 将month格式从yyyy-mm转换为单独的年份和月份
  72. year, month = month.split('-')
  73. # 2. 验证特征列名安全性
  74. safe_features = []
  75. for feat in features:
  76. if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
  77. safe_features.append(f'`{feat}`') # 用反引号包裹
  78. else:
  79. logger.info(f"警告:忽略非法特征名 '{feat}'")
  80. if not safe_features:
  81. logger.info("错误:无有效特征列")
  82. return pd.DataFrame()
  83. # 3. 构建参数化查询
  84. # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
  85. special_wind_farms = {
  86. "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
  87. #"WOF093400005": f"{windcode}-WOB000001_minute"
  88. }
  89. table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
  90. query = f"""
  91. SELECT `year`,`mont`, {','.join(safe_features)}
  92. FROM {table_name}
  93. WHERE `wind_turbine_number` = %s
  94. AND `year` = %s
  95. AND `month` = %s
  96. """
  97. logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year},{month})")
  98. # 4. 执行参数化查询
  99. return pd.read_sql(query, get_engine(dataBase.DATA_DB),
  100. params=(engine_code,year,month))
  101. except ValueError as e:
  102. logger.error(f"输入参数错误: {traceback.print_exc()}")
  103. return pd.DataFrame()
  104. except Exception as e:
  105. logger.error(f"数据库查询失败: {traceback.print_exc()}")
  106. traceback.print_exc()
  107. return pd.DataFrame()
  108. def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
  109. """批量获取风场下所有风机数据"""
  110. try:
  111. # 安全特征检查
  112. safe_features = [f'`{f}`' for f in features if isinstance(f, str) and all(c.isalnum() or c == '_' for c in f)]
  113. if not safe_features:
  114. return {}
  115. # 将month格式从yyyy-mm转换为单独的年份和月份
  116. year, month = month.split('-')
  117. # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
  118. special_wind_farms = {
  119. "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
  120. #"WOF093400005": f"{windcode}-WOB000001_minute"
  121. }
  122. table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
  123. # 单次查询获取所有风机数据
  124. query = f"""
  125. SELECT `wind_turbine_number`, {','.join(safe_features)}
  126. FROM {table_name}
  127. WHERE `year` = %s AND `month` = %s
  128. """
  129. # 执行查询
  130. all_data = pd.read_sql(query, get_engine(dataBase.DATA_DB), params=(year, month))
  131. # 按风机分组
  132. return {turbine: group.drop(columns=['wind_turbine_number'])
  133. for turbine, group in all_data.groupby('wind_turbine_number')}
  134. except Exception as e:
  135. print(f"批量查询失败: {str(e)}")
  136. return {}