37 KiB
37 KiB
测试方案设计
版本:v1.0 日期:2026-03-18 依据:testing skill 最佳实践
1. 测试策略概述
1.1 测试金字塔
╱╲
╱ ╲ E2E Tests (10%)
╱ ╲
╱──────╲
╱ ╲ Integration Tests (20%)
╱──────────╲
╱ ╲
╱────────────╲ Unit Tests (70%)
╱ ╲
╱────────────────╲
1.2 测试目标
| 指标 | 目标 | 说明 |
|---|---|---|
| 代码覆盖率 | >= 80% | 核心业务 |
| 单元测试通过率 | 100% | 必须通过 |
| 集成测试通过率 | 100% | 必须通过 |
| E2E测试通过率 | 95% | 允许5% flaky |
| 构建门禁 | 100% | CI必须通过 |
2. 单元测试
2.1 测试框架
# pytest.ini
[pytest]
testpaths = tests/unit
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--strict-markers
--tb=short
--cov=llm_gateway
--cov-report=term-missing
--cov-report=html
markers =
unit: Unit tests
integration: Integration tests
e2e: End-to-end tests
slow: Slow running tests
2.2 单元测试示例
# tests/unit/service/test_billing.py
import pytest
from decimal import Decimal
from unittest.mock import Mock, patch
from llm_gateway.service.billing import BillingService
from llm_gateway.service.repository import BillingRepository
from llm_gateway.service.balance import BalanceManager
class TestBillingService:
"""计费服务单元测试"""
@pytest.fixture
def billing_service(self):
"""Fixture: 计费服务实例"""
repo = Mock(spec=BillingRepository)
balance_mgr = Mock(spec=BalanceManager)
return BillingService(repo, balance_mgr)
def test_estimate_cost_gpt4(self, billing_service):
"""测试GPT-4成本估算"""
# Arrange
request = Mock()
request.Model = "gpt-4"
request.Messages.Tokens.return_value = 1000
request.Options.MaxTokens = 1000
# Act
cost = billing_service.EstimateCost(request)
# Assert
assert cost.Amount > 0
assert cost.Currency == "USD"
def test_estimate_cost_gpt35(self, billing_service):
"""测试GPT-3.5成本估算"""
# Arrange
request = Mock()
request.Model = "gpt-3.5-turbo"
request.Messages.Tokens.return_value = 1000
request.Options.MaxTokens = 1000
# Act
cost = billing_service.EstimateCost(request)
# Assert
# GPT-3.5应该比GPT-4便宜
gpt4_cost = billing_service.EstimateCost(self._create_request("gpt-4"))
assert cost.Amount < gpt4_cost.Amount
@pytest.mark.parametrize("model,expected_tokens", [
("gpt-4", 100),
("gpt-3.5-turbo", 50),
("claude-3-opus", 150),
])
def test_estimate_cost_models(self, billing_service, model, expected_tokens):
"""参数化测试:不同模型成本估算"""
request = self._create_request(model)
cost = billing_service.EstimateCost(request)
assert cost.Amount > 0
def _create_request(self, model):
"""创建测试请求"""
request = Mock()
request.Model = model
request.Messages.Tokens.return_value = 1000
request.Options.MaxTokens = 1000
return request
def test_insufficient_balance(self, billing_service):
"""测试余额不足场景"""
# Arrange
billing_service.balance_mgr.Reserve.return_value = None
billing_service.balance_mgr.ErrInsufficientBalance = Exception()
request = self._create_request("gpt-4")
# Act & Assert
with pytest.raises(Exception) as exc_info:
billing_service.ProcessRequest(request)
assert "insufficient" in str(exc_info.value).lower()
def test_process_request_success(self, billing_service):
"""测试成功处理请求"""
# Arrange
billing_service.balanceMgr.Reserve.return_value = Mock(Amount=Decimal("0.10"))
billing_service.balanceMgr.Charge.return_value = None
billing_service.repo.Create.return_value = None
request = self._create_request("gpt-4")
request.Response = Mock()
request.Response.Usage.PromptTokens = 500
request.Response.Usage.CompletionTokens = 500
request.ID = "req-123"
# Act
record = billing_service.ProcessRequest(request)
# Assert
assert record is not None
assert record.UserID == request.UserID
billing_service.balanceMgr.Reserve.assert_called_once()
billing_service.repo.Create.assert_called_once()
2.3 Router服务测试
# tests/unit/service/test_router.py
import pytest
from unittest.mock import Mock, AsyncMock
from llm_gateway.service.router import RouterService
from llm_gateway.internal.adapter import Registry, Provider
class TestRouterService:
"""路由服务单元测试"""
@pytest.fixture
def mock_provider(self):
"""Mock供应商"""
provider = Mock(spec=Provider)
provider.Name.return_value = "openai"
provider.HealthCheck.return_value = None
provider.Call.return_value = Mock(
id="resp-123",
choices=[Mock(delta=Mock(content="Hello"))],
usage=Mock(prompt_tokens=10, completion_tokens=5)
)
return provider
@pytest.fixture
def router_service(self, mock_provider):
"""Fixture: 路由服务实例"""
registry = Mock(spec=Registry)
registry.GetAvailableProviders.return_value = [mock_provider]
registry.Get.return_value = mock_provider
return RouterService(registry)
@pytest.mark.asyncio
async def test_route_success(self, router_service, mock_provider):
"""测试成功路由"""
# Arrange
request = Mock()
request.Model = "gpt-4"
request.UserID = 1
request.TenantID = 1
request.Messages = []
# Act
response = await router_service.Route(request)
# Assert
assert response is not None
mock_provider.Call.assert_called_once()
@pytest.mark.asyncio
async def test_route_no_provider(self, router_service):
"""测试无可用供应商"""
# Arrange
router_service.adapterRegistry.GetAvailableProviders.return_value = []
request = Mock()
request.Model = "gpt-4"
# Act & Assert
with pytest.raises(Exception) as exc_info:
await router_service.Route(request)
assert "no provider" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_route_fallback_on_error(self, router_service, mock_provider):
"""测试失败时降级"""
# Arrange
mock_provider.Call.side_effect = [Exception("API Error"), Mock()]
request = Mock()
request.Model = "gpt-4"
# Act
response = await router_service.Route(request)
# Assert
assert response is not None
assert mock_provider.Call.call_count == 2 # 重试一次
3. 集成测试
3.1 测试夹具
# tests/conftest.py
import pytest
import asyncio
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from httpx import AsyncClient
from llm_gateway.main import app
from llm_gateway.database import Base
@pytest.fixture(scope="session")
def event_loop():
"""创建事件循环"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="function")
def db_engine():
"""测试数据库引擎"""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
yield engine
Base.metadata.drop_all(engine)
@pytest.fixture(scope="function")
def db_session(db_engine):
"""测试数据库会话"""
Session = sessionmaker(bind=db_engine)
session = Session()
yield session
session.close()
@pytest.fixture(scope="function")
async def client():
"""测试客户端"""
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
@pytest.fixture
def test_user(db_session):
"""创建测试用户"""
user = User(
email="test@example.com",
password_hash="hashed_password",
name="Test User"
)
db_session.add(user)
db_session.commit()
return user
3.2 API集成测试
# tests/integration/api/test_chat.py
import pytest
from httpx import AsyncClient
class TestChatAPI:
"""聊天API集成测试"""
@pytest.mark.asyncio
async def test_chat_completions_success(self, client: AsyncClient, test_user):
"""测试成功创建聊天完成"""
# Arrange
token = await self._get_token(client, test_user)
# Act
response = await client.post(
"/v1/chat/completions",
json={
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello"}
]
},
headers={"Authorization": f"Bearer {token}"}
)
# Assert
assert response.status_code == 200
data = response.json()
assert "choices" in data
assert len(data["choices"]) > 0
@pytest.mark.asyncio
async def test_chat_completions_unauthorized(self, client: AsyncClient):
"""测试未授权访问"""
# Act
response = await client.post(
"/v1/chat/completions",
json={
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello"}
]
}
)
# Assert
assert response.status_code == 401
@pytest.mark.asyncio
async def test_chat_completions_invalid_model(self, client: AsyncClient, test_user):
"""测试无效模型"""
# Arrange
token = await self._get_token(client, test_user)
# Act
response = await client.post(
"/v1/chat/completions",
json={
"model": "invalid-model",
"messages": [
{"role": "user", "content": "Hello"}
]
},
headers={"Authorization": f"Bearer {token}"}
)
# Assert
assert response.status_code == 400
assert "model" in response.json()["error"]["code"].lower()
async def _get_token(self, client, user):
"""获取测试令牌"""
response = await client.post(
"/v1/auth/token",
json={
"email": user.email,
"password": "test_password"
}
)
return response.json()["access_token"]
4. 契约测试
4.1 Provider契约测试
# tests/contract/test_provider_adapter.py
import pytest
from llm_gateway.internal.adapter import ProviderAdapter
from llm_gateway.service.adapter import OpenAIAdapter
class TestProviderContract:
"""供应商适配器契约测试"""
@pytest.fixture
def adapter(self):
"""适配器实例"""
return OpenAIAdapter(api_key="test-key")
@pytest.mark.asyncio
async def test_response_structure(self, adapter):
"""测试响应结构符合契约"""
# Act
response = await adapter.chat_completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello"}]
)
# Assert - 验证必需字段
assert hasattr(response, 'id')
assert hasattr(response, 'model')
assert hasattr(response, 'choices')
assert hasattr(response, 'usage')
assert response.usage.prompt_tokens >= 0
assert response.usage.completion_tokens >= 0
assert response.usage.total_tokens >= 0
@pytest.mark.asyncio
async def test_error_mapping(self, adapter):
"""测试错误码映射"""
# 测试各种错误情况
test_cases = [
(Exception("invalid_api_key"), "INVALID_KEY"),
(Exception("rate_limit_exceeded"), "RATE_LIMIT"),
(Exception("insufficient_quota"), "INSUFFICIENT_QUOTA"),
]
for original_error, expected_code in test_cases:
result = adapter.map_error(original_error)
assert result.code == expected_code
@pytest.mark.asyncio
async def test_streaming(self, adapter):
"""测试流式响应"""
# Act
response = await adapter.chat_completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Count to 5"}],
stream=True
)
# Assert
chunks = []
async for chunk in response.stream():
chunks.append(chunk)
if len(chunks) >= 5:
break
assert len(chunks) > 0
assert all(hasattr(c, 'delta') for c in chunks)
4.2 契约漂移检测
# .github/workflows/contract-test.yml
name: Contract Tests
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
contract-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install pytest pytest-asyncio pact
- name: Run contract tests
run: |
pytest tests/contract/ -v --contract=true
- name: Publish contract
if: github.ref == 'refs/heads/main'
run: |
pact-broker publish \
pactDir=./pacts \
brokerUrl=${{ secrets.PACT_BROKER_URL }} \
brokerToken=${{ secrets.PACT_BROKER_TOKEN }}
5. E2E测试
5.1 Playwright E2E测试
# tests/e2e/test_user_journey.py
import pytest
from playwright.async_api import async_playwright
class TestUserJourney:
"""用户旅程E2E测试"""
@pytest.fixture
async def browser_context(self):
"""浏览器上下文"""
async with async_playwright() as p:
browser = await p.chromium.launch()
context = await browser.new_context()
yield context
await context.close()
await browser.close()
@pytest.mark.asyncio
async def test_complete_user_flow(self, browser_context):
"""测试完整用户流程"""
page = await browser_context.new_page()
# 1. 注册
await page.goto("https://app.lgateway.com/register")
await page.fill("[name=email]", "user@example.com")
await page.fill("[name=password]", "SecurePassword123!")
await page.click("button[type=submit]")
await page.wait_for_selector(".dashboard")
# 2. 创建API Key
await page.click("text=API Keys")
await page.click("text=Create Key")
await page.fill("[name=description]", "Test Key")
await page.click("button:has-text('Create')")
api_key = await page.text_content(".api-key")
# 3. 测试API调用
response = await self._call_api(api_key, {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}]
})
assert response.status == 200
# 4. 查看使用量
await page.click("text=Usage")
await page.wait_for_selector(".usage-chart")
# 5. 检查账单
await page.click("text=Billing")
await page.wait_for_selector(".balance")
async def _call_api(self, api_key, payload):
"""调用API"""
import httpx
async with httpx.AsyncClient() as client:
return await client.post(
"https://api.lgateway.com/v1/chat/completions",
json=payload,
headers={"Authorization": f"Bearer {api_key}"}
)
6. 性能测试
6.1 负载测试
# tests/performance/test_load.py
import pytest
import asyncio
import time
from locust import HttpUser, task, between
class LLMGatewayUser(HttpUser):
"""Locust负载测试用户"""
wait_time = between(0.5, 2)
def on_start(self):
"""初始化"""
response = self.client.post("/v1/auth/token", json={
"email": "test@example.com",
"password": "password"
})
self.token = response.json()["access_token"]
@task(10)
def chat_completion(self):
"""聊天完成请求"""
self.client.post(
"/v1/chat/completions",
json={
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}]
},
headers={"Authorization": f"Bearer {self.token}"}
)
@task(1)
def list_models(self):
"""列出模型"""
self.client.get(
"/v1/models",
headers={"Authorization": f"Bearer {self.token}"}
)
6.2 性能基准
# k6/performance.js
import http from 'k6/http';
import { check, sleep } from 'k6';
export const options = {
stages: [
{ duration: '2m', target: 100 }, // 2分钟内增加到100用户
{ duration: '5m', target: 100 }, // 保持100用户5分钟
{ duration: '2m', target: 200 }, // 增加到200用户
{ duration: '5m', target: 200 }, // 保持200用户5分钟
{ duration: '2m', target: 0 }, // 降到0
],
thresholds: {
http_req_duration: ['p(95)<500'], // P95 < 500ms
http_req_failed: ['rate<0.01'], // 失败率 < 1%
},
};
export default function () {
const payload = JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: 'Hello' }]
});
const params = {
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${__ENV.API_KEY}`,
},
};
const res = http.post('https://api.lgateway.com/v1/chat/completions', payload, params);
check(res, { 'status was 200': (r) => r.status === 200 });
sleep(1);
}
7. 测试覆盖率目标
7.1 覆盖率矩阵
| 模块 | 目标覆盖率 | 关键测试 |
|---|---|---|
| Router Service | 90% | 路由选择、fallback |
| Billing Service | 85% | 计费、扣款、退款 |
| Auth Service | 80% | 认证、授权 |
| Adapter | 85% | 供应商调用、错误处理 |
| Middleware | 75% | 限流、日志 |
| API Handlers | 70% | 请求验证、响应格式化 |
8. CI/CD集成
8.1 GitHub Actions
# .github/workflows/test.yml
name: Test Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
unit-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- run: pip install -r requirements-test.txt
- run: pytest tests/unit/ -v --cov
integration-test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- run: pip install -r requirements-test.txt
- run: pytest tests/integration/ -v
contract-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: pytest tests/contract/ -v --contract
e2e-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
with:
node-version: '18'
- run: npm install
- run: npx playwright install
- run: npx playwright test
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: snyk/actions/python@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
9. 混沌工程测试
9.1 故障注入策略
# tests/chaos/test_fault_injection.py
import pytest
from chaos.engine import ChaosEngine
class TestChaosEngineering:
"""混沌工程测试 - 验证系统韧性"""
@pytest.fixture
def chaos(self):
"""混沌引擎"""
return ChaosEngine()
@pytest.mark.asyncio
async def test_provider_timeout_handling(self, chaos):
"""测试供应商超时处理"""
# 注入:供应商响应超时
await chaos.inject_latency(
target="provider:openai",
delay=30 # 30秒延迟
)
# 验证:系统触发降级
response = await router.route(request)
assert response.fallback_triggered
assert response.fallback_provider == "anthropic"
@pytest.mark.asyncio
async def test_circuit_breaker_open(self, chaos):
"""测试断路器打开"""
# 注入:连续失败
await chaos.inject_errors(
target="provider:azure",
count=10,
error_type="connection"
)
# 验证:断路器打开
cb_state = await chaos.get_circuit_state("azure")
assert cb_state == "OPEN"
@pytest.mark.asyncio
async def test_network_partition(self, chaos):
"""测试网络分区"""
# 注入:网络分区
await chaos.network_partition(
source="gateway",
target="billing",
drop_packets=0.5
)
# 验证:异步计费
billing = await router.route(request)
assert billing.async_processed
@pytest.mark.asyncio
async def test_database_failure(self, chaos):
"""测试数据库故障"""
# 注入:主库故障
await chaos.failover_database(
from_primary=True
)
# 验证:自动切换到从库
db_state = await get_database_state()
assert db_state.active == "replica"
assert db_state.data_consistent
9.2 韧性验证场景
| 场景 | 注入故障 | 预期行为 |
|---|---|---|
| 单Provider宕机 | kill provider进程 | 自动切换到备选Provider |
| Redis不可用 | 网络隔离 | 降级到本地限流 |
| 数据库故障 | 主库不可用 | 自动切换从库,写入延迟处理 |
| 流量突增 | 10倍QPS | 限流生效,无雪崩 |
| 依赖服务超时 | 注入超时 | 快速失败,不阻塞 |
10. 安全测试
10.1 OWASP Top 10 防护测试
# tests/security/test_owasp.py
import pytest
from security.scanner import VulnerabilityScanner
class TestSecurityVulnerabilities:
"""安全漏洞测试"""
def test_sql_injection_prevention(self):
"""测试SQL注入防护"""
# 恶意输入
malicious_inputs = [
"' OR '1'='1",
"'; DROP TABLE users;--",
"1' UNION SELECT * FROM passwords--"
]
for payload in malicious_inputs:
response = api.get(f"/users?name={payload}")
assert response.status_code == 400
assert "injection" not in response.text.lower()
def test_api_key_exposure(self):
"""测试API Key泄露检测"""
# 模拟响应包含敏感信息
response = api.get("/v1/models")
assert api_key not in response.text
assert not any(k in response.headers for k in ['X-API-Key', 'Authorization'])
def test_rate_limiting_bypass(self):
"""测试限流绕过防护"""
# 尝试绕过限流
for i in range(150):
response = api.post("/v1/chat/completions", data)
if i >= 100:
assert response.status_code == 429
def test_privilege_escalation(self):
"""测试权限提升防护"""
# 普通用户尝试访问管理员API
response = api_admin.delete("/admin/users/1")
assert response.status_code == 403
def test_cors_misconfiguration(self):
"""测试CORS配置"""
response = api.options("/api/v1/")
assert "Access-Control-Allow-Origin" in response.headers
# 验证不允许任意Origin
assert response.headers.get("Access-Control-Allow-Origin") != "*"
10.2 密钥轮换测试
# tests/security/test_key_rotation.py
class TestKeyRotation:
"""密钥轮换测试"""
def test_automatic_key_rotation(self):
"""测试自动密钥轮换"""
# 1. 触发轮换
rotation_service.trigger_rotation()
# 2. 验证新密钥生效
new_key = key_manager.get_active_key()
assert new_key.version > old_key.version
assert new_key.is_active
# 3. 验证旧密钥过期
assert not old_key.is_active
# 验证有过渡期
assert old_key.expires_at > now
def test_key_rotation_graceful(self):
"""测试轮换期间服务不中断"""
# 模拟轮换期间的请求
requests = [api_request() for _ in range(100)]
results = parallel_execute(requests)
# 验证所有请求成功(使用旧密钥或新密钥)
assert all(r.success for r in results)
10.3 日志脱敏测试
# tests/security/test_log_redaction.py
class TestLogRedaction:
"""日志脱敏测试"""
def test_sensitive_data_redaction(self):
"""测试敏感数据脱敏"""
# 记录包含敏感信息的日志
logger.info(f"User {user_id} payment: {credit_card}")
# 验证日志已脱敏
log_entry = get_latest_log()
assert credit_card not in log_entry.message
assert "****" in log_entry.message # 脱敏后格式
assert "4" in log_entry.message # 保留后4位
def test_pii_detection(self):
"""测试PII检测"""
pii_data = [
"13812345678", # 手机号
"user@example.com", # 邮箱
"610102199001011234", # 身份证
]
for pii in pii_data:
logger.info(f"User data: {pii}")
log = get_latest_log()
assert pii not in log.message
11. 可观测性测试
11.1 指标验证测试
# tests/observability/test_metrics.py
class TestMetricsEmission:
"""指标发射测试"""
def test_request_latency_histogram(self):
"""测试请求延迟直方图"""
# 发送请求
response = api.post("/v1/chat/completions", request_data)
# 验证指标
metrics = prometheus.get_metrics("http_request_duration_seconds")
assert metrics.labels["method"] == "POST"
assert metrics.labels["status"] == "200"
assert metrics.value > 0
def test_billing_amount_gauge(self):
"""测试计费金额仪表"""
# 执行计费
billing.charge(user_id, amount)
# 验证指标
metrics = prometheus.get_metrics("billing_charged_amount")
assert metrics.labels["currency"] == "USD"
assert metrics.value == amount
def test_provider_failure_counter(self):
"""测试供应商失败计数"""
# 触发失败
for _ in range(5):
try:
provider.call(request)
except Exception:
pass
# 验证计数器
counter = prometheus.get_metrics("provider_calls_total")
assert counter.labels["status"] == "error"
assert counter.value >= 5
11.2 链路追踪验证
# tests/observability/test_tracing.py
class TestDistributedTracing:
"""分布式追踪测试"""
def test_trace_context_propagation(self):
"""测试Trace上下文传播"""
# 发起请求
response = api.post("/v1/chat/completions", request)
# 验证TraceID
trace_id = response.headers["X-Trace-ID"]
spans = jaeger.get_spans(trace_id)
# 验证链路完整
assert len(spans) >= 4 # gateway -> router -> adapter -> provider
assert all(s.parent_id in [s.id for s in spans] for s in spans)
def test_span_attributes(self):
"""测试Span属性完整"""
spans = jaeger.get_spans(trace_id)
for span in spans:
assert span.name
assert span.service_name
assert span.start_time
assert span.duration > 0
# 验证关键属性
if span.name == "provider.call":
assert span.attributes["provider"]
assert span.attributes["model"]
11.3 告警触发验证
# tests/observability/test_alerts.py
class TestAlerting:
"""告警测试"""
def test_high_latency_alert(self):
"""测试高延迟告警"""
# 注入高延迟
for _ in range(10):
await provider.call(delay=5)
# 验证告警
alert = alert_manager.get_latest_alert()
assert alert.name == "HighLatencyP99"
assert alert.severity == "P1"
def test_low_balance_alert(self):
"""测试低余额告警"""
# 设置低余额
balance.set_balance(user_id, 10)
# 触发检查
await balance.check_threshold()
# 验证告警
alert = alert_manager.get_latest_alert()
assert alert.name == "LowBalance"
assert user_id in alert.targets
---
## 12. 测试数据管理
### 12.1 测试数据工厂
```python
# tests/fixtures/factories.py
import factory
from datetime import datetime
class UserFactory(factory.Factory):
"""用户测试数据工厂"""
class Meta:
model = dict
user_id = factory.Sequence(lambda n: 10000 + n)
email = factory.LazyAttribute(lambda o: f"user{o.user_id}@test.com")
name = factory.Faker("name")
tier = "growth"
balance = factory.Faker("pydecimal", left_digits=5, right_digits=2)
created_at = factory.LazyFunction(datetime.now)
class APIKeyFactory(factory.Factory):
"""API Key测试数据工厂"""
class Meta:
model = dict
key_id = factory.Sequence(lambda n: f"sk-test-{n:08d}")
user_id = factory.SubFactory(UserFactory)
name = "Test Key"
quota = 10000
rate_limit = 1000
is_active = True
created_at = factory.LazyFunction(datetime.now)
class ProviderFactory(factory.Factory):
"""Provider测试数据工厂"""
class Meta:
model = dict
provider_id = factory.Sequence(lambda n: n)
name = factory.Iterator(["openai", "anthropic", "azure", "google"])
api_base = "https://api.example.com"
latency_p99 = factory.Faker("pyint", min_value=50, max_value=500)
availability = factory.Faker("pyfloat", min_value=0.95, max_value=1.0)
cost_per_1k = factory.Faker("pyfloat", min_value=0.5, max_value=10.0)
12.2 测试数据隔离
# tests/conftest.py
import pytest
from tests.fixtures.database import TestDatabase
@pytest.fixture(scope="session")
def test_db():
"""测试数据库会话级fixture"""
db = TestDatabase()
db.init(schema="tests/fixtures/schema.sql")
yield db
db.cleanup()
@pytest.fixture
def clean_user(test_db):
"""每个测试前清理用户数据"""
test_db.execute("DELETE FROM users WHERE email LIKE '%@test.com'")
yield
test_db.execute("DELETE FROM users WHERE email LIKE '%@test.com'")
@pytest.fixture
def isolated_balance(test_db):
"""隔离的余额测试"""
# 每个测试使用独立账户
account_id = test_db.create_test_account()
test_db.set_balance(account_id, 10000)
yield account_id
test_db.cleanup_account(account_id)
12.3 测试数据版本管理
# tests/data/version.yaml
# 测试数据版本管理
version: "1.0"
datasets:
user_tier_free:
count: 100
balance_range: [0, 100]
tier: free
user_tier_growth:
count: 50
balance_range: [100, 10000]
tier: growth
user_tier_enterprise:
count: 10
balance_range: [10000, 100000]
tier: enterprise
provider_active:
- name: openai
models: [gpt-4, gpt-3.5-turbo]
status: active
- name: anthropic
models: [claude-3-opus, claude-3-sonnet]
status: active
13. 部署验证测试
13.1 环境一致性验证
# tests/deployment/test_environment.py
class TestEnvironmentConsistency:
"""环境一致性验证"""
def test_environment_variables(self):
"""验证环境变量配置"""
required_vars = [
"DATABASE_URL",
"REDIS_URL",
"KAFKA_BROKERS",
"LOG_LEVEL",
]
for var in required_vars:
assert os.environ.get(var), f"Missing env var: {var}"
def test_database_schema_version(self):
"""验证数据库schema版本"""
# 获取当前版本
current_version = db.get_schema_version()
# 获取期望版本
expected_version = get_code_schema_version()
assert current_version == expected_version, \
f"Schema mismatch: db={current_version}, code={expected_version}"
def test_dependencies_installed(self):
"""验证依赖包版本"""
import pkg_resources
requirements = open("requirements.txt").read()
for req in pkg_resources.parse_requirements(requirements):
try:
installed = pkg_resources.get_distribution(req.project_name)
assert str(installed.version) in str(req.specifier)
except Exception as e:
pytest.fail(f"Dependency issue: {req}, error: {e}")
13.2 健康检查验证
# tests/deployment/test_health.py
class TestHealthChecks:
"""健康检查验证"""
def test_gateway_health(self):
"""测试网关健康"""
response = requests.get("http://localhost:8080/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert "version" in data
def test_service_dependencies(self):
"""测试服务依赖"""
response = requests.get("http://localhost:8080/health/ready")
data = response.json()
# 验证所有依赖健康
assert data["dependencies"]["database"]["status"] == "up"
assert data["dependencies"]["redis"]["status"] == "up"
assert data["dependencies"]["kafka"]["status"] == "up"
def test_startup_probe(self):
"""测试启动探针"""
# 模拟服务启动
start_time = time.time()
while time.time() - start_time < 30:
try:
response = requests.get("http://localhost:8080/health")
if response.status_code == 200:
break
except Exception:
pass
time.sleep(1)
# 验证30秒内启动完成
assert time.time() - start_time < 30
13.3 配置验证
# tests/deployment/test_config.py
class TestConfigurationValidation:
"""配置验证测试"""
def test_secret_rotation_config(self):
"""验证密钥轮换配置"""
config = get_config()
assert config.rotation_enabled is True
assert config.rotation_interval_days == 90
assert config.grace_period_hours == 24
def test_rate_limit_config(self):
"""验证限流配置"""
config = get_config()
assert config.rate_limit.global_limit == 100000
assert config.rate_limit.tenant_limit == 10000
assert config.rate_limit.apikey_limit == 1000
def test_circuit_breaker_config(self):
"""验证断路器配置"""
config = get_config()
assert config.circuit_breaker.failure_threshold == 5
assert config.circuit_breaker.timeout_seconds == 60
assert config.circuit_breaker.half_open_max_calls == 3
13.4 金丝雀部署验证
# tests/deployment/test_canary.py
class TestCanaryDeployment:
"""金丝雀部署验证"""
def test_canary_routing(self):
"""测试金丝雀路由"""
# 发送流量到新版本
for i in range(100):
response = api.post("/v1/chat/completions", request)
# 验证10%流量到新版本
metrics = get_canary_metrics()
assert 0.05 < metrics.canary_percentage < 0.15
def test_canary_error_rate(self):
"""测试金丝雀错误率"""
errors = get_canary_errors()
assert errors.new_version_error_rate < 0.01
assert errors.new_version_error_rate < errors.old_version_error_rate * 2
def test_rollback_on_failure(self):
"""测试失败自动回滚"""
# 注入失败
inject_failure("canary", error_rate=0.5)
# 等待检测和回滚
time.sleep(60)
# 验证已回滚
version = get_current_version()
assert version == "stable"
### 9.1 与技术架构一致性
| 测试项 | 对应模块 | 验证点 |
|--------|----------|--------|
| Provider Adapter测试 | `technical_architecture.md` | 契约符合 |
| 路由策略测试 | `technical_architecture.md` | 选择算法 |
| 计费精度测试 | `business_solution_v1.md` | Decimal精度 |
| 限流测试 | `p1_optimization_solution_v1.md` | 多维度 |
| 风控测试 | `security_solution_v1.md` | 规则执行 |
---
**文档状态**:测试方案设计
**关联文档**:
- `technical_architecture_design_v1_2026-03-18.md`
- `architecture_solution_v1_2026-03-18.md`
- `security_solution_v1_2026-03-18.md`