From 34b175b130628dd900ce6202bd51c27c733a49c2 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 11 May 2026 13:16:28 +0800 Subject: [PATCH] feat(outbox): implement concurrent claim mechanism with UPDATE RETURNING + SKIP LOCKED - Add migration 0004 to introduce 'claiming' status and timeout index - Add StatusClaiming to platformevent domain and allow it in Validate() - Rewrite ListDue as transactional UPDATE ... RETURNING with FOR UPDATE SKIP LOCKED - Add ReleaseStaleClaims to reset expired claiming events back to retrying - Worker Start() now runs a 30s ticker for stale claim recovery (5m timeout) - Update stubEventStore in tests to satisfy new EventStore interface Refs: D-02 --- .../0004_outbox_claiming_status.up.sql | 11 ++++ internal/domain/platformevent/event.go | 3 +- internal/service/platformdelivery/worker.go | 22 ++++++++ .../service/platformdelivery/worker_test.go | 4 ++ .../store/postgres/platform_event_store.go | 55 ++++++++++++++++--- 5 files changed, 86 insertions(+), 9 deletions(-) create mode 100644 db/migration/0004_outbox_claiming_status.up.sql diff --git a/db/migration/0004_outbox_claiming_status.up.sql b/db/migration/0004_outbox_claiming_status.up.sql new file mode 100644 index 0000000..98ffe88 --- /dev/null +++ b/db/migration/0004_outbox_claiming_status.up.sql @@ -0,0 +1,11 @@ +-- Add 'claiming' status to outbox CHECK constraint and add claim timeout index + +ALTER TABLE cs_platform_event_outbox +DROP CONSTRAINT IF EXISTS chk_cs_platform_event_outbox_status; + +ALTER TABLE cs_platform_event_outbox +ADD CONSTRAINT chk_cs_platform_event_outbox_status +CHECK (status IN ('pending','retrying','delivered','dead_letter','claiming')); + +CREATE INDEX IF NOT EXISTS idx_cs_platform_event_outbox_claiming_timeout +ON cs_platform_event_outbox(status, updated_at); diff --git a/internal/domain/platformevent/event.go b/internal/domain/platformevent/event.go index 1fbb5d8..65af2fd 100644 --- a/internal/domain/platformevent/event.go +++ b/internal/domain/platformevent/event.go @@ -13,6 +13,7 @@ const ( StatusRetrying Status = "retrying" StatusDelivered Status = "delivered" StatusDeadLetter Status = "dead_letter" + StatusClaiming Status = "claiming" ) const ( @@ -60,7 +61,7 @@ func (e Event) Validate() error { return fmt.Errorf("event type is required") } switch e.Status { - case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter: + case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter, StatusClaiming: default: return fmt.Errorf("invalid status: %s", e.Status) } diff --git a/internal/service/platformdelivery/worker.go b/internal/service/platformdelivery/worker.go index e235064..53af6f8 100644 --- a/internal/service/platformdelivery/worker.go +++ b/internal/service/platformdelivery/worker.go @@ -20,6 +20,7 @@ type EventStore interface { RecordDeliveryAttempt(ctx context.Context, eventID string, attemptNo int, responseStatus int, responseBody string, errorMessage string) error MarkRetry(ctx context.Context, eventID string, attemptCount int, nextAttemptAt time.Time, lastError string) error MarkDeadLetter(ctx context.Context, eventID string, attemptCount int, finalError string) error + ReleaseStaleClaims(ctx context.Context, timeout time.Duration) (int, error) } type Worker struct { @@ -31,6 +32,7 @@ type Worker struct { MaxRetries int BatchSize int PollInterval time.Duration + ClaimTimeout time.Duration RetrySchedule []time.Duration Now func() time.Time Logger *slog.Logger @@ -52,6 +54,7 @@ func NewWorker(platform, callbackURL string, store EventStore, client *http.Clie MaxRetries: maxRetries, BatchSize: 20, PollInterval: 5 * time.Second, + ClaimTimeout: 5 * time.Minute, RetrySchedule: []time.Duration{10 * time.Second, 30 * time.Second, 60 * time.Second, 5 * time.Minute, 15 * time.Minute}, Now: time.Now, } @@ -63,6 +66,8 @@ func (w *Worker) Start(ctx context.Context) { } ticker := time.NewTicker(w.pollInterval()) defer ticker.Stop() + claimTicker := time.NewTicker(30 * time.Second) + defer claimTicker.Stop() for { select { case <-ctx.Done(): @@ -77,6 +82,16 @@ func (w *Worker) Start(ctx context.Context) { return case <-ticker.C: } + select { + case <-ctx.Done(): + return + case <-claimTicker.C: + if w.Store != nil { + if _, err := w.Store.ReleaseStaleClaims(ctx, w.claimTimeout()); err != nil && w.Logger != nil { + w.Logger.Error("release stale claims failed", "platform", w.Platform, "error", err.Error()) + } + } + } } } @@ -169,6 +184,13 @@ func (w *Worker) pollInterval() time.Duration { return w.PollInterval } +func (w *Worker) claimTimeout() time.Duration { + if w.ClaimTimeout <= 0 { + return 5 * time.Minute + } + return w.ClaimTimeout +} + func (w *Worker) now() time.Time { if w.Now == nil { return time.Now() diff --git a/internal/service/platformdelivery/worker_test.go b/internal/service/platformdelivery/worker_test.go index c7ffd9b..e64545f 100644 --- a/internal/service/platformdelivery/worker_test.go +++ b/internal/service/platformdelivery/worker_test.go @@ -64,6 +64,10 @@ func (s *stubEventStore) MarkDeadLetter(_ context.Context, eventID string, attem return nil } +func (s *stubEventStore) ReleaseStaleClaims(_ context.Context, _ time.Duration) (int, error) { + return 0, nil +} + func TestWorker_ShouldDeliverPendingEventToCallbackServer(t *testing.T) { now := time.Now().UTC().Truncate(time.Second) store := &stubEventStore{ diff --git a/internal/store/postgres/platform_event_store.go b/internal/store/postgres/platform_event_store.go index 1fb6a0d..66ff719 100644 --- a/internal/store/postgres/platform_event_store.go +++ b/internal/store/postgres/platform_event_store.go @@ -75,14 +75,30 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe if platform == "" { return nil, fmt.Errorf("platform is required") } - rows, err := s.db.QueryContext(ctx, ` - SELECT id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''), - payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at, - delivered_at, COALESCE(last_error, '') - FROM cs_platform_event_outbox - WHERE platform = $1 AND status IN ('pending', 'retrying') AND next_attempt_at <= $2 - ORDER BY next_attempt_at ASC, occurred_at ASC, created_at ASC, id ASC - LIMIT $3 + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + rows, err := tx.QueryContext(ctx, ` + UPDATE cs_platform_event_outbox + SET status = 'claiming', updated_at = NOW() + WHERE id IN ( + SELECT id FROM cs_platform_event_outbox + WHERE platform = $1 AND status IN ('pending','retrying') AND next_attempt_at <= $2 + ORDER BY next_attempt_at ASC, occurred_at ASC, created_at ASC, id ASC + LIMIT $3 + FOR UPDATE SKIP LOCKED + ) + RETURNING id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''), + payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at, + delivered_at, COALESCE(last_error, '') `, platform, dueBefore, limit) if err != nil { return nil, err @@ -126,9 +142,32 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe if err := rows.Err(); err != nil { return nil, err } + + if err := tx.Commit(); err != nil { + return nil, err + } return events, nil } +func (s *PlatformEventStore) ReleaseStaleClaims(ctx context.Context, timeout time.Duration) (int, error) { + if s.db == nil { + return 0, fmt.Errorf("db is nil") + } + res, err := s.db.ExecContext(ctx, ` + UPDATE cs_platform_event_outbox + SET status = 'retrying', updated_at = NOW() + WHERE status = 'claiming' AND updated_at < NOW() - $1::interval + `, timeout.Seconds()) + if err != nil { + return 0, err + } + n, err := res.RowsAffected() + if err != nil { + return 0, err + } + return int(n), nil +} + func (s *PlatformEventStore) MarkDelivered(ctx context.Context, eventID string, deliveredAt time.Time) error { if s.db == nil { return fmt.Errorf("db is nil")