最佳实践(Production Best Practices)
生产环境下的 API 使用最佳实践,确保稳定、高效、安全。
📋 目录
架构设计
1. 高可用架构
核心原则:
- ✅ 实现故障转移机制
- ✅ 使用多个 API Key 分流
- ✅ 设置合理的超时时间
- ✅ 实现请求队列和限流
架构图:
┌─────────────────────────────────────────┐
│ 应用层 │
├─────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 请求队列 │ │ 限流器 │ │
│ └──────────────┘ └──────────────┘ │
├─────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 重试机制 │ │ 熔断器 │ │
│ └──────────────┘ └──────────────┘ │
├─────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ │
│ │ API Key 1 │ │ API Key 2 │ │
│ └──────────────┘ └──────────────┘ │
├─────────────────────────────────────────┤
│ ┌──────────────────────────────────┐ │
│ │ Next API Gateway │ │
│ └──────────────────────────────────┘ │
└─────────────────────────────────────────┘2. 负载均衡
实现方式:
python
import random
from openai import OpenAI
# 多个 API Key 轮询
api_keys = [
"sk-key1",
"sk-key2",
"sk-key3"
]
def get_client():
# 随机选择一个 API Key
api_key = random.choice(api_keys)
return OpenAI(
api_key=api_key,
base_url="https://api.nextapi.pro/v1"
)
# 使用轮询
client = get_client()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "你好!"}]
)性能优化
1. 选择合适的模型
性能对比:
| 模型 | 响应速度 | 适用场景 |
|---|---|---|
gpt-4o-mini | 快(~1秒) | 简单对话、FAQ |
gpt-4o | 中(~2-3秒) | 复杂推理、分析 |
claude-sonnet-4-6 | 中(~2-3秒) | 代码生成、技术问答 |
deepseek-v3.1 | 快(~1秒) | 中文任务、简单对话 |
推荐策略:
python
def select_model(task_type):
"""根据任务类型选择合适的模型"""
model_map = {
"simple_chat": "gpt-4o-mini", # 简单对话
"complex_reasoning": "gpt-4o", # 复杂推理
"code_generation": "claude-sonnet-4-6", # 代码生成
"chinese_task": "deepseek-v3.1", # 中文任务
}
return model_map.get(task_type, "gpt-4o-mini")
# 使用示例
model = select_model("simple_chat")
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": "你好!"}]
)2. 使用流式输出
优点:
- ✅ 减少用户等待时间
- ✅ 可以提前终止
- ✅ 更好的用户体验
实现方式:
python
# 流式输出
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "写一首诗"}],
stream=True # 开启流式输出
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
# 可以根据条件提前终止
if len(chunk.choices[0].delta.content) > 100:
break3. 控制 token 使用
方法:
python
# 使用 max_tokens 限制输出
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "写一首诗"}],
max_tokens=100, # 限制输出长度
temperature=0.7 # 控制随机性
)
# 简化输入
messages = [
{"role": "system", "content": "简洁回答"}, # 简洁的系统提示
{"role": "user", "content": "你好"} # 简短的输入
]错误处理
1. 重试机制
指数退避策略:
python
import time
from openai import OpenAI, RateLimitError, APIError
client = OpenAI(
api_key="sk-你的密钥",
base_url="https://api.nextapi.pro/v1"
)
def chat_with_retry(messages, max_retries=3):
"""带重试的对话函数"""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response
except RateLimitError:
# 429 错误:请求过于频繁
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避:1, 2, 4秒
print(f"触发限流,等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
continue
raise
except APIError as e:
# 500 错误:服务器错误
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"服务器错误,等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
continue
raise
except Exception as e:
# 其他错误
print(f"未知错误: {e}")
raise
# 使用示例
response = chat_with_retry([{"role": "user", "content": "你好!"}])2. 熔断器模式
实现方式:
python
import time
from datetime import datetime
class CircuitBreaker:
"""熔断器:防止连续失败"""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
# 检查是否可以恢复
if self.last_failure_time:
elapsed = (datetime.now() - self.last_failure_time).seconds
if elapsed < self.recovery_timeout:
raise Exception("熔断器开启,拒绝请求")
else:
self.state = "half-open"
try:
result = func(*args, **kwargs)
# 成功后重置
self.failure_count = 0
self.state = "closed"
return result
except Exception as e:
# 失败计数
self.failure_count += 1
self.last_failure_time = datetime.now()
# 达到阈值,开启熔断器
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
# 使用示例
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def safe_chat(messages):
return breaker.call(
client.chat.completions.create,
model="gpt-4o-mini",
messages=messages
)3. 优雅降级
实现方式:
python
def chat_with_fallback(messages):
"""带降级的对话函数"""
try:
# 优先使用主模型
return client.chat.completions.create(
model="gpt-4o",
messages=messages
)
except Exception as e:
print(f"主模型失败: {e}")
# 降级到备用模型
try:
return client.chat.completions.create(
model="gpt-4o-mini", # 更便宜、更稳定的模型
messages=messages
)
except Exception as e2:
print(f"备用模型也失败: {e2}")
# 最终降级:返回缓存或默认响应
return {
"choices": [{
"message": {
"content": "服务暂时不可用,请稍后再试"
}
}]
}成本控制
1. Token 监控
实现方式:
python
import json
from datetime import datetime
class TokenTracker:
"""Token 使用追踪器"""
def __init__(self):
self.usage_data = []
def track(self, response):
"""记录 token 使用"""
usage = response.usage
self.usage_data.append({
"timestamp": datetime.now().isoformat(),
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
"total_tokens": usage.total_tokens,
"model": response.model
})
def get_daily_usage(self):
"""获取每日使用统计"""
today = datetime.now().date()
daily_data = [
d for d in self.usage_data
if datetime.fromisoformat(d["timestamp"]).date() == today
]
total_tokens = sum(d["total_tokens"] for d in daily_data)
return {
"date": today.isoformat(),
"total_requests": len(daily_data),
"total_tokens": total_tokens,
"avg_tokens": total_tokens / len(daily_data) if daily_data else 0
}
# 使用示例
tracker = TokenTracker()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "你好!"}]
)
tracker.track(response)
print(tracker.get_daily_usage())2. 成本预警
实现方式:
python
class CostAlert:
"""成本预警系统"""
def __init__(self, daily_limit=10.0): # 每日限制 $10
self.daily_limit = daily_limit
def check_cost(self, tracker):
"""检查成本是否超限"""
daily_usage = tracker.get_daily_usage()
# 假设 gpt-4o-mini: $0.15/1M 输入, $0.60/1M 输出
avg_cost_per_token = 0.000375 / 1000 # $0.000375 per token
estimated_cost = daily_usage["total_tokens"] * avg_cost_per_token
if estimated_cost > self.daily_limit:
print(f"⚠️ 成本预警:已使用 ${estimated_cost:.2f}, 超过每日限制 ${self.daily_limit}")
return True
return False
# 使用示例
alert = CostAlert(daily_limit=5.0)
if alert.check_cost(tracker):
# 停止服务或切换到更便宜的模型
print("切换到最便宜的模型 deepseek-v3.1")3. 模型选择优化
成本对比:
python
def cost_comparison():
"""模型成本对比"""
models = {
"gpt-4o": {"input": 5.00, "output": 15.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet-4-6": {"input": 2.40, "output": 12.00},
"deepseek-v3.1": {"input": 0.14, "output": 0.28}
}
# 假设:1000 输入 tokens, 500 输出 tokens
input_tokens = 1000
output_tokens = 500
for model, prices in models.items():
input_cost = (input_tokens / 1_000_000) * prices["input"]
output_cost = (output_tokens / 1_000_000) * prices["output"]
total_cost = input_cost + output_cost
print(f"{model}: ${total_cost:.6f}")
cost_comparison()安全实践
1. API Key 管理
安全建议:
- ❌ 不要在代码中硬编码 API Key
- ✅ 使用环境变量
- ✅ 定期更换 API Key
- ✅ 使用密钥管理服务
实现方式:
python
import os
from dotenv import load_dotenv
# 从环境变量加载
load_dotenv()
client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"), # 环境变量
base_url="https://api.nextapi.pro/v1"
)
# 或者从密钥管理服务获取
def get_api_key():
"""从密钥管理服务获取 API Key"""
# AWS Secrets Manager, Azure Key Vault, etc.
# 这里简化为环境变量
return os.getenv("OPENAI_API_KEY")2. 输入验证
实现方式:
python
def validate_input(text):
"""验证输入内容"""
# 检查长度
if len(text) > 10000:
raise ValueError("输入内容过长")
# 检查敏感内容
sensitive_words = ["暴力", "色情", "违法"]
for word in sensitive_words:
if word in text:
raise ValueError(f"输入包含敏感内容: {word}")
# 检查特殊字符
if not text.strip():
raise ValueError("输入内容为空")
return text
# 使用示例
try:
validated_input = validate_input("你好!")
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": validated_input}]
)
except ValueError as e:
print(f"输入验证失败: {e}")3. 输出过滤
实现方式:
python
def filter_output(content):
"""过滤输出内容"""
# 移除敏感信息
sensitive_patterns = [
r'\b\d{4}-\d{4}-\d{4}-\d{4}\b', # 信用卡号
r'\b\d{11}\b', # 电话号码
r'\b[\w\.-]+@[\w\.-]+\.\w+\b' # 邮箱
]
import re
for pattern in sensitive_patterns:
content = re.sub(pattern, '[已过滤]', content)
return content
# 使用示例
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "你好!"}]
)
filtered_content = filter_output(response.choices[0].message.content)
print(filtered_content)监控和日志
1. 请求日志
实现方式:
python
import logging
from datetime import datetime
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='api_requests.log'
)
logger = logging.getLogger("api_client")
def log_request(response, latency):
"""记录请求日志"""
logger.info({
"timestamp": datetime.now().isoformat(),
"model": response.model,
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
"latency_ms": latency,
"status": "success"
})
def log_error(error):
"""记录错误日志"""
logger.error({
"timestamp": datetime.now().isoformat(),
"error_type": type(error).__name__,
"error_message": str(error),
"status": "failed"
})
# 使用示例
start_time = time.time()
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "你好!"}]
)
latency = (time.time() - start_time) * 1000
log_request(response, latency)
except Exception as e:
log_error(e)2. 性能监控
实现方式:
python
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_latency": 0,
"avg_latency": 0,
"p50_latency": 0,
"p95_latency": 0,
"p99_latency": 0
}
self.latencies = []
def record(self, latency, success=True):
"""记录性能指标"""
self.metrics["total_requests"] += 1
if success:
self.metrics["successful_requests"] += 1
else:
self.metrics["failed_requests"] += 1
self.latencies.append(latency)
self.metrics["total_latency"] += latency
# 计算平均延迟
self.metrics["avg_latency"] = self.metrics["total_latency"] / self.metrics["total_requests"]
# 计算百分位延迟
if len(self.latencies) > 10:
sorted_latencies = sorted(self.latencies)
self.metrics["p50_latency"] = sorted_latencies[len(sorted_latencies) // 2]
self.metrics["p95_latency"] = sorted_latencies[int(len(sorted_latencies) * 0.95)]
self.metrics["p99_latency"] = sorted_latencies[int(len(sorted_latencies) * 0.99)]
def get_metrics(self):
"""获取性能指标"""
return self.metrics
# 使用示例
monitor = PerformanceMonitor()
for i in range(100):
start_time = time.time()
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"测试 {i}"}]
)
latency = (time.time() - start_time) * 1000
monitor.record(latency, success=True)
except Exception as e:
latency = (time.time() - start_time) * 1000
monitor.record(latency, success=False)
print(monitor.get_metrics())测试和验证
1. 单元测试
实现方式:
python
import unittest
from unittest.mock import Mock, patch
class TestAPIClient(unittest.TestCase):
"""API 客户端单元测试"""
@patch('openai.OpenAI')
def test_chat_completion(self, mock_openai):
"""测试对话完成"""
# 模拟响应
mock_response = Mock()
mock_response.choices = [Mock()]
mock_response.choices[0].message.content = "测试响应"
mock_client = Mock()
mock_client.chat.completions.create.return_value = mock_response
mock_openai.return_value = mock_client
# 测试
client = OpenAI(api_key="test", base_url="https://api.nextapi.pro/v1")
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "测试"}]
)
self.assertEqual(response.choices[0].message.content, "测试响应")
def test_error_handling(self):
"""测试错误处理"""
# 测试限流错误
with patch('openai.OpenAI') as mock_openai:
mock_client = Mock()
mock_client.chat.completions.create.side_effect = RateLimitError("限流")
# 应该触发重试
try:
chat_with_retry([{"role": "user", "content": "测试"}])
except RateLimitError:
pass # 预期的错误
if __name__ == '__main__':
unittest.main()2. 压力测试
实现方式:
python
import concurrent.futures
import time
def stress_test(num_requests=100, num_threads=10):
"""压力测试"""
def make_request():
start_time = time.time()
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "压力测试"}]
)
latency = (time.time() - start_time) * 1000
return {"success": True, "latency": latency}
except Exception as e:
latency = (time.time() - start_time) * 1000
return {"success": False, "latency": latency, "error": str(e)}
# 并发请求
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(make_request) for _ in range(num_requests)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
# 统计结果
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
print(f"成功: {len(successful)}, 失败: {len(failed)}")
if successful:
avg_latency = sum(r["latency"] for r in successful) / len(successful)
print(f"平均延迟: {avg_latency:.2f}ms")
# 使用示例
stress_test(num_requests=50, num_threads=5)