| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- import json
- import logging.handlers
- import os.path
- import traceback
- from datetime import datetime, timedelta
- from typing import Dict, List, Tuple
- import pandas as pd
- from sqlalchemy import create_engine, text
- from sqlalchemy.engine import Engine
- # 日志配置函数化,避免全局副作用
- def setup_logger():
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.DEBUG)
- # 避免重复添加handler
- if logger.handlers:
- return logger
- log_path = "/home/caiji/project/logs/"
- os.makedirs(log_path, exist_ok=True)
- # 1. 创建并配置TimedRotatingFileHandler(所有级别的文件输出)
- all_log_handler = logging.handlers.TimedRotatingFileHandler(
- os.path.join(log_path, "info.log"), 'D', backupCount=90
- )
- # 2. 创建并配置ERROR级别的TimedRotatingFileHandler
- error_log_handler = logging.handlers.TimedRotatingFileHandler(
- os.path.join(log_path, "error.log"), 'D', backupCount=90
- )
- error_log_handler.setLevel(logging.ERROR)
- # 3. 创建并配置StreamHandler(控制台输出)
- console_handler = logging.StreamHandler()
- console_handler.setLevel(logging.INFO)
- # 定义日志格式
- fmt = logging.Formatter(
- "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
- "%Y-%m-%d %H:%M:%S"
- )
- # 设置各个handler的格式
- all_log_handler.setFormatter(fmt)
- error_log_handler.setFormatter(fmt)
- console_handler.setFormatter(fmt)
- # 添加handler
- logger.addHandler(all_log_handler)
- logger.addHandler(error_log_handler)
- logger.addHandler(console_handler)
- return logger
- logger = setup_logger()
- class DatabaseConnector:
- """数据库连接管理器"""
- _engines: Dict[str, Engine] = {}
- @classmethod
- def get_engine(cls, host='172.21.6.37', port=3306, user='envision', pwd='envision', db='envision') -> Engine:
- """获取或创建数据库引擎"""
- key = f"{host}:{port}:{user}:{db}"
- if key not in cls._engines:
- connection_string = f'mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}'
- cls._engines[key] = create_engine(
- connection_string,
- pool_size=10, # 增加连接池大小
- max_overflow=20,
- pool_pre_ping=True, # 连接前ping检查
- pool_recycle=600, # 1小时回收连接
- echo=False # 生产环境设为False
- )
- logger.info(f"Created new engine for {key}")
- return cls._engines[key]
- def generate_scada_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
- """生成SCADA数据查询结构"""
- try:
- # 一次性读取所有相关文件
- df_2404 = pd.read_excel("conf/2404_表数据.xlsx")
- df_2405 = pd.read_excel("conf/2405_表数据.xlsx")
- df_scada = pd.concat([df_2404, df_2405], ignore_index=True)
- # 使用groupby提高效率
- grouped = df_scada.groupby(['场站标准化编号', '风机号', '历史采样表名'])
- sql_data = {}
- for (wind_farm_code, wind_turbine_name, table), group in grouped:
- # 生成字段映射
- columns = [f' {row["历史采样域名"]} as {row["标准化英文"]}'
- for _, row in group.iterrows()]
- wind_turbine_name = str(wind_turbine_name)
- # 更新数据结构
- if wind_farm_code not in sql_data:
- sql_data[wind_farm_code] = {}
- if wind_turbine_name not in sql_data[wind_farm_code]:
- sql_data[wind_farm_code][wind_turbine_name] = {}
- sql_data[wind_farm_code][wind_turbine_name][table] = columns
- logger.info(f"Generated SCADA SQL for {len(sql_data)} wind farms")
- return sql_data
- except Exception as e:
- logger.error(f"Error generating SCADA SQL: {e}")
- raise
- def generate_warn_fault_sql() -> Dict[str, Dict[str, Dict[str, List[str]]]]:
- """生成告警故障数据查询结构"""
- try:
- df_2406 = pd.read_excel('conf/2406_表数据.xlsx')
- df_2406 = df_2406[df_2406['场站标准化编号'] != 'WOF35800080']
- # 使用groupby优化
- grouped = df_2406.groupby(['场站标准化编号', '风机号', '历史采样表名'])
- warn_fault_data = {}
- for (wind_farm_code, wind_turbine_name, table), group in grouped:
- # 生成字段列表
- columns = [f'{row["历史采样域名"]} as fault_code'
- for _, row in group.iterrows()]
- wind_turbine_name = str(wind_turbine_name)
- if wind_farm_code not in warn_fault_data:
- warn_fault_data[wind_farm_code] = {}
- if wind_turbine_name not in warn_fault_data[wind_farm_code]:
- warn_fault_data[wind_farm_code][wind_turbine_name] = {}
- warn_fault_data[wind_farm_code][wind_turbine_name][table] = columns
- logger.info(f"Generated warning/fault SQL for {len(warn_fault_data)} wind farms")
- return warn_fault_data
- except Exception as e:
- logger.error(f"Error generating warn/fault SQL: {e}")
- raise
- def merge_dataframes(dataframes: List[pd.DataFrame]) -> pd.DataFrame:
- """高效合并数据框"""
- if not dataframes:
- return pd.DataFrame()
- # 使用reduce进行合并
- result = dataframes[0]
- for df in dataframes[1:]:
- result = pd.merge(
- result,
- df,
- on=['wind_turbine_name', 'time_stamp'],
- how='inner' # 根据需求选择合并方式
- )
- return result
- def execute_query_batch(engine: Engine, queries: List[Tuple[str, Dict]]) -> List[pd.DataFrame]:
- """批量执行查询"""
- results = []
- with engine.connect() as conn:
- for sql, params in queries:
- try:
- df = pd.read_sql(
- text(sql),
- conn,
- params=params
- )
- results.append(df)
- except Exception as e:
- logger.error(f"Query failed: {sql[:100]}... Error: {e}")
- # 返回空DataFrame避免中断
- results.append(pd.DataFrame())
- return results
- def process_wind_turbine_batch(
- engine: Engine,
- wind_farm_code: str,
- wind_turbine_name: str,
- points_data: Dict[str, List[str]],
- begin_time: datetime,
- end_time: datetime,
- query_type: str
- ) -> bool:
- """处理单个风机的数据查询和保存"""
- try:
- begin_time_str = begin_time.strftime('%Y-%m-%d %H:%M:%S')
- end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
- # 批量构建查询
- queries = []
- for table, cols in points_data.items():
- if not cols: # 跳过空列
- continue
- sql = f"""
- SELECT
- :wind_turbine_name as wind_turbine_name,
- occur_time as time_stamp,
- {', '.join(cols)}
- FROM {table}
- WHERE occur_time >= :begin_time AND occur_time < :end_time
- """
- queries.append((sql, {
- 'wind_turbine_name': wind_turbine_name,
- 'begin_time': begin_time_str,
- 'end_time': end_time_str
- }))
- # 批量执行查询
- dfs = execute_query_batch(engine, queries)
- # 合并数据
- merged_df = merge_dataframes(dfs)
- if merged_df.empty:
- logger.warning(f"No data found for {wind_farm_code}_{wind_turbine_name}")
- return False
- # 保存数据
- save_path = os.path.join("/home/caiji/data", query_type)
- os.makedirs(save_path, exist_ok=True)
- file_name = f"{wind_farm_code}_{wind_turbine_name}_{begin_time.strftime('%Y%m%d%H%M%S')}.csv"
- file_path = os.path.join(save_path, file_name)
- # 使用parquet格式,更快更节省空间
- merged_df.to_csv(file_path, index=False)
- logger.info(f"Saved {len(merged_df)} rows to {file_path}")
- return True
- except Exception as e:
- logger.error(f"Error processing {wind_farm_code}_{wind_turbine_name}: {e}")
- logger.error(traceback.format_exc(), exc_info=True)
- return False
- def read_and_save_file_optimized(query_dict: Dict, query_type: str, begin_time: datetime, end_time: datetime):
- # 创建输出目录
- output_dir = os.path.join("/home/caiji/data", query_type)
- os.makedirs(output_dir, exist_ok=True)
- # 获取数据库引擎
- engine = DatabaseConnector.get_engine()
- success_count = 0
- failed_count = 0
- for wind_farm_code, wind_turbine_data in query_dict.items():
- for wind_turbine_name, points_data in wind_turbine_data.items():
- return_data = process_wind_turbine_batch(engine, wind_farm_code, wind_turbine_name,
- points_data, begin_time, end_time, query_type)
- if return_data:
- success_count += 1
- else:
- failed_count += 1
- logger.info(f"Processing completed. Success: {success_count}, Failed: {failed_count}")
- def main(begin_time: datetime, end_time: datetime):
- """主函数"""
- try:
- # # 生成查询结构
- # scada_data = generate_scada_sql()
- # warn_fault_data = generate_warn_fault_sql()
- #
- # with open('conf/scada.json', 'w') as f:
- # f.write(json.dumps(scada_data))
- #
- # with open('conf/warn_fault.json', 'w') as f:
- # f.write(json.dumps(warn_fault_data))
- with open('conf/scada.json', 'r') as f:
- scada_data = json.load(f)
- with open('conf/warn_fault.json', 'r') as f:
- warn_fault_data = json.load(f)
- # 处理SCADA数据
- logger.info("Starting SCADA data processing")
- scada_results = read_and_save_file_optimized(
- scada_data,
- 'scada',
- begin_time,
- end_time
- )
- logger.info(str(scada_results))
- logger.info("Finished SCADA data processing")
- # 处理告警故障数据
- logger.info("Starting warn/fault data processing")
- warn_results = read_and_save_file_optimized(
- warn_fault_data,
- 'warn_fault',
- begin_time,
- end_time
- )
- logger.info(str(warn_results))
- logger.info("Finished warn/fault data processing")
- logger.info("All processing completed")
- except Exception as e:
- logger.error(traceback.format_exc(), exc_info=True)
- logger.error(f"Main process failed: {e}", exc_info=True)
- raise
- def parse_time_args() -> Tuple[datetime, datetime]:
- import sys
- now = datetime.now()
- default_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
- default_end = default_start + timedelta(days=1)
- logger.info(sys.argv)
- # 解析命令行参数
- if len(sys.argv) >= 3:
- try:
- # 假设传入的是时间字符串,例如:"2024-01-01 12:00:00"
- start_time = datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S")
- end_time = datetime.strptime(sys.argv[2], "%Y-%m-%d %H:%M:%S")
- except ValueError:
- logger.error("时间格式错误,请使用格式:YYYY-MM-DD HH:MM:SS")
- # 如果解析失败,使用默认值
- start_time = default_start
- end_time = default_end
- else:
- # 没有足够参数,使用默认值
- start_time = default_start
- end_time = default_end
- return start_time, end_time
- if __name__ == '__main__':
- start_time, end_time = parse_time_args()
- print(start_time)
- logger.info(f'{start_time}___{end_time}')
- main(start_time, end_time)
|