From 3292e1dc38937c0f67d59b447b93bf40a0771c59 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 17 Apr 2026 18:09:06 +0800 Subject: [PATCH] feat(token-runtime): add postgres-backed runtime and audit stores --- platform-token-runtime/README.md | 12 +- .../cmd/platform-token-runtime/main.go | 16 +- .../cmd/platform-token-runtime/main_test.go | 24 ++ platform-token-runtime/go.mod | 11 + platform-token-runtime/go.sum | 28 +++ .../internal/app/bootstrap.go | 25 ++ .../internal/app/bootstrap_test.go | 68 ++++++ .../auth/service/postgres_audit_store.go | 195 ++++++++++++++++ .../auth/service/postgres_audit_store_test.go | 124 ++++++++++ .../auth/service/postgres_runtime_store.go | 221 ++++++++++++++++++ .../service/postgres_runtime_store_test.go | 147 ++++++++++++ sql/postgresql/token_runtime_schema_v1.sql | 4 + 12 files changed, 872 insertions(+), 3 deletions(-) create mode 100644 platform-token-runtime/go.sum create mode 100644 platform-token-runtime/internal/auth/service/postgres_audit_store.go create mode 100644 platform-token-runtime/internal/auth/service/postgres_audit_store_test.go create mode 100644 platform-token-runtime/internal/auth/service/postgres_runtime_store.go create mode 100644 platform-token-runtime/internal/auth/service/postgres_runtime_store_test.go diff --git a/platform-token-runtime/README.md b/platform-token-runtime/README.md index 2ce5df7f..274065f6 100644 --- a/platform-token-runtime/README.md +++ b/platform-token-runtime/README.md @@ -7,7 +7,7 @@ - 服务入口是 `cmd/platform-token-runtime/main.go`,装配逻辑收口在 `internal/app/bootstrap.go`。 - 当前可用接口包括 `issue`、`refresh`、`revoke`、`introspect`、`audit-events`。 - `TOKEN_RUNTIME_ENV=dev` 且未显式注入 store 时,bootstrap 会自动使用内存 runtime store 与内存 audit store。 -- `TOKEN_RUNTIME_ENV=staging` 或 `TOKEN_RUNTIME_ENV=prod` 时,必须显式注入 runtime store 与 audit store;当前仓库仍未提供持久化 store,因此这两种环境会快速失败,而不是伪装成可上线服务。 +- `TOKEN_RUNTIME_ENV=staging` 或 `TOKEN_RUNTIME_ENV=prod` 时,支持通过 `TOKEN_RUNTIME_DATABASE_URL` 自动装配 PostgreSQL runtime store 与 audit store;未提供 DSN 时仍会快速失败,而不是回退到内存实现。 - `audit-events` 当前始终保持可查询接口语义;默认内存 audit store 会返回真实事件,未提供查询能力的自定义 emitter 会返回空结果而不是 `501` 占位响应。 ## 设计边界 @@ -30,6 +30,14 @@ export TOKEN_RUNTIME_ADDR=":18081" export TOKEN_RUNTIME_ENV="dev" ``` +PostgreSQL 模式: + +```bash +export TOKEN_RUNTIME_ENV="prod" +export TOKEN_RUNTIME_DATABASE_URL="postgres://postgres:secret@127.0.0.1:5432/token_runtime?sslmode=disable" +go run ./cmd/platform-token-runtime +``` + ## 验证命令 模块级验证: @@ -52,3 +60,5 @@ bash scripts/ci/repo_integrity_check.sh - `internal/httpapi/token_api.go`:HTTP 接口与审计查询输出。 - `internal/auth/service/runtime_store.go`:内存 runtime store。 - `internal/auth/service/audit_store.go`:内存 audit store 与审计查询。 +- `internal/auth/service/postgres_runtime_store.go`:PostgreSQL runtime store。 +- `internal/auth/service/postgres_audit_store.go`:PostgreSQL audit store。 diff --git a/platform-token-runtime/cmd/platform-token-runtime/main.go b/platform-token-runtime/cmd/platform-token-runtime/main.go index 60e42971..595c7e60 100644 --- a/platform-token-runtime/cmd/platform-token-runtime/main.go +++ b/platform-token-runtime/cmd/platform-token-runtime/main.go @@ -14,11 +14,23 @@ import ( ) func main() { - srv, err := app.BuildServer(app.Config{ + cfg := app.Config{ Addr: envOrDefault("TOKEN_RUNTIME_ADDR", ":18081"), Env: strings.ToLower(envOrDefault("TOKEN_RUNTIME_ENV", "dev")), Now: time.Now, - }) + } + + if databaseURL := strings.TrimSpace(os.Getenv("TOKEN_RUNTIME_DATABASE_URL")); databaseURL != "" { + runtimeStore, auditStore, closeFn, err := app.BuildPostgresStores(context.Background(), databaseURL) + if err != nil { + log.Fatalf("platform-token-runtime postgres bootstrap failed: %v", err) + } + cfg.RuntimeStore = runtimeStore + cfg.AuditStore = auditStore + defer closeFn() + } + + srv, err := app.BuildServer(cfg) if err != nil { log.Fatalf("platform-token-runtime bootstrap failed: %v", err) } diff --git a/platform-token-runtime/cmd/platform-token-runtime/main_test.go b/platform-token-runtime/cmd/platform-token-runtime/main_test.go index b42aac2c..2b006177 100644 --- a/platform-token-runtime/cmd/platform-token-runtime/main_test.go +++ b/platform-token-runtime/cmd/platform-token-runtime/main_test.go @@ -32,6 +32,30 @@ func TestMain_ProdRejectsInMemoryRuntime(t *testing.T) { } } +func TestMain_ProdWithDatabaseURLDoesNotFailForMissingStore(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestMainHelperProcess") + cmd.Env = append(os.Environ(), + "GO_WANT_HELPER_PROCESS=1", + "TOKEN_RUNTIME_ENV=prod", + "TOKEN_RUNTIME_ADDR=127.0.0.1:0", + "TOKEN_RUNTIME_DATABASE_URL=postgres://postgres:secret@127.0.0.1:1/token_runtime?sslmode=disable", + ) + output, err := cmd.CombinedOutput() + + if ctx.Err() == context.DeadlineExceeded { + t.Fatalf("expected prod startup with database url to fail fast, but process timed out. output=%s", string(output)) + } + if err == nil { + t.Fatalf("expected prod startup to fail because database is unavailable. output=%s", string(output)) + } + if strings.Contains(string(output), "runtime store is required") { + t.Fatalf("expected startup to attempt postgres store wiring before missing-store check, got: %s", string(output)) + } +} + func TestMainHelperProcess(t *testing.T) { if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { return diff --git a/platform-token-runtime/go.mod b/platform-token-runtime/go.mod index 1a456727..c5983b99 100644 --- a/platform-token-runtime/go.mod +++ b/platform-token-runtime/go.mod @@ -1,3 +1,14 @@ module lijiaoqiao/platform-token-runtime go 1.22 + +require github.com/jackc/pgx/v5 v5.5.1 + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + golang.org/x/crypto v0.9.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/text v0.9.0 // indirect +) diff --git a/platform-token-runtime/go.sum b/platform-token-runtime/go.sum new file mode 100644 index 00000000..eb5fae0e --- /dev/null +++ b/platform-token-runtime/go.sum @@ -0,0 +1,28 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI= +github.com/jackc/pgx/v5 v5.5.1/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= +golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/platform-token-runtime/internal/app/bootstrap.go b/platform-token-runtime/internal/app/bootstrap.go index 7f773e43..7675fae4 100644 --- a/platform-token-runtime/internal/app/bootstrap.go +++ b/platform-token-runtime/internal/app/bootstrap.go @@ -1,11 +1,14 @@ package app import ( + "context" "fmt" "net/http" "strings" "time" + "github.com/jackc/pgx/v5/pgxpool" + "lijiaoqiao/platform-token-runtime/internal/auth/service" "lijiaoqiao/platform-token-runtime/internal/httpapi" ) @@ -18,6 +21,28 @@ type Config struct { Now func() time.Time } +var newPostgresStoreBundle = func(ctx context.Context, databaseURL string) (service.RuntimeStore, service.AuditStore, func(), error) { + if ctx == nil { + ctx = context.Background() + } + pool, err := pgxpool.New(ctx, strings.TrimSpace(databaseURL)) + if err != nil { + return nil, nil, nil, err + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, nil, nil, err + } + return service.NewPostgresRuntimeStore(pool), service.NewPostgresAuditStore(pool), pool.Close, nil +} + +func BuildPostgresStores(ctx context.Context, databaseURL string) (service.RuntimeStore, service.AuditStore, func(), error) { + if strings.TrimSpace(databaseURL) == "" { + return nil, nil, nil, fmt.Errorf("token runtime database url is required") + } + return newPostgresStoreBundle(ctx, databaseURL) +} + func BuildRuntime(cfg Config) (*service.InMemoryTokenRuntime, service.AuditStore, error) { now := cfg.Now if now == nil { diff --git a/platform-token-runtime/internal/app/bootstrap_test.go b/platform-token-runtime/internal/app/bootstrap_test.go index 14d480f6..bb4284d9 100644 --- a/platform-token-runtime/internal/app/bootstrap_test.go +++ b/platform-token-runtime/internal/app/bootstrap_test.go @@ -1,6 +1,7 @@ package app import ( + "context" "net/http" "net/http/httptest" "strings" @@ -76,3 +77,70 @@ func TestBuildServer_HealthEndpoint(t *testing.T) { t.Fatalf("unexpected body: %s", rec.Body.String()) } } + +type stubRuntimeStore struct{} + +func (stubRuntimeStore) Save(context.Context, service.TokenRecord, string, string) error { + return nil +} + +func (stubRuntimeStore) GetByTokenID(context.Context, string) (*service.TokenRecord, bool, error) { + return nil, false, nil +} + +func (stubRuntimeStore) GetByAccessToken(context.Context, string) (*service.TokenRecord, bool, error) { + return nil, false, nil +} + +func (stubRuntimeStore) LookupIdempotency(context.Context, string) (service.IdempotencyEntry, bool, error) { + return service.IdempotencyEntry{}, false, nil +} + +type stubAuditStore struct{} + +func (stubAuditStore) Emit(context.Context, service.AuditEvent) error { return nil } + +func (stubAuditStore) QueryEvents(context.Context, service.AuditEventFilter) ([]service.AuditEvent, error) { + return nil, nil +} + +func TestBuildPostgresStores_RequiresDatabaseURL(t *testing.T) { + _, _, closeFn, err := BuildPostgresStores(context.Background(), "") + if err == nil { + t.Fatal("expected empty database url error") + } + if closeFn != nil { + t.Fatal("expected nil close function on error") + } +} + +func TestBuildPostgresStores_UsesFactory(t *testing.T) { + oldFactory := newPostgresStoreBundle + defer func() { newPostgresStoreBundle = oldFactory }() + + closed := false + newPostgresStoreBundle = func(ctx context.Context, databaseURL string) (service.RuntimeStore, service.AuditStore, func(), error) { + if databaseURL != "postgres://token-runtime" { + t.Fatalf("unexpected database url: %s", databaseURL) + } + return stubRuntimeStore{}, stubAuditStore{}, func() { closed = true }, nil + } + + runtimeStore, auditStore, closeFn, err := BuildPostgresStores(context.Background(), "postgres://token-runtime") + if err != nil { + t.Fatalf("BuildPostgresStores returned error: %v", err) + } + if runtimeStore == nil { + t.Fatal("expected runtime store") + } + if auditStore == nil { + t.Fatal("expected audit store") + } + if closeFn == nil { + t.Fatal("expected close function") + } + closeFn() + if !closed { + t.Fatal("expected close function to run") + } +} diff --git a/platform-token-runtime/internal/auth/service/postgres_audit_store.go b/platform-token-runtime/internal/auth/service/postgres_audit_store.go new file mode 100644 index 00000000..9257dc65 --- /dev/null +++ b/platform-token-runtime/internal/auth/service/postgres_audit_store.go @@ -0,0 +1,195 @@ +package service + +import ( + "context" + "errors" + "strconv" + "strings" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type auditStoreRows interface { + Close() + Err() error + Next() bool + Scan(dest ...any) error +} + +type auditStoreDB interface { + Exec(ctx context.Context, query string, args ...any) error + Query(ctx context.Context, query string, args ...any) (auditStoreRows, error) +} + +type pgxAuditStoreDB struct { + pool *pgxpool.Pool +} + +func (db *pgxAuditStoreDB) Exec(ctx context.Context, query string, args ...any) error { + _, err := db.pool.Exec(ctx, query, args...) + return err +} + +func (db *pgxAuditStoreDB) Query(ctx context.Context, query string, args ...any) (auditStoreRows, error) { + return db.pool.Query(ctx, query, args...) +} + +type PostgresAuditStore struct { + db auditStoreDB +} + +func NewPostgresAuditStore(pool *pgxpool.Pool) *PostgresAuditStore { + return newPostgresAuditStoreWithDB(&pgxAuditStoreDB{pool: pool}) +} + +func newPostgresAuditStoreWithDB(db auditStoreDB) *PostgresAuditStore { + return &PostgresAuditStore{db: db} +} + +func (s *PostgresAuditStore) Emit(ctx context.Context, event AuditEvent) error { + if ctx == nil { + ctx = context.Background() + } + if s == nil || s.db == nil { + return errors.New("postgres audit store is not configured") + } + if event.EventID == "" { + eventID, err := generateEventID() + if err != nil { + return err + } + event.EventID = eventID + } + if event.CreatedAt.IsZero() { + event.CreatedAt = time.Now() + } + + const query = ` +INSERT INTO auth_token_audit_events ( + event_id, + event_name, + request_id, + token_id, + subject_id, + route, + result_code, + client_ip, + created_at +) VALUES ( + $1, + $2, + $3, + NULLIF($4, ''), + NULLIF($5, ''), + $6, + $7, + NULLIF($8, '')::inet, + $9 +) +` + + return s.db.Exec(ctx, query, + event.EventID, + event.EventName, + event.RequestID, + event.TokenID, + event.SubjectID, + event.Route, + event.ResultCode, + strings.TrimSpace(event.ClientIP), + event.CreatedAt, + ) +} + +func (s *PostgresAuditStore) QueryEvents(ctx context.Context, filter AuditEventFilter) ([]AuditEvent, error) { + if ctx == nil { + ctx = context.Background() + } + if s == nil || s.db == nil { + return nil, errors.New("postgres audit store is not configured") + } + + limit := filter.Limit + if limit <= 0 { + limit = 100 + } + if limit > 500 { + limit = 500 + } + + query := ` +SELECT + event_id, + event_name, + request_id, + COALESCE(token_id, ''), + COALESCE(subject_id, ''), + route, + result_code, + COALESCE(host(client_ip), ''), + created_at +FROM auth_token_audit_events +WHERE 1=1 +` + args := make([]any, 0, 6) + if filter.RequestID != "" { + args = append(args, strings.TrimSpace(filter.RequestID)) + query += " AND request_id = $" + strconvArg(len(args)) + } + if filter.TokenID != "" { + args = append(args, strings.TrimSpace(filter.TokenID)) + query += " AND token_id = $" + strconvArg(len(args)) + } + if filter.SubjectID != "" { + args = append(args, strings.TrimSpace(filter.SubjectID)) + query += " AND subject_id = $" + strconvArg(len(args)) + } + if filter.EventName != "" { + args = append(args, strings.TrimSpace(filter.EventName)) + query += " AND event_name = $" + strconvArg(len(args)) + } + if filter.ResultCode != "" { + args = append(args, strings.TrimSpace(filter.ResultCode)) + query += " AND result_code = $" + strconvArg(len(args)) + } + args = append(args, limit) + query += " ORDER BY created_at DESC LIMIT $" + strconvArg(len(args)) + + rows, err := s.db.Query(ctx, query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + events := make([]AuditEvent, 0, limit) + for rows.Next() { + var event AuditEvent + if err := rows.Scan( + &event.EventID, + &event.EventName, + &event.RequestID, + &event.TokenID, + &event.SubjectID, + &event.Route, + &event.ResultCode, + &event.ClientIP, + &event.CreatedAt, + ); err != nil { + return nil, err + } + events = append(events, event) + } + if err := rows.Err(); err != nil { + return nil, err + } + + for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 { + events[i], events[j] = events[j], events[i] + } + return events, nil +} + +func strconvArg(position int) string { + return strconv.Itoa(position) +} diff --git a/platform-token-runtime/internal/auth/service/postgres_audit_store_test.go b/platform-token-runtime/internal/auth/service/postgres_audit_store_test.go new file mode 100644 index 00000000..d304022f --- /dev/null +++ b/platform-token-runtime/internal/auth/service/postgres_audit_store_test.go @@ -0,0 +1,124 @@ +package service + +import ( + "context" + "testing" + "time" +) + +type fakeAuditRows struct { + rows [][]any + idx int + err error +} + +func (r *fakeAuditRows) Next() bool { + if r.idx >= len(r.rows) { + return false + } + r.idx++ + return true +} + +func (r *fakeAuditRows) Scan(dest ...any) error { + row := r.rows[r.idx-1] + for i := range dest { + switch d := dest[i].(type) { + case *string: + *d = row[i].(string) + case *time.Time: + *d = row[i].(time.Time) + default: + panic("unsupported audit scan destination") + } + } + return nil +} + +func (r *fakeAuditRows) Err() error { return r.err } +func (r *fakeAuditRows) Close() {} + +type fakeAuditDB struct { + lastExecSQL string + lastExecArgs []any + lastQuerySQL string + lastQueryArgs []any + rows auditStoreRows + execErr error + queryErr error +} + +func (db *fakeAuditDB) Exec(_ context.Context, query string, args ...any) error { + db.lastExecSQL = query + db.lastExecArgs = append([]any(nil), args...) + return db.execErr +} + +func (db *fakeAuditDB) Query(_ context.Context, query string, args ...any) (auditStoreRows, error) { + db.lastQuerySQL = query + db.lastQueryArgs = append([]any(nil), args...) + if db.queryErr != nil { + return nil, db.queryErr + } + return db.rows, nil +} + +func TestPostgresAuditStore_EmitPersistsAuditEvent(t *testing.T) { + db := &fakeAuditDB{} + store := newPostgresAuditStoreWithDB(db) + + err := store.Emit(context.Background(), AuditEvent{ + EventID: "evt-1", + EventName: EventTokenIssueSuccess, + RequestID: "req-1", + TokenID: "tok-1", + SubjectID: "2001", + Route: "/issue", + ResultCode: "OK", + ClientIP: "127.0.0.1", + CreatedAt: time.Date(2026, 4, 17, 9, 10, 0, 0, time.UTC), + }) + if err != nil { + t.Fatalf("emit audit event: %v", err) + } + if db.lastExecSQL == "" { + t.Fatal("expected emit query to be executed") + } + if got := db.lastExecArgs[0].(string); got != "evt-1" { + t.Fatalf("unexpected event id: got=%s want=evt-1", got) + } + if got := db.lastExecArgs[7].(string); got != "127.0.0.1" { + t.Fatalf("unexpected client ip: got=%s want=127.0.0.1", got) + } +} + +func TestPostgresAuditStore_QueryEventsReturnsAscendingOrder(t *testing.T) { + older := time.Date(2026, 4, 17, 9, 0, 0, 0, time.UTC) + newer := older.Add(2 * time.Minute) + db := &fakeAuditDB{ + rows: &fakeAuditRows{ + rows: [][]any{ + {"evt-2", EventTokenRevokeSuccess, "req-2", "tok-1", "2001", "/revoke", "OK", "127.0.0.1", newer}, + {"evt-1", EventTokenIssueSuccess, "req-1", "tok-1", "2001", "/issue", "OK", "127.0.0.1", older}, + }, + }, + } + store := newPostgresAuditStoreWithDB(db) + + events, err := store.QueryEvents(context.Background(), AuditEventFilter{ + TokenID: "tok-1", + Limit: 600, + }) + if err != nil { + t.Fatalf("query events: %v", err) + } + if len(events) != 2 { + t.Fatalf("unexpected event count: got=%d want=2", len(events)) + } + if events[0].EventID != "evt-1" { + t.Fatalf("expected ascending order, got first event %s", events[0].EventID) + } + if got := db.lastQueryArgs[len(db.lastQueryArgs)-1].(int); got != 500 { + t.Fatalf("unexpected limit: got=%d want=500", got) + } +} diff --git a/platform-token-runtime/internal/auth/service/postgres_runtime_store.go b/platform-token-runtime/internal/auth/service/postgres_runtime_store.go new file mode 100644 index 00000000..effa50c3 --- /dev/null +++ b/platform-token-runtime/internal/auth/service/postgres_runtime_store.go @@ -0,0 +1,221 @@ +package service + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "strings" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type runtimeStoreRow interface { + Scan(dest ...any) error +} + +type runtimeStoreDB interface { + Exec(ctx context.Context, query string, args ...any) error + QueryRow(ctx context.Context, query string, args ...any) runtimeStoreRow +} + +type pgxRuntimeStoreDB struct { + pool *pgxpool.Pool +} + +func (db *pgxRuntimeStoreDB) Exec(ctx context.Context, query string, args ...any) error { + _, err := db.pool.Exec(ctx, query, args...) + return err +} + +func (db *pgxRuntimeStoreDB) QueryRow(ctx context.Context, query string, args ...any) runtimeStoreRow { + return db.pool.QueryRow(ctx, query, args...) +} + +type PostgresRuntimeStore struct { + db runtimeStoreDB +} + +func NewPostgresRuntimeStore(pool *pgxpool.Pool) *PostgresRuntimeStore { + return newPostgresRuntimeStoreWithDB(&pgxRuntimeStoreDB{pool: pool}) +} + +func newPostgresRuntimeStoreWithDB(db runtimeStoreDB) *PostgresRuntimeStore { + return &PostgresRuntimeStore{db: db} +} + +func (s *PostgresRuntimeStore) Save(ctx context.Context, record TokenRecord, idempotencyKey, requestHash string) error { + if ctx == nil { + ctx = context.Background() + } + if s == nil || s.db == nil { + return errors.New("postgres runtime store is not configured") + } + + scopeJSON, err := json.Marshal(record.Scope) + if err != nil { + return err + } + + tokenFingerprint := "" + if strings.TrimSpace(record.AccessToken) != "" { + tokenFingerprint = accessTokenFingerprint(record.AccessToken) + } + + var revokedAt any + if record.Status == TokenStatusRevoked { + revokedAt = time.Now().UTC() + } + + const query = ` +INSERT INTO auth_platform_tokens ( + token_id, + token_fingerprint, + hash_algo, + subject_id, + role_code, + scope_json, + status, + issued_at, + expires_at, + revoked_reason, + revoked_at, + issue_request_id, + issue_idempotency_key, + issue_request_hash +) VALUES ( + $1, + NULLIF($2, ''), + 'SHA-256', + $3, + $4, + $5, + $6, + $7, + $8, + NULLIF($9, ''), + $10, + NULLIF($11, ''), + NULLIF($12, ''), + NULLIF($13, '') +) +ON CONFLICT (token_id) DO UPDATE SET + token_fingerprint = COALESCE(NULLIF(EXCLUDED.token_fingerprint, ''), auth_platform_tokens.token_fingerprint), + subject_id = EXCLUDED.subject_id, + role_code = EXCLUDED.role_code, + scope_json = EXCLUDED.scope_json, + status = EXCLUDED.status, + issued_at = EXCLUDED.issued_at, + expires_at = EXCLUDED.expires_at, + revoked_reason = NULLIF(EXCLUDED.revoked_reason, ''), + revoked_at = CASE + WHEN EXCLUDED.status = 'revoked' THEN COALESCE(auth_platform_tokens.revoked_at, EXCLUDED.revoked_at, CURRENT_TIMESTAMP) + ELSE NULL + END, + issue_request_id = COALESCE(NULLIF(EXCLUDED.issue_request_id, ''), auth_platform_tokens.issue_request_id), + issue_idempotency_key = COALESCE(NULLIF(EXCLUDED.issue_idempotency_key, ''), auth_platform_tokens.issue_idempotency_key), + issue_request_hash = COALESCE(NULLIF(EXCLUDED.issue_request_hash, ''), auth_platform_tokens.issue_request_hash), + updated_at = CURRENT_TIMESTAMP +` + + return s.db.Exec(ctx, query, + record.TokenID, + tokenFingerprint, + record.SubjectID, + record.Role, + scopeJSON, + string(record.Status), + record.IssuedAt, + record.ExpiresAt, + strings.TrimSpace(record.RevokedReason), + revokedAt, + strings.TrimSpace(record.RequestID), + strings.TrimSpace(idempotencyKey), + strings.TrimSpace(requestHash), + ) +} + +func (s *PostgresRuntimeStore) GetByTokenID(ctx context.Context, tokenID string) (*TokenRecord, bool, error) { + if ctx == nil { + ctx = context.Background() + } + return s.querySingleRecord(ctx, ` +SELECT token_id, subject_id, role_code, scope_json, status, issued_at, expires_at, issue_request_id, COALESCE(revoked_reason, '') +FROM auth_platform_tokens +WHERE token_id = $1 +`, strings.TrimSpace(tokenID)) +} + +func (s *PostgresRuntimeStore) GetByAccessToken(ctx context.Context, accessToken string) (*TokenRecord, bool, error) { + if ctx == nil { + ctx = context.Background() + } + return s.querySingleRecord(ctx, ` +SELECT token_id, subject_id, role_code, scope_json, status, issued_at, expires_at, issue_request_id, COALESCE(revoked_reason, '') +FROM auth_platform_tokens +WHERE token_fingerprint = $1 +`, accessTokenFingerprint(accessToken)) +} + +func (s *PostgresRuntimeStore) LookupIdempotency(ctx context.Context, idempotencyKey string) (IdempotencyEntry, bool, error) { + if ctx == nil { + ctx = context.Background() + } + if s == nil || s.db == nil { + return IdempotencyEntry{}, false, errors.New("postgres runtime store is not configured") + } + + var entry IdempotencyEntry + err := s.db.QueryRow(ctx, ` +SELECT COALESCE(issue_request_hash, ''), token_id +FROM auth_platform_tokens +WHERE issue_idempotency_key = $1 +`, strings.TrimSpace(idempotencyKey)).Scan(&entry.RequestHash, &entry.TokenID) + if errors.Is(err, pgx.ErrNoRows) { + return IdempotencyEntry{}, false, nil + } + if err != nil { + return IdempotencyEntry{}, false, err + } + return entry, true, nil +} + +func (s *PostgresRuntimeStore) querySingleRecord(ctx context.Context, query string, arg string) (*TokenRecord, bool, error) { + if s == nil || s.db == nil { + return nil, false, errors.New("postgres runtime store is not configured") + } + + var scopeJSON []byte + var status string + record := TokenRecord{} + err := s.db.QueryRow(ctx, query, arg).Scan( + &record.TokenID, + &record.SubjectID, + &record.Role, + &scopeJSON, + &status, + &record.IssuedAt, + &record.ExpiresAt, + &record.RequestID, + &record.RevokedReason, + ) + if errors.Is(err, pgx.ErrNoRows) { + return nil, false, nil + } + if err != nil { + return nil, false, err + } + if err := json.Unmarshal(scopeJSON, &record.Scope); err != nil { + return nil, false, err + } + record.Status = TokenStatus(status) + return &record, true, nil +} + +func accessTokenFingerprint(accessToken string) string { + sum := sha256.Sum256([]byte(strings.TrimSpace(accessToken))) + return hex.EncodeToString(sum[:]) +} diff --git a/platform-token-runtime/internal/auth/service/postgres_runtime_store_test.go b/platform-token-runtime/internal/auth/service/postgres_runtime_store_test.go new file mode 100644 index 00000000..dbb43497 --- /dev/null +++ b/platform-token-runtime/internal/auth/service/postgres_runtime_store_test.go @@ -0,0 +1,147 @@ +package service + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/jackc/pgx/v5" +) + +type fakeRuntimeRow struct { + values []any + err error +} + +func (r fakeRuntimeRow) Scan(dest ...any) error { + if r.err != nil { + return r.err + } + for i := range dest { + switch d := dest[i].(type) { + case *string: + *d = r.values[i].(string) + case *[]byte: + *d = append((*d)[:0], r.values[i].([]byte)...) + case *time.Time: + *d = r.values[i].(time.Time) + default: + tpanic("unsupported scan destination") + } + } + return nil +} + +type fakeRuntimeDB struct { + lastExecSQL string + lastExecArgs []any + lastQuerySQL string + lastQueryArgs []any + queryRowFunc func(query string, args ...any) runtimeStoreRow + execErr error +} + +func (db *fakeRuntimeDB) Exec(_ context.Context, query string, args ...any) error { + db.lastExecSQL = query + db.lastExecArgs = append([]any(nil), args...) + return db.execErr +} + +func (db *fakeRuntimeDB) QueryRow(_ context.Context, query string, args ...any) runtimeStoreRow { + db.lastQuerySQL = query + db.lastQueryArgs = append([]any(nil), args...) + if db.queryRowFunc == nil { + return fakeRuntimeRow{err: pgx.ErrNoRows} + } + return db.queryRowFunc(query, args...) +} + +func TestPostgresRuntimeStore_SavePersistsTokenRecord(t *testing.T) { + now := time.Date(2026, 4, 17, 9, 0, 0, 0, time.UTC) + db := &fakeRuntimeDB{} + store := newPostgresRuntimeStoreWithDB(db) + + err := store.Save(context.Background(), TokenRecord{ + TokenID: "tok_123", + AccessToken: "ptk_secret", + SubjectID: "2001", + Role: "owner", + Scope: []string{"supply:*"}, + IssuedAt: now, + ExpiresAt: now.Add(15 * time.Minute), + Status: TokenStatusActive, + RequestID: "req-1", + RevokedReason: "", + }, "idem-1", "hash-1") + if err != nil { + t.Fatalf("save record: %v", err) + } + if db.lastExecSQL == "" { + t.Fatal("expected save query to be executed") + } + if got := db.lastExecArgs[1].(string); got == "" { + t.Fatal("expected token fingerprint to be persisted") + } + if got := db.lastExecArgs[10].(string); got != "req-1" { + t.Fatalf("unexpected request id: got=%s want=req-1", got) + } + if got := db.lastExecArgs[11].(string); got != "idem-1" { + t.Fatalf("unexpected idempotency key: got=%s want=idem-1", got) + } + if got := db.lastExecArgs[12].(string); got != "hash-1" { + t.Fatalf("unexpected request hash: got=%s want=hash-1", got) + } +} + +func TestPostgresRuntimeStore_LookupsUseFingerprintAndIdempotencyHash(t *testing.T) { + now := time.Date(2026, 4, 17, 9, 5, 0, 0, time.UTC) + db := &fakeRuntimeDB{ + queryRowFunc: func(query string, args ...any) runtimeStoreRow { + switch { + case strings.Contains(query, "token_fingerprint"): + return fakeRuntimeRow{values: []any{ + "tok_123", + "2001", + "owner", + []byte(`["supply:*"]`), + string(TokenStatusActive), + now, + now.Add(10 * time.Minute), + "req-1", + "", + }} + case strings.Contains(query, "issue_idempotency_key"): + return fakeRuntimeRow{values: []any{"hash-1", "tok_123"}} + default: + return fakeRuntimeRow{err: pgx.ErrNoRows} + } + }, + } + store := newPostgresRuntimeStoreWithDB(db) + + record, ok, err := store.GetByAccessToken(context.Background(), "ptk_secret") + if err != nil { + t.Fatalf("get by access token: %v", err) + } + if !ok { + t.Fatal("expected access token lookup to succeed") + } + if record.TokenID != "tok_123" { + t.Fatalf("unexpected token id: got=%s want=tok_123", record.TokenID) + } + entry, ok, err := store.LookupIdempotency(context.Background(), "idem-1") + if err != nil { + t.Fatalf("lookup idempotency: %v", err) + } + if !ok { + t.Fatal("expected idempotency lookup to succeed") + } + if entry.RequestHash != "hash-1" { + t.Fatalf("unexpected request hash: got=%s want=hash-1", entry.RequestHash) + } +} + +func tpanic(msg string) { + panic(msg) +} diff --git a/sql/postgresql/token_runtime_schema_v1.sql b/sql/postgresql/token_runtime_schema_v1.sql index f7cddd46..077bcea2 100644 --- a/sql/postgresql/token_runtime_schema_v1.sql +++ b/sql/postgresql/token_runtime_schema_v1.sql @@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS auth_platform_tokens ( revoked_reason VARCHAR(256), issue_request_id VARCHAR(128) NOT NULL, issue_idempotency_key VARCHAR(128), + issue_request_hash CHAR(64), last_seen_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP @@ -29,6 +30,9 @@ CREATE TABLE IF NOT EXISTS auth_platform_tokens ( CREATE UNIQUE INDEX IF NOT EXISTS uq_auth_platform_tokens_fingerprint ON auth_platform_tokens (token_fingerprint); +CREATE UNIQUE INDEX IF NOT EXISTS uq_auth_platform_tokens_issue_idempotency_key + ON auth_platform_tokens (issue_idempotency_key) + WHERE issue_idempotency_key IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_auth_platform_tokens_subject_status ON auth_platform_tokens (subject_id, status); CREATE INDEX IF NOT EXISTS idx_auth_platform_tokens_expires_at