# cache_service.py import json import hashlib import logging import redis from typing import Optional, Dict, Any class CacheService: def __init__(self, host: str = 'localhost', port: int = 6379, db: int = 0, password: str = None, ttl: int = 2592000): """ 初始化Redis缓存服务 :param ttl: 默认缓存时间(秒),默认24小时 """ self.client = redis.Redis( host=host, port=port, db=db, password=password, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) self.ttl = ttl self.logger = logging.getLogger(__name__) def _generate_cache_key(self, request_body: Dict[str, Any]) -> str: """ 根据请求体生成唯一的缓存键 """ request_str = json.dumps(request_body, sort_keys=True) return f"health:cache:{hashlib.sha256(request_str.encode()).hexdigest()}" def get_cached_response(self, request_body: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 获取缓存结果 :return: 如果存在则返回缓存结果,否则返回None """ cache_key = self._generate_cache_key(request_body) try: cached = self.client.get(cache_key) if cached: self.logger.info(f"缓存命中: {cache_key}") return json.loads(cached) return None except Exception as e: self.logger.error(f"获取缓存失败: {str(e)}") return None def set_cached_response(self, request_body: Dict[str, Any], response_data: Dict[str, Any], ttl: int = None) -> bool: """ 设置缓存结果 """ cache_key = self._generate_cache_key(request_body) try: expire_time = ttl if ttl is not None else self.ttl self.client.setex( cache_key, expire_time, json.dumps(response_data) ) self.logger.info(f"缓存已设置: {cache_key} (TTL: {expire_time}s)") return True except Exception as e: self.logger.error(f"设置缓存失败: {str(e)}") return False def clear_cache(self, request_body: Dict[str, Any]) -> bool: """清除指定请求的缓存""" cache_key = self._generate_cache_key(request_body) try: self.client.delete(cache_key) self.logger.info(f"缓存已清除: {cache_key}") return True except Exception as e: self.logger.error(f"清除缓存失败: {str(e)}") return False def ping(self) -> bool: """检查Redis连接是否正常""" try: return self.client.ping() except Exception as e: self.logger.error(f"Redis连接测试失败: {str(e)}") return False