重试机制熔断可靠性

AI API 重试与熔断机制最佳实践:构建弹性可靠的系统

2026-04-23 · 约 11 分钟阅读

AI API 重试与熔断机制最佳实践:构建弹性可靠的系统

AI API 天生不稳定:网络超时、服务端错误、限速限制……任何一个环节出问题都会导致应用失败。本文介绍 AI API 的重试与熔断机制最佳实践,帮你构建弹性可靠的系统。

为什么需要重试和熔断?

问题后果解决方案
网络抖动偶发失败重试
服务端临时错误5xx 响应重试
持续失败浪费资源熔断
级联故障整个系统崩溃熔断

---

重试机制设计

#### 1. 哪些错误应该重试?

错误类型是否重试说明
网络超时✅ 是临时问题
5xx 服务端错误✅ 是服务端临时故障
429 限速错误✅ 是等待后重试
401/403 认证错误❌ 否重试也没用
400 请求错误❌ 否参数有问题

代码示例(Python):

```python

def should_retry(error):

"""判断是否应该重试"""
if isinstance(error, (ConnectionError, TimeoutError)):
    return True

if hasattr(error, 'status_code'):
    status_code = error.status_code
    if status_code == 429:
        return True
    if 500 <= status_code < 600:
        return True

return False

```

#### 2. 重试策略

指数退避(Exponential Backoff):

```

第1次重试:等待 1s

第2次重试:等待 2s

第3次重试:等待 4s

第4次重试:等待 8s

第5次重试:等待 16s

...

```

带抖动的指数退避(Exponential Backoff with Jitter):

避免多个客户端同时重试导致的「惊群效应」:

```python

import random

import time

def exponential_backoff_with_jitter(

attempt, 
base_delay=1, 
max_delay=60, 
jitter_factor=0.5

):

"""
带抖动的指数退避
"""
# 指数退避
delay = min(base_delay * (2 ** attempt), max_delay)

# 添加抖动
jitter = random.uniform(-jitter_factor * delay, jitter_factor * delay)
delay = max(0, delay + jitter)

return delay

# 使用

for attempt in range(5):

try:
    response = client.chat.completions.create(...)
    break
except Exception as e:
    if should_retry(e) and attempt < 4:
        delay = exponential_backoff_with_jitter(attempt)
        time.sleep(delay)
    else:
        raise

```

#### 3. 429 限速错误的特殊处理

429 错误通常包含 `Retry-After` 响应头,告诉我们多久后可以重试:

```python

def get_retry_after(error):

"""从 429 响应中获取重试时间"""
if hasattr(error, 'response') and error.response:
    retry_after = error.response.headers.get('Retry-After')
    if retry_after:
        return int(retry_after)

# 如果没有 Retry-After,使用默认值
return 10

```

#### 4. 重试次数限制

场景推荐重试次数
实时交互(聊天)2-3 次
后台任务3-5 次
批量处理5-10 次

---

熔断机制设计

#### 什么是熔断器?

熔断器有三种状态:

```

Closed(关闭)→ 正常放行请求

↓(失败率超过阈值)

Open(打开)→ 直接拒绝请求

↓(等待一段时间后)

Half-Open(半打开)→ 尝试少量请求

↓(成功)→ 回到 Closed
↓(失败)→ 回到 Open

```

#### 熔断器参数

参数推荐值说明
失败率阈值50%超过这个比例就熔断
窗口大小60s统计最近多长时间的数据
最小请求数10请求太少不熔断
熔断时间30sOpen 状态持续多久

#### Python 实现熔断器

```python

import time

from collections import deque

from enum import Enum

class CircuitState(Enum):

CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"

class CircuitBreaker:

def __init__(
    self,
    failure_threshold=0.5,
    window_size=60,
    min_requests=10,
    open_timeout=30,
    half_open_max_calls=3
):
    self.failure_threshold = failure_threshold
    self.window_size = window_size
    self.min_requests = min_requests
    self.open_timeout = open_timeout
    self.half_open_max_calls = half_open_max_calls
    
    self.state = CircuitState.CLOSED
    self.failure_times = deque()
    self.success_times = deque()
    self.open_at = None
    self.half_open_calls = 0

def _cleanup_old_entries(self):
    """清理窗口外的记录"""
    now = time.time()
    cutoff = now - self.window_size
    
    while self.failure_times and self.failure_times[0] < cutoff:
        self.failure_times.popleft()
    
    while self.success_times and self.success_times[0] < cutoff:
        self.success_times.popleft()

def _get_failure_rate(self):
    """计算失败率"""
    self._cleanup_old_entries()
    total = len(self.failure_times) + len(self.success_times)
    if total < self.min_requests:
        return 0.0
    return len(self.failure_times) / total

def _can_attempt_call(self):
    """判断是否可以尝试调用"""
    if self.state == CircuitState.CLOSED:
        return True
    
    if self.state == CircuitState.OPEN:
        now = time.time()
        if now - self.open_at >= self.open_timeout:
            self.state = CircuitState.HALF_OPEN
            self.half_open_calls = 0
            return True
        return False
    
    if self.state == CircuitState.HALF_OPEN:
        return self.half_open_calls < self.half_open_max_calls
    
    return False

def _on_success(self):
    """成功回调"""
    now = time.time()
    
    if self.state == CircuitState.HALF_OPEN:
        self.half_open_calls += 1
        if self.half_open_calls >= self.half_open_max_calls:
            self.state = CircuitState.CLOSED
            self.failure_times.clear()
            self.success_times.clear()
    else:
        self.success_times.append(now)

def _on_failure(self):
    """失败回调"""
    now = time.time()
    
    if self.state == CircuitState.HALF_OPEN:
        self.state = CircuitState.OPEN
        self.open_at = now
    else:
        self.failure_times.append(now)
        failure_rate = self._get_failure_rate()
        if failure_rate >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.open_at = now

def execute(self, func, fallback=None):
    """执行函数"""
    if not self._can_attempt_call():
        if fallback is not None:
            return fallback()
        raise Exception("Circuit breaker is OPEN")
    
    try:
        result = func()
        self._on_success()
        return result
    except Exception as e:
        self._on_failure()
        raise

# 使用

breaker = CircuitBreaker(

failure_threshold=0.5,
window_size=60,
min_requests=10,
open_timeout=30

)

def call_ai_api():

return client.chat.completions.create(...)

try:

response = breaker.execute(call_ai_api)

except Exception as e:

print(f"请求失败: {e}")

```

---

完整的弹性客户端

#### 结合重试 + 熔断 + 退避

```python

import time

import random

from functools import wraps

def resilient_api_call(

max_retries=3,
base_delay=1,
max_delay=30,
circuit_breaker=None

):

"""
弹性 API 调用装饰器
"""
def decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        last_exception = None
        
        for attempt in range(max_retries + 1):
            try:
                # 检查熔断器
                if circuit_breaker:
                    return circuit_breaker.execute(
                        lambda: func(*args, **kwargs)
                    )
                else:
                    return func(*args, **kwargs)
                
            except Exception as e:
                last_exception = e
                
                # 判断是否应该重试
                if attempt >= max_retries:
                    break
                
                if not should_retry(e):
                    break
                
                # 计算延迟
                delay = exponential_backoff_with_jitter(
                    attempt,
                    base_delay=base_delay,
                    max_delay=max_delay
                )
                
                # 429 特殊处理
                if hasattr(e, 'status_code') and e.status_code == 429:
                    retry_after = get_retry_after(e)
                    delay = max(delay, retry_after)
                
                time.sleep(delay)
        
        raise last_exception
    
    return wrapper
return decorator

# 使用

breaker = CircuitBreaker()

@resilient_api_call(

max_retries=3,
circuit_breaker=breaker

)

def call_ai_with_resilience(model, messages):

return client.chat.completions.create(
    model=model,
    messages=messages
)

try:

response = call_ai_with_resilience("gpt-4o-mini", [...])

except Exception as e:

print(f"最终失败: {e}")

```

---

多供应商容错

当一个供应商失败时,自动切换到另一个:

```python

class MultiProviderClient:

def __init__(self, providers):
    self.providers = providers  # [{"name": "openai", "client": ..., "priority": 1}]
    self.circuit_breakers = {
        p["name"]: CircuitBreaker() for p in providers
    }

def call(self, *args, **kwargs):
    # 按优先级排序
    sorted_providers = sorted(
        self.providers,
        key=lambda p: p["priority"]
    )
    
    last_exception = None
    
    for provider in sorted_providers:
        name = provider["name"]
        client = provider["client"]
        breaker = self.circuit_breakers[name]
        
        if not breaker._can_attempt_call():
            continue
        
        try:
            return breaker.execute(
                lambda: client.chat.completions.create(*args, **kwargs)
            )
        except Exception as e:
            last_exception = e
            continue
    
    raise last_exception or Exception("All providers failed")

```

---

最佳实践

#### 1. 重试 + 熔断配合使用

机制解决的问题
重试临时故障
熔断持续故障

#### 2. 监控重试和熔断

关键指标:

  • 重试次数/比例
  • 熔断状态变化
  • 失败率趋势
  • 多供应商切换次数

#### 3. 合理设置参数

  • 重试次数不要太多(2-5 次)
  • 熔断阈值不要太敏感(50% 比较合理)
  • 窗口大小适中(60s-5min)

#### 4. 提供降级方案

当所有重试都失败时:

  • 返回缓存的结果
  • 返回默认值
  • 提示用户稍后重试

---

总结

AI API 的重试与熔断机制是构建弹性系统的关键:

  • ✅ 只对可重试的错误进行重试
  • ✅ 使用带抖动的指数退避
  • ✅ 熔断器防止级联故障
  • ✅ 多供应商容错提高可用性
  • ✅ 监控指标持续优化

建议:

1. 先从重试开始,逐步添加熔断

2. 使用成熟的库(tenacity、pybreaker)

3. 持续监控,根据数据调整参数

4. 考虑多供应商架构,避免单点故障

可在本站查看更多 AI API 中转平台,找到更稳定可靠的 AI API 服务。

找到最适合你的 AI API 中转站

收录 77+ 服务商,按价格、模型、标签一键筛选

查看所有中转站 →