Agent系统在生产环境中面临着各种不确定性:网络波动、服务超时、资源耗尽、逻辑错误、外部依赖故障等。一个健壮的Agent系统不仅需要正确处理这些异常,还需要具备快速恢复的能力。错误处理不是事后补救,而是系统设计的核心组成部分。
本文将深入探讨Agent系统的错误处理与容错机制,从异常分类、重试策略、熔断降级、隔离机制、自愈能力、监控告警到故障演练,构建完整的可靠性保障体系。
异常分类与处理策略
Agent系统中可能发生的异常可以分为多个层次,每个层次需要不同的处理策略。
异常层次分类
基础设施层异常。 包括网络超时、连接失败、DNS解析错误、证书过期等。这类异常通常是暂时的,适合采用重试策略。但需要注意,盲目的重试可能加剧问题,比如对已经过载的服务造成更大压力。
服务层异常。 包括API限流、服务不可用、数据格式错误、认证失败等。这类异常需要区分是暂时性的还是持续性的。对于暂时性异常可以重试,对于持续性异常则需要触发降级或熔断。
业务逻辑层异常。 包括输入验证失败、状态不一致、业务规则冲突等。这类异常通常不应该重试,因为重试很可能得到同样的结果。正确的处理方式是返回清晰的错误信息,让上层决定如何处理。
Agent自身异常。 包括上下文溢出、推理失败、输出格式错误、工具调用失败等。这类异常是Agent系统特有的,需要专门的恢复策略。
统一异常处理框架
from enum import Enum, auto
from dataclasses import dataclass
from typing import Optional, Dict, Any
import traceback
import time
import uuid
class ErrorSeverity(Enum):
CRITICAL = auto()
HIGH = auto()
MEDIUM = auto()
LOW = auto()
WARNING = auto()
class ErrorCategory(Enum):
NETWORK = "network"
TIMEOUT = "timeout"
AUTH = "authentication"
VALIDATION = "validation"
RESOURCE = "resource"
LOGIC = "logic"
EXTERNAL = "external_dependency"
UNKNOWN = "unknown"
@dataclass
class AgentError:
error_id: str
category: ErrorCategory
severity: ErrorSeverity
message: str
original_exception: Optional[Exception]
context: Dict[str, Any]
timestamp: float
stack_trace: Optional[str]
recoverable: bool
retry_count: int = 0
class ErrorClassifier:
def classify(self, exception: Exception, context: Dict = None) -> AgentError:
error_id = f"ERR-{uuid.uuid4().hex[:8].upper()}"
if isinstance(exception, (ConnectionError, TimeoutError)):
category = ErrorCategory.NETWORK
severity = ErrorSeverity.HIGH
recoverable = True
elif isinstance(exception, PermissionError):
category = ErrorCategory.AUTH
severity = ErrorSeverity.CRITICAL
recoverable = False
elif isinstance(exception, ValueError):
category = ErrorCategory.VALIDATION
severity = ErrorSeverity.MEDIUM
recoverable = False
elif isinstance(exception, MemoryError):
category = ErrorCategory.RESOURCE
severity = ErrorSeverity.CRITICAL
recoverable = True
else:
category = ErrorCategory.UNKNOWN
severity = ErrorSeverity.HIGH
recoverable = True
return AgentError(
error_id=error_id,
category=category,
severity=severity,
message=str(exception),
original_exception=exception,
context=context or {},
timestamp=time.time(),
stack_trace=traceback.format_exc(),
recoverable=recoverable
)
class ErrorHandler:
def __init__(self):
self.classifier = ErrorClassifier()
self.error_log = []
def handle(self, exception: Exception, context: Dict = None) -> AgentError:
error = self.classifier.classify(exception, context)
self.error_log.append(error)
return error
这个异常处理框架实现了统一的异常分类和处理机制。每个异常都被赋予了明确的类别、严重级别和恢复建议,使得后续的处理逻辑可以基于这些信息做出正确决策。
重试策略与退避算法
重试是处理暂时性故障的基本手段。但不当的重试策略可能导致问题恶化,因此需要仔细设计。
智能重试机制
import random
from typing import Optional, Callable, Any
class RetryPolicy:
def __init__(self, max_retries=3, base_delay=1.0, max_delay=60.0,
backoff_strategy='exponential', retryable_exceptions=(Exception,)):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.backoff_strategy = backoff_strategy
self.retryable_exceptions = retryable_exceptions
self.attempt_history = []
def calculate_delay(self, attempt: int) -> float:
if self.backoff_strategy == 'fixed':
delay = self.base_delay
elif self.backoff_strategy == 'linear':
delay = self.base_delay * attempt
elif self.backoff_strategy == 'exponential':
delay = self.base_delay * (2 ** attempt)
elif self.backoff_strategy == 'jitter':
base = self.base_delay * (2 ** attempt)
delay = base + random.uniform(0, base * 0.1)
else:
delay = self.base_delay
return min(delay, self.max_delay)
def should_retry(self, exception: Exception, attempt: int) -> bool:
if attempt >= self.max_retries:
return False
if not isinstance(exception, self.retryable_exceptions):
return False
if hasattr(exception, 'recoverable') and not exception.recoverable:
return False
return True
class RetryExecutor:
def __init__(self, policy: RetryPolicy):
self.policy = policy
def execute(self, operation: Callable, *args, **kwargs) -> Any:
last_exception = None
for attempt in range(self.policy.max_retries + 1):
try:
result = operation(*args, **kwargs)
if attempt > 0:
self.policy.attempt_history.append({
'attempt': attempt, 'success': True, 'timestamp': time.time()
})
return result
except Exception as e:
last_exception = e
if not self.policy.should_retry(e, attempt):
raise
delay = self.policy.calculate_delay(attempt)
self.policy.attempt_history.append({
'attempt': attempt, 'success': False,
'exception': str(e), 'delay': delay, 'timestamp': time.time()
})
if attempt < self.policy.max_retries:
time.sleep(delay)
raise last_exception
这个重试机制实现了多种退避策略,包括固定间隔、线性增长、指数退避和抖动退避。针对Agent的特殊需求,可以配置不同的重试条件。
熔断器与降级机制
当某个依赖服务持续故障时,重试只会加剧问题。熔断器模式通过暂时停止对故障服务的调用,给服务恢复留出时间。
熔断器实现
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60.0,
half_open_max_calls=3, success_threshold=2):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.half_open_calls = 0
self.lock = Lock()
def call(self, operation: Callable, fallback: Callable = None, *args, **kwargs):
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
self.success_count = 0
else:
if fallback:
return fallback(*args, **kwargs)
raise Exception("Circuit breaker is OPEN")
elif self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
if fallback:
return fallback(*args, **kwargs)
raise Exception("Half-open call limit reached")
self.half_open_calls += 1
try:
result = operation(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
if fallback:
return fallback(*args, **kwargs)
raise
def _on_success(self):
with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.half_open_calls = 0
else:
self.failure_count = 0
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return True
return (time.time() - self.last_failure_time) >= self.recovery_timeout
这个熔断器实现了完整的状态转换逻辑。从关闭到打开,再到半开,最后回到关闭,每个状态都有明确的触发条件和行为定义。
降级策略
from typing import Any
class DegradationStrategy:
def fallback(self, *args, **kwargs) -> Any:
raise NotImplementedError
class StaticFallback(DegradationStrategy):
def __init__(self, fallback_result: Any):
self.fallback_result = fallback_result
def fallback(self, *args, **kwargs) -> Any:
return self.fallback_result
class CachedFallback(DegradationStrategy):
def __init__(self, cache_provider):
self.cache = cache_provider
def fallback(self, cache_key: str, *args, **kwargs) -> Any:
return self.cache.get(cache_key)
class DegradationManager:
def __init__(self):
self.strategies = {}
self.default_strategy = StaticFallback({'status': 'degraded', 'result': None})
def register_strategy(self, operation_name: str, strategy: DegradationStrategy):
self.strategies[operation_name] = strategy
def execute_with_degradation(self, operation_name: str, operation: Callable,
*args, **kwargs) -> Any:
try:
return operation(*args, **kwargs)
except Exception:
strategy = self.strategies.get(operation_name, self.default_strategy)
return strategy.fallback(*args, **kwargs)
降级策略提供了多种备选方案。从返回静态结果到使用缓存,不同的场景可以选择不同的降级方式。
隔离机制与资源保护
隔离机制防止故障在系统中扩散。当一个Agent或组件出现问题时,隔离机制确保问题不会影响到其他部分。
舱壁模式实现
from concurrent.futures import ThreadPoolExecutor
import threading
class Bulkhead:
def __init__(self, name: str, max_concurrent: int = 10,
max_queue: int = 100, timeout: float = 30.0):
self.name = name
self.max_concurrent = max_concurrent
self.max_queue = max_queue
self.timeout = timeout
self.executor = ThreadPoolExecutor(max_workers=max_concurrent)
self.queue_size = threading.Semaphore(max_queue)
self.active_count = 0
self.lock = threading.Lock()
def execute(self, operation: Callable, *args, **kwargs):
if not self.queue_size.acquire(timeout=self.timeout):
raise Exception(f"Bulkhead {self.name} queue is full")
try:
with self.lock:
self.active_count += 1
future = self.executor.submit(operation, *args, **kwargs)
return future.result(timeout=self.timeout)
finally:
with self.lock:
self.active_count -= 1
self.queue_size.release()
舱壁模式通过限制资源使用来防止故障扩散。每个Agent或功能模块都有自己的资源池,不会因为其他模块的问题而耗尽资源。
自愈能力与自动恢复
自愈能力是指系统在检测到故障后,能够自动采取措施恢复服务,而不需要人工干预。
自愈机制设计
from typing import List, Callable
import threading
class SelfHealingManager:
def __init__(self, check_interval: float = 30.0):
self.check_interval = check_interval
self.health_checks = {}
self.recovery_actions = {}
self.health_status = {}
self.running = False
def register_health_check(self, component_name: str,
check_func: Callable,
recovery_actions: List[Callable] = None):
self.health_checks[component_name] = check_func
self.recovery_actions[component_name] = recovery_actions or []
self.health_status[component_name] = 'unknown'
def start_monitoring(self):
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _monitor_loop(self):
while self.running:
for component_name, check_func in self.health_checks.items():
try:
is_healthy = check_func()
if is_healthy:
self.health_status[component_name] = 'healthy'
else:
self.health_status[component_name] = 'unhealthy'
self._attempt_recovery(component_name)
except Exception as e:
self.health_status[component_name] = 'error'
self._attempt_recovery(component_name)
time.sleep(self.check_interval)
def _attempt_recovery(self, component_name: str):
actions = self.recovery_actions.get(component_name, [])
for action in actions:
try:
result = action()
if result:
self.health_status[component_name] = 'recovering'
return
except Exception:
continue
自愈管理器实现了完整的自动恢复机制。它定期执行健康检查,当发现问题时自动尝试恢复操作。
监控告警与可观测性
可观测性是故障处理的基础。只有了解系统的运行状态,才能及时发现和处理问题。
监控指标体系
from collections import defaultdict
import statistics
class MetricsCollector:
def __init__(self):
self.counters = defaultdict(int)
self.gauges = {}
self.histograms = defaultdict(list)
self.timers = defaultdict(list)
def increment_counter(self, name: str, value: int = 1, tags: Dict = None):
key = f"{name}#{tags}" if tags else name
self.counters[key] += value
def record_histogram(self, name: str, value: float, tags: Dict = None):
key = f"{name}#{tags}" if tags else name
self.histograms[key].append(value)
def get_histogram_stats(self, name: str, tags: Dict = None) -> Dict:
key = f"{name}#{tags}" if tags else name
values = self.histograms.get(key, [])
if not values:
return {}
return {
'count': len(values),
'min': min(values),
'max': max(values),
'mean': statistics.mean(values),
'median': statistics.median(values)
}
class AlertManager:
def __init__(self):
self.rules = []
self.handlers = []
def add_rule(self, name: str, metric: str, operator: str,
threshold: float, severity: str):
self.rules.append({
'name': name, 'metric': metric, 'operator': operator,
'threshold': threshold, 'severity': severity
})
def check_metrics(self, metrics: MetricsCollector):
for rule in self.rules:
value = metrics.counters.get(rule['metric'], 0)
triggered = False
if rule['operator'] == '>' and value > rule['threshold']:
triggered = True
elif rule['operator'] == '<' and value < rule['threshold']:
triggered = True
if triggered:
alert = {
'rule_name': rule['name'],
'severity': rule['severity'],
'message': f"{rule['metric']} {rule['operator']} {rule['threshold']}",
'timestamp': time.time()
}
for handler in self.handlers:
handler(alert)
监控告警系统实现了完整的指标收集和告警机制。它支持计数器、直方图和定时器等多种指标类型,并提供了灵活的告警规则配置。
故障演练与混沌工程
故障演练是验证系统可靠性的重要手段。通过主动注入故障,可以检验系统的容错能力和恢复机制。
混沌工程实现
import random
class ChaosExperiment:
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.active = False
def start(self):
self.active = True
def stop(self):
self.active = False
class LatencyInjection(ChaosExperiment):
def __init__(self, target_agent, latency_range=(0.1, 2.0)):
super().__init__('latency_injection', 'Inject random latency')
self.target_agent = target_agent
self.latency_range = latency_range
self.original_execute = None
def start(self):
super().start()
self.original_execute = self.target_agent.execute
def delayed_execute(*args, **kwargs):
delay = random.uniform(*self.latency_range)
time.sleep(delay)
return self.original_execute(*args, **kwargs)
self.target_agent.execute = delayed_execute
def stop(self):
super().stop()
if self.original_execute:
self.target_agent.execute = self.original_execute
class ErrorInjection(ChaosExperiment):
def __init__(self, target_agent, error_rate=0.1):
super().__init__('error_injection', 'Inject random errors')
self.target_agent = target_agent
self.error_rate = error_rate
self.original_execute = None
def start(self):
super().start()
self.original_execute = self.target_agent.execute
def error_injected_execute(*args, **kwargs):
if random.random() < self.error_rate:
raise Exception("Injected error for chaos testing")
return self.original_execute(*args, **kwargs)
self.target_agent.execute = error_injected_execute
def stop(self):
super().stop()
if self.original_execute:
self.target_agent.execute = self.original_execute
混沌工程框架实现了系统化的故障演练能力。通过延迟注入、错误注入等实验,可以验证系统的容错能力。
实际案例分析:智能客服系统
让我们通过一个智能客服系统的案例来展示容错机制的实际应用。
系统架构
这个系统包含多个Agent:意图理解Agent、知识检索Agent、对话生成Agent和工单创建Agent。
容错设计
熔断保护。 当知识库查询服务连续失败5次时,熔断器打开,直接返回”请稍后再试”的提示。
舱壁隔离。 对话生成和知识检索使用不同的线程池,避免相互影响。
降级策略。 当情感分析服务不可用时,跳过情感分析,直接生成标准回复。
自愈机制。 系统每30秒检查一次各Agent状态,发现异常自动重启。
典型故障场景
场景一:知识库超时。 用户提问后,知识检索Agent查询超时。重试机制等待2秒后重试,仍然超时。熔断器打开,返回缓存的答案或提示用户稍后查询。
场景二:对话生成Agent异常。 对话生成过程中发生上下文溢出。异常被捕获,系统保存当前对话状态,重启Agent后从保存点恢复。
场景三:外部API限流。 调用外部翻译API时触发限流。系统使用本地备用翻译服务,虽然质量略低但能保证服务可用。
总结与最佳实践
构建可靠的Agent系统需要从多个层面进行容错设计:
分层异常处理。 不同层次的异常需要不同的处理策略。基础设施层适合重试,业务逻辑层需要返回明确错误。
智能重试机制。 使用指数退避和抖动避免雪崩效应,设置最大重试次数防止无限循环。
熔断器保护。 当服务持续失败时快速失败,保护系统资源,并在恢复后重新尝试。
多级降级。 从使用缓存到返回简化结果,确保即使最坏情况下也能提供基本服务。
资源隔离。 使用舱壁模式隔离不同功能的资源,防止故障扩散。
自愈能力。 自动检测和恢复,减少人工干预的需求。
全面监控。 建立完整的监控体系,覆盖系统指标、业务指标和健康状态。
定期演练。 通过混沌工程定期验证系统的容错能力,在真实故障场景下测试恢复机制。
可靠性不是一次性设计出来的,而是通过持续的优化和验证建立起来的。每一次故障都是改进的机会,每一次演练都是能力的验证。建立完善的错误处理和容错机制,才能让Agent系统在生产环境中稳定可靠地运行。