import pandas as pd from sqlalchemy import create_engine, inspect import traceback import logging class DataFetcher: def __init__(self): self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show') self.data_engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod') def get_turbine_columns(self, windcode): """ 获取指定风场数据表的所有列名 :param windcode: 风场编号 (如 "WF001") :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"]) """ table_name = f"{windcode}_minute" try: inspector = inspect(self.data_engine) columns = inspector.get_columns(table_name) return [col['name'] for col in columns] except Exception as e: print(f"Error fetching columns for {table_name}: {str(e)}") return [] """ 获取风场下所有风机信息 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name """ def get_turbines(self, windcode): query = f""" SELECT engine_code, mill_type_code,engine_name FROM wind_engine_group WHERE field_code = '{windcode}' """ return pd.read_sql(query, self.show_engine) """ 获取机型驱动类型 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值 """ def get_mill_type(self, mill_type_code): query = f""" SELECT curved_motion_type FROM wind_engine_mill WHERE mill_type_code = '{mill_type_code}' """ result = pd.read_sql(query, self.show_engine) return result.iloc[0, 0] if not result.empty else None """ 获取风机时序数据 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条 """ # def fetch_turbine_data(self, windecode,engine_code, month, features): # table_name = f"{windecode}_minute" # month_start = f"{month}-01" # month_end = f"{month}-31" # 自动处理不同月份天数 # query = f""" # SELECT time_stamp, {','.join(features)} # FROM {table_name} # WHERE wind_turbine_number ='{engine_code}' # AND time_stamp BETWEEN '{month_start} 00:00:00' AND '{month_end} 23:59:59' # """ # print('sql语句') # print(query) # try: # return pd.read_sql(query, self.data_engine) # except: # print(traceback.print_exc()) # return pd.DataFrame() # def fetch_turbine_data(self, windecode,engine_code, month, features): # year_month_int = int(month.replace('-','')) # table_name = f"{windecode}_minute" # query = f""" # SELECT year_month, {','.join(features)} # FROM {table_name} # WHERE wind_turbine_number ='{engine_code}' # AND year_month = :year_month_int # """ # print('sql语句') # print(query) # try: # return pd.read_sql(query, self.data_engine) # except: # print(traceback.print_exc()) # return pd.DataFrame() def fetch_turbine_data(self, windcode, engine_code, month, features): """获取指定月份风机数据(安全参数化版本) Args: windcode: 风场编号 (如 "WF001") engine_code: 风机编号 (如 "WT001") month: 月份字符串 (格式 "YYYY-MM") features: 需要查询的字段列表 Returns: pd.DataFrame: 包含查询结果的DataFrame """ try: # 1. 转换并验证月份格式 year_month_int = int(month.replace('-', '')) if not 100001 <= year_month_int <= 999912: # 基本格式验证 raise ValueError("月份格式应为YYYY-MM") # 2. 验证特征列名安全性 safe_features = [] for feat in features: if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat): safe_features.append(f'`{feat}`') # 用反引号包裹 else: print(f"警告:忽略非法特征名 '{feat}'") if not safe_features: print("错误:无有效特征列") return pd.DataFrame() # 3. 构建参数化查询 query = f""" SELECT `year_month`, {','.join(safe_features)} FROM `{windcode}_minute` WHERE `wind_turbine_number` = %s AND `year_month` = %s """ print(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year_month_int})") # 4. 执行参数化查询 return pd.read_sql(query, self.data_engine, params=(engine_code, year_month_int)) except ValueError as e: print(f"输入参数错误: {str(e)}") return pd.DataFrame() except Exception as e: print(f"数据库查询失败: {str(e)}") import traceback traceback.print_exc() return pd.DataFrame() def get_turbine_columns(self, windcode): """获取指定风场数据表的所有列名""" table_name = f"{windcode}_minute" try: inspector = inspect(self.data_engine) columns = inspector.get_columns(table_name) return [col['name'] for col in columns] except Exception as e: print(f"获取列名失败: {str(e)}") return []