database.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import pandas as pd
  2. from sqlalchemy import create_engine, inspect
  3. import traceback
  4. import logging
  5. class DataFetcher:
  6. def __init__(self):
  7. self.show_engine = create_engine('mysql+pymysql://admin:admin123456@192.168.50.233:3306/energy_show')
  8. self.data_engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/energy_data_prod')
  9. def get_turbine_columns(self, windcode):
  10. """
  11. 获取指定风场数据表的所有列名
  12. :param windcode: 风场编号 (如 "WF001")
  13. :return: 列名列表 (如 ["timestamp", "temp1", "vibration1"])
  14. """
  15. table_name = f"{windcode}_minute"
  16. try:
  17. inspector = inspect(self.data_engine)
  18. columns = inspector.get_columns(table_name)
  19. return [col['name'] for col in columns]
  20. except Exception as e:
  21. print(f"Error fetching columns for {table_name}: {str(e)}")
  22. return []
  23. """
  24. 获取风场下所有风机信息
  25. 根据风场编号在表'wind_engine_group'中查询所有的风机编号engine_code 以及对应的机型编号mill_type_code,风机名称engine_name
  26. """
  27. def get_turbines(self, windcode):
  28. query = f"""
  29. SELECT engine_code, mill_type_code,engine_name
  30. FROM wind_engine_group
  31. WHERE field_code = '{windcode}'
  32. """
  33. return pd.read_sql(query, self.show_engine)
  34. """
  35. 获取机型驱动类型
  36. 根据机型编号在表'wind_engine_mill'中查询对应的驱动方式值
  37. """
  38. def get_mill_type(self, mill_type_code):
  39. query = f"""
  40. SELECT curved_motion_type
  41. FROM wind_engine_mill
  42. WHERE mill_type_code = '{mill_type_code}'
  43. """
  44. result = pd.read_sql(query, self.show_engine)
  45. return result.iloc[0, 0] if not result.empty else None
  46. """
  47. 获取风机时序数据
  48. 根据风机编号在表'windcode_minute'中,筛选出timestamp在month范围里的所有数据条
  49. """
  50. # def fetch_turbine_data(self, windecode,engine_code, month, features):
  51. # table_name = f"{windecode}_minute"
  52. # month_start = f"{month}-01"
  53. # month_end = f"{month}-31" # 自动处理不同月份天数
  54. # query = f"""
  55. # SELECT time_stamp, {','.join(features)}
  56. # FROM {table_name}
  57. # WHERE wind_turbine_number ='{engine_code}'
  58. # AND time_stamp BETWEEN '{month_start} 00:00:00' AND '{month_end} 23:59:59'
  59. # """
  60. # print('sql语句')
  61. # print(query)
  62. # try:
  63. # return pd.read_sql(query, self.data_engine)
  64. # except:
  65. # print(traceback.print_exc())
  66. # return pd.DataFrame()
  67. # def fetch_turbine_data(self, windecode,engine_code, month, features):
  68. # year_month_int = int(month.replace('-',''))
  69. # table_name = f"{windecode}_minute"
  70. # query = f"""
  71. # SELECT year_month, {','.join(features)}
  72. # FROM {table_name}
  73. # WHERE wind_turbine_number ='{engine_code}'
  74. # AND year_month = :year_month_int
  75. # """
  76. # print('sql语句')
  77. # print(query)
  78. # try:
  79. # return pd.read_sql(query, self.data_engine)
  80. # except:
  81. # print(traceback.print_exc())
  82. # return pd.DataFrame()
  83. def fetch_turbine_data(self, windcode, engine_code, month, features):
  84. """获取指定月份风机数据(安全参数化版本)
  85. Args:
  86. windcode: 风场编号 (如 "WF001")
  87. engine_code: 风机编号 (如 "WT001")
  88. month: 月份字符串 (格式 "YYYY-MM")
  89. features: 需要查询的字段列表
  90. Returns:
  91. pd.DataFrame: 包含查询结果的DataFrame
  92. """
  93. try:
  94. # 1. 转换并验证月份格式
  95. year_month_int = int(month.replace('-', ''))
  96. if not 100001 <= year_month_int <= 999912: # 基本格式验证
  97. raise ValueError("月份格式应为YYYY-MM")
  98. # 2. 验证特征列名安全性
  99. safe_features = []
  100. for feat in features:
  101. if isinstance(feat, str) and all(c.isalnum() or c == '_' for c in feat):
  102. safe_features.append(f'`{feat}`') # 用反引号包裹
  103. else:
  104. print(f"警告:忽略非法特征名 '{feat}'")
  105. if not safe_features:
  106. print("错误:无有效特征列")
  107. return pd.DataFrame()
  108. # 3. 构建参数化查询
  109. query = f"""
  110. SELECT `year_month`, {','.join(safe_features)}
  111. FROM `{windcode}_minute`
  112. WHERE `wind_turbine_number` = %s
  113. AND `year_month` = %s
  114. """
  115. print(f"执行安全查询:\n{query}\n参数: ({engine_code}, {year_month_int})")
  116. # 4. 执行参数化查询
  117. return pd.read_sql(query, self.data_engine,
  118. params=(engine_code, year_month_int))
  119. except ValueError as e:
  120. print(f"输入参数错误: {str(e)}")
  121. return pd.DataFrame()
  122. except Exception as e:
  123. print(f"数据库查询失败: {str(e)}")
  124. import traceback
  125. traceback.print_exc()
  126. return pd.DataFrame()
  127. def get_turbine_columns(self, windcode):
  128. """获取指定风场数据表的所有列名"""
  129. table_name = f"{windcode}_minute"
  130. try:
  131. inspector = inspect(self.data_engine)
  132. columns = inspector.get_columns(table_name)
  133. return [col['name'] for col in columns]
  134. except Exception as e:
  135. print(f"获取列名失败: {str(e)}")
  136. return []