import json import logging.handlers import os.path import traceback from datetime import datetime, timedelta from typing import Dict, List, Tuple import pandas as pd from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine # 日志配置函数化,避免全局副作用 def setup_logger(): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # 避免重复添加handler if logger.handlers: return logger log_path = "/home/caiji/project/logs/" os.makedirs(log_path, exist_ok=True) # 1. 创建并配置TimedRotatingFileHandler(所有级别的文件输出) all_log_handler = logging.handlers.TimedRotatingFileHandler( os.path.join(log_path, "info.log"), 'D', backupCount=90 ) # 2. 创建并配置ERROR级别的TimedRotatingFileHandler error_log_handler = logging.handlers.TimedRotatingFileHandler( os.path.join(log_path, "error.log"), 'D', backupCount=90 ) error_log_handler.setLevel(logging.ERROR) # 3. 创建并配置StreamHandler(控制台输出) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 定义日志格式 fmt = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s", "%Y-%m-%d %H:%M:%S" ) # 设置各个handler的格式 all_log_handler.setFormatter(fmt) error_log_handler.setFormatter(fmt) console_handler.setFormatter(fmt) # 添加handler logger.addHandler(all_log_handler) logger.addHandler(error_log_handler) logger.addHandler(console_handler) return logger logger = setup_logger() class DatabaseConnector: """数据库连接管理器""" _engines: Dict[str, Engine] = {} @classmethod def get_engine(cls, host='172.21.6.37', port=3306, user='envision', pwd='envision', db='envision') -> Engine: """获取或创建数据库引擎""" key = f"{host}:{port}:{user}:{db}" if key not in cls._engines: connection_string = f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}' cls._engines[key] = create_engine( connection_string, pool_size=10, # 增加连接池大小 max_overflow=20, pool_pre_ping=True, # 连接前ping检查 pool_recycle=600, # 1小时回收连接 echo=False # 生产环境设为False ) logger.info(f"Created new engine for {key}") return cls._engines[key] def generate_scada_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]: """生成SCADA数据查询结构""" try: # 一次性读取所有相关文件 df_2404 = pd.read_excel("conf/2404_表数据.xlsx") df_2405 = pd.read_excel("conf/2405_表数据.xlsx") df_scada = pd.concat([df_2404, df_2405], ignore_index=True) # 使用groupby提高效率 grouped = df_scada.groupby(['场站标准化编号', '风机号', '历史采样表名']) sql_data = {} for (wind_farm_code, wind_turbine_name, table), group in grouped: # 生成字段映射 columns = [f' {row["历史采样域名"]} as {row["标准化英文"]}' for _, row in group.iterrows()] wind_turbine_name = str(wind_turbine_name) # 更新数据结构 if wind_farm_code not in sql_data: sql_data[wind_farm_code] = {} if wind_turbine_name not in sql_data[wind_farm_code]: sql_data[wind_farm_code][wind_turbine_name] = {} sql_data[wind_farm_code][wind_turbine_name][table] = columns logger.info(f"Generated SCADA SQL for {len(sql_data)} wind farms") return sql_data except Exception as e: logger.error(f"Error generating SCADA SQL: {e}") raise def generate_warn_fault_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]: """生成告警故障数据查询结构""" try: df_2406 = pd.read_excel('conf/2406_表数据.xlsx') df_2406 = df_2406[df_2406['场站标准化编号'] != 'WOF35800080'] # 使用groupby优化 grouped = df_2406.groupby(['场站标准化编号', '风机号', '历史采样表名']) warn_fault_data = {} for (wind_farm_code, wind_turbine_name, table), group in grouped: # 生成字段列表 columns = [f'{row["历史采样域名"]} as fault_code' for _, row in group.iterrows()] wind_turbine_name = str(wind_turbine_name) if wind_farm_code not in warn_fault_data: warn_fault_data[wind_farm_code] = {} if wind_turbine_name not in warn_fault_data[wind_farm_code]: warn_fault_data[wind_farm_code][wind_turbine_name] = {} warn_fault_data[wind_farm_code][wind_turbine_name][table] = columns logger.info(f"Generated warning/fault SQL for {len(warn_fault_data)} wind farms") return warn_fault_data except Exception as e: logger.error(f"Error generating warn/fault SQL: {e}") raise def merge_dataframes(dataframes: List[pd.DataFrame]) -> pd.DataFrame: """高效合并数据框""" if not dataframes: return pd.DataFrame() # 使用reduce进行合并 result = dataframes[0] for df in dataframes[1:]: result = pd.merge( result, df, on=['wind_turbine_name', 'time_stamp'], how='inner' # 根据需求选择合并方式 ) return result def execute_query_batch(engine: Engine, queries: List[Tuple[str, Dict]]) -> List[pd.DataFrame]: """批量执行查询""" results = [] with engine.connect() as conn: for sql, params in queries: try: df = pd.read_sql( text(sql), conn, params=params ) results.append(df) except Exception as e: logger.error(f"Query failed: {sql[:100]}... Error: {e}") # 返回空DataFrame避免中断 results.append(pd.DataFrame()) return results def process_wind_turbine_batch( engine: Engine, wind_farm_code: str, wind_turbine_name: str, points_data: Dict[str, List[str]], begin_time: datetime, end_time: datetime, query_type: str ) -> bool: """处理单个风机的数据查询和保存""" try: begin_time_str = begin_time.strftime('%Y-%m-%d %H:%M:%S') end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S') # 批量构建查询 queries = [] for table, cols in points_data.items(): if not cols: # 跳过空列 continue sql = f""" SELECT :wind_turbine_name as wind_turbine_name, occur_time as time_stamp, {', '.join(cols)} FROM {table} WHERE occur_time >= :begin_time AND occur_time < :end_time """ queries.append((sql, { 'wind_turbine_name': wind_turbine_name, 'begin_time': begin_time_str, 'end_time': end_time_str })) # 批量执行查询 dfs = execute_query_batch(engine, queries) # 合并数据 merged_df = merge_dataframes(dfs) if merged_df.empty: logger.warning(f"No data found for {wind_farm_code}_{wind_turbine_name}") return False # 保存数据 save_path = os.path.join("/home/caiji/data", query_type) os.makedirs(save_path, exist_ok=True) file_name = f"{wind_farm_code}_{wind_turbine_name}_{begin_time.strftime('%Y%m%d%H%M%S')}.csv" file_path = os.path.join(save_path, file_name) # 使用parquet格式,更快更节省空间 merged_df.to_csv(file_path, index=False) logger.info(f"Saved {len(merged_df)} rows to {file_path}") return True except Exception as e: logger.error(f"Error processing {wind_farm_code}_{wind_turbine_name}: {e}") logger.error(traceback.format_exc(), exc_info=True) return False def read_and_save_file_optimized(query_dict: Dict, query_type: str, begin_time: datetime, end_time: datetime): # 创建输出目录 output_dir = os.path.join("/home/caiji/data", query_type) os.makedirs(output_dir, exist_ok=True) # 获取数据库引擎 engine = DatabaseConnector.get_engine() success_count = 0 failed_count = 0 for wind_farm_code, wind_turbine_data in query_dict.items(): for wind_turbine_name, points_data in wind_turbine_data.items(): return_data = process_wind_turbine_batch(engine, wind_farm_code, wind_turbine_name, points_data, begin_time, end_time, query_type) if return_data: success_count += 1 else: failed_count += 1 logger.info(f"Processing completed. Success: {success_count}, Failed: {failed_count}") def main(begin_time: datetime, end_time: datetime): """主函数""" try: # # 生成查询结构 # scada_data = generate_scada_sql() # warn_fault_data = generate_warn_fault_sql() # # with open('conf/scada.json', 'w') as f: # f.write(json.dumps(scada_data)) # # with open('conf/warn_fault.json', 'w') as f: # f.write(json.dumps(warn_fault_data)) with open('conf/scada.json', 'r') as f: scada_data = json.load(f) with open('conf/warn_fault.json', 'r') as f: warn_fault_data = json.load(f) # 处理SCADA数据 logger.info("Starting SCADA data processing") scada_results = read_and_save_file_optimized( scada_data, 'scada', begin_time, end_time ) logger.info(str(scada_results)) logger.info("Finished SCADA data processing") # 处理告警故障数据 logger.info("Starting warn/fault data processing") warn_results = read_and_save_file_optimized( warn_fault_data, 'warn_fault', begin_time, end_time ) logger.info(str(warn_results)) logger.info("Finished warn/fault data processing") logger.info("All processing completed") except Exception as e: logger.error(traceback.format_exc(), exc_info=True) logger.error(f"Main process failed: {e}", exc_info=True) raise def parse_time_args() -> Tuple[datetime, datetime]: import sys now = datetime.now() default_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1) default_end = default_start + timedelta(days=1) logger.info(sys.argv) # 解析命令行参数 if len(sys.argv) >= 3: try: # 假设传入的是时间字符串,例如:"2024-01-01 12:00:00" start_time = datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S") end_time = datetime.strptime(sys.argv[2], "%Y-%m-%d %H:%M:%S") except ValueError: logger.error("时间格式错误,请使用格式:YYYY-MM-DD HH:MM:SS") # 如果解析失败,使用默认值 start_time = default_start end_time = default_end else: # 没有足够参数,使用默认值 start_time = default_start end_time = default_end return start_time, end_time if __name__ == '__main__': start_time, end_time = parse_time_args() print(start_time) logger.info(f'{start_time}___{end_time}') main(start_time, end_time)