AI API 批量处理与优化技巧:大幅提升吞吐量降低成本
2026-04-23 · 约 11 分钟阅读
AI API 批量处理与优化技巧:大幅提升吞吐量降低成本
批量处理是 AI API 优化的利器——把多个请求合并成一个,既能提升吞吐量,又能降低成本。但批量处理也有坑:超时、部分失败、顺序问题……本文介绍 AI API 批量处理的最佳实践,帮你大幅提升效率。
为什么需要批量处理?
| 指标 | 单个请求 | 批量处理 | 提升 |
|---|---|---|---|
| 延迟(每个任务) | 1s | 2s | -50% |
| 吞吐量(任务/分钟) | 60 | 300 | +500% |
| 成本(每个任务) | $0.01 | $0.006 | -40% |
| 网络开销 | 高 | 低 | -80% |
---
批量处理的适用场景
#### ✅ 适合批量处理
| 场景 | 说明 |
|---|---|
| 文档批量翻译 | 一次翻译 100 篇文档 |
| 批量文本分类 | 一次分类 1000 条评论 |
| 批量摘要生成 | 一次生成 50 篇摘要 |
| 数据标注 | 一次标注 200 条数据 |
| 夜间批处理任务 | 非实时的后台任务 |
#### ❌ 不适合批量处理
| 场景 | 说明 |
|---|---|
| 实时对话 | 用户需要立即响应 |
| 大文件处理 | 单个文件就很大 |
| 强顺序依赖 | 后一个依赖前一个的结果 |
| 低延迟要求 | 需要毫秒级响应 |
---
批量处理策略
#### 策略 1:简单批次(Simple Batching)
```
多个独立请求 → 打包成一个请求 → 发送 → 解包结果
```
示例:
```python
from openai import OpenAI
client = OpenAI()
def batch_translate(texts, source_lang="en", target_lang="zh"):
"""简单批量翻译"""
# 把多个文本打包成一个请求
combined_text = "\n\n---NEXT-TEXT---\n\n".join(texts)
prompt = f"""
请将以下文本从 {source_lang} 翻译成 {target_lang}。
每个文本之间用 ---NEXT-TEXT--- 分隔。
请保持分隔符,按顺序输出翻译结果。
文本:
{combined_text}
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0
)
# 解包结果
translated_text = response.choices[0].message.content
return translated_text.split("\n\n---NEXT-TEXT---\n\n")
# 使用
texts = [
"Hello world!",
"How are you?",
"Nice to meet you!"
]
translations = batch_translate(texts)
print(translations)
```
---
#### 策略 2:函数调用批量(Function Calling Batching)
使用函数调用更结构化地处理批量任务:
```python
def batch_classify(texts):
"""使用函数调用批量分类"""
functions = [
{
"name": "classify_texts",
"description": "对多个文本进行分类",
"parameters": {
"type": "object",
"properties": {
"classifications": {
"type": "array",
"items": {
"type": "object",
"properties": {
"index": {"type": "integer"},
"category": {"type": "string"},
"confidence": {"type": "number"}
},
"required": ["index", "category", "confidence"]
}
}
},
"required": ["classifications"]
}
}
]
combined_text = "\n".join([f"{i}: {text}" for i, text in enumerate(texts)])
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": f"请对以下文本进行分类(积极/消极/中性):\n{combined_text}"}
],
functions=functions,
function_call={"name": "classify_texts"}
)
return response.choices[0].message.function_call
```
---
#### 策略 3:并行批量(Parallel Batching)
多个批次并行处理:
```python
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def process_single_batch(batch):
"""处理单个批次"""
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": format_batch(batch)}]
)
return parse_response(response)
async def parallel_batch_process(items, batch_size=10, max_concurrent=5):
"""并行批量处理"""
# 分批
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
# 限制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(batch):
async with semaphore:
return await process_single_batch(batch)
# 并行处理
tasks = [process_with_semaphore(batch) for batch in batches]
results = await asyncio.gather(*tasks)
# 合并结果
return [item for batch_result in results for item in batch_result]
# 使用
items = get_1000_items()
results = await parallel_batch_process(items, batch_size=10, max_concurrent=5)
```
---
批次大小优化
#### 如何选择合适的批次大小?
| 因素 | 小批次(<10) | 中批次(10-50) | 大批次(>50) |
|---|---|---|---|
| 成功率 | 高 | 中 | 低 |
| 延迟 | 低 | 中 | 高 |
| 成本 | 高 | 中 | 低 |
| 复杂度 | 低 | 中 | 高 |
#### 批次大小测试
```python
def find_optimal_batch_size(items, test_sizes=[5, 10, 20, 50]):
"""测试不同批次大小的性能"""
results = []
for batch_size in test_sizes:
start_time = time.time()
success_count = 0
try:
# 测试一个批次
batch = items[:batch_size]
result = process_batch(batch)
success_count = len(result)
except Exception as e:
print(f"批次大小 {batch_size} 失败: {e}")
continue
elapsed = time.time() - start_time
success_rate = success_count / batch_size
throughput = success_count / elapsed
results.append({
"batch_size": batch_size,
"elapsed": elapsed,
"success_rate": success_rate,
"throughput": throughput,
"cost_per_item": estimate_cost(batch_size)
})
# 选择最优解
best = max(results, key=lambda x: x["throughput"] * x["success_rate"])
return best["batch_size"], results
```
---
错误处理和重试
#### 1. 部分失败处理
```python
def process_batch_with_retry(batch, max_retries=3):
"""批量处理,失败时重试失败的项"""
pending = list(enumerate(batch))
results = [None] * len(batch)
for attempt in range(max_retries):
if not pending:
break
# 只处理待处理的项
indices, items = zip(*pending)
current_batch = items
try:
batch_results = process_batch(current_batch)
# 标记成功的项
new_pending = []
for i, idx in enumerate(indices):
if i < len(batch_results) and batch_results[i] is not None:
results[idx] = batch_results[i]
else:
new_pending.append((idx, batch[idx]))
pending = new_pending
except Exception as e:
print(f"批次处理失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
# 最后一次尝试,标记失败
for idx, _ in pending:
results[idx] = {"error": str(e)}
return results
```
#### 2. 降级策略
当大批次失败时,降级为小批次:
```python
def process_batch_with_fallback(batch):
"""带降级的批量处理"""
batch_sizes = [50, 20, 10, 5, 1]
for batch_size in batch_sizes:
try:
if len(batch) <= batch_size:
return process_batch(batch)
else:
# 分割成更小的批次
sub_batches = [batch[i:i+batch_size] for i in range(0, len(batch), batch_size)]
results = []
for sub_batch in sub_batches:
results.extend(process_batch(sub_batch))
return results
except Exception as e:
print(f"批次大小 {batch_size} 失败,尝试更小的批次: {e}")
continue
raise Exception("所有批次大小都失败了")
```
---
成本优化
#### 1. Token 优化
```python
def optimize_batch_prompt(texts):
"""优化批量提示词,减少 token 消耗"""
# 不要重复的说明
system_prompt = "你是一个翻译助手。"
# 用简洁的分隔符
separator = "|"
# 编号文本
numbered_texts = [f"{i}:{text}" for i, text in enumerate(texts)]
combined = separator.join(numbered_texts)
user_prompt = f"翻译以下文本(格式:序号:原文|序号:译文):\n{combined}"
return system_prompt, user_prompt
```
#### 2. 选择合适的模型
| 任务 | 推荐模型 | 相对成本 |
|---|---|---|
| 简单分类/摘要 | GPT-4o Mini | 1x |
| 中等复杂度 | Claude 3 Haiku | 1.5x |
| 复杂推理 | GPT-4o | 10x |
---
监控和调优
#### 关键指标
| 指标 | 说明 |
|---|---|
| 批次成功率 | 成功批次 / 总批次 |
| 项成功率 | 成功项 / 总项 |
| 吞吐量 | 处理的项数 / 时间 |
| 平均延迟 | 每批次平均时间 |
| 成本 / 项 | 总成本 / 总项数 |
---
最佳实践
#### 1. 渐进式批次
- 从小批次开始(5-10)
- 逐步增加批次大小
- 监控成功率和延迟
- 找到最佳平衡点
#### 2. 幂等性设计
确保重复处理不会产生问题:
```python
def process_item_idempotent(item_id, item):
"""幂等处理:先检查是否已处理"""
if is_already_processed(item_id):
return get_cached_result(item_id)
result = process_item(item)
cache_result(item_id, result)
return result
```
#### 3. 检查点机制
长时间运行的批量任务需要保存进度:
```python
def save_checkpoint(checkpoint_file, processed_indices, results):
"""保存检查点"""
checkpoint = {
"processed_indices": processed_indices,
"results": results,
"timestamp": time.time()
}
with open(checkpoint_file, "w") as f:
json.dump(checkpoint, f)
def load_checkpoint(checkpoint_file):
"""加载检查点"""
if os.path.exists(checkpoint_file):
with open(checkpoint_file) as f:
return json.load(f)
return None
# 使用
checkpoint = load_checkpoint("batch_checkpoint.json")
if checkpoint:
processed = set(checkpoint["processed_indices"])
results = checkpoint["results"]
else:
processed = set()
results = []
for i, item in enumerate(items):
if i in processed:
continue
result = process_item(item)
results.append(result)
processed.add(i)
if i % 10 == 0:
save_checkpoint("batch_checkpoint.json", list(processed), results)
```
---
总结
批量处理是提升 AI API 效率的利器:
- ✅ 大幅提升吞吐量(5-10 倍)
- ✅ 显著降低成本(30-50%)
- ✅ 减少网络开销
- ✅ 适合非实时任务
- ✅ 需要合理的错误处理
建议:
1. 从小批次开始测试
2. 找到最优的批次大小
3. 实现完善的错误处理和重试
4. 添加检查点机制
5. 持续监控和调优
可在本站查看更多 AI API 中转平台,找到性价比更高的批量处理方案。