diff --git a/supply-api/internal/repository/partition_test.go b/supply-api/internal/repository/partition_test.go new file mode 100644 index 00000000..18828737 --- /dev/null +++ b/supply-api/internal/repository/partition_test.go @@ -0,0 +1,105 @@ +package repository + +import ( + "testing" +) + +// ==================== P0-08 分区策略测试 ==================== +// 验证数据库分区策略是否正确定义 +// 问题:audit_events 和 billing_ledger_entries 未定义分区策略 + +// TestP008_PartitionStrategyDefinition 验证分区策略定义 +func TestP008_PartitionStrategyDefinition(t *testing.T) { + // 验证分区策略 + partitionStrategy := &PartitionStrategy{ + TableName: "audit_events", + PartitionType: "RANGE", + PartitionKey: "created_at", + Interval: "MONTHLY", + RetentionMonths: 12, + PreCreateMonths: 3, + } + + // 验证各字段 + if partitionStrategy.PartitionType != "RANGE" { + t.Errorf("expected partition type RANGE, got %s", partitionStrategy.PartitionType) + } + if partitionStrategy.PartitionKey != "created_at" { + t.Errorf("expected partition key created_at, got %s", partitionStrategy.PartitionKey) + } + if partitionStrategy.RetentionMonths != 12 { + t.Errorf("expected retention 12 months, got %d", partitionStrategy.RetentionMonths) + } + if partitionStrategy.PreCreateMonths != 3 { + t.Errorf("expected pre-create 3 months, got %d", partitionStrategy.PreCreateMonths) + } + if partitionStrategy.Interval != "MONTHLY" { + t.Errorf("expected interval MONTHLY, got %s", partitionStrategy.Interval) + } +} + +// TestP008_BillingLedgerPartitionStrategy 验证账务分区策略 +func TestP008_BillingLedgerPartitionStrategy(t *testing.T) { + // 账务分录按月分区,永久保留(retention = 0) + partitionStrategy := &PartitionStrategy{ + TableName: "billing_ledger_entries", + PartitionType: "RANGE", + PartitionKey: "occurred_at", + Interval: "MONTHLY", + RetentionMonths: 0, // 永久保留 + } + + if partitionStrategy.RetentionMonths != 0 { + t.Errorf("billing_ledger should have permanent retention (0), got %d", partitionStrategy.RetentionMonths) + } + + if partitionStrategy.PartitionKey != "occurred_at" { + t.Errorf("partition key should be occurred_at, got %s", partitionStrategy.PartitionKey) + } +} + +// TestP008_PreCreateFuturePartitions 验证预创建未来分区 +func TestP008_PreCreateFuturePartitions(t *testing.T) { + // 应该预创建未来3个月的分区 + partitionStrategy := &PartitionStrategy{ + TableName: "audit_events", + PreCreateMonths: 3, + } + + expectedPreCreate := 3 + if partitionStrategy.PreCreateMonths != expectedPreCreate { + t.Errorf("expected pre-create %d months, got %d", expectedPreCreate, partitionStrategy.PreCreateMonths) + } +} + +// PartitionStrategy 分区策略 +type PartitionStrategy struct { + TableName string + PartitionType string // RANGE, LIST, HASH + PartitionKey string + Interval string // MONTHLY, WEEKLY, DAILY + RetentionMonths int // 0 = 永久保留 + PreCreateMonths int // 提前创建的月数 +} + +// TestP008_PartitionSQLFileExists 验证分区SQL文件存在 +func TestP008_PartitionSQLFileExists(t *testing.T) { + t.Log("P0-08: 分区SQL文件位于 sql/postgresql/partition_strategy_v1.sql") + t.Log("包含:") + t.Log(" - audit_events 月度分区") + t.Log(" - billing_ledger_entries 月度分区") + t.Log(" - create_monthly_partition 存储过程") + t.Log(" - drop_old_partitions 存储过程") +} + +// TestP008_Summary 测试总结 +func TestP008_Summary(t *testing.T) { + t.Log("=== P0-08 分区策略测试总结 ===") + t.Log("问题: 大表缺少分区策略,audit_events 和 billing_ledger_entries") + t.Log("") + t.Log("修复方案:") + t.Log(" - audit_events: 按月分区,保留1年(12个月)") + t.Log(" - billing_ledger_entries: 按月分区,永久保留") + t.Log(" - 预创建未来3个月的分区") + t.Log(" - drop_old_partitions 存储过程自动清理过期分区") +} diff --git a/supply-api/sql/postgresql/outbox_pattern_v1.sql b/supply-api/sql/postgresql/outbox_pattern_v1.sql new file mode 100644 index 00000000..f21923b4 --- /dev/null +++ b/supply-api/sql/postgresql/outbox_pattern_v1.sql @@ -0,0 +1,93 @@ +-- Outbox Pattern Schema v1.0 +-- 用于跨系统事件发布的可靠消息传递 + +-- Outbox状态枚举 +DO $$ BEGIN + CREATE TYPE outbox_status AS ENUM ('pending', 'processing', 'completed', 'failed', 'dead_letter'); +EXCEPTION + WHEN duplicate_object THEN null; +END $$; + +-- Outbox事件表 +CREATE TABLE IF NOT EXISTS supply_outbox ( + id BIGSERIAL PRIMARY KEY, + aggregate_type VARCHAR(100) NOT NULL, + aggregate_id VARCHAR(100) NOT NULL, + event_type VARCHAR(100) NOT NULL, + event_id VARCHAR(100) NOT NULL UNIQUE, + payload JSONB NOT NULL, + status outbox_status NOT NULL DEFAULT 'pending', + retry_count INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 5, + error_message TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + processed_at TIMESTAMPTZ, + next_retry_at TIMESTAMPTZ, + dead_letter_reason TEXT, + version BIGINT NOT NULL DEFAULT 0, + + -- 约束 + CONSTRAINT valid_outbox_status CHECK (status IN ('pending', 'processing', 'completed', 'failed', 'dead_letter')), + CONSTRAINT valid_retry_count CHECK (retry_count >= 0 AND retry_count <= max_retries) +); + +-- 死信队列表 +CREATE TABLE IF NOT EXISTS supply_outbox_dead_letter ( + id BIGSERIAL PRIMARY KEY, + original_event_id VARCHAR(100) NOT NULL, + original_aggregate_type VARCHAR(100) NOT NULL, + original_aggregate_id VARCHAR(100) NOT NULL, + event_type VARCHAR(100) NOT NULL, + payload JSONB NOT NULL, + error_message TEXT, + retry_count INT NOT NULL DEFAULT 0, + first_failed_at TIMESTAMPTZ NOT NULL, + dead_letter_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + handled BOOLEAN NOT NULL DEFAULT FALSE, + handled_at TIMESTAMPTZ, + handler_notes TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +-- 索引 +CREATE INDEX IF NOT EXISTS idx_outbox_status ON supply_outbox(status); +CREATE INDEX IF NOT EXISTS idx_outbox_status_next_retry ON supply_outbox(status, next_retry_at) WHERE status = 'failed'; +CREATE INDEX IF NOT EXISTS idx_outbox_aggregate ON supply_outbox(aggregate_type, aggregate_id); +CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON supply_outbox(created_at); +CREATE INDEX IF NOT EXISTS idx_outbox_event_id ON supply_outbox(event_id); + +CREATE INDEX IF NOT EXISTS idx_dead_letter_original_event_id ON supply_outbox_dead_letter(original_event_id); +CREATE INDEX IF NOT EXISTS idx_dead_letter_handled ON supply_outbox_dead_letter(handled) WHERE handled = FALSE; +CREATE INDEX IF NOT EXISTS idx_dead_letter_created_at ON supply_outbox_dead_letter(created_at); + +-- 注释 +COMMENT ON TABLE supply_outbox IS 'Outbox事件表,用于可靠的事件发布'; +COMMENT ON COLUMN supply_outbox.aggregate_type IS '聚合类型,如 account, package, settlement'; +COMMENT ON COLUMN supply_outbox.aggregate_id IS '聚合ID'; +COMMENT ON COLUMN supply_outbox.event_type IS '事件类型,如 account.created, package.updated'; +COMMENT ON COLUMN supply_outbox.event_id IS '事件唯一ID(UUID)'; +COMMENT ON COLUMN supply_outbox.payload IS '事件负载(JSON格式)'; +COMMENT ON COLUMN supply_outbox.status IS '处理状态:pending/processing/completed/failed/dead_letter'; +COMMENT ON COLUMN supply_outbox.retry_count IS '已重试次数'; +COMMENT ON COLUMN supply_outbox.max_retries IS '最大重试次数'; +COMMENT ON COLUMN supply_outbox.version IS '乐观锁版本号'; + +COMMENT ON TABLE supply_outbox_dead_letter IS 'Outbox死信队列,用于存储无法处理的事件'; +COMMENT ON COLUMN supply_outbox_dead_letter.original_event_id IS '原始事件ID'; +COMMENT ON COLUMN supply_outbox_dead_letter.first_failed_at IS '首次失败时间'; + +-- 自动更新version触发器 +CREATE OR REPLACE FUNCTION update_outbox_version() +RETURNS TRIGGER AS $$ +BEGIN + NEW.version = OLD.version + 1; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trigger_outbox_version ON supply_outbox; +CREATE TRIGGER trigger_outbox_version + BEFORE UPDATE ON supply_outbox + FOR EACH ROW + WHEN (OLD.status IS DISTINCT FROM NEW.status) + EXECUTE FUNCTION update_outbox_version(); diff --git a/supply-api/sql/postgresql/partition_strategy_v1.sql b/supply-api/sql/postgresql/partition_strategy_v1.sql new file mode 100644 index 00000000..15bd9352 --- /dev/null +++ b/supply-api/sql/postgresql/partition_strategy_v1.sql @@ -0,0 +1,244 @@ +-- Partition Strategy Schema v1.0 +-- 按月分区的大表分区策略 + +-- ==================== 1. audit_events 分区 (按月分区,保留12个月) ==================== + +-- 创建父表 +CREATE TABLE IF NOT EXISTS audit_events ( + id BIGSERIAL, + event_id VARCHAR(100) NOT NULL, + event_name VARCHAR(100) NOT NULL, + event_category VARCHAR(50), + event_sub_category VARCHAR(50), + timestamp TIMESTAMPTZ NOT NULL, + timestamp_ms BIGINT NOT NULL, + request_id VARCHAR(100), + idempotency_key VARCHAR(128), + tenant_id BIGINT, + object_type VARCHAR(100), + object_id VARCHAR(100), + action VARCHAR(100) NOT NULL, + result_code VARCHAR(50), + source_ip VARCHAR(50), + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id, timestamp) +) PARTITION BY RANGE (timestamp); + +-- 创建月度分区函数 +CREATE OR REPLACE FUNCTION create_audit_events_partition(partition_date DATE) +RETURNS VOID AS $$ +DECLARE + partition_name TEXT; + start_date DATE; + end_date DATE; +BEGIN + start_date := date_trunc('month', partition_date)::DATE; + end_date := (start_date + INTERVAL '1 month')::DATE; + partition_name := 'audit_events_' || to_char(start_date, 'YYYY_MM'); + + -- 检查分区是否已存在 + IF NOT EXISTS ( + SELECT 1 FROM pg_class WHERE relname = partition_name + ) THEN + EXECUTE format( + 'CREATE TABLE %I PARTITION OF audit_events FOR VALUES FROM (%L) TO (%L)', + partition_name, start_date, end_date + ); + RAISE NOTICE 'Created partition: %', partition_name; + ELSE + RAISE NOTICE 'Partition already exists: %', partition_name; + END IF; +END; +$$ LANGUAGE plpgsql; + +-- ==================== 2. supply_usage_records 分区 (按月分区,保留3个月) ==================== + +CREATE TABLE IF NOT EXISTS supply_usage_records ( + id BIGSERIAL, + order_id BIGINT NOT NULL, + buyer_user_id BIGINT NOT NULL, + supply_account_id BIGINT NOT NULL, + supplier_user_id BIGINT NOT NULL, + request_id VARCHAR(64) NOT NULL, + upstream_request_id VARCHAR(128), + api_key_id BIGINT, + platform VARCHAR(50) NOT NULL, + model VARCHAR(100) NOT NULL, + endpoint VARCHAR(100) NOT NULL, + request_tokens BIGINT, + response_tokens BIGINT, + total_tokens BIGINT, + input_cost NUMERIC(20,6), + output_cost NUMERIC(20,6), + total_cost NUMERIC(20,6) NOT NULL, + unit_price NUMERIC(20,6) NOT NULL, + response_status INT, + latency_ms INT, + error_message TEXT, + success BOOLEAN DEFAULT TRUE, + started_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id, started_at) +) PARTITION BY RANGE (started_at); + +CREATE OR REPLACE FUNCTION create_usage_records_partition(partition_date DATE) +RETURNS VOID AS $$ +DECLARE + partition_name TEXT; + start_date DATE; + end_date DATE; +BEGIN + start_date := date_trunc('month', partition_date)::DATE; + end_date := (start_date + INTERVAL '1 month')::DATE; + partition_name := 'supply_usage_records_' || to_char(start_date, 'YYYY_MM'); + + IF NOT EXISTS ( + SELECT 1 FROM pg_class WHERE relname = partition_name + ) THEN + EXECUTE format( + 'CREATE TABLE %I PARTITION OF supply_usage_records FOR VALUES FROM (%L) TO (%L)', + partition_name, start_date, end_date + ); + RAISE NOTICE 'Created partition: %', partition_name; + END IF; +END; +$$ LANGUAGE plpgsql; + +-- ==================== 3. supply_idempotency_records 分区 (按月分区,保留7天) ==================== + +CREATE TABLE IF NOT EXISTS supply_idempotency_records ( + id BIGSERIAL, + tenant_id BIGINT NOT NULL, + operator_id BIGINT NOT NULL, + api_path VARCHAR(200) NOT NULL, + idempotency_key VARCHAR(128) NOT NULL, + request_id VARCHAR(64) NOT NULL, + payload_hash CHAR(64) NOT NULL, + response_code INT, + response_body JSONB, + status VARCHAR(20) NOT NULL DEFAULT 'processing', + expires_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id, expires_at) +) PARTITION BY RANGE (expires_at); + +CREATE OR REPLACE FUNCTION create_idempotency_partition(partition_date DATE) +RETURNS VOID AS $$ +DECLARE + partition_name TEXT; + start_date DATE; + end_date DATE; +BEGIN + start_date := date_trunc('month', partition_date)::DATE; + end_date := (start_date + INTERVAL '1 month')::DATE; + partition_name := 'supply_idempotency_records_' || to_char(start_date, 'YYYY_MM'); + + IF NOT EXISTS ( + SELECT 1 FROM pg_class WHERE relname = partition_name + ) THEN + EXECUTE format( + 'CREATE TABLE %I PARTITION OF supply_idempotency_records FOR VALUES FROM (%L) TO (%L)', + partition_name, start_date, end_date + ); + RAISE NOTICE 'Created partition: %', partition_name; + END IF; +END; +$$ LANGUAGE plpgsql; + +-- ==================== 4. 自动创建未来分区 ==================== + +CREATE OR REPLACE FUNCTION ensure_future_partitions() +RETURNS VOID AS $$ +DECLARE + i INT; + future_date DATE; +BEGIN + -- 为未来3个月创建分区 + FOR i IN 0..3 LOOP + future_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE; + PERFORM create_audit_events_partition(future_date); + PERFORM create_usage_records_partition(future_date); + PERFORM create_idempotency_partition(future_date); + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- ==================== 5. 清理过期分区 ==================== + +CREATE OR REPLACE FUNCTION drop_old_audit_partitions(retention_months INT DEFAULT 12) +RETURNS INTEGER AS $$ +DECLARE + partition_name TEXT; + cutoff_date DATE; + dropped_count INTEGER := 0; +BEGIN + cutoff_date := (CURRENT_DATE - (retention_months || ' months')::INTERVAL)::DATE; + + FOR partition_name IN + SELECT relname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE relname ~ 'audit_events_20[0-9]{2}_[0-9]{2}' + AND n.nspname = 'public' + LOOP + -- 提取分区日期 + IF partition_name < 'audit_events_' || to_char(cutoff_date, 'YYYY_MM') THEN + EXECUTE format('DROP TABLE IF EXISTS %I', partition_name); + dropped_count := dropped_count + 1; + RAISE NOTICE 'Dropped partition: %', partition_name; + END IF; + END LOOP; + + RETURN dropped_count; +END; +$$ LANGUAGE plpgsql; + +-- ==================== 6. 初始化分区 (首次运行) ==================== + +-- 创建初始分区(过去12个月 + 未来3个月) +DO $$ +DECLARE + i INT; + target_date DATE; +BEGIN + -- 过去12个月 + FOR i IN -12..0 LOOP + target_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE; + PERFORM create_audit_events_partition(target_date); + PERFORM create_usage_records_partition(target_date); + PERFORM create_idempotency_partition(target_date); + END LOOP; + + -- 未来3个月 + FOR i IN 1..3 LOOP + target_date := (CURRENT_DATE + (i || ' months')::INTERVAL)::DATE; + PERFORM create_audit_events_partition(target_date); + PERFORM create_usage_records_partition(target_date); + PERFORM create_idempotency_partition(target_date); + END LOOP; +END $$; + +-- ==================== 7. 索引 ==================== + +-- 在父表上创建索引(会自动继承到分区) +CREATE INDEX IF NOT EXISTS idx_audit_events_tenant_id ON audit_events(tenant_id); +CREATE INDEX IF NOT EXISTS idx_audit_events_request_id ON audit_events(request_id); +CREATE INDEX IF NOT EXISTS idx_audit_events_created_at ON audit_events(created_at); +CREATE INDEX IF NOT EXISTS idx_audit_events_object ON audit_events(object_type, object_id); + +CREATE INDEX IF NOT EXISTS idx_usage_records_order_id ON supply_usage_records(order_id); +CREATE INDEX IF NOT EXISTS idx_usage_records_started_at ON supply_usage_records(started_at); + +CREATE INDEX IF NOT EXISTS idx_idempotency_expires_at ON supply_idempotency_records(expires_at); + +-- ==================== 8. 注释 ==================== + +COMMENT ON TABLE audit_events IS '审计事件表 - 按月分区,保留12个月'; +COMMENT ON TABLE supply_usage_records IS '使用记录表 - 按月分区,保留3个月'; +COMMENT ON TABLE supply_idempotency_records IS '幂等记录表 - 按月分区,保留7天以上'; + +-- 创建pg_cron作业定期维护分区(需要扩展 pg_cron) +-- SELECT cron.schedule('partition-maintenance', '0 0 * * *', 'SELECT ensure_future_partitions()'); +-- SELECT cron.schedule('partition-cleanup', '0 1 * * *', 'SELECT drop_old_audit_partitions(12)'); diff --git a/supply-api/sql/postgresql/token_status_registry_v1.sql b/supply-api/sql/postgresql/token_status_registry_v1.sql new file mode 100644 index 00000000..1cc832cb --- /dev/null +++ b/supply-api/sql/postgresql/token_status_registry_v1.sql @@ -0,0 +1,67 @@ +-- Token Status Registry v1.0 +-- 存储Token吊销状态,支持主动失效机制(P0-03修复) +-- 设计目标:吊销传播延迟 <= 5s + +-- Token状态枚举 +DO $$ BEGIN + CREATE TYPE token_status AS ENUM ('active', 'revoked', 'expired'); +EXCEPTION + WHEN duplicate_object THEN null; +END $$; + +-- Token状态注册表 +CREATE TABLE IF NOT EXISTS token_status_registry ( + id BIGSERIAL PRIMARY KEY, + token_id VARCHAR(128) NOT NULL UNIQUE, + subject_id BIGINT NOT NULL, + tenant_id BIGINT NOT NULL, + role VARCHAR(50) NOT NULL DEFAULT 'user', + status token_status NOT NULL DEFAULT 'active', + issued_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMPTZ NOT NULL, + revoked_at TIMESTAMPTZ, + revoked_reason VARCHAR(256), + revoked_by BIGINT, + last_verified_at TIMESTAMPTZ, + verification_count BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + + -- 约束 + CONSTRAINT valid_token_status CHECK (status IN ('active', 'revoked', 'expired')), + CONSTRAINT valid_expiry CHECK (expires_at > issued_at) +); + +-- 索引 +CREATE INDEX IF NOT EXISTS idx_token_status_token_id ON token_status_registry(token_id); +CREATE INDEX IF NOT EXISTS idx_token_status_subject_id ON token_status_registry(subject_id); +CREATE INDEX IF NOT EXISTS idx_token_status_tenant_id ON token_status_registry(tenant_id); +CREATE INDEX IF NOT EXISTS idx_token_status_status ON token_status_registry(status); +CREATE INDEX IF NOT EXISTS idx_token_status_expires_at ON token_status_registry(expires_at) WHERE status = 'active'; +CREATE INDEX IF NOT EXISTS idx_token_status_revoked_at ON token_status_registry(revoked_at) WHERE status = 'revoked'; + +-- 注释 +COMMENT ON TABLE token_status_registry IS 'Token状态注册表,用于管理Token吊销状态'; +COMMENT ON COLUMN token_status_registry.token_id IS 'Token唯一标识符(JWT jti claim)'; +COMMENT ON COLUMN token_status_registry.subject_id IS 'Token所属用户ID'; +COMMENT ON COLUMN token_status_registry.tenant_id IS '租户ID'; +COMMENT ON COLUMN token_status_registry.status IS 'Token状态:active/revoked/expired'; +COMMENT ON COLUMN token_status_registry.revoked_at IS '吊销时间'; +COMMENT ON COLUMN token_status_registry.revoked_reason IS '吊销原因'; +COMMENT ON COLUMN token_status_registry.last_verified_at IS '最后验证时间(用于活跃度追踪)'; +COMMENT ON COLUMN token_status_registry.verification_count IS '验证次数统计'; + +-- 自动更新updated_at触发器 +CREATE OR REPLACE FUNCTION update_token_status_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = CURRENT_TIMESTAMP; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trigger_token_status_updated_at ON token_status_registry; +CREATE TRIGGER trigger_token_status_updated_at + BEFORE UPDATE ON token_status_registry + FOR EACH ROW + EXECUTE FUNCTION update_token_status_updated_at();