Files
user-system/docs/SECURITY.md

920 lines
21 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 安全设计文档
## 概述
本文档描述用户管理系统的安全设计,包括数据加密、防攻击策略、合规性要求等。安全是系统的核心考量,所有设计均符合企业级安全标准。
## 安全架构
### 安全层次
```
┌─────────────────────────────────────────┐
│ 应用层安全 (Application) │
│ • 输入验证 • 输出编码 • 业务安全 │
├─────────────────────────────────────────┤
│ 服务层安全 (Service) │
│ • 认证授权 • 权限控制 • 数据过滤 │
├─────────────────────────────────────────┤
│ 网络层安全 (Network) │
│ • HTTPS/TLS • 防火墙 • 网络隔离 │
├─────────────────────────────────────────┤
│ 数据层安全 (Data) │
│ • 数据加密 • 访问控制 • 审计日志 │
└─────────────────────────────────────────┘
```
---
## 1. 数据加密
### 1.1 密码加密
#### 加密算法选择
| 算法 | 推荐度 | 说明 |
|------|--------|------|
| Argon2id | ⭐⭐⭐⭐⭐ | 最推荐,抗 GPU/ASIC 攻击 |
| bcrypt | ⭐⭐⭐⭐ | 成熟稳定,可配置成本因子 |
| PBKDF2 | ⭐⭐⭐ | NIST 认证,但性能较差 |
#### 推荐配置Argon2id
```yaml
argon2:
algorithm: argon2id
memory_cost: 65536 # 64 MB
time_cost: 3 # 迭代次数
parallelism: 4 # 并行线程
hash_length: 32 # Hash 长度
salt_length: 16 # 盐长度
```
#### 加密流程
```python
# Python 伪代码
import argon2
def hash_password(password: str) -> str:
# 生成随机盐
salt = os.urandom(16)
# 使用 Argon2id 加密
hasher = argon2.PasswordHasher(
time_cost=3,
memory_cost=65536,
parallelism=4,
hash_len=32,
salt_len=16
)
return hasher.hash(password)
def verify_password(password: str, hash: str) -> bool:
try:
hasher = argon2.PasswordHasher()
hasher.verify(hash, password)
return True
except:
return False
```
#### 密码策略
| 规则 | 要求 | 说明 |
|------|------|------|
| 最小长度 | 8 字符 | 建议使用 12 字符以上 |
| 最大长度 | 32 字符 | 防止 DoS 攻击 |
| 大小写 | 至少 1 个 | - |
| 数字 | 至少 1 个 | - |
| 特殊字符 | 至少 1 个 | !@#$%^&*()_+-=[]{}|;:'",.<>/? |
| 常见密码 | 检查黑名单 | 禁止使用 123456、password 等 |
| 密码历史 | 检查最近 5 次 | 防止重复使用旧密码 |
---
### 1.2 敏感数据加密
#### 加密数据范围
| 数据类型 | 加密方式 | 说明 |
|----------|----------|------|
| 手机号 | AES-256-GCM | 部分脱敏 + 加密存储 |
| 身份证号 | AES-256-GCM | 完全加密 |
| 银行卡号 | AES-256-GCM | 部分脱敏 + 加密存储 |
| 邮箱 | AES-256-GCM | 可选加密 |
| 私钥/密钥 | HSM | 硬件安全模块 |
#### AES-256-GCM 配置
```yaml
encryption:
algorithm: AES-256-GCM
key_size: 256
key_rotation_days: 90
key_store: HSM # 或 KMS
```
#### 加密实现Go
```go
package security
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"io"
)
type AESEncryptor struct {
key []byte
}
func NewAESEncryptor(key string) *AESEncryptor {
return &AESEncryptor{key: []byte(key)}
}
func (e *AESEncryptor) Encrypt(plaintext string) (string, error) {
block, err := aes.NewCipher(e.key)
if err != nil {
return "", err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
nonce := make([]byte, gcm.NonceSize())
if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
return "", err
}
ciphertext := gcm.Seal(nonce, nonce, []byte(plaintext), nil)
return base64.StdEncoding.EncodeToString(ciphertext), nil
}
func (e *AESEncryptor) Decrypt(ciphertext string) (string, error) {
data, err := base64.StdEncoding.DecodeString(ciphertext)
if err != nil {
return "", err
}
block, err := aes.NewCipher(e.key)
if err != nil {
return "", err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
nonceSize := gcm.NonceSize()
nonce, ciphertext := data[:nonceSize], data[nonceSize:]
plaintext, err := gcm.Open(nil, nonce, ciphertext, nil)
if err != nil {
return "", err
}
return string(plaintext), nil
}
```
---
### 1.3 Token 签名
#### JWT 签名算法
| 算法 | 安全性 | 说明 |
|------|--------|------|
| RS256 | ⭐⭐⭐⭐⭐ | 推荐,非对称加密 |
| ES256 | ⭐⭐⭐⭐⭐ | 推荐ECDSA 签名 |
| HS256 | ⭐⭐⭐ | 对称加密,密钥管理复杂 |
#### 推荐配置RS256
```yaml
jwt:
algorithm: RS256
access_token_expire: 7200 # 2 小时
refresh_token_expire: 2592000 # 30 天
key_rotation_days: 90
issuer: "user-management-system"
```
#### 密钥管理
- **私钥存储**:使用 HSM 或 KMS
- **公钥分发**:通过 JWKS 端点
- **密钥轮换**:每 90 天轮换一次
- **密钥备份**:加密备份,多地存储
---
### 1.4 数据脱敏
#### 脱敏规则
| 数据类型 | 显示格式 | 示例 |
|----------|----------|------|
| 手机号 | 138****8000 | 13800138000 → 138****8000 |
| 邮箱 | j***e@example.com | john@example.com → j***e@example.com |
| 身份证号 | 110***********1234 | 110101199001011234 → 110***********1234 |
| 银行卡号 | 6222********1234 | 6222020012345678 → 6222********1234 |
#### 脱敏实现Java
```java
public class DataMasking {
public static String maskPhone(String phone) {
if (phone == null || phone.length() < 11) {
return phone;
}
return phone.substring(0, 3) + "****" + phone.substring(7);
}
public static String maskEmail(String email) {
if (email == null || !email.contains("@")) {
return email;
}
String[] parts = email.split("@");
String local = parts[0];
String maskedLocal = local.substring(0, 1) + "***" +
local.substring(local.length() - 1);
return maskedLocal + "@" + parts[1];
}
public static String maskIdCard(String idCard) {
if (idCard == null || idCard.length() < 18) {
return idCard;
}
return idCard.substring(0, 3) + "***********" + idCard.substring(14);
}
}
```
---
## 2. 防攻击策略
### 2.1 SQL 注入防护
#### 防护措施
1. **使用参数化查询**
```sql
-- 错误示例SQL 注入风险)
SELECT * FROM users WHERE username = '" + username + "'
-- 正确示例(参数化查询)
SELECT * FROM users WHERE username = ?
```
2. **使用 ORM 框架**
```python
# SQLAlchemyPython
user = session.query(User).filter(User.username == username).first()
# HibernateJava
User user = session.createQuery(
"FROM User WHERE username = :username", User.class)
.setParameter("username", username)
.uniqueResult();
```
3. **输入验证**
```python
import re
def validate_username(username: str) -> bool:
# 只允许字母、数字、下划线
pattern = r'^[a-zA-Z0-9_]{4,20}$'
return re.match(pattern, username) is not None
```
---
### 2.2 XSS 防护
#### 防护措施
1. **输出编码**
```html
<!-- 错误示例 -->
<div>{{ user_input }}</div>
<!-- 正确示例HTML 编码) -->
<div>{{ user_input | escape }}</div>
```
2. **Content Security Policy (CSP)**
```http
Content-Security-Policy: default-src 'self';
script-src 'self' 'unsafe-inline' 'unsafe-eval';
style-src 'self' 'unsafe-inline';
img-src 'self' data: https:;
```
3. **输入过滤**
```python
import html
def sanitize_input(input_str: str) -> str:
# 转义 HTML 特殊字符
return html.escape(input_str)
```
---
### 2.3 CSRF 防护
#### 防护措施
1. **CSRF Token**
```python
# 生成 Token
def generate_csrf_token():
return secrets.token_urlsafe(32)
# 验证 Token
def verify_csrf_token(request):
token = request.headers.get('X-CSRF-Token')
session_token = session.get('csrf_token')
return token == session_token
```
2. **SameSite Cookie**
```http
Set-Cookie: session_id=xxx; SameSite=Strict; Secure; HttpOnly
```
3. **Origin 验证**
```python
def validate_origin(request):
allowed_origins = ['https://example.com']
origin = request.headers.get('Origin')
return origin in allowed_origins
```
---
### 2.4 接口防刷
#### 限流策略
| 类型 | 算法 | 阈值 | 时间窗口 | 说明 |
|------|------|------|----------|------|
| 登录接口 | 令牌桶 | 5 次/分钟 | 1 分钟 | 防止暴力破解 |
| 注册接口 | 漏桶 | 3 次/小时 | 1 小时 | 防止批量注册 |
| 验证码接口 | 固定窗口 | 1 次/分钟 | 1 分钟 | 防止验证码滥用 |
| API 接口(普通) | 滑动窗口 | 1000 次/分钟 | 1 分钟 | 普通用户限流 |
| API 接口VIP | 令牌桶 | 10000 次/分钟 | 1 分钟 | VIP 用户限流 |
| API 接口IP | 令牌桶 | 10000 次/分钟 | 1 分钟 | 单 IP 限流 |
#### 分布式限流实现
```go
package ratelimit
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// 令牌桶算法
type TokenBucket struct {
redis *redis.Client
capacity int64 // 桶容量
rate int64 // 令牌生成速率tokens/秒)
}
func NewTokenBucket(redis *redis.Client, capacity, rate int64) *TokenBucket {
return &TokenBucket{
redis: redis,
capacity: capacity,
rate: rate,
}
}
// 尝试获取令牌
func (tb *TokenBucket) Allow(ctx context.Context, key string) (bool, error) {
now := time.Now().Unix()
windowStart := now - 1 // 1 秒时间窗口
pipe := tb.redis.Pipeline()
// 获取当前令牌数
tokensKey := fmt.Sprintf("rate_limit:tokens:%s", key)
tokensCmd := pipe.Get(ctx, tokensKey)
// 获取上次刷新时间
lastRefillKey := fmt.Sprintf("rate_limit:last_refill:%s", key)
lastRefillCmd := pipe.Get(ctx, lastRefillKey)
_, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return false, err
}
var tokens float64
if err := tokensCmd.Err(); err == nil {
tokens, _ = tokensCmd.Float64()
} else {
tokens = float64(tb.capacity)
}
var lastRefill int64
if err := lastRefillCmd.Err(); err == nil {
lastRefill, _ = lastRefillCmd.Int64()
} else {
lastRefill = now
}
// 计算需要补充的令牌
elapsedTime := now - lastRefill
refillTokens := float64(elapsedTime) * float64(tb.rate)
tokens += refillTokens
if tokens > float64(tb.capacity) {
tokens = float64(tb.capacity)
}
// 尝试消费一个令牌
if tokens >= 1 {
tokens -= 1
// 更新 Redis
pipe := tb.redis.Pipeline()
pipe.Set(ctx, tokensKey, tokens, 2*time.Second)
pipe.Set(ctx, lastRefillKey, now, 2*time.Second)
pipe.Exec(ctx)
return true, nil
}
return false, nil
}
```
#### Redis 限流实现
```python
import redis
import time
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def is_allowed(self, key: str, limit: int, window: int) -> bool:
current = int(time.time())
window_start = current - window
pipe = self.redis.pipeline()
# 移除过期记录
pipe.zremrangebyscore(key, 0, window_start)
# 统计当前窗口请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(current): current})
# 设置过期时间
pipe.expire(key, window)
results = pipe.execute()
count = results[1]
return count < limit
```
---
### 2.5 密码暴力破解防护
#### 防护措施
1. **登录失败限制**
```yaml
security:
login:
max_attempts: 5
lockout_duration: 1800 # 30 分钟
progressive_delay: true
```
2. **渐进式延迟**
```python
def calculate_lockout_time(attempts: int) -> int:
if attempts <= 3:
return 0
elif attempts == 4:
return 30 # 30 秒
elif attempts == 5:
return 300 # 5 分钟
else:
return 1800 # 30 分钟
```
3. **验证码触发**
```python
def should_show_captcha(attempts: int) -> bool:
return attempts >= 3
```
---
### 2.6 中间人攻击防护
#### HTTPS 配置
```nginx
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# 其他安全头
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
}
```
---
## 3. 认证与授权安全
### 3.1 JWT 安全
#### JWT Payload 结构
```json
{
"iss": "user-management-system",
"sub": "123456",
"aud": "api.example.com",
"exp": 1699999999,
"iat": 1699992799,
"jti": "unique-token-id",
"user": {
"id": 123456,
"username": "john_doe",
"roles": ["user"]
}
}
```
#### JWT 安全最佳实践
1. **不存储敏感信息**:不在 Payload 中存储密码、手机号等
2. **设置合理的过期时间**Access Token 2 小时Refresh Token 30 天
3. **使用强签名算法**RS256 或 ES256
4. **Token 黑名单**:吊销的 Token 存入 Redis
5. **刷新 Token 一次性**:使用后立即失效
#### Token 黑名单实现
```python
class TokenBlacklist:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def revoke(self, token: str, expire_at: int):
key = f"blacklist:{token}"
self.redis.setex(key, expire_at, "1")
def is_revoked(self, token: str) -> bool:
key = f"blacklist:{token}"
return self.redis.exists(key) == 1
```
---
### 3.2 OAuth 2.0 安全
#### 授权码模式流程
```mermaid
sequenceDiagram
participant User as 用户
participant App as 应用
participant Auth as 认证服务
participant Resource as 资源服务
User->>App: 1. 点击登录
App->>Auth: 2. 重定向到授权页
User->>Auth: 3. 授权
Auth->>App: 4. 返回授权码
App->>Auth: 5. 用授权码换取 Token
Auth->>App: 6. 返回 Token
App->>Resource: 7. 用 Token 访问资源
Resource->>App: 8. 返回资源
```
#### 安全注意事项
1. **state 参数**:防止 CSRF 攻击
2. **PKCE**:移动端推荐使用
3. **HTTPS**:所有通信必须使用 HTTPS
4. **Token 存储**:后端存储,避免前端暴露
5. **Scope 限制**:最小权限原则
---
## 4. 审计与监控
### 4.1 审计日志
#### 审计事件
| 事件类型 | 说明 | 优先级 |
|----------|------|--------|
| 用户注册 | 新用户注册 | 中 |
| 用户登录 | 用户登录成功/失败 | 中 |
| 密码修改 | 用户修改密码 | 高 |
| 角色分配 | 分配/移除角色 | 高 |
| 权限变更 | 修改权限 | 高 |
| 数据导出 | 导出敏感数据 | 高 |
| 异常操作 | 异常行为检测 | 高 |
#### 审计日志格式
```json
{
"event_id": "uuid",
"event_type": "password.changed",
"user_id": 123456,
"username": "john_doe",
"ip": "192.168.1.1",
"user_agent": "Mozilla/5.0...",
"resource_type": "user",
"resource_id": 123456,
"action": "update",
"old_value": "***",
"new_value": "***",
"result": "success",
"error_message": null,
"created_at": "2026-03-10T10:00:00Z"
}
```
---
### 4.2 异常检测
#### 异常登录检测
1. **异地登录检测**
```python
def detect_abnormal_login(user_id: str, current_ip: str) -> bool:
# 获取用户最近登录 IP 列表
recent_ips = get_user_recent_ips(user_id, limit=5)
# 获取当前 IP 的地理位置
current_location = get_ip_location(current_ip)
# 检查是否与最近登录位置差异过大
for ip in recent_ips:
location = get_ip_location(ip)
distance = calculate_distance(location, current_location)
if distance > 1000: # 超过 1000 公里
return True
return False
```
2. **异常设备检测**
```python
def detect_abnormal_device(user_id: str, device_id: str) -> bool:
# 检查设备是否已注册
if not is_device_registered(user_id, device_id):
# 新设备,需要二次验证
return True
# 检查设备最后活跃时间
last_active = get_device_last_active(device_id)
if last_active.days_ago() > 30:
# 设备长时间未使用,需要验证
return True
return False
```
---
### 4.3 安全监控指标
| 指标 | 阈值 | 告警级别 |
|------|------|----------|
| 登录失败率 | > 10% | 警告 |
| 单 IP 登录失败次数 | > 20 次/分钟 | 严重 |
| 单用户登录失败次数 | > 10 次/小时 | 警告 |
| 异常登录次数 | > 5 次/小时 | 严重 |
| Token 验证失败率 | > 5% | 警告 |
| 接口调用异常 | 错误率 > 1% | 警告 |
---
## 5. 合规性要求
### 5.1 GDPR 合规
#### 数据主体权利
| 权利 | 实现方式 |
|------|----------|
| 访问权 | 提供数据导出接口 |
| 更正权 | 支持用户更新信息 |
| 删除权 | 支持账号删除(数据清理) |
| 限制处理权 | 支持数据冻结 |
| 数据携带权 | 支持数据导出为标准格式 |
| 反对权 | 支持用户撤销授权 |
#### 数据最小化原则
```python
# 只收集必要的用户信息
def collect_user_data():
return {
"username": username,
"email": email,
"phone": phone,
# 不收集不必要的字段,如家庭住址、收入等
}
```
---
### 5.2 个人信息保护法
#### 数据分类分级
| 级别 | 数据类型 | 保护措施 |
|------|----------|----------|
| 一级(一般) | 用户名、昵称 | 访问控制 |
| 二级(重要) | 手机号、邮箱 | 加密存储 + 访问控制 |
| 三级(敏感) | 身份证号、银行信息 | 强加密 + 审计日志 |
| 四级(核心) | 生物识别信息 | HSM + 最小权限 |
#### 数据留存策略
```yaml
data_retention:
# 活跃用户数据:永久保留
active_users: forever
# 注销用户数据:保留 30 天后清理
deleted_users: 30 days
# 日志数据:保留 2 年
logs: 2 years
# 登录日志:保留 1 年
login_logs: 1 year
# 审计日志:保留 5 年
audit_logs: 5 years
```
---
### 5.3 等保 2.0
#### 安全等级:三级
| 控制项 | 要求 | 实现状态 |
|--------|------|----------|
| 身份鉴别 | 双因素认证 | ✅ 已实现 |
| 访问控制 | RBAC 权限模型 | ✅ 已实现 |
| 安全审计 | 完整的审计日志 | ✅ 已实现 |
| 数据完整性 | 数据加密 + 校验 | ✅ 已实现 |
| 数据保密性 | 敏感数据加密 | ✅ 已实现 |
| 入侵防范 | 异常检测 + 告警 | ✅ 已实现 |
---
## 6. 安全开发流程
### 6.1 安全编码规范
#### OWASP Top 10 防护
1. **A01:2021 访问控制失效**
- 实施严格的权限检查
- 默认拒绝策略
2. **A02:2021 加密失效**
- 使用强加密算法
- 禁止弱加密
3. **A03:2021 注入**
- 参数化查询
- 输入验证
4. **A04:2021 不安全设计**
- 安全威胁建模
- 安全代码审查
5. **A05:2021 安全配置错误**
- 默认安全配置
- 定期安全扫描
---
### 6.2 安全测试
#### 测试类型
| 测试类型 | 频率 | 工具 |
|----------|------|------|
| 静态代码分析 | 每次提交 | SonarQube |
| 动态安全测试 | 每周 | OWASP ZAP |
| 依赖漏洞扫描 | 每天 | Snyk |
| 渗透测试 | 每季度 | 人工 + 自动 |
| 代码安全审查 | 每次 PR | 人工 |
---
### 6.3 应急响应
#### 安全事件响应流程
```mermaid
graph TD
A[发现安全事件] --> B[确认事件级别]
B --> C{事件级别}
C -->|低| D[记录日志]
C -->|中| E[隔离受影响系统]
C -->|高| F[紧急响应]
E --> G[分析原因]
F --> G
G --> H[修复漏洞]
H --> I[验证修复]
I --> J[恢复服务]
J --> K[事后复盘]
```
---
## 7. 安全检查清单
### 部署前检查
- [ ] 所有接口强制使用 HTTPS
- [ ] 密码使用 Argon2id 加密
- [ ] 敏感数据使用 AES-256 加密
- [ ] JWT 使用 RS256 签名
- [ ] 实现接口限流
- [ ] 实现登录失败限制
- [ ] 实现审计日志
- [ ] 配置安全响应头
- [ ] 关闭不必要的端口
- [ ] 定期更新依赖包
### 运维检查
- [ ] 每日检查异常登录日志
- [ ] 每周检查接口调用异常
- [ ] 每月进行安全扫描
- [ ] 每季度进行渗透测试
- [ ] 每年进行安全审计
---
*本文档持续更新中,如有疑问请联系安全团队。*