import traceback import pandas as pd from sqlalchemy import inspect from app.config import dataBase from app.database import get_engine from app.logger import logger class DataFetcher: def get_turbine_columns(self, windcode): """ 获取指定风场数据表的所有列名 :param windcode: 风场编号 (如 "WF001") :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"]) """ table_name = f"{windcode}_minute" try: inspector = inspect(get_engine(dataBase.DATA_DB)) columns = inspector.get_columns(table_name) return [col['name'] for col in columns] except Exception as e: logger.error(f"Error fetching columns for {table_name}: {str(e)}") return [] """ 获取风场下所有风机信息 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name """ def get_turbines(self, windcode): query = f""" SELECT engine_code, mill_type_code,engine_name FROM wind_engine_group WHERE field_code = '{windcode}' """ return pd.read_sql(query, get_engine(dataBase.PLATFORM_DB)) """ 获取机型驱动类型 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值 """ def get_mill_type(self, mill_type_code): query = f""" SELECT curved_motion_type FROM wind_engine_mill WHERE mill_type_code = '{mill_type_code}' """ result = pd.read_sql(query, get_engine(dataBase.PLATFORM_DB)) return result.iloc[0, 0] if not result.empty else None """ 获取风机时序数据 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条 """ def fetch_turbine_data(self, windcode, engine_code, month, features): """获取指定月份风机数据(安全参数化版本) Args: windcode: 风场编号 (如 "WF001") engine_code: 风机编号 (如 "WT001") month: 月份字符串 (格式 "YYYY-MM") features: 需要查询的字段列表 Returns: pd.DataFrame: 包含查询结果的DataFrame """ try: # 1. 转换并验证月份格式 year_month_int = int(month.replace('-', '')) if not 100001 <= year_month_int <= 999912: # 基本格式验证 raise ValueError("月份格式应为YYYY-MM") # 2. 验证特征列名安全性 safe_features = [] for feat in features: if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat): safe_features.append(f'`{feat}`') # 用反引号包裹 else: logger.info(f"警告:忽略非法特征名 '{feat}'") if not safe_features: logger.info("错误:无有效特征列") return pd.DataFrame() # 3. 构建参数化查询 query = f""" SELECT `year_month`, {','.join(safe_features)} FROM `{windcode}_minute` WHERE `wind_turbine_number` = %s AND `year_month` = %s """ logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year_month_int})") # 4. 执行参数化查询 return pd.read_sql(query, get_engine(dataBase.DATA_DB), params=(engine_code, year_month_int)) except ValueError as e: logger.error(f"输入参数错误: {traceback.print_exc()}") return pd.DataFrame() except Exception as e: logger.error(f"数据库查询失败: {traceback.print_exc()}") return pd.DataFrame() def get_turbine_columns(self, windcode): """获取指定风场数据表的所有列名""" table_name = f"{windcode}_minute" try: inspector = inspect(get_engine(dataBase.DATA_DB)) columns = inspector.get_columns(table_name) return [col['name'] for col in columns] except Exception as e: logger.error(f"获取列名失败: {traceback.print_exc()}") return []