| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- # database.py
- import pandas as pd
- from sqlalchemy import create_engine, inspect
- import traceback
- import logging
- from functools import lru_cache
- from typing import List, Dict, Optional
- import os
- import glob
- import pyarrow.parquet as pq
- import paramiko
- import tempfile
- class DataFetcher:
- def __init__(self, parquet_root: str = "/home/wzl/jupyter-data/wzl/parquet_output_standard"):
- # 数据库连接用于获取台账信息
- self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
- # 设置parquet文件根目录(远程服务器路径)
- self.parquet_root = parquet_root
-
- # SSH连接配置
- self.ssh_host = "192.168.50.241"
- self.ssh_username = "root"
- self.ssh_password = "Envisi0n@Scada"
- self.ssh_port = 22
- self.ssh_client = None
- self.sftp_client = None
-
- def _ensure_ssh_connection(self):
- """确保SSH连接可用"""
- if self.ssh_client is None:
- try:
- self.ssh_client = paramiko.SSHClient()
- self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.ssh_client.connect(
- hostname=self.ssh_host,
- port=self.ssh_port,
- username=self.ssh_username,
- password=self.ssh_password,
- timeout=30
- )
- # 创建SFTP客户端
- self.sftp_client = self.ssh_client.open_sftp()
- print("SSH连接成功")
- except Exception as e:
- print(f"SSH连接失败: {str(e)}")
- raise
-
- def _get_remote_file_list(self, remote_dir: str) -> List[str]:
- """获取远程目录下的文件列表"""
- try:
- self._ensure_ssh_connection()
- # 确保路径使用正斜杠
- remote_dir = remote_dir.replace('\\', '/')
- files = self.sftp_client.listdir(remote_dir)
- return [f for f in files if f.endswith('.parquet')]
- except Exception as e:
- print(f"获取远程文件列表失败 {remote_dir}: {str(e)}")
- return []
-
- def _remote_path_exists(self, remote_path: str) -> bool:
- """检查远程路径是否存在"""
- try:
- self._ensure_ssh_connection()
- # 确保路径使用正斜杠
- remote_path = remote_path.replace('\\', '/')
- self.sftp_client.stat(remote_path)
- return True
- except:
- return False
- def _read_remote_parquet_direct(self, remote_path: str) -> Optional[pd.DataFrame]:
- """直接读取远程parquet文件到DataFrame"""
- try:
- self._ensure_ssh_connection()
- # 确保路径使用正斜杠
- remote_path = remote_path.replace('\\', '/')
-
- # 使用临时文件
- with tempfile.NamedTemporaryFile(suffix='.parquet', delete=False) as temp_file:
- temp_path = temp_file.name
-
- try:
- # 下载到临时文件
- self.sftp_client.get(remote_path, temp_path)
- # 读取数据
- df = pd.read_parquet(temp_path)
- return df
- finally:
- # 清理临时文件
- if os.path.exists(temp_path):
- os.unlink(temp_path)
-
- except Exception as e:
- print(f"读取远程文件失败 {remote_path}: {str(e)}")
- return None
- def get_turbine_columns(self, windcode: str) -> List[str]:
- """
- 获取指定风场数据表的所有列名 - 从远程parquet文件读取
- """
- try:
- # 修正风场配置 - 使用您提供的实际路径
- windfarm_configs = {
- "7V2xSuma": {
- "mill_type": "GW140-2500",
- "farm_name": "新疆老君庙风电场_7V2xSuma" # 修正为实际目录名
- },
- "qwx0AWl6": {
- "mill_type": "MY5.0se-155",
- "farm_name": "福建显美风电场_qwx0AWl6" # 推测实际目录名
- },
- "FmowTOzB": {
- "mill_type": "EN182-6250",
- "farm_name": "内蒙海勒斯风电场_FmowTOzB" # 推测实际目录名
- }
- }
-
- windfarm_config = windfarm_configs.get(windcode)
- if not windfarm_config:
- print(f"未找到风场 {windcode} 的配置")
- return []
-
- # 构建远程parquet文件路径 - 使用正斜杠
- remote_parquet_path = "/".join([
- self.parquet_root,
- windfarm_config["mill_type"],
- windfarm_config["farm_name"]
- ])
-
- print(f"远程路径: {remote_parquet_path}")
-
- # 检查远程路径是否存在
- if not self._remote_path_exists(remote_parquet_path):
- print(f"远程路径不存在: {remote_parquet_path}")
- # 尝试列出根目录内容来调试
- self._debug_remote_structure()
- return []
-
- # 获取远程文件列表
- parquet_files = self._get_remote_file_list(remote_parquet_path)
- if not parquet_files:
- print(f"在 {remote_parquet_path} 中未找到parquet文件")
- return []
-
- # 使用第一个文件来读取列名
- first_file = parquet_files[0]
- remote_file_path = "/".join([remote_parquet_path, first_file])
-
- print(f"读取远程文件: {remote_file_path}")
-
- # 直接读取文件获取列名
- df = self._read_remote_parquet_direct(remote_file_path)
- if df is not None:
- columns = df.columns.tolist()
- print(f"从远程parquet文件获取到 {len(columns)} 个列")
- return columns
- else:
- print("无法读取远程文件")
- return []
-
- except Exception as e:
- print(f"Error fetching columns for {windcode}: {str(e)}")
- return []
- def _debug_remote_structure(self):
- """调试远程目录结构"""
- try:
- print("\n=== 远程目录结构调试 ===")
- self._ensure_ssh_connection()
-
- # 检查根目录
- root_files = self.sftp_client.listdir(self.parquet_root)
- print(f"根目录内容: {root_files}")
-
- # 检查GW140-2500目录
- gw_path = "/".join([self.parquet_root, "GW140-2500"])
- if self._remote_path_exists(gw_path):
- gw_files = self.sftp_client.listdir(gw_path)
- print(f"GW140-2500目录内容: {gw_files}")
- else:
- print("GW140-2500目录不存在")
-
- except Exception as e:
- print(f"目录结构调试失败: {str(e)}")
- # 其他方法保持不变...
- def get_turbines(self, windcode):
- """获取风场下所有风机信息"""
- demo_turbines = {
- "7V2xSuma": [
- {"engine_code": "nuBwynYF", "mill_type_code": "GW140-2500", "engine_name": "nuBwynYF"},
- {"engine_code": "tWmqBYXV", "mill_type_code": "GW140-2500", "engine_name": "tWmqBYXV"}
- ],
- "qwx0AWl6": [
- {"engine_code": "0rQJxE72", "mill_type_code": "MY5.0se-155", "engine_name": "0rQJxE72"},
- {"engine_code": "7UTZjdEu", "mill_type_code": "MY5.0se-155", "engine_name": "7UTZjdEu"}
- ],
- "FmowTOzB": [
- {"engine_code": "Nj1RlyOI", "mill_type_code": "EN182-6250", "engine_name": "Nj1RlyOI"},
- {"engine_code": "dg8IwCwy", "mill_type_code": "EN182-6250", "engine_name": "dg8IwCwy"}
- ]
- }
-
- turbines_data = demo_turbines.get(windcode, [])
- if turbines_data:
- return pd.DataFrame(turbines_data)
- else:
- print(f"未找到风场 {windcode} 的风机信息")
- return pd.DataFrame()
- def get_mill_type(self, mill_type_code):
- """获取机型驱动类型"""
- mill_type_mapping = {
- "GW140-2500": 1, # 直驱
- "MY5.0se-155": 2, # 半直驱
- "EN182-6250": 3, # 双馈
- }
- return mill_type_mapping.get(mill_type_code)
- def fetch_turbine_data(self, windcode: str, engine_code: str, month: str, features: List[str]) -> pd.DataFrame:
- """
- 获取指定月份风机数据 - 从远程parquet文件读取
- """
- try:
- # 修正风场配置
- windfarm_configs = {
- "7V2xSuma": {
- "mill_type": "GW140-2500",
- "farm_name": "新疆老君庙风电场_7V2xSuma" # 修正为实际目录名
- },
- "qwx0AWl6": {
- "mill_type": "MY5.0se-155",
- "farm_name": "福建显美风电场_qwx0AWl6"
- },
- "FmowTOzB": {
- "mill_type": "EN182-6250",
- "farm_name": "内蒙海勒斯风电场_FmowTOzB"
- }
- }
-
- windfarm_config = windfarm_configs.get(windcode)
- if not windfarm_config:
- print(f"未找到风场 {windcode} 的配置")
- return pd.DataFrame()
-
- # 构建完整远程文件路径 - 使用正斜杠
- remote_file_path = "/".join([
- self.parquet_root,
- windfarm_config["mill_type"],
- windfarm_config["farm_name"],
- f"{engine_code}.parquet"
- ])
-
- print(f"正在读取远程parquet文件: {remote_file_path}")
-
- # 检查远程文件是否存在
- if not self._remote_path_exists(remote_file_path):
- print(f"远程parquet文件不存在: {remote_file_path}")
- return pd.DataFrame()
-
- # 直接读取远程文件
- df = self._read_remote_parquet_direct(remote_file_path)
- if df is None:
- print(f"无法读取远程文件: {remote_file_path}")
- return pd.DataFrame()
-
- # 过滤指定月份的数据
- if 'time_stamp' in df.columns:
- df['time_stamp'] = pd.to_datetime(df['time_stamp'])
- year, month_num = month.split('-')
- start_date = pd.Timestamp(f"{year}-{month_num}-01")
- if month_num == '12':
- end_date = pd.Timestamp(f"{int(year)+1}-01-01") - pd.Timedelta(days=1)
- else:
- end_date = pd.Timestamp(f"{year}-{int(month_num)+1:02d}-01") - pd.Timedelta(days=1)
-
- mask = (df['time_stamp'] >= start_date) & (df['time_stamp'] <= end_date)
- df = df.loc[mask]
- print(f"时间过滤后数据量: {len(df)} 行")
-
- # 选择需要的特征列
- available_features = [f for f in features if f in df.columns]
- if not available_features:
- print(f"无可用特征: {features}")
- return pd.DataFrame()
-
- result = df[available_features]
- print(f"从远程parquet文件加载数据: {len(result)} 行, {len(available_features)} 特征")
- return result
-
- except Exception as e:
- print(f"读取远程parquet文件失败: {str(e)}")
- import traceback
- traceback.print_exc()
- return pd.DataFrame()
- def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
- """批量获取风场下所有风机数据 - 从远程parquet文件读取"""
- try:
- # 获取风场所有风机
- turbines = self.get_turbines(windcode)
- if turbines.empty:
- return {}
-
- all_data = {}
- for _, turbine in turbines.iterrows():
- engine_code = turbine['engine_code']
- print(f"正在获取风机 {engine_code} 的数据...")
- data = self.fetch_turbine_data(windcode, engine_code, month, features)
- if not data.empty:
- all_data[engine_code] = data
- else:
- print(f"风机 {engine_code} 无有效数据")
-
- print(f"批量加载完成: {len(all_data)} 台风机数据")
- return all_data
-
- except Exception as e:
- print(f"批量查询失败: {str(e)}")
- return {}
- def close_connection(self):
- """关闭SSH连接"""
- if self.sftp_client:
- self.sftp_client.close()
- self.sftp_client = None
- if self.ssh_client:
- self.ssh_client.close()
- self.ssh_client = None
- print("SSH连接已关闭")
- def __del__(self):
- """析构函数,关闭SSH连接"""
- self.close_connection()
|