# database.py import pandas as pd from sqlalchemy import create_engine, inspect import traceback import logging from functools import lru_cache from typing import List, Dict, Optional import os import glob import pyarrow.parquet as pq import paramiko import tempfile class DataFetcher: def __init__(self, parquet_root: str = "/home/wzl/jupyter-data/wzl/parquet_output_standard"): # 数据库连接用于获取台账信息 self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show') # 设置parquet文件根目录(远程服务器路径) self.parquet_root = parquet_root # SSH连接配置 self.ssh_host = "192.168.50.241" self.ssh_username = "root" self.ssh_password = "Envisi0n@Scada" self.ssh_port = 22 self.ssh_client = None self.sftp_client = None def _ensure_ssh_connection(self): """确保SSH连接可用""" if self.ssh_client is None: try: self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_client.connect( hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_username, password=self.ssh_password, timeout=30 ) # 创建SFTP客户端 self.sftp_client = self.ssh_client.open_sftp() print("SSH连接成功") except Exception as e: print(f"SSH连接失败: {str(e)}") raise def _get_remote_file_list(self, remote_dir: str) -> List[str]: """获取远程目录下的文件列表""" try: self._ensure_ssh_connection() # 确保路径使用正斜杠 remote_dir = remote_dir.replace('\\', '/') files = self.sftp_client.listdir(remote_dir) return [f for f in files if f.endswith('.parquet')] except Exception as e: print(f"获取远程文件列表失败 {remote_dir}: {str(e)}") return [] def _remote_path_exists(self, remote_path: str) -> bool: """检查远程路径是否存在""" try: self._ensure_ssh_connection() # 确保路径使用正斜杠 remote_path = remote_path.replace('\\', '/') self.sftp_client.stat(remote_path) return True except: return False def _read_remote_parquet_direct(self, remote_path: str) -> Optional[pd.DataFrame]: """直接读取远程parquet文件到DataFrame""" try: self._ensure_ssh_connection() # 确保路径使用正斜杠 remote_path = remote_path.replace('\\', '/') # 使用临时文件 with tempfile.NamedTemporaryFile(suffix='.parquet', delete=False) as temp_file: temp_path = temp_file.name try: # 下载到临时文件 self.sftp_client.get(remote_path, temp_path) # 读取数据 df = pd.read_parquet(temp_path) return df finally: # 清理临时文件 if os.path.exists(temp_path): os.unlink(temp_path) except Exception as e: print(f"读取远程文件失败 {remote_path}: {str(e)}") return None def get_turbine_columns(self, windcode: str) -> List[str]: """ 获取指定风场数据表的所有列名 - 从远程parquet文件读取 """ try: # 修正风场配置 - 使用您提供的实际路径 windfarm_configs = { "7V2xSuma": { "mill_type": "GW140-2500", "farm_name": "新疆老君庙风电场_7V2xSuma" # 修正为实际目录名 }, "qwx0AWl6": { "mill_type": "MY5.0se-155", "farm_name": "福建显美风电场_qwx0AWl6" # 推测实际目录名 }, "FmowTOzB": { "mill_type": "EN182-6250", "farm_name": "内蒙海勒斯风电场_FmowTOzB" # 推测实际目录名 } } windfarm_config = windfarm_configs.get(windcode) if not windfarm_config: print(f"未找到风场 {windcode} 的配置") return [] # 构建远程parquet文件路径 - 使用正斜杠 remote_parquet_path = "/".join([ self.parquet_root, windfarm_config["mill_type"], windfarm_config["farm_name"] ]) print(f"远程路径: {remote_parquet_path}") # 检查远程路径是否存在 if not self._remote_path_exists(remote_parquet_path): print(f"远程路径不存在: {remote_parquet_path}") # 尝试列出根目录内容来调试 self._debug_remote_structure() return [] # 获取远程文件列表 parquet_files = self._get_remote_file_list(remote_parquet_path) if not parquet_files: print(f"在 {remote_parquet_path} 中未找到parquet文件") return [] # 使用第一个文件来读取列名 first_file = parquet_files[0] remote_file_path = "/".join([remote_parquet_path, first_file]) print(f"读取远程文件: {remote_file_path}") # 直接读取文件获取列名 df = self._read_remote_parquet_direct(remote_file_path) if df is not None: columns = df.columns.tolist() print(f"从远程parquet文件获取到 {len(columns)} 个列") return columns else: print("无法读取远程文件") return [] except Exception as e: print(f"Error fetching columns for {windcode}: {str(e)}") return [] def _debug_remote_structure(self): """调试远程目录结构""" try: print("\n=== 远程目录结构调试 ===") self._ensure_ssh_connection() # 检查根目录 root_files = self.sftp_client.listdir(self.parquet_root) print(f"根目录内容: {root_files}") # 检查GW140-2500目录 gw_path = "/".join([self.parquet_root, "GW140-2500"]) if self._remote_path_exists(gw_path): gw_files = self.sftp_client.listdir(gw_path) print(f"GW140-2500目录内容: {gw_files}") else: print("GW140-2500目录不存在") except Exception as e: print(f"目录结构调试失败: {str(e)}") # 其他方法保持不变... def get_turbines(self, windcode): """获取风场下所有风机信息""" demo_turbines = { "7V2xSuma": [ {"engine_code": "nuBwynYF", "mill_type_code": "GW140-2500", "engine_name": "nuBwynYF"}, {"engine_code": "tWmqBYXV", "mill_type_code": "GW140-2500", "engine_name": "tWmqBYXV"} ], "qwx0AWl6": [ {"engine_code": "0rQJxE72", "mill_type_code": "MY5.0se-155", "engine_name": "0rQJxE72"}, {"engine_code": "7UTZjdEu", "mill_type_code": "MY5.0se-155", "engine_name": "7UTZjdEu"} ], "FmowTOzB": [ {"engine_code": "Nj1RlyOI", "mill_type_code": "EN182-6250", "engine_name": "Nj1RlyOI"}, {"engine_code": "dg8IwCwy", "mill_type_code": "EN182-6250", "engine_name": "dg8IwCwy"} ] } turbines_data = demo_turbines.get(windcode, []) if turbines_data: return pd.DataFrame(turbines_data) else: print(f"未找到风场 {windcode} 的风机信息") return pd.DataFrame() def get_mill_type(self, mill_type_code): """获取机型驱动类型""" mill_type_mapping = { "GW140-2500": 1, # 直驱 "MY5.0se-155": 2, # 半直驱 "EN182-6250": 3, # 双馈 } return mill_type_mapping.get(mill_type_code) def fetch_turbine_data(self, windcode: str, engine_code: str, month: str, features: List[str]) -> pd.DataFrame: """ 获取指定月份风机数据 - 从远程parquet文件读取 """ try: # 修正风场配置 windfarm_configs = { "7V2xSuma": { "mill_type": "GW140-2500", "farm_name": "新疆老君庙风电场_7V2xSuma" # 修正为实际目录名 }, "qwx0AWl6": { "mill_type": "MY5.0se-155", "farm_name": "福建显美风电场_qwx0AWl6" }, "FmowTOzB": { "mill_type": "EN182-6250", "farm_name": "内蒙海勒斯风电场_FmowTOzB" } } windfarm_config = windfarm_configs.get(windcode) if not windfarm_config: print(f"未找到风场 {windcode} 的配置") return pd.DataFrame() # 构建完整远程文件路径 - 使用正斜杠 remote_file_path = "/".join([ self.parquet_root, windfarm_config["mill_type"], windfarm_config["farm_name"], f"{engine_code}.parquet" ]) print(f"正在读取远程parquet文件: {remote_file_path}") # 检查远程文件是否存在 if not self._remote_path_exists(remote_file_path): print(f"远程parquet文件不存在: {remote_file_path}") return pd.DataFrame() # 直接读取远程文件 df = self._read_remote_parquet_direct(remote_file_path) if df is None: print(f"无法读取远程文件: {remote_file_path}") return pd.DataFrame() # 过滤指定月份的数据 if 'time_stamp' in df.columns: df['time_stamp'] = pd.to_datetime(df['time_stamp']) year, month_num = month.split('-') start_date = pd.Timestamp(f"{year}-{month_num}-01") if month_num == '12': end_date = pd.Timestamp(f"{int(year)+1}-01-01") - pd.Timedelta(days=1) else: end_date = pd.Timestamp(f"{year}-{int(month_num)+1:02d}-01") - pd.Timedelta(days=1) mask = (df['time_stamp'] >= start_date) & (df['time_stamp'] <= end_date) df = df.loc[mask] print(f"时间过滤后数据量: {len(df)} 行") # 选择需要的特征列 available_features = [f for f in features if f in df.columns] if not available_features: print(f"无可用特征: {features}") return pd.DataFrame() result = df[available_features] print(f"从远程parquet文件加载数据: {len(result)} 行, {len(available_features)} 特征") return result except Exception as e: print(f"读取远程parquet文件失败: {str(e)}") import traceback traceback.print_exc() return pd.DataFrame() def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]: """批量获取风场下所有风机数据 - 从远程parquet文件读取""" try: # 获取风场所有风机 turbines = self.get_turbines(windcode) if turbines.empty: return {} all_data = {} for _, turbine in turbines.iterrows(): engine_code = turbine['engine_code'] print(f"正在获取风机 {engine_code} 的数据...") data = self.fetch_turbine_data(windcode, engine_code, month, features) if not data.empty: all_data[engine_code] = data else: print(f"风机 {engine_code} 无有效数据") print(f"批量加载完成: {len(all_data)} 台风机数据") return all_data except Exception as e: print(f"批量查询失败: {str(e)}") return {} def close_connection(self): """关闭SSH连接""" if self.sftp_client: self.sftp_client.close() self.sftp_client = None if self.ssh_client: self.ssh_client.close() self.ssh_client = None print("SSH连接已关闭") def __del__(self): """析构函数,关闭SSH连接""" self.close_connection()