import traceback import pandas as pd from sqlalchemy import inspect from functools import lru_cache from typing import List, Dict from app.config import dataBase from app.database import get_engine from app.logger import logger class DataFetcher: @lru_cache(maxsize=10) def get_turbine_columns_cached(self, windcode: str): return self.get_turbine_columns(windcode) def get_turbine_columns(self, windcode): """ 获取指定风场数据表的所有列名 :param windcode: 风场编号 (如 "WF001") :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"]) """ # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号 special_wind_farms = { #训练诺木洪风场时 删掉反引号 "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号 # "WOF093400005": f"{windcode}-WOB000001_minute" } table_name = special_wind_farms.get(windcode, f"{windcode}_minute") try: inspector = inspect(get_engine(dataBase.DATA_DB)) columns = inspector.get_columns(table_name) return [col['name'] for col in columns] except Exception as e: logger.error(f"Error fetching columns for {table_name}: {str(e)}") return [] """ 获取风场下所有风机信息 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name """ def get_turbines(self, windcode): query = f""" SELECT engine_code, mill_type_code,engine_name FROM wind_engine_group WHERE field_code = '{windcode}' """ return pd.read_sql(query, get_engine(dataBase.PLATFORM_DB)) """ 获取机型驱动类型 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值 """ def get_mill_type(self, mill_type_code): query = f""" SELECT curved_motion_type FROM wind_engine_mill WHERE mill_type_code = '{mill_type_code}' """ result = pd.read_sql(query, get_engine(dataBase.PLATFORM_DB)) return result.iloc[0, 0] if not result.empty else None """ 获取风机时序数据 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条 """ def fetch_turbine_data(self, windcode, engine_code, month, features): """获取指定月份风机数据(安全参数化版本) Args: windcode: 风场编号 (如 "WF001") engine_code: 风机编号 (如 "WT001") month: 月份字符串 (格式 "YYYY-MM") features: 需要查询的字段列表 Returns: pd.DataFrame: 包含查询结果的DataFrame """ try: # 将month格式从yyyy-mm转换为单独的年份和月份 year, month = month.split('-') # 2. 验证特征列名安全性 safe_features = [] for feat in features: if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat): safe_features.append(f'`{feat}`') # 用反引号包裹 else: logger.info(f"警告:忽略非法特征名 '{feat}'") if not safe_features: logger.info("错误:无有效特征列") return pd.DataFrame() # 3. 构建参数化查询 # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号 special_wind_farms = { "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号 #"WOF093400005": f"{windcode}-WOB000001_minute" } table_name = special_wind_farms.get(windcode, f"{windcode}_minute") query = f""" SELECT `year`,`mont`, {','.join(safe_features)} FROM {table_name} WHERE `wind_turbine_number` = %s AND `year` = %s AND `month` = %s """ logger.info(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year},{month})") # 4. 执行参数化查询 return pd.read_sql(query, get_engine(dataBase.DATA_DB), params=(engine_code,year,month)) except ValueError as e: logger.error(f"输入参数错误: {traceback.print_exc()}") return pd.DataFrame() except Exception as e: logger.error(f"数据库查询失败: {traceback.print_exc()}") traceback.print_exc() return pd.DataFrame() def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]: """批量获取风场下所有风机数据""" try: # 安全特征检查 safe_features = [f'`{f}`' for f in features if isinstance(f, str) and all(c.isalnum() or c == '_' for c in f)] if not safe_features: return {} # 将month格式从yyyy-mm转换为单独的年份和月份 year, month = month.split('-') # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号 special_wind_farms = { "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号 #"WOF093400005": f"{windcode}-WOB000001_minute" } table_name = special_wind_farms.get(windcode, f"{windcode}_minute") # 单次查询获取所有风机数据 query = f""" SELECT `wind_turbine_number`, {','.join(safe_features)} FROM {table_name} WHERE `year` = %s AND `month` = %s """ # 执行查询 all_data = pd.read_sql(query, get_engine(dataBase.DATA_DB), params=(year, month)) # 按风机分组 return {turbine: group.drop(columns=['wind_turbine_number']) for turbine, group in all_data.groupby('wind_turbine_number')} except Exception as e: print(f"批量查询失败: {str(e)}") return {}