123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import traceback
- import pandas as pd
- from sqlalchemy import inspect
- from functools import lru_cache
- from typing import List, Dict
- from app.config import dataBase
- from app.database import get_engine
- from app.logger import logger
- class DataFetcher:
- @lru_cache(maxsize=10)
- def get_turbine_columns_cached(self, windcode: str):
- return self.get_turbine_columns(windcode)
- def get_turbine_columns(self, windcode):
- """
- 获取指定风场数据表的所有列名
- :param windcode: 风场编号 (如 "WF001")
- :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
- """
- table_name = f"{windcode}_minute"
- try:
- inspector = inspect(get_engine(dataBase.DATA_DB))
- columns = inspector.get_columns(table_name)
- return [col['name'] for col in columns]
- except Exception as e:
- logger.error(f"Error fetching columns for {table_name}: {str(e)}")
- return []
- """
- 获取风场下所有风机信息
- 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
- """
- def get_turbines(self, windcode):
- query = f"""
- SELECT engine_code, mill_type_code,engine_name
- FROM wind_engine_group
- WHERE field_code = '{windcode}'
- """
- return pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
- """
- 获取机型驱动类型
- 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
- """
- def get_mill_type(self, mill_type_code):
- query = f"""
- SELECT curved_motion_type
- FROM wind_engine_mill
- WHERE mill_type_code = '{mill_type_code}'
- """
- result = pd.read_sql(query, get_engine(dataBase.PLATFORM_DB))
- return result.iloc[0, 0] if not result.empty else None
- """
- 获取风机时序数据
- 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
- """
- def fetch_turbine_data(self, windcode, engine_code, month, features):
- """获取指定月份风机数据(安全参数化版本)
- Args:
- windcode: 风场编号 (如 "WF001")
- engine_code: 风机编号 (如 "WT001")
- month: 月份字符串 (格式 "YYYY-MM")
- features: 需要查询的字段列表
- Returns:
- pd.DataFrame: 包含查询结果的DataFrame
- """
- try:
- # 将month格式从yyyy-mm转换为单独的年份和月份
- year, month = month.split('-')
- # 2. 验证特征列名安全性
- safe_features = []
- for feat in features:
- if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
- safe_features.append(f'`{feat}`') # 用反引号包裹
- else:
- logger.info(f"警告:忽略非法特征名 '{feat}'")
- if not safe_features:
- logger.info("错误:无有效特征列")
- return pd.DataFrame()
- # 3. 构建参数化查询
- table_name = f"{windcode}_minute"
- query = f"""
- SELECT `year`,`mont`, {','.join(safe_features)}
- FROM {table_name}
- WHERE `wind_turbine_number` = %s
- AND `year` = %s
- AND `month` = %s
- """
- logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year},{month})")
- # 4. 执行参数化查询
- return pd.read_sql(query, get_engine(dataBase.DATA_DB),
- params=(engine_code,year,month))
- except ValueError as e:
- logger.error(f"输入参数错误: {traceback.print_exc()}")
- return pd.DataFrame()
- except Exception as e:
- logger.error(f"数据库查询失败: {traceback.print_exc()}")
- traceback.print_exc()
- return pd.DataFrame()
- def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
- """批量获取风场下所有风机数据"""
- try:
- # 安全特征检查
- safe_features = [f'`{f}`' for f in features if isinstance(f, str) and all(c.isalnum() or c == '_' for c in f)]
- if not safe_features:
- return {}
- # 将month格式从yyyy-mm转换为单独的年份和月份
- year, month = month.split('-')
- table_name = f"{windcode}_minute"
- # 单次查询获取所有风机数据
- query = f"""
- SELECT `wind_turbine_number`, {','.join(safe_features)}
- FROM {table_name}
- WHERE `year` = %s AND `month` = %s
- """
- # 执行查询
- all_data = pd.read_sql(query, get_engine(dataBase.DATA_DB), params=(year, month))
- # 按风机分组
- return {turbine: group.drop(columns=['wind_turbine_number'])
- for turbine, group in all_data.groupby('wind_turbine_number')}
- except Exception as e:
- print(f"批量查询失败: {str(e)}")
- return {}
|