批量处理性能优化最佳实践

AI API 批量处理与优化技巧:大幅提升吞吐量降低成本

2026-04-23 · 约 11 分钟阅读

AI API 批量处理与优化技巧:大幅提升吞吐量降低成本

批量处理是 AI API 优化的利器——把多个请求合并成一个,既能提升吞吐量,又能降低成本。但批量处理也有坑:超时、部分失败、顺序问题……本文介绍 AI API 批量处理的最佳实践,帮你大幅提升效率。

为什么需要批量处理?

指标单个请求批量处理提升
延迟(每个任务)1s2s-50%
吞吐量(任务/分钟)60300+500%
成本(每个任务)$0.01$0.006-40%
网络开销-80%

---

批量处理的适用场景

#### ✅ 适合批量处理

场景说明
文档批量翻译一次翻译 100 篇文档
批量文本分类一次分类 1000 条评论
批量摘要生成一次生成 50 篇摘要
数据标注一次标注 200 条数据
夜间批处理任务非实时的后台任务

#### ❌ 不适合批量处理

场景说明
实时对话用户需要立即响应
大文件处理单个文件就很大
强顺序依赖后一个依赖前一个的结果
低延迟要求需要毫秒级响应

---

批量处理策略

#### 策略 1:简单批次(Simple Batching)

```

多个独立请求 → 打包成一个请求 → 发送 → 解包结果

```

示例:

```python

from openai import OpenAI

client = OpenAI()

def batch_translate(texts, source_lang="en", target_lang="zh"):

"""简单批量翻译"""
# 把多个文本打包成一个请求
combined_text = "\n\n---NEXT-TEXT---\n\n".join(texts)

prompt = f"""

请将以下文本从 {source_lang} 翻译成 {target_lang}。

每个文本之间用 ---NEXT-TEXT--- 分隔。

请保持分隔符,按顺序输出翻译结果。

文本:

{combined_text}

"""


response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": prompt}],
    temperature=0
)

# 解包结果
translated_text = response.choices[0].message.content
return translated_text.split("\n\n---NEXT-TEXT---\n\n")

# 使用

texts = [

"Hello world!",
"How are you?",
"Nice to meet you!"

]

translations = batch_translate(texts)

print(translations)

```

---

#### 策略 2:函数调用批量(Function Calling Batching)

使用函数调用更结构化地处理批量任务:

```python

def batch_classify(texts):

"""使用函数调用批量分类"""
functions = [
    {
        "name": "classify_texts",
        "description": "对多个文本进行分类",
        "parameters": {
            "type": "object",
            "properties": {
                "classifications": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "index": {"type": "integer"},
                            "category": {"type": "string"},
                            "confidence": {"type": "number"}
                        },
                        "required": ["index", "category", "confidence"]
                    }
                }
            },
            "required": ["classifications"]
        }
    }
]

combined_text = "\n".join([f"{i}: {text}" for i, text in enumerate(texts)])

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {"role": "user", "content": f"请对以下文本进行分类(积极/消极/中性):\n{combined_text}"}
    ],
    functions=functions,
    function_call={"name": "classify_texts"}
)

return response.choices[0].message.function_call

```

---

#### 策略 3:并行批量(Parallel Batching)

多个批次并行处理:

```python

import asyncio

from openai import AsyncOpenAI

client = AsyncOpenAI()

async def process_single_batch(batch):

"""处理单个批次"""
response = await client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": format_batch(batch)}]
)
return parse_response(response)

async def parallel_batch_process(items, batch_size=10, max_concurrent=5):

"""并行批量处理"""
# 分批
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]

# 限制并发数
semaphore = asyncio.Semaphore(max_concurrent)

async def process_with_semaphore(batch):
    async with semaphore:
        return await process_single_batch(batch)

# 并行处理
tasks = [process_with_semaphore(batch) for batch in batches]
results = await asyncio.gather(*tasks)

# 合并结果
return [item for batch_result in results for item in batch_result]

# 使用

items = get_1000_items()

results = await parallel_batch_process(items, batch_size=10, max_concurrent=5)

```

---

批次大小优化

#### 如何选择合适的批次大小?

因素小批次(<10)中批次(10-50)大批次(>50)
成功率
延迟
成本
复杂度

#### 批次大小测试

```python

def find_optimal_batch_size(items, test_sizes=[5, 10, 20, 50]):

"""测试不同批次大小的性能"""
results = []

for batch_size in test_sizes:
    start_time = time.time()
    success_count = 0
    
    try:
        # 测试一个批次
        batch = items[:batch_size]
        result = process_batch(batch)
        success_count = len(result)
    except Exception as e:
        print(f"批次大小 {batch_size} 失败: {e}")
        continue
    
    elapsed = time.time() - start_time
    success_rate = success_count / batch_size
    throughput = success_count / elapsed
    
    results.append({
        "batch_size": batch_size,
        "elapsed": elapsed,
        "success_rate": success_rate,
        "throughput": throughput,
        "cost_per_item": estimate_cost(batch_size)
    })

# 选择最优解
best = max(results, key=lambda x: x["throughput"] * x["success_rate"])
return best["batch_size"], results

```

---

错误处理和重试

#### 1. 部分失败处理

```python

def process_batch_with_retry(batch, max_retries=3):

"""批量处理,失败时重试失败的项"""
pending = list(enumerate(batch))
results = [None] * len(batch)

for attempt in range(max_retries):
    if not pending:
        break
    
    # 只处理待处理的项
    indices, items = zip(*pending)
    current_batch = items
    
    try:
        batch_results = process_batch(current_batch)
        
        # 标记成功的项
        new_pending = []
        for i, idx in enumerate(indices):
            if i < len(batch_results) and batch_results[i] is not None:
                results[idx] = batch_results[i]
            else:
                new_pending.append((idx, batch[idx]))
        
        pending = new_pending
        
    except Exception as e:
        print(f"批次处理失败 (尝试 {attempt + 1}/{max_retries}): {e}")
        if attempt == max_retries - 1:
            # 最后一次尝试,标记失败
            for idx, _ in pending:
                results[idx] = {"error": str(e)}

return results

```

#### 2. 降级策略

当大批次失败时,降级为小批次:

```python

def process_batch_with_fallback(batch):

"""带降级的批量处理"""
batch_sizes = [50, 20, 10, 5, 1]

for batch_size in batch_sizes:
    try:
        if len(batch) <= batch_size:
            return process_batch(batch)
        else:
            # 分割成更小的批次
            sub_batches = [batch[i:i+batch_size] for i in range(0, len(batch), batch_size)]
            results = []
            for sub_batch in sub_batches:
                results.extend(process_batch(sub_batch))
            return results
    except Exception as e:
        print(f"批次大小 {batch_size} 失败,尝试更小的批次: {e}")
        continue

raise Exception("所有批次大小都失败了")

```

---

成本优化

#### 1. Token 优化

```python

def optimize_batch_prompt(texts):

"""优化批量提示词,减少 token 消耗"""
# 不要重复的说明
system_prompt = "你是一个翻译助手。"

# 用简洁的分隔符
separator = "|"

# 编号文本
numbered_texts = [f"{i}:{text}" for i, text in enumerate(texts)]
combined = separator.join(numbered_texts)

user_prompt = f"翻译以下文本(格式:序号:原文|序号:译文):\n{combined}"

return system_prompt, user_prompt

```

#### 2. 选择合适的模型

任务推荐模型相对成本
简单分类/摘要GPT-4o Mini1x
中等复杂度Claude 3 Haiku1.5x
复杂推理GPT-4o10x

---

监控和调优

#### 关键指标

指标说明
批次成功率成功批次 / 总批次
项成功率成功项 / 总项
吞吐量处理的项数 / 时间
平均延迟每批次平均时间
成本 / 项总成本 / 总项数

---

最佳实践

#### 1. 渐进式批次

  • 从小批次开始(5-10)
  • 逐步增加批次大小
  • 监控成功率和延迟
  • 找到最佳平衡点

#### 2. 幂等性设计

确保重复处理不会产生问题:

```python

def process_item_idempotent(item_id, item):

"""幂等处理:先检查是否已处理"""
if is_already_processed(item_id):
    return get_cached_result(item_id)

result = process_item(item)
cache_result(item_id, result)
return result

```

#### 3. 检查点机制

长时间运行的批量任务需要保存进度:

```python

def save_checkpoint(checkpoint_file, processed_indices, results):

"""保存检查点"""
checkpoint = {
    "processed_indices": processed_indices,
    "results": results,
    "timestamp": time.time()
}
with open(checkpoint_file, "w") as f:
    json.dump(checkpoint, f)

def load_checkpoint(checkpoint_file):

"""加载检查点"""
if os.path.exists(checkpoint_file):
    with open(checkpoint_file) as f:
        return json.load(f)
return None

# 使用

checkpoint = load_checkpoint("batch_checkpoint.json")

if checkpoint:

processed = set(checkpoint["processed_indices"])
results = checkpoint["results"]

else:

processed = set()
results = []

for i, item in enumerate(items):

if i in processed:
    continue

result = process_item(item)
results.append(result)
processed.add(i)

if i % 10 == 0:
    save_checkpoint("batch_checkpoint.json", list(processed), results)

```

---

总结

批量处理是提升 AI API 效率的利器:

  • ✅ 大幅提升吞吐量(5-10 倍)
  • ✅ 显著降低成本(30-50%)
  • ✅ 减少网络开销
  • ✅ 适合非实时任务
  • ✅ 需要合理的错误处理

建议:

1. 从小批次开始测试

2. 找到最优的批次大小

3. 实现完善的错误处理和重试

4. 添加检查点机制

5. 持续监控和调优

可在本站查看更多 AI API 中转平台,找到性价比更高的批量处理方案。

找到最适合你的 AI API 中转站

收录 77+ 服务商,按价格、模型、标签一键筛选

查看所有中转站 →