fix(supply-api): realign audit event persistence contract

This commit is contained in:
Your Name
2026-04-20 11:50:20 +08:00
parent 1c088e2dd4
commit 319d9e1989
6 changed files with 438 additions and 19 deletions

View File

@@ -24,14 +24,19 @@ var _ AuditStore = (*PostgresAuditStore)(nil)
// Emit 发送审计事件
func (s *PostgresAuditStore) Emit(ctx context.Context, event Event) error {
timestamp := event.CreatedAt
if timestamp.IsZero() {
timestamp = time.Now()
}
// 转换 audit.Event -> model.AuditEvent
modelEvent := &model.AuditEvent{
EventID: event.EventID,
EventName: event.Action,
EventCategory: "",
EventSubCategory: "",
Timestamp: event.CreatedAt,
TimestampMs: event.CreatedAt.UnixMilli(),
Timestamp: timestamp,
TimestampMs: timestamp.UnixMilli(),
RequestID: event.RequestID,
IdempotencyKey: "",
TenantID: event.TenantID,
@@ -39,7 +44,14 @@ func (s *PostgresAuditStore) Emit(ctx context.Context, event Event) error {
ObjectID: event.ObjectID,
Action: event.Action,
ResultCode: event.ResultCode,
Success: event.ResultCode == "OK",
BeforeState: event.BeforeState,
AfterState: event.AfterState,
SourceIP: event.SourceIP,
SecurityFlags: *model.NewSecurityFlags(),
ComplianceTags: []string{},
Version: 1,
CreatedAt: timestamp,
}
return s.repo.Emit(ctx, modelEvent)
}

View File

@@ -0,0 +1,113 @@
//go:build integration
// +build integration
package audit
import (
"context"
"os"
"testing"
"time"
"github.com/jackc/pgx/v5/pgxpool"
auditrepo "lijiaoqiao/supply-api/internal/audit/repository"
)
func getAuditStoreTestDB(t *testing.T) *pgxpool.Pool {
t.Helper()
host := os.Getenv("SUPPLY_API_DB_HOST")
if host == "" {
host = "/var/run/postgresql"
}
port := os.Getenv("SUPPLY_API_DB_PORT")
if port == "" {
port = "5432"
}
user := os.Getenv("SUPPLY_API_DB_USER")
if user == "" {
user = "long"
}
password := os.Getenv("SUPPLY_API_DB_PASSWORD")
dbName := os.Getenv("SUPPLY_API_DB_NAME")
if dbName == "" {
dbName = "supply_test"
}
var dsn string
if host[0] == '/' {
dsn = "postgres://" + user + ":" + password + "@/" + dbName + "?host=" + host + "&sslmode=disable"
} else {
dsn = "postgres://" + user + ":" + password + "@" + host + ":" + port + "/" + dbName + "?sslmode=disable"
}
pool, err := pgxpool.New(context.Background(), dsn)
if err != nil {
t.Skipf("跳过集成测试:无法连接数据库: %v", err)
return nil
}
if err := pool.Ping(context.Background()); err != nil {
pool.Close()
t.Skipf("跳过集成测试:无法 ping 数据库: %v", err)
return nil
}
t.Cleanup(func() {
pool.Close()
})
return pool
}
func TestPostgresAuditStore_QueryWithTotalAndGetByID_Integration(t *testing.T) {
if testing.Short() {
t.Skip("跳过集成测试short mode")
}
pool := getAuditStoreTestDB(t)
if pool == nil {
return
}
store := NewPostgresAuditStore(auditrepo.NewPostgresAuditRepository(pool))
now := time.Now().UTC().Truncate(time.Millisecond)
if err := store.Emit(context.Background(), Event{
TenantID: 3001,
ObjectType: "supply_settlement",
ObjectID: 88,
Action: "cancel",
BeforeState: map[string]any{"status": "pending"},
AfterState: map[string]any{"status": "cancelled"},
RequestID: "req-audit-store-int",
ResultCode: "OK",
SourceIP: "127.0.0.1",
CreatedAt: now,
}); err != nil {
t.Fatalf("emit audit store event failed: %v", err)
}
events, total, err := store.QueryWithTotal(context.Background(), EventFilter{
TenantID: 3001,
Action: "cancel",
Limit: 10,
})
if err != nil {
t.Fatalf("query with total failed: %v", err)
}
if total == 0 || len(events) == 0 {
t.Fatalf("expected queried audit events, total=%d len=%d", total, len(events))
}
found, err := store.GetByID(context.Background(), events[0].EventID)
if err != nil {
t.Fatalf("get audit event by id failed: %v", err)
}
if found.Action != "cancel" {
t.Fatalf("expected action %q, got %q", "cancel", found.Action)
}
if found.BeforeState["status"] != "pending" {
t.Fatalf("expected before status pending, got %v", found.BeforeState["status"])
}
if found.AfterState["status"] != "cancelled" {
t.Fatalf("expected after status cancelled, got %v", found.AfterState["status"])
}
}

View File

@@ -87,6 +87,11 @@ func (r *PostgresAuditRepository) Emit(ctx context.Context, event *model.AuditEv
return fmt.Errorf("failed to marshal security flags: %w", err)
}
complianceTags := event.ComplianceTags
if complianceTags == nil {
complianceTags = []string{}
}
// 序列化状态变更
var beforeStateJSON, afterStateJSON []byte
if event.BeforeState != nil {
@@ -144,7 +149,7 @@ func (r *PostgresAuditRepository) Emit(ctx context.Context, event *model.AuditEv
event.ResultCode, event.ResultMessage, event.Success,
beforeStateJSON, afterStateJSON,
securityFlagsJSON, event.RiskScore,
event.ComplianceTags, event.InvariantRule,
complianceTags, event.InvariantRule,
extensionsJSON,
1, time.Now(),
)

View File

@@ -0,0 +1,160 @@
//go:build integration
// +build integration
package repository
import (
"context"
"os"
"testing"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"lijiaoqiao/supply-api/internal/audit/model"
)
func getAuditRepositoryTestDB(t *testing.T) *pgxpool.Pool {
t.Helper()
host := os.Getenv("SUPPLY_API_DB_HOST")
if host == "" {
host = "/var/run/postgresql"
}
port := os.Getenv("SUPPLY_API_DB_PORT")
if port == "" {
port = "5432"
}
user := os.Getenv("SUPPLY_API_DB_USER")
if user == "" {
user = "long"
}
password := os.Getenv("SUPPLY_API_DB_PASSWORD")
dbName := os.Getenv("SUPPLY_API_DB_NAME")
if dbName == "" {
dbName = "supply_test"
}
var dsn string
if host[0] == '/' {
dsn = "postgres://" + user + ":" + password + "@/" + dbName + "?host=" + host + "&sslmode=disable"
} else {
dsn = "postgres://" + user + ":" + password + "@" + host + ":" + port + "/" + dbName + "?sslmode=disable"
}
pool, err := pgxpool.New(context.Background(), dsn)
if err != nil {
t.Skipf("跳过集成测试:无法连接数据库: %v", err)
return nil
}
if err := pool.Ping(context.Background()); err != nil {
pool.Close()
t.Skipf("跳过集成测试:无法 ping 数据库: %v", err)
return nil
}
t.Cleanup(func() {
pool.Close()
})
return pool
}
func TestPostgresAuditRepository_EmitQueryAndGetByEventID_Integration(t *testing.T) {
if testing.Short() {
t.Skip("跳过集成测试short mode")
}
pool := getAuditRepositoryTestDB(t)
if pool == nil {
return
}
repo := NewPostgresAuditRepository(pool)
now := time.Now().UTC().Truncate(time.Millisecond)
event := &model.AuditEvent{
EventName: "AUD-REPO-INTEGRATION",
EventCategory: model.CategorySECURITY,
EventSubCategory: model.SubCategoryCredIngress,
Timestamp: now,
RequestID: "req-audit-repo-int",
TraceID: "trace-audit-repo-int",
SpanID: "span-audit-repo-int",
IdempotencyKey: "idem-audit-repo-int",
OperatorID: 42,
OperatorType: model.OperatorTypeUser,
OperatorRole: "supplier_admin",
TenantID: 1001,
TenantType: model.TenantTypeSupplier,
ObjectType: "supply_account",
ObjectID: 2002,
Action: "activate",
ActionDetail: "activate account audit trail",
CredentialType: model.CredentialTypePlatformToken,
CredentialID: "ptok-001",
CredentialFingerprint: "fp-001",
SourceType: "api",
SourceIP: "127.0.0.1",
SourceRegion: "cn-hz",
UserAgent: "audit-repo-integration",
TargetType: "http",
TargetEndpoint: "/api/v1/audit/events",
TargetDirect: false,
ResultCode: "OK",
ResultMessage: "emitted",
Success: true,
BeforeState: map[string]any{"status": "pending"},
AfterState: map[string]any{"status": "active"},
SecurityFlags: model.SecurityFlags{
HasCredential: true,
CredentialExposed: false,
Desensitized: true,
Scanned: true,
ScanPassed: true,
ViolationTypes: []string{},
},
RiskScore: 12,
ComplianceTags: []string{"SOC2", "GDPR"},
InvariantRule: "AUD-001",
Extensions: map[string]any{"source": "integration"},
Version: 1,
CreatedAt: now,
}
if err := repo.Emit(context.Background(), event); err != nil {
t.Fatalf("emit audit event failed: %v", err)
}
if event.EventID == "" {
t.Fatal("expected event id after emit")
}
stored, err := repo.GetByEventID(context.Background(), event.EventID)
if err != nil {
t.Fatalf("get by event id failed: %v", err)
}
if stored.TraceID != event.TraceID {
t.Fatalf("expected trace id %q, got %q", event.TraceID, stored.TraceID)
}
if stored.SpanID != event.SpanID {
t.Fatalf("expected span id %q, got %q", event.SpanID, stored.SpanID)
}
if stored.BeforeState["status"] != event.BeforeState["status"] {
t.Fatalf("expected before status %v, got %v", event.BeforeState["status"], stored.BeforeState["status"])
}
if stored.AfterState["status"] != event.AfterState["status"] {
t.Fatalf("expected after status %v, got %v", event.AfterState["status"], stored.AfterState["status"])
}
if len(stored.ComplianceTags) != len(event.ComplianceTags) {
t.Fatalf("expected compliance tags %v, got %v", event.ComplianceTags, stored.ComplianceTags)
}
events, total, err := repo.Query(context.Background(), &EventFilter{
TenantID: 1001,
EventName: event.EventName,
Limit: 10,
})
if err != nil {
t.Fatalf("query audit events failed: %v", err)
}
if total == 0 || len(events) == 0 {
t.Fatalf("expected queried events, total=%d len=%d", total, len(events))
}
}

View File

@@ -0,0 +1,99 @@
-- Forward-only migration: align supply-api audit_events with the repository contract.
-- This migration is intended for databases that were initialized from the older
-- compact audit_events schema in supply-api/sql/postgresql/partition_strategy_v1.sql.
ALTER TABLE IF EXISTS audit_events
ADD COLUMN IF NOT EXISTS trace_id VARCHAR(64) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS span_id VARCHAR(64) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS operator_id BIGINT NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS operator_type VARCHAR(32) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS operator_role VARCHAR(64) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS tenant_type VARCHAR(32) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS action_detail TEXT NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS credential_type VARCHAR(64) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS credential_id VARCHAR(255) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS credential_fingerprint VARCHAR(255) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS source_type VARCHAR(32) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS source_region VARCHAR(100) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS user_agent TEXT NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS target_type VARCHAR(32) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS target_endpoint TEXT NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS target_direct BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS result_message TEXT NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS success BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS before_state JSONB,
ADD COLUMN IF NOT EXISTS after_state JSONB,
ADD COLUMN IF NOT EXISTS security_flags JSONB NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS risk_score INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS compliance_tags TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
ADD COLUMN IF NOT EXISTS invariant_rule VARCHAR(255) NOT NULL DEFAULT '',
ADD COLUMN IF NOT EXISTS extensions JSONB,
ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 1;
UPDATE audit_events
SET
event_category = COALESCE(event_category, ''),
event_sub_category = COALESCE(event_sub_category, ''),
timestamp_ms = COALESCE(timestamp_ms, EXTRACT(EPOCH FROM timestamp) * 1000),
request_id = COALESCE(request_id, ''),
idempotency_key = COALESCE(idempotency_key, ''),
tenant_id = COALESCE(tenant_id, 0),
object_type = COALESCE(object_type, ''),
action = COALESCE(action, ''),
result_code = COALESCE(result_code, ''),
source_ip = COALESCE(source_ip, '');
ALTER TABLE IF EXISTS audit_events
ALTER COLUMN event_category SET DEFAULT '',
ALTER COLUMN event_category SET NOT NULL,
ALTER COLUMN event_sub_category SET DEFAULT '',
ALTER COLUMN event_sub_category SET NOT NULL,
ALTER COLUMN timestamp_ms SET DEFAULT 0,
ALTER COLUMN timestamp_ms SET NOT NULL,
ALTER COLUMN request_id SET DEFAULT '',
ALTER COLUMN request_id SET NOT NULL,
ALTER COLUMN idempotency_key SET DEFAULT '',
ALTER COLUMN idempotency_key SET NOT NULL,
ALTER COLUMN tenant_id SET DEFAULT 0,
ALTER COLUMN tenant_id SET NOT NULL,
ALTER COLUMN object_type SET DEFAULT '',
ALTER COLUMN object_type SET NOT NULL,
ALTER COLUMN result_code SET DEFAULT '',
ALTER COLUMN result_code SET NOT NULL,
ALTER COLUMN source_ip SET DEFAULT '',
ALTER COLUMN source_ip SET NOT NULL;
DO $$
DECLARE
current_type TEXT;
BEGIN
SELECT data_type
INTO current_type
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'audit_events'
AND column_name = 'object_id';
IF current_type IS NULL THEN
ALTER TABLE audit_events
ADD COLUMN object_id BIGINT NOT NULL DEFAULT 0;
ELSIF current_type <> 'bigint' THEN
UPDATE audit_events
SET object_id = '0'
WHERE object_id IS NULL
OR object_id = ''
OR object_id !~ '^[0-9]+$';
ALTER TABLE audit_events
ALTER COLUMN object_id TYPE BIGINT USING object_id::BIGINT;
END IF;
ALTER TABLE audit_events
ALTER COLUMN object_id SET DEFAULT 0,
ALTER COLUMN object_id SET NOT NULL;
END $$;
CREATE INDEX IF NOT EXISTS idx_audit_events_event_id ON audit_events(event_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_event_name ON audit_events(event_name);
CREATE INDEX IF NOT EXISTS idx_audit_events_trace_id ON audit_events(trace_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_idempotency_key ON audit_events(idempotency_key);

View File

@@ -8,18 +8,44 @@ CREATE TABLE IF NOT EXISTS audit_events (
id BIGSERIAL,
event_id VARCHAR(100) NOT NULL,
event_name VARCHAR(100) NOT NULL,
event_category VARCHAR(50),
event_sub_category VARCHAR(50),
event_category VARCHAR(50) NOT NULL DEFAULT '',
event_sub_category VARCHAR(50) NOT NULL DEFAULT '',
timestamp TIMESTAMPTZ NOT NULL,
timestamp_ms BIGINT NOT NULL,
request_id VARCHAR(100),
idempotency_key VARCHAR(128),
tenant_id BIGINT,
object_type VARCHAR(100),
object_id VARCHAR(100),
timestamp_ms BIGINT NOT NULL DEFAULT 0,
request_id VARCHAR(100) NOT NULL DEFAULT '',
trace_id VARCHAR(64) NOT NULL DEFAULT '',
span_id VARCHAR(64) NOT NULL DEFAULT '',
idempotency_key VARCHAR(128) NOT NULL DEFAULT '',
operator_id BIGINT NOT NULL DEFAULT 0,
operator_type VARCHAR(32) NOT NULL DEFAULT '',
operator_role VARCHAR(64) NOT NULL DEFAULT '',
tenant_id BIGINT NOT NULL DEFAULT 0,
tenant_type VARCHAR(32) NOT NULL DEFAULT '',
object_type VARCHAR(100) NOT NULL DEFAULT '',
object_id BIGINT NOT NULL DEFAULT 0,
action VARCHAR(100) NOT NULL,
result_code VARCHAR(50),
source_ip VARCHAR(50),
action_detail TEXT NOT NULL DEFAULT '',
credential_type VARCHAR(64) NOT NULL DEFAULT '',
credential_id VARCHAR(255) NOT NULL DEFAULT '',
credential_fingerprint VARCHAR(255) NOT NULL DEFAULT '',
source_type VARCHAR(32) NOT NULL DEFAULT '',
source_ip VARCHAR(50) NOT NULL DEFAULT '',
source_region VARCHAR(100) NOT NULL DEFAULT '',
user_agent TEXT NOT NULL DEFAULT '',
target_type VARCHAR(32) NOT NULL DEFAULT '',
target_endpoint TEXT NOT NULL DEFAULT '',
target_direct BOOLEAN NOT NULL DEFAULT FALSE,
result_code VARCHAR(50) NOT NULL DEFAULT '',
result_message TEXT NOT NULL DEFAULT '',
success BOOLEAN NOT NULL DEFAULT FALSE,
before_state JSONB,
after_state JSONB,
security_flags JSONB NOT NULL DEFAULT '{}'::jsonb,
risk_score INTEGER NOT NULL DEFAULT 0,
compliance_tags TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
invariant_rule VARCHAR(255) NOT NULL DEFAULT '',
extensions JSONB,
version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, timestamp)
) PARTITION BY RANGE (timestamp);
@@ -211,7 +237,11 @@ END $$;
-- 在父表上创建索引(会自动继承到分区)
CREATE INDEX IF NOT EXISTS idx_audit_events_tenant_id ON audit_events(tenant_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_event_id ON audit_events(event_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_event_name ON audit_events(event_name);
CREATE INDEX IF NOT EXISTS idx_audit_events_request_id ON audit_events(request_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_trace_id ON audit_events(trace_id);
CREATE INDEX IF NOT EXISTS idx_audit_events_idempotency_key ON audit_events(idempotency_key);
CREATE INDEX IF NOT EXISTS idx_audit_events_created_at ON audit_events(created_at);
CREATE INDEX IF NOT EXISTS idx_audit_events_object ON audit_events(object_type, object_id);