feat(token-runtime): add postgres-backed runtime and audit stores
This commit is contained in:
@@ -7,7 +7,7 @@
|
||||
- 服务入口是 `cmd/platform-token-runtime/main.go`,装配逻辑收口在 `internal/app/bootstrap.go`。
|
||||
- 当前可用接口包括 `issue`、`refresh`、`revoke`、`introspect`、`audit-events`。
|
||||
- `TOKEN_RUNTIME_ENV=dev` 且未显式注入 store 时,bootstrap 会自动使用内存 runtime store 与内存 audit store。
|
||||
- `TOKEN_RUNTIME_ENV=staging` 或 `TOKEN_RUNTIME_ENV=prod` 时,必须显式注入 runtime store 与 audit store;当前仓库仍未提供持久化 store,因此这两种环境会快速失败,而不是伪装成可上线服务。
|
||||
- `TOKEN_RUNTIME_ENV=staging` 或 `TOKEN_RUNTIME_ENV=prod` 时,支持通过 `TOKEN_RUNTIME_DATABASE_URL` 自动装配 PostgreSQL runtime store 与 audit store;未提供 DSN 时仍会快速失败,而不是回退到内存实现。
|
||||
- `audit-events` 当前始终保持可查询接口语义;默认内存 audit store 会返回真实事件,未提供查询能力的自定义 emitter 会返回空结果而不是 `501` 占位响应。
|
||||
|
||||
## 设计边界
|
||||
@@ -30,6 +30,14 @@ export TOKEN_RUNTIME_ADDR=":18081"
|
||||
export TOKEN_RUNTIME_ENV="dev"
|
||||
```
|
||||
|
||||
PostgreSQL 模式:
|
||||
|
||||
```bash
|
||||
export TOKEN_RUNTIME_ENV="prod"
|
||||
export TOKEN_RUNTIME_DATABASE_URL="postgres://postgres:secret@127.0.0.1:5432/token_runtime?sslmode=disable"
|
||||
go run ./cmd/platform-token-runtime
|
||||
```
|
||||
|
||||
## 验证命令
|
||||
|
||||
模块级验证:
|
||||
@@ -52,3 +60,5 @@ bash scripts/ci/repo_integrity_check.sh
|
||||
- `internal/httpapi/token_api.go`:HTTP 接口与审计查询输出。
|
||||
- `internal/auth/service/runtime_store.go`:内存 runtime store。
|
||||
- `internal/auth/service/audit_store.go`:内存 audit store 与审计查询。
|
||||
- `internal/auth/service/postgres_runtime_store.go`:PostgreSQL runtime store。
|
||||
- `internal/auth/service/postgres_audit_store.go`:PostgreSQL audit store。
|
||||
|
||||
@@ -14,11 +14,23 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
srv, err := app.BuildServer(app.Config{
|
||||
cfg := app.Config{
|
||||
Addr: envOrDefault("TOKEN_RUNTIME_ADDR", ":18081"),
|
||||
Env: strings.ToLower(envOrDefault("TOKEN_RUNTIME_ENV", "dev")),
|
||||
Now: time.Now,
|
||||
})
|
||||
}
|
||||
|
||||
if databaseURL := strings.TrimSpace(os.Getenv("TOKEN_RUNTIME_DATABASE_URL")); databaseURL != "" {
|
||||
runtimeStore, auditStore, closeFn, err := app.BuildPostgresStores(context.Background(), databaseURL)
|
||||
if err != nil {
|
||||
log.Fatalf("platform-token-runtime postgres bootstrap failed: %v", err)
|
||||
}
|
||||
cfg.RuntimeStore = runtimeStore
|
||||
cfg.AuditStore = auditStore
|
||||
defer closeFn()
|
||||
}
|
||||
|
||||
srv, err := app.BuildServer(cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("platform-token-runtime bootstrap failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -32,6 +32,30 @@ func TestMain_ProdRejectsInMemoryRuntime(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMain_ProdWithDatabaseURLDoesNotFailForMissingStore(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestMainHelperProcess")
|
||||
cmd.Env = append(os.Environ(),
|
||||
"GO_WANT_HELPER_PROCESS=1",
|
||||
"TOKEN_RUNTIME_ENV=prod",
|
||||
"TOKEN_RUNTIME_ADDR=127.0.0.1:0",
|
||||
"TOKEN_RUNTIME_DATABASE_URL=postgres://postgres:secret@127.0.0.1:1/token_runtime?sslmode=disable",
|
||||
)
|
||||
output, err := cmd.CombinedOutput()
|
||||
|
||||
if ctx.Err() == context.DeadlineExceeded {
|
||||
t.Fatalf("expected prod startup with database url to fail fast, but process timed out. output=%s", string(output))
|
||||
}
|
||||
if err == nil {
|
||||
t.Fatalf("expected prod startup to fail because database is unavailable. output=%s", string(output))
|
||||
}
|
||||
if strings.Contains(string(output), "runtime store is required") {
|
||||
t.Fatalf("expected startup to attempt postgres store wiring before missing-store check, got: %s", string(output))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMainHelperProcess(t *testing.T) {
|
||||
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
|
||||
return
|
||||
|
||||
@@ -1,3 +1,14 @@
|
||||
module lijiaoqiao/platform-token-runtime
|
||||
|
||||
go 1.22
|
||||
|
||||
require github.com/jackc/pgx/v5 v5.5.1
|
||||
|
||||
require (
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||
golang.org/x/crypto v0.9.0 // indirect
|
||||
golang.org/x/sync v0.1.0 // indirect
|
||||
golang.org/x/text v0.9.0 // indirect
|
||||
)
|
||||
|
||||
28
platform-token-runtime/go.sum
Normal file
28
platform-token-runtime/go.sum
Normal file
@@ -0,0 +1,28 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
|
||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
|
||||
github.com/jackc/pgx/v5 v5.5.1/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
|
||||
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
||||
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
|
||||
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
|
||||
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
|
||||
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
@@ -1,11 +1,14 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"lijiaoqiao/platform-token-runtime/internal/auth/service"
|
||||
"lijiaoqiao/platform-token-runtime/internal/httpapi"
|
||||
)
|
||||
@@ -18,6 +21,28 @@ type Config struct {
|
||||
Now func() time.Time
|
||||
}
|
||||
|
||||
var newPostgresStoreBundle = func(ctx context.Context, databaseURL string) (service.RuntimeStore, service.AuditStore, func(), error) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
pool, err := pgxpool.New(ctx, strings.TrimSpace(databaseURL))
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
return service.NewPostgresRuntimeStore(pool), service.NewPostgresAuditStore(pool), pool.Close, nil
|
||||
}
|
||||
|
||||
func BuildPostgresStores(ctx context.Context, databaseURL string) (service.RuntimeStore, service.AuditStore, func(), error) {
|
||||
if strings.TrimSpace(databaseURL) == "" {
|
||||
return nil, nil, nil, fmt.Errorf("token runtime database url is required")
|
||||
}
|
||||
return newPostgresStoreBundle(ctx, databaseURL)
|
||||
}
|
||||
|
||||
func BuildRuntime(cfg Config) (*service.InMemoryTokenRuntime, service.AuditStore, error) {
|
||||
now := cfg.Now
|
||||
if now == nil {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
@@ -76,3 +77,70 @@ func TestBuildServer_HealthEndpoint(t *testing.T) {
|
||||
t.Fatalf("unexpected body: %s", rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
type stubRuntimeStore struct{}
|
||||
|
||||
func (stubRuntimeStore) Save(context.Context, service.TokenRecord, string, string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (stubRuntimeStore) GetByTokenID(context.Context, string) (*service.TokenRecord, bool, error) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func (stubRuntimeStore) GetByAccessToken(context.Context, string) (*service.TokenRecord, bool, error) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func (stubRuntimeStore) LookupIdempotency(context.Context, string) (service.IdempotencyEntry, bool, error) {
|
||||
return service.IdempotencyEntry{}, false, nil
|
||||
}
|
||||
|
||||
type stubAuditStore struct{}
|
||||
|
||||
func (stubAuditStore) Emit(context.Context, service.AuditEvent) error { return nil }
|
||||
|
||||
func (stubAuditStore) QueryEvents(context.Context, service.AuditEventFilter) ([]service.AuditEvent, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestBuildPostgresStores_RequiresDatabaseURL(t *testing.T) {
|
||||
_, _, closeFn, err := BuildPostgresStores(context.Background(), "")
|
||||
if err == nil {
|
||||
t.Fatal("expected empty database url error")
|
||||
}
|
||||
if closeFn != nil {
|
||||
t.Fatal("expected nil close function on error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildPostgresStores_UsesFactory(t *testing.T) {
|
||||
oldFactory := newPostgresStoreBundle
|
||||
defer func() { newPostgresStoreBundle = oldFactory }()
|
||||
|
||||
closed := false
|
||||
newPostgresStoreBundle = func(ctx context.Context, databaseURL string) (service.RuntimeStore, service.AuditStore, func(), error) {
|
||||
if databaseURL != "postgres://token-runtime" {
|
||||
t.Fatalf("unexpected database url: %s", databaseURL)
|
||||
}
|
||||
return stubRuntimeStore{}, stubAuditStore{}, func() { closed = true }, nil
|
||||
}
|
||||
|
||||
runtimeStore, auditStore, closeFn, err := BuildPostgresStores(context.Background(), "postgres://token-runtime")
|
||||
if err != nil {
|
||||
t.Fatalf("BuildPostgresStores returned error: %v", err)
|
||||
}
|
||||
if runtimeStore == nil {
|
||||
t.Fatal("expected runtime store")
|
||||
}
|
||||
if auditStore == nil {
|
||||
t.Fatal("expected audit store")
|
||||
}
|
||||
if closeFn == nil {
|
||||
t.Fatal("expected close function")
|
||||
}
|
||||
closeFn()
|
||||
if !closed {
|
||||
t.Fatal("expected close function to run")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,195 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type auditStoreRows interface {
|
||||
Close()
|
||||
Err() error
|
||||
Next() bool
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
type auditStoreDB interface {
|
||||
Exec(ctx context.Context, query string, args ...any) error
|
||||
Query(ctx context.Context, query string, args ...any) (auditStoreRows, error)
|
||||
}
|
||||
|
||||
type pgxAuditStoreDB struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func (db *pgxAuditStoreDB) Exec(ctx context.Context, query string, args ...any) error {
|
||||
_, err := db.pool.Exec(ctx, query, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *pgxAuditStoreDB) Query(ctx context.Context, query string, args ...any) (auditStoreRows, error) {
|
||||
return db.pool.Query(ctx, query, args...)
|
||||
}
|
||||
|
||||
type PostgresAuditStore struct {
|
||||
db auditStoreDB
|
||||
}
|
||||
|
||||
func NewPostgresAuditStore(pool *pgxpool.Pool) *PostgresAuditStore {
|
||||
return newPostgresAuditStoreWithDB(&pgxAuditStoreDB{pool: pool})
|
||||
}
|
||||
|
||||
func newPostgresAuditStoreWithDB(db auditStoreDB) *PostgresAuditStore {
|
||||
return &PostgresAuditStore{db: db}
|
||||
}
|
||||
|
||||
func (s *PostgresAuditStore) Emit(ctx context.Context, event AuditEvent) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
if s == nil || s.db == nil {
|
||||
return errors.New("postgres audit store is not configured")
|
||||
}
|
||||
if event.EventID == "" {
|
||||
eventID, err := generateEventID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event.EventID = eventID
|
||||
}
|
||||
if event.CreatedAt.IsZero() {
|
||||
event.CreatedAt = time.Now()
|
||||
}
|
||||
|
||||
const query = `
|
||||
INSERT INTO auth_token_audit_events (
|
||||
event_id,
|
||||
event_name,
|
||||
request_id,
|
||||
token_id,
|
||||
subject_id,
|
||||
route,
|
||||
result_code,
|
||||
client_ip,
|
||||
created_at
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
NULLIF($4, ''),
|
||||
NULLIF($5, ''),
|
||||
$6,
|
||||
$7,
|
||||
NULLIF($8, '')::inet,
|
||||
$9
|
||||
)
|
||||
`
|
||||
|
||||
return s.db.Exec(ctx, query,
|
||||
event.EventID,
|
||||
event.EventName,
|
||||
event.RequestID,
|
||||
event.TokenID,
|
||||
event.SubjectID,
|
||||
event.Route,
|
||||
event.ResultCode,
|
||||
strings.TrimSpace(event.ClientIP),
|
||||
event.CreatedAt,
|
||||
)
|
||||
}
|
||||
|
||||
func (s *PostgresAuditStore) QueryEvents(ctx context.Context, filter AuditEventFilter) ([]AuditEvent, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
if s == nil || s.db == nil {
|
||||
return nil, errors.New("postgres audit store is not configured")
|
||||
}
|
||||
|
||||
limit := filter.Limit
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
if limit > 500 {
|
||||
limit = 500
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT
|
||||
event_id,
|
||||
event_name,
|
||||
request_id,
|
||||
COALESCE(token_id, ''),
|
||||
COALESCE(subject_id, ''),
|
||||
route,
|
||||
result_code,
|
||||
COALESCE(host(client_ip), ''),
|
||||
created_at
|
||||
FROM auth_token_audit_events
|
||||
WHERE 1=1
|
||||
`
|
||||
args := make([]any, 0, 6)
|
||||
if filter.RequestID != "" {
|
||||
args = append(args, strings.TrimSpace(filter.RequestID))
|
||||
query += " AND request_id = $" + strconvArg(len(args))
|
||||
}
|
||||
if filter.TokenID != "" {
|
||||
args = append(args, strings.TrimSpace(filter.TokenID))
|
||||
query += " AND token_id = $" + strconvArg(len(args))
|
||||
}
|
||||
if filter.SubjectID != "" {
|
||||
args = append(args, strings.TrimSpace(filter.SubjectID))
|
||||
query += " AND subject_id = $" + strconvArg(len(args))
|
||||
}
|
||||
if filter.EventName != "" {
|
||||
args = append(args, strings.TrimSpace(filter.EventName))
|
||||
query += " AND event_name = $" + strconvArg(len(args))
|
||||
}
|
||||
if filter.ResultCode != "" {
|
||||
args = append(args, strings.TrimSpace(filter.ResultCode))
|
||||
query += " AND result_code = $" + strconvArg(len(args))
|
||||
}
|
||||
args = append(args, limit)
|
||||
query += " ORDER BY created_at DESC LIMIT $" + strconvArg(len(args))
|
||||
|
||||
rows, err := s.db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
events := make([]AuditEvent, 0, limit)
|
||||
for rows.Next() {
|
||||
var event AuditEvent
|
||||
if err := rows.Scan(
|
||||
&event.EventID,
|
||||
&event.EventName,
|
||||
&event.RequestID,
|
||||
&event.TokenID,
|
||||
&event.SubjectID,
|
||||
&event.Route,
|
||||
&event.ResultCode,
|
||||
&event.ClientIP,
|
||||
&event.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events = append(events, event)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 {
|
||||
events[i], events[j] = events[j], events[i]
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func strconvArg(position int) string {
|
||||
return strconv.Itoa(position)
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type fakeAuditRows struct {
|
||||
rows [][]any
|
||||
idx int
|
||||
err error
|
||||
}
|
||||
|
||||
func (r *fakeAuditRows) Next() bool {
|
||||
if r.idx >= len(r.rows) {
|
||||
return false
|
||||
}
|
||||
r.idx++
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *fakeAuditRows) Scan(dest ...any) error {
|
||||
row := r.rows[r.idx-1]
|
||||
for i := range dest {
|
||||
switch d := dest[i].(type) {
|
||||
case *string:
|
||||
*d = row[i].(string)
|
||||
case *time.Time:
|
||||
*d = row[i].(time.Time)
|
||||
default:
|
||||
panic("unsupported audit scan destination")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *fakeAuditRows) Err() error { return r.err }
|
||||
func (r *fakeAuditRows) Close() {}
|
||||
|
||||
type fakeAuditDB struct {
|
||||
lastExecSQL string
|
||||
lastExecArgs []any
|
||||
lastQuerySQL string
|
||||
lastQueryArgs []any
|
||||
rows auditStoreRows
|
||||
execErr error
|
||||
queryErr error
|
||||
}
|
||||
|
||||
func (db *fakeAuditDB) Exec(_ context.Context, query string, args ...any) error {
|
||||
db.lastExecSQL = query
|
||||
db.lastExecArgs = append([]any(nil), args...)
|
||||
return db.execErr
|
||||
}
|
||||
|
||||
func (db *fakeAuditDB) Query(_ context.Context, query string, args ...any) (auditStoreRows, error) {
|
||||
db.lastQuerySQL = query
|
||||
db.lastQueryArgs = append([]any(nil), args...)
|
||||
if db.queryErr != nil {
|
||||
return nil, db.queryErr
|
||||
}
|
||||
return db.rows, nil
|
||||
}
|
||||
|
||||
func TestPostgresAuditStore_EmitPersistsAuditEvent(t *testing.T) {
|
||||
db := &fakeAuditDB{}
|
||||
store := newPostgresAuditStoreWithDB(db)
|
||||
|
||||
err := store.Emit(context.Background(), AuditEvent{
|
||||
EventID: "evt-1",
|
||||
EventName: EventTokenIssueSuccess,
|
||||
RequestID: "req-1",
|
||||
TokenID: "tok-1",
|
||||
SubjectID: "2001",
|
||||
Route: "/issue",
|
||||
ResultCode: "OK",
|
||||
ClientIP: "127.0.0.1",
|
||||
CreatedAt: time.Date(2026, 4, 17, 9, 10, 0, 0, time.UTC),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("emit audit event: %v", err)
|
||||
}
|
||||
if db.lastExecSQL == "" {
|
||||
t.Fatal("expected emit query to be executed")
|
||||
}
|
||||
if got := db.lastExecArgs[0].(string); got != "evt-1" {
|
||||
t.Fatalf("unexpected event id: got=%s want=evt-1", got)
|
||||
}
|
||||
if got := db.lastExecArgs[7].(string); got != "127.0.0.1" {
|
||||
t.Fatalf("unexpected client ip: got=%s want=127.0.0.1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostgresAuditStore_QueryEventsReturnsAscendingOrder(t *testing.T) {
|
||||
older := time.Date(2026, 4, 17, 9, 0, 0, 0, time.UTC)
|
||||
newer := older.Add(2 * time.Minute)
|
||||
db := &fakeAuditDB{
|
||||
rows: &fakeAuditRows{
|
||||
rows: [][]any{
|
||||
{"evt-2", EventTokenRevokeSuccess, "req-2", "tok-1", "2001", "/revoke", "OK", "127.0.0.1", newer},
|
||||
{"evt-1", EventTokenIssueSuccess, "req-1", "tok-1", "2001", "/issue", "OK", "127.0.0.1", older},
|
||||
},
|
||||
},
|
||||
}
|
||||
store := newPostgresAuditStoreWithDB(db)
|
||||
|
||||
events, err := store.QueryEvents(context.Background(), AuditEventFilter{
|
||||
TokenID: "tok-1",
|
||||
Limit: 600,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("query events: %v", err)
|
||||
}
|
||||
if len(events) != 2 {
|
||||
t.Fatalf("unexpected event count: got=%d want=2", len(events))
|
||||
}
|
||||
if events[0].EventID != "evt-1" {
|
||||
t.Fatalf("expected ascending order, got first event %s", events[0].EventID)
|
||||
}
|
||||
if got := db.lastQueryArgs[len(db.lastQueryArgs)-1].(int); got != 500 {
|
||||
t.Fatalf("unexpected limit: got=%d want=500", got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,221 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type runtimeStoreRow interface {
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
type runtimeStoreDB interface {
|
||||
Exec(ctx context.Context, query string, args ...any) error
|
||||
QueryRow(ctx context.Context, query string, args ...any) runtimeStoreRow
|
||||
}
|
||||
|
||||
type pgxRuntimeStoreDB struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func (db *pgxRuntimeStoreDB) Exec(ctx context.Context, query string, args ...any) error {
|
||||
_, err := db.pool.Exec(ctx, query, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *pgxRuntimeStoreDB) QueryRow(ctx context.Context, query string, args ...any) runtimeStoreRow {
|
||||
return db.pool.QueryRow(ctx, query, args...)
|
||||
}
|
||||
|
||||
type PostgresRuntimeStore struct {
|
||||
db runtimeStoreDB
|
||||
}
|
||||
|
||||
func NewPostgresRuntimeStore(pool *pgxpool.Pool) *PostgresRuntimeStore {
|
||||
return newPostgresRuntimeStoreWithDB(&pgxRuntimeStoreDB{pool: pool})
|
||||
}
|
||||
|
||||
func newPostgresRuntimeStoreWithDB(db runtimeStoreDB) *PostgresRuntimeStore {
|
||||
return &PostgresRuntimeStore{db: db}
|
||||
}
|
||||
|
||||
func (s *PostgresRuntimeStore) Save(ctx context.Context, record TokenRecord, idempotencyKey, requestHash string) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
if s == nil || s.db == nil {
|
||||
return errors.New("postgres runtime store is not configured")
|
||||
}
|
||||
|
||||
scopeJSON, err := json.Marshal(record.Scope)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tokenFingerprint := ""
|
||||
if strings.TrimSpace(record.AccessToken) != "" {
|
||||
tokenFingerprint = accessTokenFingerprint(record.AccessToken)
|
||||
}
|
||||
|
||||
var revokedAt any
|
||||
if record.Status == TokenStatusRevoked {
|
||||
revokedAt = time.Now().UTC()
|
||||
}
|
||||
|
||||
const query = `
|
||||
INSERT INTO auth_platform_tokens (
|
||||
token_id,
|
||||
token_fingerprint,
|
||||
hash_algo,
|
||||
subject_id,
|
||||
role_code,
|
||||
scope_json,
|
||||
status,
|
||||
issued_at,
|
||||
expires_at,
|
||||
revoked_reason,
|
||||
revoked_at,
|
||||
issue_request_id,
|
||||
issue_idempotency_key,
|
||||
issue_request_hash
|
||||
) VALUES (
|
||||
$1,
|
||||
NULLIF($2, ''),
|
||||
'SHA-256',
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6,
|
||||
$7,
|
||||
$8,
|
||||
NULLIF($9, ''),
|
||||
$10,
|
||||
NULLIF($11, ''),
|
||||
NULLIF($12, ''),
|
||||
NULLIF($13, '')
|
||||
)
|
||||
ON CONFLICT (token_id) DO UPDATE SET
|
||||
token_fingerprint = COALESCE(NULLIF(EXCLUDED.token_fingerprint, ''), auth_platform_tokens.token_fingerprint),
|
||||
subject_id = EXCLUDED.subject_id,
|
||||
role_code = EXCLUDED.role_code,
|
||||
scope_json = EXCLUDED.scope_json,
|
||||
status = EXCLUDED.status,
|
||||
issued_at = EXCLUDED.issued_at,
|
||||
expires_at = EXCLUDED.expires_at,
|
||||
revoked_reason = NULLIF(EXCLUDED.revoked_reason, ''),
|
||||
revoked_at = CASE
|
||||
WHEN EXCLUDED.status = 'revoked' THEN COALESCE(auth_platform_tokens.revoked_at, EXCLUDED.revoked_at, CURRENT_TIMESTAMP)
|
||||
ELSE NULL
|
||||
END,
|
||||
issue_request_id = COALESCE(NULLIF(EXCLUDED.issue_request_id, ''), auth_platform_tokens.issue_request_id),
|
||||
issue_idempotency_key = COALESCE(NULLIF(EXCLUDED.issue_idempotency_key, ''), auth_platform_tokens.issue_idempotency_key),
|
||||
issue_request_hash = COALESCE(NULLIF(EXCLUDED.issue_request_hash, ''), auth_platform_tokens.issue_request_hash),
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
`
|
||||
|
||||
return s.db.Exec(ctx, query,
|
||||
record.TokenID,
|
||||
tokenFingerprint,
|
||||
record.SubjectID,
|
||||
record.Role,
|
||||
scopeJSON,
|
||||
string(record.Status),
|
||||
record.IssuedAt,
|
||||
record.ExpiresAt,
|
||||
strings.TrimSpace(record.RevokedReason),
|
||||
revokedAt,
|
||||
strings.TrimSpace(record.RequestID),
|
||||
strings.TrimSpace(idempotencyKey),
|
||||
strings.TrimSpace(requestHash),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *PostgresRuntimeStore) GetByTokenID(ctx context.Context, tokenID string) (*TokenRecord, bool, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
return s.querySingleRecord(ctx, `
|
||||
SELECT token_id, subject_id, role_code, scope_json, status, issued_at, expires_at, issue_request_id, COALESCE(revoked_reason, '')
|
||||
FROM auth_platform_tokens
|
||||
WHERE token_id = $1
|
||||
`, strings.TrimSpace(tokenID))
|
||||
}
|
||||
|
||||
func (s *PostgresRuntimeStore) GetByAccessToken(ctx context.Context, accessToken string) (*TokenRecord, bool, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
return s.querySingleRecord(ctx, `
|
||||
SELECT token_id, subject_id, role_code, scope_json, status, issued_at, expires_at, issue_request_id, COALESCE(revoked_reason, '')
|
||||
FROM auth_platform_tokens
|
||||
WHERE token_fingerprint = $1
|
||||
`, accessTokenFingerprint(accessToken))
|
||||
}
|
||||
|
||||
func (s *PostgresRuntimeStore) LookupIdempotency(ctx context.Context, idempotencyKey string) (IdempotencyEntry, bool, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
if s == nil || s.db == nil {
|
||||
return IdempotencyEntry{}, false, errors.New("postgres runtime store is not configured")
|
||||
}
|
||||
|
||||
var entry IdempotencyEntry
|
||||
err := s.db.QueryRow(ctx, `
|
||||
SELECT COALESCE(issue_request_hash, ''), token_id
|
||||
FROM auth_platform_tokens
|
||||
WHERE issue_idempotency_key = $1
|
||||
`, strings.TrimSpace(idempotencyKey)).Scan(&entry.RequestHash, &entry.TokenID)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return IdempotencyEntry{}, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return IdempotencyEntry{}, false, err
|
||||
}
|
||||
return entry, true, nil
|
||||
}
|
||||
|
||||
func (s *PostgresRuntimeStore) querySingleRecord(ctx context.Context, query string, arg string) (*TokenRecord, bool, error) {
|
||||
if s == nil || s.db == nil {
|
||||
return nil, false, errors.New("postgres runtime store is not configured")
|
||||
}
|
||||
|
||||
var scopeJSON []byte
|
||||
var status string
|
||||
record := TokenRecord{}
|
||||
err := s.db.QueryRow(ctx, query, arg).Scan(
|
||||
&record.TokenID,
|
||||
&record.SubjectID,
|
||||
&record.Role,
|
||||
&scopeJSON,
|
||||
&status,
|
||||
&record.IssuedAt,
|
||||
&record.ExpiresAt,
|
||||
&record.RequestID,
|
||||
&record.RevokedReason,
|
||||
)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
if err := json.Unmarshal(scopeJSON, &record.Scope); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
record.Status = TokenStatus(status)
|
||||
return &record, true, nil
|
||||
}
|
||||
|
||||
func accessTokenFingerprint(accessToken string) string {
|
||||
sum := sha256.Sum256([]byte(strings.TrimSpace(accessToken)))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
type fakeRuntimeRow struct {
|
||||
values []any
|
||||
err error
|
||||
}
|
||||
|
||||
func (r fakeRuntimeRow) Scan(dest ...any) error {
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
}
|
||||
for i := range dest {
|
||||
switch d := dest[i].(type) {
|
||||
case *string:
|
||||
*d = r.values[i].(string)
|
||||
case *[]byte:
|
||||
*d = append((*d)[:0], r.values[i].([]byte)...)
|
||||
case *time.Time:
|
||||
*d = r.values[i].(time.Time)
|
||||
default:
|
||||
tpanic("unsupported scan destination")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeRuntimeDB struct {
|
||||
lastExecSQL string
|
||||
lastExecArgs []any
|
||||
lastQuerySQL string
|
||||
lastQueryArgs []any
|
||||
queryRowFunc func(query string, args ...any) runtimeStoreRow
|
||||
execErr error
|
||||
}
|
||||
|
||||
func (db *fakeRuntimeDB) Exec(_ context.Context, query string, args ...any) error {
|
||||
db.lastExecSQL = query
|
||||
db.lastExecArgs = append([]any(nil), args...)
|
||||
return db.execErr
|
||||
}
|
||||
|
||||
func (db *fakeRuntimeDB) QueryRow(_ context.Context, query string, args ...any) runtimeStoreRow {
|
||||
db.lastQuerySQL = query
|
||||
db.lastQueryArgs = append([]any(nil), args...)
|
||||
if db.queryRowFunc == nil {
|
||||
return fakeRuntimeRow{err: pgx.ErrNoRows}
|
||||
}
|
||||
return db.queryRowFunc(query, args...)
|
||||
}
|
||||
|
||||
func TestPostgresRuntimeStore_SavePersistsTokenRecord(t *testing.T) {
|
||||
now := time.Date(2026, 4, 17, 9, 0, 0, 0, time.UTC)
|
||||
db := &fakeRuntimeDB{}
|
||||
store := newPostgresRuntimeStoreWithDB(db)
|
||||
|
||||
err := store.Save(context.Background(), TokenRecord{
|
||||
TokenID: "tok_123",
|
||||
AccessToken: "ptk_secret",
|
||||
SubjectID: "2001",
|
||||
Role: "owner",
|
||||
Scope: []string{"supply:*"},
|
||||
IssuedAt: now,
|
||||
ExpiresAt: now.Add(15 * time.Minute),
|
||||
Status: TokenStatusActive,
|
||||
RequestID: "req-1",
|
||||
RevokedReason: "",
|
||||
}, "idem-1", "hash-1")
|
||||
if err != nil {
|
||||
t.Fatalf("save record: %v", err)
|
||||
}
|
||||
if db.lastExecSQL == "" {
|
||||
t.Fatal("expected save query to be executed")
|
||||
}
|
||||
if got := db.lastExecArgs[1].(string); got == "" {
|
||||
t.Fatal("expected token fingerprint to be persisted")
|
||||
}
|
||||
if got := db.lastExecArgs[10].(string); got != "req-1" {
|
||||
t.Fatalf("unexpected request id: got=%s want=req-1", got)
|
||||
}
|
||||
if got := db.lastExecArgs[11].(string); got != "idem-1" {
|
||||
t.Fatalf("unexpected idempotency key: got=%s want=idem-1", got)
|
||||
}
|
||||
if got := db.lastExecArgs[12].(string); got != "hash-1" {
|
||||
t.Fatalf("unexpected request hash: got=%s want=hash-1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostgresRuntimeStore_LookupsUseFingerprintAndIdempotencyHash(t *testing.T) {
|
||||
now := time.Date(2026, 4, 17, 9, 5, 0, 0, time.UTC)
|
||||
db := &fakeRuntimeDB{
|
||||
queryRowFunc: func(query string, args ...any) runtimeStoreRow {
|
||||
switch {
|
||||
case strings.Contains(query, "token_fingerprint"):
|
||||
return fakeRuntimeRow{values: []any{
|
||||
"tok_123",
|
||||
"2001",
|
||||
"owner",
|
||||
[]byte(`["supply:*"]`),
|
||||
string(TokenStatusActive),
|
||||
now,
|
||||
now.Add(10 * time.Minute),
|
||||
"req-1",
|
||||
"",
|
||||
}}
|
||||
case strings.Contains(query, "issue_idempotency_key"):
|
||||
return fakeRuntimeRow{values: []any{"hash-1", "tok_123"}}
|
||||
default:
|
||||
return fakeRuntimeRow{err: pgx.ErrNoRows}
|
||||
}
|
||||
},
|
||||
}
|
||||
store := newPostgresRuntimeStoreWithDB(db)
|
||||
|
||||
record, ok, err := store.GetByAccessToken(context.Background(), "ptk_secret")
|
||||
if err != nil {
|
||||
t.Fatalf("get by access token: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("expected access token lookup to succeed")
|
||||
}
|
||||
if record.TokenID != "tok_123" {
|
||||
t.Fatalf("unexpected token id: got=%s want=tok_123", record.TokenID)
|
||||
}
|
||||
entry, ok, err := store.LookupIdempotency(context.Background(), "idem-1")
|
||||
if err != nil {
|
||||
t.Fatalf("lookup idempotency: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("expected idempotency lookup to succeed")
|
||||
}
|
||||
if entry.RequestHash != "hash-1" {
|
||||
t.Fatalf("unexpected request hash: got=%s want=hash-1", entry.RequestHash)
|
||||
}
|
||||
}
|
||||
|
||||
func tpanic(msg string) {
|
||||
panic(msg)
|
||||
}
|
||||
@@ -22,6 +22,7 @@ CREATE TABLE IF NOT EXISTS auth_platform_tokens (
|
||||
revoked_reason VARCHAR(256),
|
||||
issue_request_id VARCHAR(128) NOT NULL,
|
||||
issue_idempotency_key VARCHAR(128),
|
||||
issue_request_hash CHAR(64),
|
||||
last_seen_at TIMESTAMPTZ,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
@@ -29,6 +30,9 @@ CREATE TABLE IF NOT EXISTS auth_platform_tokens (
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uq_auth_platform_tokens_fingerprint
|
||||
ON auth_platform_tokens (token_fingerprint);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uq_auth_platform_tokens_issue_idempotency_key
|
||||
ON auth_platform_tokens (issue_idempotency_key)
|
||||
WHERE issue_idempotency_key IS NOT NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_auth_platform_tokens_subject_status
|
||||
ON auth_platform_tokens (subject_id, status);
|
||||
CREATE INDEX IF NOT EXISTS idx_auth_platform_tokens_expires_at
|
||||
|
||||
Reference in New Issue
Block a user