Skip to content

最佳实践(Production Best Practices)

生产环境下的 API 使用最佳实践,确保稳定、高效、安全。


📋 目录


架构设计

1. 高可用架构

核心原则:

  • ✅ 实现故障转移机制
  • ✅ 使用多个 API Key 分流
  • ✅ 设置合理的超时时间
  • ✅ 实现请求队列和限流

架构图:

┌─────────────────────────────────────────┐
│              应用层                      │
├─────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐    │
│  │ 请求队列     │  │ 限流器       │    │
│  └──────────────┘  └──────────────┘    │
├─────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐    │
│  │ 重试机制     │  │ 熔断器       │    │
│  └──────────────┘  └──────────────┘    │
├─────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐    │
│  │ API Key 1    │  │ API Key 2    │    │
│  └──────────────┘  └──────────────┘    │
├─────────────────────────────────────────┤
│  ┌──────────────────────────────────┐  │
│  │        Next API Gateway          │  │
│  └──────────────────────────────────┘  │
└─────────────────────────────────────────┘

2. 负载均衡

实现方式:

python
import random
from openai import OpenAI

# 多个 API Key 轮询
api_keys = [
    "sk-key1",
    "sk-key2",
    "sk-key3"
]

def get_client():
    # 随机选择一个 API Key
    api_key = random.choice(api_keys)
    return OpenAI(
        api_key=api_key,
        base_url="https://api.nextapi.pro/v1"
    )

# 使用轮询
client = get_client()
response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "你好!"}]
)

性能优化

1. 选择合适的模型

性能对比:

模型响应速度适用场景
gpt-4o-mini快(~1秒)简单对话、FAQ
gpt-4o中(~2-3秒)复杂推理、分析
claude-sonnet-4-6中(~2-3秒)代码生成、技术问答
deepseek-v3.1快(~1秒)中文任务、简单对话

推荐策略:

python
def select_model(task_type):
    """根据任务类型选择合适的模型"""
    model_map = {
        "simple_chat": "gpt-4o-mini",      # 简单对话
        "complex_reasoning": "gpt-4o",     # 复杂推理
        "code_generation": "claude-sonnet-4-6",  # 代码生成
        "chinese_task": "deepseek-v3.1",   # 中文任务
    }
    return model_map.get(task_type, "gpt-4o-mini")

# 使用示例
model = select_model("simple_chat")
response = client.chat.completions.create(
    model=model,
    messages=[{"role": "user", "content": "你好!"}]
)

2. 使用流式输出

优点:

  • ✅ 减少用户等待时间
  • ✅ 可以提前终止
  • ✅ 更好的用户体验

实现方式:

python
# 流式输出
stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "写一首诗"}],
    stream=True  # 开启流式输出
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
        
        # 可以根据条件提前终止
        if len(chunk.choices[0].delta.content) > 100:
            break

3. 控制 token 使用

方法:

python
# 使用 max_tokens 限制输出
response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "写一首诗"}],
    max_tokens=100,  # 限制输出长度
    temperature=0.7  # 控制随机性
)

# 简化输入
messages = [
    {"role": "system", "content": "简洁回答"},  # 简洁的系统提示
    {"role": "user", "content": "你好"}  # 简短的输入
]

错误处理

1. 重试机制

指数退避策略:

python
import time
from openai import OpenAI, RateLimitError, APIError

client = OpenAI(
    api_key="sk-你的密钥",
    base_url="https://api.nextapi.pro/v1"
)

def chat_with_retry(messages, max_retries=3):
    """带重试的对话函数"""
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages
            )
            return response
        except RateLimitError:
            # 429 错误:请求过于频繁
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 指数退避:1, 2, 4秒
                print(f"触发限流,等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
                continue
            raise
        except APIError as e:
            # 500 错误:服务器错误
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"服务器错误,等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
                continue
            raise
        except Exception as e:
            # 其他错误
            print(f"未知错误: {e}")
            raise

# 使用示例
response = chat_with_retry([{"role": "user", "content": "你好!"}])

2. 熔断器模式

实现方式:

python
import time
from datetime import datetime

class CircuitBreaker:
    """熔断器:防止连续失败"""
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def call(self, func, *args, **kwargs):
        if self.state == "open":
            # 检查是否可以恢复
            if self.last_failure_time:
                elapsed = (datetime.now() - self.last_failure_time).seconds
                if elapsed < self.recovery_timeout:
                    raise Exception("熔断器开启,拒绝请求")
                else:
                    self.state = "half-open"
        
        try:
            result = func(*args, **kwargs)
            # 成功后重置
            self.failure_count = 0
            self.state = "closed"
            return result
        except Exception as e:
            # 失败计数
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            # 达到阈值,开启熔断器
            if self.failure_count >= self.failure_threshold:
                self.state = "open"
            
            raise

# 使用示例
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def safe_chat(messages):
    return breaker.call(
        client.chat.completions.create,
        model="gpt-4o-mini",
        messages=messages
    )

3. 优雅降级

实现方式:

python
def chat_with_fallback(messages):
    """带降级的对话函数"""
    try:
        # 优先使用主模型
        return client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )
    except Exception as e:
        print(f"主模型失败: {e}")
        
        # 降级到备用模型
        try:
            return client.chat.completions.create(
                model="gpt-4o-mini",  # 更便宜、更稳定的模型
                messages=messages
            )
        except Exception as e2:
            print(f"备用模型也失败: {e2}")
            
            # 最终降级:返回缓存或默认响应
            return {
                "choices": [{
                    "message": {
                        "content": "服务暂时不可用,请稍后再试"
                    }
                }]
            }

成本控制

1. Token 监控

实现方式:

python
import json
from datetime import datetime

class TokenTracker:
    """Token 使用追踪器"""
    def __init__(self):
        self.usage_data = []
    
    def track(self, response):
        """记录 token 使用"""
        usage = response.usage
        self.usage_data.append({
            "timestamp": datetime.now().isoformat(),
            "prompt_tokens": usage.prompt_tokens,
            "completion_tokens": usage.completion_tokens,
            "total_tokens": usage.total_tokens,
            "model": response.model
        })
    
    def get_daily_usage(self):
        """获取每日使用统计"""
        today = datetime.now().date()
        daily_data = [
            d for d in self.usage_data
            if datetime.fromisoformat(d["timestamp"]).date() == today
        ]
        
        total_tokens = sum(d["total_tokens"] for d in daily_data)
        return {
            "date": today.isoformat(),
            "total_requests": len(daily_data),
            "total_tokens": total_tokens,
            "avg_tokens": total_tokens / len(daily_data) if daily_data else 0
        }

# 使用示例
tracker = TokenTracker()

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "你好!"}]
)

tracker.track(response)
print(tracker.get_daily_usage())

2. 成本预警

实现方式:

python
class CostAlert:
    """成本预警系统"""
    def __init__(self, daily_limit=10.0):  # 每日限制 $10
        self.daily_limit = daily_limit
    
    def check_cost(self, tracker):
        """检查成本是否超限"""
        daily_usage = tracker.get_daily_usage()
        
        # 假设 gpt-4o-mini: $0.15/1M 输入, $0.60/1M 输出
        avg_cost_per_token = 0.000375 / 1000  # $0.000375 per token
        
        estimated_cost = daily_usage["total_tokens"] * avg_cost_per_token
        
        if estimated_cost > self.daily_limit:
            print(f"⚠️ 成本预警:已使用 ${estimated_cost:.2f}, 超过每日限制 ${self.daily_limit}")
            return True
        
        return False

# 使用示例
alert = CostAlert(daily_limit=5.0)
if alert.check_cost(tracker):
    # 停止服务或切换到更便宜的模型
    print("切换到最便宜的模型 deepseek-v3.1")

3. 模型选择优化

成本对比:

python
def cost_comparison():
    """模型成本对比"""
    models = {
        "gpt-4o": {"input": 5.00, "output": 15.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-sonnet-4-6": {"input": 2.40, "output": 12.00},
        "deepseek-v3.1": {"input": 0.14, "output": 0.28}
    }
    
    # 假设:1000 输入 tokens, 500 输出 tokens
    input_tokens = 1000
    output_tokens = 500
    
    for model, prices in models.items():
        input_cost = (input_tokens / 1_000_000) * prices["input"]
        output_cost = (output_tokens / 1_000_000) * prices["output"]
        total_cost = input_cost + output_cost
        
        print(f"{model}: ${total_cost:.6f}")

cost_comparison()

安全实践

1. API Key 管理

安全建议:

  • ❌ 不要在代码中硬编码 API Key
  • ✅ 使用环境变量
  • ✅ 定期更换 API Key
  • ✅ 使用密钥管理服务

实现方式:

python
import os
from dotenv import load_dotenv

# 从环境变量加载
load_dotenv()

client = OpenAI(
    api_key=os.getenv("OPENAI_API_KEY"),  # 环境变量
    base_url="https://api.nextapi.pro/v1"
)

# 或者从密钥管理服务获取
def get_api_key():
    """从密钥管理服务获取 API Key"""
    # AWS Secrets Manager, Azure Key Vault, etc.
    # 这里简化为环境变量
    return os.getenv("OPENAI_API_KEY")

2. 输入验证

实现方式:

python
def validate_input(text):
    """验证输入内容"""
    # 检查长度
    if len(text) > 10000:
        raise ValueError("输入内容过长")
    
    # 检查敏感内容
    sensitive_words = ["暴力", "色情", "违法"]
    for word in sensitive_words:
        if word in text:
            raise ValueError(f"输入包含敏感内容: {word}")
    
    # 检查特殊字符
    if not text.strip():
        raise ValueError("输入内容为空")
    
    return text

# 使用示例
try:
    validated_input = validate_input("你好!")
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": validated_input}]
    )
except ValueError as e:
    print(f"输入验证失败: {e}")

3. 输出过滤

实现方式:

python
def filter_output(content):
    """过滤输出内容"""
    # 移除敏感信息
    sensitive_patterns = [
        r'\b\d{4}-\d{4}-\d{4}-\d{4}\b',  # 信用卡号
        r'\b\d{11}\b',  # 电话号码
        r'\b[\w\.-]+@[\w\.-]+\.\w+\b'  # 邮箱
    ]
    
    import re
    for pattern in sensitive_patterns:
        content = re.sub(pattern, '[已过滤]', content)
    
    return content

# 使用示例
response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "你好!"}]
)

filtered_content = filter_output(response.choices[0].message.content)
print(filtered_content)

监控和日志

1. 请求日志

实现方式:

python
import logging
from datetime import datetime

# 设置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='api_requests.log'
)

logger = logging.getLogger("api_client")

def log_request(response, latency):
    """记录请求日志"""
    logger.info({
        "timestamp": datetime.now().isoformat(),
        "model": response.model,
        "prompt_tokens": response.usage.prompt_tokens,
        "completion_tokens": response.usage.completion_tokens,
        "total_tokens": response.usage.total_tokens,
        "latency_ms": latency,
        "status": "success"
    })

def log_error(error):
    """记录错误日志"""
    logger.error({
        "timestamp": datetime.now().isoformat(),
        "error_type": type(error).__name__,
        "error_message": str(error),
        "status": "failed"
    })

# 使用示例
start_time = time.time()
try:
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "你好!"}]
    )
    latency = (time.time() - start_time) * 1000
    log_request(response, latency)
except Exception as e:
    log_error(e)

2. 性能监控

实现方式:

python
class PerformanceMonitor:
    """性能监控器"""
    def __init__(self):
        self.metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_latency": 0,
            "avg_latency": 0,
            "p50_latency": 0,
            "p95_latency": 0,
            "p99_latency": 0
        }
        self.latencies = []
    
    def record(self, latency, success=True):
        """记录性能指标"""
        self.metrics["total_requests"] += 1
        if success:
            self.metrics["successful_requests"] += 1
        else:
            self.metrics["failed_requests"] += 1
        
        self.latencies.append(latency)
        self.metrics["total_latency"] += latency
        
        # 计算平均延迟
        self.metrics["avg_latency"] = self.metrics["total_latency"] / self.metrics["total_requests"]
        
        # 计算百分位延迟
        if len(self.latencies) > 10:
            sorted_latencies = sorted(self.latencies)
            self.metrics["p50_latency"] = sorted_latencies[len(sorted_latencies) // 2]
            self.metrics["p95_latency"] = sorted_latencies[int(len(sorted_latencies) * 0.95)]
            self.metrics["p99_latency"] = sorted_latencies[int(len(sorted_latencies) * 0.99)]
    
    def get_metrics(self):
        """获取性能指标"""
        return self.metrics

# 使用示例
monitor = PerformanceMonitor()

for i in range(100):
    start_time = time.time()
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": f"测试 {i}"}]
        )
        latency = (time.time() - start_time) * 1000
        monitor.record(latency, success=True)
    except Exception as e:
        latency = (time.time() - start_time) * 1000
        monitor.record(latency, success=False)

print(monitor.get_metrics())

测试和验证

1. 单元测试

实现方式:

python
import unittest
from unittest.mock import Mock, patch

class TestAPIClient(unittest.TestCase):
    """API 客户端单元测试"""
    
    @patch('openai.OpenAI')
    def test_chat_completion(self, mock_openai):
        """测试对话完成"""
        # 模拟响应
        mock_response = Mock()
        mock_response.choices = [Mock()]
        mock_response.choices[0].message.content = "测试响应"
        
        mock_client = Mock()
        mock_client.chat.completions.create.return_value = mock_response
        mock_openai.return_value = mock_client
        
        # 测试
        client = OpenAI(api_key="test", base_url="https://api.nextapi.pro/v1")
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": "测试"}]
        )
        
        self.assertEqual(response.choices[0].message.content, "测试响应")
    
    def test_error_handling(self):
        """测试错误处理"""
        # 测试限流错误
        with patch('openai.OpenAI') as mock_openai:
            mock_client = Mock()
            mock_client.chat.completions.create.side_effect = RateLimitError("限流")
            
            # 应该触发重试
            try:
                chat_with_retry([{"role": "user", "content": "测试"}])
            except RateLimitError:
                pass  # 预期的错误

if __name__ == '__main__':
    unittest.main()

2. 压力测试

实现方式:

python
import concurrent.futures
import time

def stress_test(num_requests=100, num_threads=10):
    """压力测试"""
    def make_request():
        start_time = time.time()
        try:
            response = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": "压力测试"}]
            )
            latency = (time.time() - start_time) * 1000
            return {"success": True, "latency": latency}
        except Exception as e:
            latency = (time.time() - start_time) * 1000
            return {"success": False, "latency": latency, "error": str(e)}
    
    # 并发请求
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = [executor.submit(make_request) for _ in range(num_requests)]
        results = [f.result() for f in concurrent.futures.as_completed(futures)]
    
    # 统计结果
    successful = [r for r in results if r["success"]]
    failed = [r for r in results if not r["success"]]
    
    print(f"成功: {len(successful)}, 失败: {len(failed)}")
    if successful:
        avg_latency = sum(r["latency"] for r in successful) / len(successful)
        print(f"平均延迟: {avg_latency:.2f}ms")

# 使用示例
stress_test(num_requests=50, num_threads=5)

下一步