123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- import pandas as pd
- from sqlalchemy import create_engine, inspect
- import traceback
- import logging
- from functools import lru_cache
- from typing import List, Dict, Optional
- class DataFetcher:
- def __init__(self):
- self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
- self.data_engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
- # self.show_engine = create_engine('mysql+pymysql://admin:admin123456@106.120.102.238:16306/energy_show')
- # self.data_engine = create_engine('mysql+pymysql://root:admin123456@106.120.102.238:10336/energy_data_prod')
- @lru_cache(maxsize=10)
- def get_turbine_columns_cached(self, windcode: str):
- return self.get_turbine_columns(windcode)
- def get_turbine_columns(self, windcode):
- """
- 获取指定风场数据表的所有列名
- :param windcode: 风场编号 (如 "WF001")
- :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
- """
- # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
- special_wind_farms = {
- #训练诺木洪风场时 删掉反引号
- "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
- # "WOF093400005": f"{windcode}-WOB000001_minute"
- }
- table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
- try:
- inspector = inspect(self.data_engine)
- print(f"正在查询表: {table_name}") # 调试用
- columns = inspector.get_columns(table_name)
- return [col['name'] for col in columns]
- except Exception as e:
- print(f"Error fetching columns for {table_name}: {str(e)}")
- return []
- """
- 获取风场下所有风机信息
- 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
- """
- def get_turbines(self, windcode):
- query = f"""
- SELECT engine_code, mill_type_code,engine_name
- FROM wind_engine_group
- WHERE field_code = '{windcode}'
- """
- return pd.read_sql(query, self.show_engine)
- """
- 获取机型驱动类型
- 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
- """
- def get_mill_type(self, mill_type_code):
- query = f"""
- SELECT curved_motion_type
- FROM wind_engine_mill
- WHERE mill_type_code = '{mill_type_code}'
- """
- result = pd.read_sql(query, self.show_engine)
- return result.iloc[0, 0] if not result.empty else None
- """
- 获取风机时序数据
- 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
- """
- def fetch_turbine_data(self, windcode, engine_code, month, features):
- """获取指定月份风机数据(安全参数化版本)
-
- Args:
- windcode: 风场编号 (如 "WF001")
- engine_code: 风机编号 (如 "WT001")
- month: 月份字符串 (格式 "YYYY-MM")
- features: 需要查询的字段列表
- Returns:
- pd.DataFrame: 包含查询结果的DataFrame
- """
- try:
- # 将month格式从yyyy-mm转换为单独的年份和月份
- year, month = month.split('-')
-
- # 2. 验证特征列名安全性
- safe_features = []
- for feat in features:
- if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
- safe_features.append(f'`{feat}`') # 用反引号包裹
- else:
- print(f"警告:忽略非法特征名 '{feat}'")
-
- if not safe_features:
- print("错误:无有效特征列")
- return pd.DataFrame()
- # 3. 构建参数化查询
- # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
- special_wind_farms = {
- "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
- #"WOF093400005": f"{windcode}-WOB000001_minute"
- }
- table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
- query = f"""
- SELECT `year`,`mont`, {','.join(safe_features)}
- FROM {table_name}
- WHERE `wind_turbine_number` = %s
- AND `year` = %s
- AND `month` = %s
- """
-
- print(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year},{month})")
-
- # 4. 执行参数化查询
- return pd.read_sql(query, self.data_engine,
- params=(engine_code, year,month))
-
- except ValueError as e:
- print(f"输入参数错误: {str(e)}")
- return pd.DataFrame()
- except Exception as e:
- print(f"数据库查询失败: {str(e)}")
- import traceback
- traceback.print_exc()
- return pd.DataFrame()
- def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
- """批量获取风场下所有风机数据"""
- try:
- # 安全特征检查
- safe_features = [f'`{f}`' for f in features if isinstance(f, str) and all(c.isalnum() or c == '_' for c in f)]
- if not safe_features:
- return {}
-
- # 将month格式从yyyy-mm转换为单独的年份和月份
- year, month = month.split('-')
- # 根据风场编号获取表名,特殊风场用反引号,其他风场不加反引号
- special_wind_farms = {
- "WOF093400005": f"`{windcode}-WOB000001_minute`" # 加上反引号
- #"WOF093400005": f"{windcode}-WOB000001_minute"
- }
- table_name = special_wind_farms.get(windcode, f"{windcode}_minute")
- # 单次查询获取所有风机数据
- query = f"""
- SELECT `wind_turbine_number`, {','.join(safe_features)}
- FROM {table_name}
- WHERE `year` = %s AND `month` = %s
- """
- # 执行查询
- all_data = pd.read_sql(query, self.data_engine, params=(year, month))
- # 按风机分组
- return {turbine: group.drop(columns=['wind_turbine_number'])
- for turbine, group in all_data.groupby('wind_turbine_number')}
-
- except Exception as e:
- print(f"批量查询失败: {str(e)}")
- return {}
|