限流生产实践

AI API 速率限制与配额管理指南:避免 429,稳定运行生产系统

2026-04-17 · 约 7 分钟阅读

AI API 速率限制与配额管理完整指南:避免 429 错误,稳定运行生产系统

使用 AI API 时,最让人头疼的就是突然遇到 `429 Too Many Requests` 错误——业务跑着跑着就停了。本文深入讲解 AI API 的速率限制和配额管理,从原理到实战,帮你构建稳定的生产系统。

为什么会有限速?

原因说明
保护基础设施防止过载,保证服务稳定
防止滥用避免恶意刷 API
成本控制引导合理使用,降低双方成本
公平性让所有用户都能用上服务

常见的 429 错误:

```

OpenAI: Rate limit reached for gpt-4o in organization org_xxx

Anthropic: Rate limit exceeded: Too many requests

Azure: Requests to the OpenAI API have been throttled

```

---

各平台限速规则(2026)

#### OpenAI

模型RPM(每分钟请求数)TPM(每分钟 tokens)RPD(每天请求数)
GPT-4o10,0002,000,000-
GPT-4o Mini20,0004,000,000-
GPT-3.5 Turbo30,0006,000,000-

说明:

  • RPM(Requests Per Minute):每分钟最多请求多少次
  • TPM(Tokens Per Minute):每分钟最多处理多少 tokens
  • 哪个先到就触发哪个限制

#### Anthropic Claude

模型RPMTPM
Claude 3.7 Sonnet5,0001,000,000
Claude 3.5 Sonnet10,0002,000,000
Claude 3 Haiku20,0004,000,000

#### 国产模型(通义千问/豆包/智谱)

模型RPMTPM特点
通义千问 Plus10,0002,000,000配额弹性好
豆包 Pro8,0001,600,000免费额度充足
智谱 GLM-415,0003,000,000限速较宽松

---

实战:限速处理最佳实践

#### 1. 指数退避重试(Exponential Backoff)

遇到 429 不要立即重试,而是等待一段时间再试,每次等待时间指数增长:

```python

import time

import random

from openai import OpenAI

client = OpenAI()

def call_ai_api_with_retry(messages, max_retries=5):

retry_count = 0
while retry_count < max_retries:
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages
        )
        return response
    except Exception as e:
        if "rate_limit" in str(e).lower() or e.status_code == 429:
            # 指数退避 + 抖动
            wait_time = (2 ** retry_count) + random.uniform(0, 1)
            print(f"限速了,等待 {wait_time:.1f} 秒后重试...")
            time.sleep(wait_time)
            retry_count += 1
        else:
            # 其他错误直接抛出
            raise
raise Exception("重试次数已用完")

# 使用

response = call_ai_api_with_retry([{"role": "user", "content": "你好"}])

```

退避策略:

  • 第 1 次失败:等 1-2 秒
  • 第 2 次失败:等 2-3 秒
  • 第 3 次失败:等 4-5 秒
  • 第 4 次失败:等 8-9 秒
  • 第 5 次失败:放弃

#### 2. 请求队列(Request Queue)

把请求放入队列,按限速规则匀速消费:

```python

import time

from collections import deque

from threading import Thread, Lock

class RateLimiter:

def __init__(self, rpm=10000, tpm=2000000):
    self.rpm = rpm
    self.tpm = tpm
    self.request_history = deque()  # 记录请求时间
    self.token_history = deque()    # 记录 token 消耗
    self.lock = Lock()
    
def _cleanup_old_entries(self):
    """清理 1 分钟前的记录"""
    now = time.time()
    cutoff = now - 60
    with self.lock:
        while self.request_history and self.request_history[0] < cutoff:
            self.request_history.popleft()
        while self.token_history and self.token_history[0][0] < cutoff:
            self.token_history.popleft()

def _get_current_usage(self):
    """获取当前 1 分钟内的用量"""
    self._cleanup_old_entries()
    with self.lock:
        current_rpm = len(self.request_history)
        current_tpm = sum(t for _, t in self.token_history)
    return current_rpm, current_tpm

def acquire(self, tokens=100):
    """获取许可,返回需要等待的时间"""
    while True:
        current_rpm, current_tpm = self._get_current_usage()
        
        if current_rpm < self.rpm and current_tpm + tokens < self.tpm:
            # 可以执行
            now = time.time()
            with self.lock:
                self.request_history.append(now)
                self.token_history.append((now, tokens))
            return 0
        
        # 需要等待
        time.sleep(0.1)

# 使用示例

limiter = RateLimiter(rpm=10000, tpm=2000000)

def process_task(task):

# 先获取许可
wait_time = limiter.acquire(tokens=500)
if wait_time > 0:
    time.sleep(wait_time)

# 再调用 API
response = client.chat.completions.create(...)
return response

```

#### 3. 多 API Key 轮询

如果一个 Key 限速了,切换到另一个:

```python

import random

class MultiKeyClient:

def __init__(self, api_keys):
    self.clients = [OpenAI(api_key=key) for key in api_keys]
    self.current_index = 0
    self.failed_counts = [0] * len(api_keys)
    
def get_client(self):
    # 简单轮询
    client = self.clients[self.current_index]
    self.current_index = (self.current_index + 1) % len(self.clients)
    return client
    
def call_with_failover(self, messages):
    for attempt in range(len(self.clients)):
        client = self.get_client()
        try:
            return client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages
            )
        except Exception as e:
            if "rate_limit" in str(e).lower():
                print(f"Key 限速了,尝试下一个...")
                continue
            raise
    raise Exception("所有 Key 都限速了")

# 使用

api_keys = ["sk-xxx1", "sk-xxx2", "sk-xxx3"]

multi_client = MultiKeyClient(api_keys)

response = multi_client.call_with_failover([{"role": "user", "content": "你好"}])

```

#### 4. 批量处理(Batching)

把多个请求合并成一个,减少请求次数:

```python

# ❌ 不好的做法:一次一个

for question in questions:

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": question}]
)

# ✅ 好的做法:批量处理

batch_prompt = "请依次回答以下问题,每个问题的答案用 --- 分隔:\n\n"

for i, question in enumerate(questions):

batch_prompt += f"{i+1}. {question}\n\n"

response = client.chat.completions.create(

model="gpt-4o-mini",
messages=[{"role": "user", "content": batch_prompt}]

)

# 拆分结果

answers = response.choices[0].message.content.split("---")

```

效果: 节省 70-90% 的请求次数!

---

监控和告警

#### 关键指标监控

指标说明告警阈值
429 错误率限速错误的比例> 5%
请求成功率成功请求 / 总请求< 95%
平均延迟API 响应时间> 5s
RPM/TPM 使用率当前用量 / 限额> 80%

#### 简单监控脚本

```python

import time

from collections import deque

class APIMonitor:

def __init__(self):
    self.requests = deque(maxlen=1000)  # 最近 1000 次请求
    self.errors = deque(maxlen=100)      # 最近 100 次错误
    
def record_request(self, success, latency, tokens=0):
    self.requests.append({
        "time": time.time(),
        "success": success,
        "latency": latency,
        "tokens": tokens
    })
    if not success:
        self.errors.append({"time": time.time()})

def get_stats(self):
    if not self.requests:
        return {}
    
    now = time.time()
    last_minute = [r for r in self.requests if r["time"] > now - 60]
    
    total = len(self.requests)
    successful = sum(1 for r in self.requests if r["success"])
    success_rate = successful / total if total > 0 else 0
    
    avg_latency = sum(r["latency"] for r in self.requests) / total if total > 0 else 0
    
    rpm_usage = len(last_minute)
    
    return {
        "total_requests": total,
        "success_rate": success_rate,
        "avg_latency": avg_latency,
        "rpm_usage": rpm_usage,
        "recent_errors": len(self.errors)
    }

def check_alerts(self):
    stats = self.get_stats()
    alerts = []
    
    if stats.get("success_rate", 1) < 0.95:
        alerts.append("⚠️ 成功率低于 95%")
    
    if stats.get("avg_latency", 0) > 5:
        alerts.append("⚠️ 平均延迟超过 5 秒")
    
    if stats.get("rpm_usage", 0) > 8000:  # 假设限额 10000
        alerts.append("⚠️ RPM 使用率超过 80%")
    
    return alerts

# 使用

monitor = APIMonitor()

# 每次请求后记录

start = time.time()

try:

response = client.chat.completions.create(...)
latency = time.time() - start
monitor.record_request(success=True, latency=latency)

except Exception as e:

latency = time.time() - start
monitor.record_request(success=False, latency=latency)

# 检查告警

alerts = monitor.check_alerts()

if alerts:

print("告警:", alerts)
# 发送告警通知(Slack/邮件/飞书)

```

---

成本优化策略

策略效果实现难度
模型分级降本 50-70%
缓存常用请求降本 30-80%
批量处理降本 40-90%
max_tokens 限制降本 10-30%

模型分级示例:

```python

def choose_model(task_type):

if task_type == "complex_reasoning":
    return "gpt-4o"
elif task_type == "simple_chat":
    return "gpt-4o-mini"
elif task_type == "classification":
    return "gpt-3.5-turbo"
else:
    return "gpt-4o-mini"  # 默认用便宜的

```

---

总结

AI API 速率限制是生产系统必须面对的问题:

  • ✅ 理解各平台的限速规则
  • ✅ 实现指数退避重试
  • ✅ 用队列控制请求速率
  • ✅ 多 Key 轮询提高可用性
  • ✅ 监控告警及时发现问题
  • ✅ 批量处理和缓存降低成本

建议:

1. 从简单的重试开始,逐步完善

2. 监控是关键,提前发现问题

3. 不要只靠一个 API Key,多备几个

4. 合理分级使用模型,控制成本

可在本站查看更多 AI API 中转平台,找到限速更宽松、性价比更高的选择。

找到最适合你的 AI API 中转站

收录 77+ 服务商,按价格、模型、标签一键筛选

查看所有中转站 →