database.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. # database.py
  2. import pandas as pd
  3. from sqlalchemy import create_engine, inspect
  4. import traceback
  5. import logging
  6. from functools import lru_cache
  7. from typing import List, Dict, Optional
  8. import os
  9. import glob
  10. import pyarrow.parquet as pq
  11. import paramiko
  12. import tempfile
  13. class DataFetcher:
  14. def __init__(self, parquet_root: str = "/home/wzl/jupyter-data/wzl/parquet_output_standard"):
  15. # 数据库连接用于获取台账信息
  16. self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
  17. # 设置parquet文件根目录(远程服务器路径)
  18. self.parquet_root = parquet_root
  19. # SSH连接配置
  20. self.ssh_host = "192.168.50.241"
  21. self.ssh_username = "root"
  22. self.ssh_password = "Envisi0n@Scada"
  23. self.ssh_port = 22
  24. self.ssh_client = None
  25. self.sftp_client = None
  26. def _ensure_ssh_connection(self):
  27. """确保SSH连接可用"""
  28. if self.ssh_client is None:
  29. try:
  30. self.ssh_client = paramiko.SSHClient()
  31. self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  32. self.ssh_client.connect(
  33. hostname=self.ssh_host,
  34. port=self.ssh_port,
  35. username=self.ssh_username,
  36. password=self.ssh_password,
  37. timeout=30
  38. )
  39. # 创建SFTP客户端
  40. self.sftp_client = self.ssh_client.open_sftp()
  41. print("SSH连接成功")
  42. except Exception as e:
  43. print(f"SSH连接失败: {str(e)}")
  44. raise
  45. def _get_remote_file_list(self, remote_dir: str) -> List[str]:
  46. """获取远程目录下的文件列表"""
  47. try:
  48. self._ensure_ssh_connection()
  49. # 确保路径使用正斜杠
  50. remote_dir = remote_dir.replace('\\', '/')
  51. files = self.sftp_client.listdir(remote_dir)
  52. return [f for f in files if f.endswith('.parquet')]
  53. except Exception as e:
  54. print(f"获取远程文件列表失败 {remote_dir}: {str(e)}")
  55. return []
  56. def _remote_path_exists(self, remote_path: str) -> bool:
  57. """检查远程路径是否存在"""
  58. try:
  59. self._ensure_ssh_connection()
  60. # 确保路径使用正斜杠
  61. remote_path = remote_path.replace('\\', '/')
  62. self.sftp_client.stat(remote_path)
  63. return True
  64. except:
  65. return False
  66. def _read_remote_parquet_direct(self, remote_path: str) -> Optional[pd.DataFrame]:
  67. """直接读取远程parquet文件到DataFrame"""
  68. try:
  69. self._ensure_ssh_connection()
  70. # 确保路径使用正斜杠
  71. remote_path = remote_path.replace('\\', '/')
  72. # 使用临时文件
  73. with tempfile.NamedTemporaryFile(suffix='.parquet', delete=False) as temp_file:
  74. temp_path = temp_file.name
  75. try:
  76. # 下载到临时文件
  77. self.sftp_client.get(remote_path, temp_path)
  78. # 读取数据
  79. df = pd.read_parquet(temp_path)
  80. return df
  81. finally:
  82. # 清理临时文件
  83. if os.path.exists(temp_path):
  84. os.unlink(temp_path)
  85. except Exception as e:
  86. print(f"读取远程文件失败 {remote_path}: {str(e)}")
  87. return None
  88. def get_turbine_columns(self, windcode: str) -> List[str]:
  89. """
  90. 获取指定风场数据表的所有列名 - 从远程parquet文件读取
  91. """
  92. try:
  93. # 修正风场配置 - 使用您提供的实际路径
  94. windfarm_configs = {
  95. "7V2xSuma": {
  96. "mill_type": "GW140-2500",
  97. "farm_name": "新疆老君庙风电场_7V2xSuma" # 修正为实际目录名
  98. },
  99. "qwx0AWl6": {
  100. "mill_type": "MY5.0se-155",
  101. "farm_name": "福建显美风电场_qwx0AWl6" # 推测实际目录名
  102. },
  103. "FmowTOzB": {
  104. "mill_type": "EN182-6250",
  105. "farm_name": "内蒙海勒斯风电场_FmowTOzB" # 推测实际目录名
  106. }
  107. }
  108. windfarm_config = windfarm_configs.get(windcode)
  109. if not windfarm_config:
  110. print(f"未找到风场 {windcode} 的配置")
  111. return []
  112. # 构建远程parquet文件路径 - 使用正斜杠
  113. remote_parquet_path = "/".join([
  114. self.parquet_root,
  115. windfarm_config["mill_type"],
  116. windfarm_config["farm_name"]
  117. ])
  118. print(f"远程路径: {remote_parquet_path}")
  119. # 检查远程路径是否存在
  120. if not self._remote_path_exists(remote_parquet_path):
  121. print(f"远程路径不存在: {remote_parquet_path}")
  122. # 尝试列出根目录内容来调试
  123. self._debug_remote_structure()
  124. return []
  125. # 获取远程文件列表
  126. parquet_files = self._get_remote_file_list(remote_parquet_path)
  127. if not parquet_files:
  128. print(f"在 {remote_parquet_path} 中未找到parquet文件")
  129. return []
  130. # 使用第一个文件来读取列名
  131. first_file = parquet_files[0]
  132. remote_file_path = "/".join([remote_parquet_path, first_file])
  133. print(f"读取远程文件: {remote_file_path}")
  134. # 直接读取文件获取列名
  135. df = self._read_remote_parquet_direct(remote_file_path)
  136. if df is not None:
  137. columns = df.columns.tolist()
  138. print(f"从远程parquet文件获取到 {len(columns)} 个列")
  139. return columns
  140. else:
  141. print("无法读取远程文件")
  142. return []
  143. except Exception as e:
  144. print(f"Error fetching columns for {windcode}: {str(e)}")
  145. return []
  146. def _debug_remote_structure(self):
  147. """调试远程目录结构"""
  148. try:
  149. print("\n=== 远程目录结构调试 ===")
  150. self._ensure_ssh_connection()
  151. # 检查根目录
  152. root_files = self.sftp_client.listdir(self.parquet_root)
  153. print(f"根目录内容: {root_files}")
  154. # 检查GW140-2500目录
  155. gw_path = "/".join([self.parquet_root, "GW140-2500"])
  156. if self._remote_path_exists(gw_path):
  157. gw_files = self.sftp_client.listdir(gw_path)
  158. print(f"GW140-2500目录内容: {gw_files}")
  159. else:
  160. print("GW140-2500目录不存在")
  161. except Exception as e:
  162. print(f"目录结构调试失败: {str(e)}")
  163. # 其他方法保持不变...
  164. def get_turbines(self, windcode):
  165. """获取风场下所有风机信息"""
  166. demo_turbines = {
  167. "7V2xSuma": [
  168. {"engine_code": "nuBwynYF", "mill_type_code": "GW140-2500", "engine_name": "nuBwynYF"},
  169. {"engine_code": "tWmqBYXV", "mill_type_code": "GW140-2500", "engine_name": "tWmqBYXV"}
  170. ],
  171. "qwx0AWl6": [
  172. {"engine_code": "0rQJxE72", "mill_type_code": "MY5.0se-155", "engine_name": "0rQJxE72"},
  173. {"engine_code": "7UTZjdEu", "mill_type_code": "MY5.0se-155", "engine_name": "7UTZjdEu"}
  174. ],
  175. "FmowTOzB": [
  176. {"engine_code": "Nj1RlyOI", "mill_type_code": "EN182-6250", "engine_name": "Nj1RlyOI"},
  177. {"engine_code": "dg8IwCwy", "mill_type_code": "EN182-6250", "engine_name": "dg8IwCwy"}
  178. ]
  179. }
  180. turbines_data = demo_turbines.get(windcode, [])
  181. if turbines_data:
  182. return pd.DataFrame(turbines_data)
  183. else:
  184. print(f"未找到风场 {windcode} 的风机信息")
  185. return pd.DataFrame()
  186. def get_mill_type(self, mill_type_code):
  187. """获取机型驱动类型"""
  188. mill_type_mapping = {
  189. "GW140-2500": 1, # 直驱
  190. "MY5.0se-155": 2, # 半直驱
  191. "EN182-6250": 3, # 双馈
  192. }
  193. return mill_type_mapping.get(mill_type_code)
  194. def fetch_turbine_data(self, windcode: str, engine_code: str, month: str, features: List[str]) -> pd.DataFrame:
  195. """
  196. 获取指定月份风机数据 - 从远程parquet文件读取
  197. """
  198. try:
  199. # 修正风场配置
  200. windfarm_configs = {
  201. "7V2xSuma": {
  202. "mill_type": "GW140-2500",
  203. "farm_name": "新疆老君庙风电场_7V2xSuma" # 修正为实际目录名
  204. },
  205. "qwx0AWl6": {
  206. "mill_type": "MY5.0se-155",
  207. "farm_name": "福建显美风电场_qwx0AWl6"
  208. },
  209. "FmowTOzB": {
  210. "mill_type": "EN182-6250",
  211. "farm_name": "内蒙海勒斯风电场_FmowTOzB"
  212. }
  213. }
  214. windfarm_config = windfarm_configs.get(windcode)
  215. if not windfarm_config:
  216. print(f"未找到风场 {windcode} 的配置")
  217. return pd.DataFrame()
  218. # 构建完整远程文件路径 - 使用正斜杠
  219. remote_file_path = "/".join([
  220. self.parquet_root,
  221. windfarm_config["mill_type"],
  222. windfarm_config["farm_name"],
  223. f"{engine_code}.parquet"
  224. ])
  225. print(f"正在读取远程parquet文件: {remote_file_path}")
  226. # 检查远程文件是否存在
  227. if not self._remote_path_exists(remote_file_path):
  228. print(f"远程parquet文件不存在: {remote_file_path}")
  229. return pd.DataFrame()
  230. # 直接读取远程文件
  231. df = self._read_remote_parquet_direct(remote_file_path)
  232. if df is None:
  233. print(f"无法读取远程文件: {remote_file_path}")
  234. return pd.DataFrame()
  235. # 过滤指定月份的数据
  236. if 'time_stamp' in df.columns:
  237. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  238. year, month_num = month.split('-')
  239. start_date = pd.Timestamp(f"{year}-{month_num}-01")
  240. if month_num == '12':
  241. end_date = pd.Timestamp(f"{int(year)+1}-01-01") - pd.Timedelta(days=1)
  242. else:
  243. end_date = pd.Timestamp(f"{year}-{int(month_num)+1:02d}-01") - pd.Timedelta(days=1)
  244. mask = (df['time_stamp'] >= start_date) & (df['time_stamp'] <= end_date)
  245. df = df.loc[mask]
  246. print(f"时间过滤后数据量: {len(df)} 行")
  247. # 选择需要的特征列
  248. available_features = [f for f in features if f in df.columns]
  249. if not available_features:
  250. print(f"无可用特征: {features}")
  251. return pd.DataFrame()
  252. result = df[available_features]
  253. print(f"从远程parquet文件加载数据: {len(result)} 行, {len(available_features)} 特征")
  254. return result
  255. except Exception as e:
  256. print(f"读取远程parquet文件失败: {str(e)}")
  257. import traceback
  258. traceback.print_exc()
  259. return pd.DataFrame()
  260. def fetch_all_turbines_data(self, windcode: str, month: str, features: List[str]) -> Dict[str, pd.DataFrame]:
  261. """批量获取风场下所有风机数据 - 从远程parquet文件读取"""
  262. try:
  263. # 获取风场所有风机
  264. turbines = self.get_turbines(windcode)
  265. if turbines.empty:
  266. return {}
  267. all_data = {}
  268. for _, turbine in turbines.iterrows():
  269. engine_code = turbine['engine_code']
  270. print(f"正在获取风机 {engine_code} 的数据...")
  271. data = self.fetch_turbine_data(windcode, engine_code, month, features)
  272. if not data.empty:
  273. all_data[engine_code] = data
  274. else:
  275. print(f"风机 {engine_code} 无有效数据")
  276. print(f"批量加载完成: {len(all_data)} 台风机数据")
  277. return all_data
  278. except Exception as e:
  279. print(f"批量查询失败: {str(e)}")
  280. return {}
  281. def close_connection(self):
  282. """关闭SSH连接"""
  283. if self.sftp_client:
  284. self.sftp_client.close()
  285. self.sftp_client = None
  286. if self.ssh_client:
  287. self.ssh_client.close()
  288. self.ssh_client = None
  289. print("SSH连接已关闭")
  290. def __del__(self):
  291. """析构函数,关闭SSH连接"""
  292. self.close_connection()