123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- import pandas as pd
- from sqlalchemy import create_engine, inspect
- import traceback
- import logging
- class DataFetcher:
- def __init__(self):
- self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
- self.data_engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
-
- def get_turbine_columns(self, windcode):
- """
- 获取指定风场数据表的所有列名
- :param windcode: 风场编号 (如 "WF001")
- :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
- """
- table_name = f"{windcode}_minute"
- try:
- inspector = inspect(self.data_engine)
- columns = inspector.get_columns(table_name)
- return [col['name'] for col in columns]
- except Exception as e:
- print(f"Error fetching columns for {table_name}: {str(e)}")
- return []
- """
- 获取风场下所有风机信息
- 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
- """
- def get_turbines(self, windcode):
- query = f"""
- SELECT engine_code, mill_type_code,engine_name
- FROM wind_engine_group
- WHERE field_code = '{windcode}'
- """
- return pd.read_sql(query, self.show_engine)
- """
- 获取机型驱动类型
- 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
- """
- def get_mill_type(self, mill_type_code):
- query = f"""
- SELECT curved_motion_type
- FROM wind_engine_mill
- WHERE mill_type_code = '{mill_type_code}'
- """
- result = pd.read_sql(query, self.show_engine)
- return result.iloc[0, 0] if not result.empty else None
- """
- 获取风机时序数据
- 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
- """
- # def fetch_turbine_data(self, windecode,engine_code, month, features):
- # table_name = f"{windecode}_minute"
- # month_start = f"{month}-01"
- # month_end = f"{month}-31" # 自动处理不同月份天数
-
- # query = f"""
- # SELECT time_stamp, {','.join(features)}
- # FROM {table_name}
- # WHERE wind_turbine_number ='{engine_code}'
- # AND time_stamp BETWEEN '{month_start} 00:00:00' AND '{month_end} 23:59:59'
- # """
- # print('sql语句')
- # print(query)
- # try:
- # return pd.read_sql(query, self.data_engine)
- # except:
- # print(traceback.print_exc())
- # return pd.DataFrame()
-
- # def fetch_turbine_data(self, windecode,engine_code, month, features):
- # year_month_int = int(month.replace('-',''))
- # table_name = f"{windecode}_minute"
-
- # query = f"""
- # SELECT year_month, {','.join(features)}
- # FROM {table_name}
- # WHERE wind_turbine_number ='{engine_code}'
- # AND year_month = :year_month_int
- # """
- # print('sql语句')
- # print(query)
- # try:
- # return pd.read_sql(query, self.data_engine)
- # except:
- # print(traceback.print_exc())
- # return pd.DataFrame()
- def fetch_turbine_data(self, windcode, engine_code, month, features):
- """获取指定月份风机数据(安全参数化版本)
-
- Args:
- windcode: 风场编号 (如 "WF001")
- engine_code: 风机编号 (如 "WT001")
- month: 月份字符串 (格式 "YYYY-MM")
- features: 需要查询的字段列表
- Returns:
- pd.DataFrame: 包含查询结果的DataFrame
- """
- try:
- # 1. 转换并验证月份格式
- year_month_int = int(month.replace('-', ''))
- if not 100001 <= year_month_int <= 999912: # 基本格式验证
- raise ValueError("月份格式应为YYYY-MM")
-
- # 2. 验证特征列名安全性
- safe_features = []
- for feat in features:
- if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
- safe_features.append(f'`{feat}`') # 用反引号包裹
- else:
- print(f"警告:忽略非法特征名 '{feat}'")
-
- if not safe_features:
- print("错误:无有效特征列")
- return pd.DataFrame()
- # 3. 构建参数化查询
- query = f"""
- SELECT `year_month`, {','.join(safe_features)}
- FROM `{windcode}_minute`
- WHERE `wind_turbine_number` = %s
- AND `year_month` = %s
- """
-
- print(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year_month_int})")
-
- # 4. 执行参数化查询
- return pd.read_sql(query, self.data_engine,
- params=(engine_code, year_month_int))
-
- except ValueError as e:
- print(f"输入参数错误: {str(e)}")
- return pd.DataFrame()
- except Exception as e:
- print(f"数据库查询失败: {str(e)}")
- import traceback
- traceback.print_exc()
- return pd.DataFrame()
- def get_turbine_columns(self, windcode):
- """获取指定风场数据表的所有列名"""
- table_name = f"{windcode}_minute"
- try:
- inspector = inspect(self.data_engine)
- columns = inspector.get_columns(table_name)
- return [col['name'] for col in columns]
- except Exception as e:
- print(f"获取列名失败: {str(e)}")
- return []
|