get_db_data.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. import json
  2. import logging.handlers
  3. import os.path
  4. import traceback
  5. from datetime import datetime, timedelta
  6. from typing import Dict, List, Tuple
  7. import pandas as pd
  8. from sqlalchemy import create_engine, text
  9. from sqlalchemy.engine import Engine
  10. # 日志配置函数化,避免全局副作用
  11. def setup_logger():
  12. logger = logging.getLogger(__name__)
  13. logger.setLevel(logging.DEBUG)
  14. # 避免重复添加handler
  15. if logger.handlers:
  16. return logger
  17. log_path = "/home/caiji/project/logs/"
  18. os.makedirs(log_path, exist_ok=True)
  19. # 1. 创建并配置TimedRotatingFileHandler(所有级别的文件输出)
  20. all_log_handler = logging.handlers.TimedRotatingFileHandler(
  21. os.path.join(log_path, "info.log"), 'D', backupCount=90
  22. )
  23. # 2. 创建并配置ERROR级别的TimedRotatingFileHandler
  24. error_log_handler = logging.handlers.TimedRotatingFileHandler(
  25. os.path.join(log_path, "error.log"), 'D', backupCount=90
  26. )
  27. error_log_handler.setLevel(logging.ERROR)
  28. # 3. 创建并配置StreamHandler(控制台输出)
  29. console_handler = logging.StreamHandler()
  30. console_handler.setLevel(logging.INFO)
  31. # 定义日志格式
  32. fmt = logging.Formatter(
  33. "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
  34. "%Y-%m-%d %H:%M:%S"
  35. )
  36. # 设置各个handler的格式
  37. all_log_handler.setFormatter(fmt)
  38. error_log_handler.setFormatter(fmt)
  39. console_handler.setFormatter(fmt)
  40. # 添加handler
  41. logger.addHandler(all_log_handler)
  42. logger.addHandler(error_log_handler)
  43. logger.addHandler(console_handler)
  44. return logger
  45. logger = setup_logger()
  46. class DatabaseConnector:
  47. """数据库连接管理器"""
  48. _engines: Dict[str, Engine] = {}
  49. @classmethod
  50. def get_engine(cls, host='172.21.6.37', port=3306, user='envision', pwd='envision', db='envision') -> Engine:
  51. """获取或创建数据库引擎"""
  52. key = f"{host}:{port}:{user}:{db}"
  53. if key not in cls._engines:
  54. connection_string = f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}'
  55. cls._engines[key] = create_engine(
  56. connection_string,
  57. pool_size=10, # 增加连接池大小
  58. max_overflow=20,
  59. pool_pre_ping=True, # 连接前ping检查
  60. pool_recycle=600, # 1小时回收连接
  61. echo=False # 生产环境设为False
  62. )
  63. logger.info(f"Created new engine for {key}")
  64. return cls._engines[key]
  65. def generate_scada_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
  66. """生成SCADA数据查询结构"""
  67. try:
  68. # 一次性读取所有相关文件
  69. df_2404 = pd.read_excel("conf/2404_表数据.xlsx")
  70. df_2405 = pd.read_excel("conf/2405_表数据.xlsx")
  71. df_scada = pd.concat([df_2404, df_2405], ignore_index=True)
  72. # 使用groupby提高效率
  73. grouped = df_scada.groupby(['场站标准化编号', '风机号', '历史采样表名'])
  74. sql_data = {}
  75. for (wind_farm_code, wind_turbine_name, table), group in grouped:
  76. # 生成字段映射
  77. columns = [f' {row["历史采样域名"]} as {row["标准化英文"]}'
  78. for _, row in group.iterrows()]
  79. wind_turbine_name = str(wind_turbine_name)
  80. # 更新数据结构
  81. if wind_farm_code not in sql_data:
  82. sql_data[wind_farm_code] = {}
  83. if wind_turbine_name not in sql_data[wind_farm_code]:
  84. sql_data[wind_farm_code][wind_turbine_name] = {}
  85. sql_data[wind_farm_code][wind_turbine_name][table] = columns
  86. logger.info(f"Generated SCADA SQL for {len(sql_data)} wind farms")
  87. return sql_data
  88. except Exception as e:
  89. logger.error(f"Error generating SCADA SQL: {e}")
  90. raise
  91. def generate_warn_fault_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
  92. """生成告警故障数据查询结构"""
  93. try:
  94. df_2406 = pd.read_excel('conf/2406_表数据.xlsx')
  95. df_2406 = df_2406[df_2406['场站标准化编号'] != 'WOF35800080']
  96. # 使用groupby优化
  97. grouped = df_2406.groupby(['场站标准化编号', '风机号', '历史采样表名'])
  98. warn_fault_data = {}
  99. for (wind_farm_code, wind_turbine_name, table), group in grouped:
  100. # 生成字段列表
  101. columns = [f'{row["历史采样域名"]} as fault_code'
  102. for _, row in group.iterrows()]
  103. wind_turbine_name = str(wind_turbine_name)
  104. if wind_farm_code not in warn_fault_data:
  105. warn_fault_data[wind_farm_code] = {}
  106. if wind_turbine_name not in warn_fault_data[wind_farm_code]:
  107. warn_fault_data[wind_farm_code][wind_turbine_name] = {}
  108. warn_fault_data[wind_farm_code][wind_turbine_name][table] = columns
  109. logger.info(f"Generated warning/fault SQL for {len(warn_fault_data)} wind farms")
  110. return warn_fault_data
  111. except Exception as e:
  112. logger.error(f"Error generating warn/fault SQL: {e}")
  113. raise
  114. def merge_dataframes(dataframes: List[pd.DataFrame]) -> pd.DataFrame:
  115. """高效合并数据框"""
  116. if not dataframes:
  117. return pd.DataFrame()
  118. # 使用reduce进行合并
  119. result = dataframes[0]
  120. for df in dataframes[1:]:
  121. result = pd.merge(
  122. result,
  123. df,
  124. on=['wind_turbine_name', 'time_stamp'],
  125. how='inner' # 根据需求选择合并方式
  126. )
  127. return result
  128. def execute_query_batch(engine: Engine, queries: List[Tuple[str, Dict]]) -> List[pd.DataFrame]:
  129. """批量执行查询"""
  130. results = []
  131. with engine.connect() as conn:
  132. for sql, params in queries:
  133. try:
  134. df = pd.read_sql(
  135. text(sql),
  136. conn,
  137. params=params
  138. )
  139. results.append(df)
  140. except Exception as e:
  141. logger.error(f"Query failed: {sql[:100]}... Error: {e}")
  142. # 返回空DataFrame避免中断
  143. results.append(pd.DataFrame())
  144. return results
  145. def process_wind_turbine_batch(
  146. engine: Engine,
  147. wind_farm_code: str,
  148. wind_turbine_name: str,
  149. points_data: Dict[str, List[str]],
  150. begin_time: datetime,
  151. end_time: datetime,
  152. query_type: str
  153. ) -> bool:
  154. """处理单个风机的数据查询和保存"""
  155. try:
  156. begin_time_str = begin_time.strftime('%Y-%m-%d %H:%M:%S')
  157. end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
  158. # 批量构建查询
  159. queries = []
  160. for table, cols in points_data.items():
  161. if not cols: # 跳过空列
  162. continue
  163. sql = f"""
  164. SELECT
  165. :wind_turbine_name as wind_turbine_name,
  166. occur_time as time_stamp,
  167. {', '.join(cols)}
  168. FROM {table}
  169. WHERE occur_time >= :begin_time AND occur_time < :end_time
  170. """
  171. queries.append((sql, {
  172. 'wind_turbine_name': wind_turbine_name,
  173. 'begin_time': begin_time_str,
  174. 'end_time': end_time_str
  175. }))
  176. # 批量执行查询
  177. dfs = execute_query_batch(engine, queries)
  178. # 合并数据
  179. merged_df = merge_dataframes(dfs)
  180. if merged_df.empty:
  181. logger.warning(f"No data found for {wind_farm_code}_{wind_turbine_name}")
  182. return False
  183. # 保存数据
  184. save_path = os.path.join("/home/caiji/data", query_type)
  185. os.makedirs(save_path, exist_ok=True)
  186. file_name = f"{wind_farm_code}_{wind_turbine_name}_{begin_time.strftime('%Y%m%d%H%M%S')}.csv"
  187. file_path = os.path.join(save_path, file_name)
  188. # 使用parquet格式,更快更节省空间
  189. merged_df.to_csv(file_path, index=False)
  190. logger.info(f"Saved {len(merged_df)} rows to {file_path}")
  191. return True
  192. except Exception as e:
  193. logger.error(f"Error processing {wind_farm_code}_{wind_turbine_name}: {e}")
  194. logger.error(traceback.format_exc(), exc_info=True)
  195. return False
  196. def read_and_save_file_optimized(query_dict: Dict, query_type: str, begin_time: datetime, end_time: datetime):
  197. # 创建输出目录
  198. output_dir = os.path.join("/home/caiji/data", query_type)
  199. os.makedirs(output_dir, exist_ok=True)
  200. # 获取数据库引擎
  201. engine = DatabaseConnector.get_engine()
  202. success_count = 0
  203. failed_count = 0
  204. for wind_farm_code, wind_turbine_data in query_dict.items():
  205. for wind_turbine_name, points_data in wind_turbine_data.items():
  206. return_data = process_wind_turbine_batch(engine, wind_farm_code, wind_turbine_name,
  207. points_data, begin_time, end_time, query_type)
  208. if return_data:
  209. success_count += 1
  210. else:
  211. failed_count += 1
  212. logger.info(f"Processing completed. Success: {success_count}, Failed: {failed_count}")
  213. def main(begin_time: datetime, end_time: datetime):
  214. """主函数"""
  215. try:
  216. # # 生成查询结构
  217. # scada_data = generate_scada_sql()
  218. # warn_fault_data = generate_warn_fault_sql()
  219. #
  220. # with open('conf/scada.json', 'w') as f:
  221. # f.write(json.dumps(scada_data))
  222. #
  223. # with open('conf/warn_fault.json', 'w') as f:
  224. # f.write(json.dumps(warn_fault_data))
  225. with open('conf/scada.json', 'r') as f:
  226. scada_data = json.load(f)
  227. with open('conf/warn_fault.json', 'r') as f:
  228. warn_fault_data = json.load(f)
  229. # 处理SCADA数据
  230. logger.info("Starting SCADA data processing")
  231. scada_results = read_and_save_file_optimized(
  232. scada_data,
  233. 'scada',
  234. begin_time,
  235. end_time
  236. )
  237. logger.info(str(scada_results))
  238. logger.info("Finished SCADA data processing")
  239. # 处理告警故障数据
  240. logger.info("Starting warn/fault data processing")
  241. warn_results = read_and_save_file_optimized(
  242. warn_fault_data,
  243. 'warn_fault',
  244. begin_time,
  245. end_time
  246. )
  247. logger.info(str(warn_results))
  248. logger.info("Finished warn/fault data processing")
  249. logger.info("All processing completed")
  250. except Exception as e:
  251. logger.error(traceback.format_exc(), exc_info=True)
  252. logger.error(f"Main process failed: {e}", exc_info=True)
  253. raise
  254. def parse_time_args() -> Tuple[datetime, datetime]:
  255. import sys
  256. now = datetime.now()
  257. default_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
  258. default_end = default_start + timedelta(days=1)
  259. logger.info(sys.argv)
  260. # 解析命令行参数
  261. if len(sys.argv) >= 3:
  262. try:
  263. # 假设传入的是时间字符串,例如:"2024-01-01 12:00:00"
  264. start_time = datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S")
  265. end_time = datetime.strptime(sys.argv[2], "%Y-%m-%d %H:%M:%S")
  266. except ValueError:
  267. logger.error("时间格式错误,请使用格式:YYYY-MM-DD HH:MM:SS")
  268. # 如果解析失败,使用默认值
  269. start_time = default_start
  270. end_time = default_end
  271. else:
  272. # 没有足够参数,使用默认值
  273. start_time = default_start
  274. end_time = default_end
  275. return start_time, end_time
  276. if __name__ == '__main__':
  277. start_time, end_time = parse_time_args()
  278. print(start_time)
  279. logger.info(f'{start_time}___{end_time}')
  280. main(start_time, end_time)