diff --git a/db/migration/0004_outbox_claiming_status.up.sql b/db/migration/0004_outbox_claiming_status.up.sql new file mode 100644 index 0000000..98ffe88 --- /dev/null +++ b/db/migration/0004_outbox_claiming_status.up.sql @@ -0,0 +1,11 @@ +-- Add 'claiming' status to outbox CHECK constraint and add claim timeout index + +ALTER TABLE cs_platform_event_outbox +DROP CONSTRAINT IF EXISTS chk_cs_platform_event_outbox_status; + +ALTER TABLE cs_platform_event_outbox +ADD CONSTRAINT chk_cs_platform_event_outbox_status +CHECK (status IN ('pending','retrying','delivered','dead_letter','claiming')); + +CREATE INDEX IF NOT EXISTS idx_cs_platform_event_outbox_claiming_timeout +ON cs_platform_event_outbox(status, updated_at); diff --git a/internal/domain/platformevent/event.go b/internal/domain/platformevent/event.go index 1fbb5d8..65af2fd 100644 --- a/internal/domain/platformevent/event.go +++ b/internal/domain/platformevent/event.go @@ -13,6 +13,7 @@ const ( StatusRetrying Status = "retrying" StatusDelivered Status = "delivered" StatusDeadLetter Status = "dead_letter" + StatusClaiming Status = "claiming" ) const ( @@ -60,7 +61,7 @@ func (e Event) Validate() error { return fmt.Errorf("event type is required") } switch e.Status { - case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter: + case StatusPending, StatusRetrying, StatusDelivered, StatusDeadLetter, StatusClaiming: default: return fmt.Errorf("invalid status: %s", e.Status) } diff --git a/internal/service/platformdelivery/worker.go b/internal/service/platformdelivery/worker.go index e235064..53af6f8 100644 --- a/internal/service/platformdelivery/worker.go +++ b/internal/service/platformdelivery/worker.go @@ -20,6 +20,7 @@ type EventStore interface { RecordDeliveryAttempt(ctx context.Context, eventID string, attemptNo int, responseStatus int, responseBody string, errorMessage string) error MarkRetry(ctx context.Context, eventID string, attemptCount int, nextAttemptAt time.Time, lastError string) error MarkDeadLetter(ctx context.Context, eventID string, attemptCount int, finalError string) error + ReleaseStaleClaims(ctx context.Context, timeout time.Duration) (int, error) } type Worker struct { @@ -31,6 +32,7 @@ type Worker struct { MaxRetries int BatchSize int PollInterval time.Duration + ClaimTimeout time.Duration RetrySchedule []time.Duration Now func() time.Time Logger *slog.Logger @@ -52,6 +54,7 @@ func NewWorker(platform, callbackURL string, store EventStore, client *http.Clie MaxRetries: maxRetries, BatchSize: 20, PollInterval: 5 * time.Second, + ClaimTimeout: 5 * time.Minute, RetrySchedule: []time.Duration{10 * time.Second, 30 * time.Second, 60 * time.Second, 5 * time.Minute, 15 * time.Minute}, Now: time.Now, } @@ -63,6 +66,8 @@ func (w *Worker) Start(ctx context.Context) { } ticker := time.NewTicker(w.pollInterval()) defer ticker.Stop() + claimTicker := time.NewTicker(30 * time.Second) + defer claimTicker.Stop() for { select { case <-ctx.Done(): @@ -77,6 +82,16 @@ func (w *Worker) Start(ctx context.Context) { return case <-ticker.C: } + select { + case <-ctx.Done(): + return + case <-claimTicker.C: + if w.Store != nil { + if _, err := w.Store.ReleaseStaleClaims(ctx, w.claimTimeout()); err != nil && w.Logger != nil { + w.Logger.Error("release stale claims failed", "platform", w.Platform, "error", err.Error()) + } + } + } } } @@ -169,6 +184,13 @@ func (w *Worker) pollInterval() time.Duration { return w.PollInterval } +func (w *Worker) claimTimeout() time.Duration { + if w.ClaimTimeout <= 0 { + return 5 * time.Minute + } + return w.ClaimTimeout +} + func (w *Worker) now() time.Time { if w.Now == nil { return time.Now() diff --git a/internal/service/platformdelivery/worker_test.go b/internal/service/platformdelivery/worker_test.go index c7ffd9b..e64545f 100644 --- a/internal/service/platformdelivery/worker_test.go +++ b/internal/service/platformdelivery/worker_test.go @@ -64,6 +64,10 @@ func (s *stubEventStore) MarkDeadLetter(_ context.Context, eventID string, attem return nil } +func (s *stubEventStore) ReleaseStaleClaims(_ context.Context, _ time.Duration) (int, error) { + return 0, nil +} + func TestWorker_ShouldDeliverPendingEventToCallbackServer(t *testing.T) { now := time.Now().UTC().Truncate(time.Second) store := &stubEventStore{ diff --git a/internal/store/postgres/platform_event_store.go b/internal/store/postgres/platform_event_store.go index 1fb6a0d..66ff719 100644 --- a/internal/store/postgres/platform_event_store.go +++ b/internal/store/postgres/platform_event_store.go @@ -75,14 +75,30 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe if platform == "" { return nil, fmt.Errorf("platform is required") } - rows, err := s.db.QueryContext(ctx, ` - SELECT id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''), - payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at, - delivered_at, COALESCE(last_error, '') - FROM cs_platform_event_outbox - WHERE platform = $1 AND status IN ('pending', 'retrying') AND next_attempt_at <= $2 - ORDER BY next_attempt_at ASC, occurred_at ASC, created_at ASC, id ASC - LIMIT $3 + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } + }() + + rows, err := tx.QueryContext(ctx, ` + UPDATE cs_platform_event_outbox + SET status = 'claiming', updated_at = NOW() + WHERE id IN ( + SELECT id FROM cs_platform_event_outbox + WHERE platform = $1 AND status IN ('pending','retrying') AND next_attempt_at <= $2 + ORDER BY next_attempt_at ASC, occurred_at ASC, created_at ASC, id ASC + LIMIT $3 + FOR UPDATE SKIP LOCKED + ) + RETURNING id, platform, event_type, COALESCE(session_id::text, ''), COALESCE(ticket_id::text, ''), COALESCE(source_message_id, ''), + payload, status, attempt_count, next_attempt_at, occurred_at, created_at, updated_at, + delivered_at, COALESCE(last_error, '') `, platform, dueBefore, limit) if err != nil { return nil, err @@ -126,9 +142,32 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe if err := rows.Err(); err != nil { return nil, err } + + if err := tx.Commit(); err != nil { + return nil, err + } return events, nil } +func (s *PlatformEventStore) ReleaseStaleClaims(ctx context.Context, timeout time.Duration) (int, error) { + if s.db == nil { + return 0, fmt.Errorf("db is nil") + } + res, err := s.db.ExecContext(ctx, ` + UPDATE cs_platform_event_outbox + SET status = 'retrying', updated_at = NOW() + WHERE status = 'claiming' AND updated_at < NOW() - $1::interval + `, timeout.Seconds()) + if err != nil { + return 0, err + } + n, err := res.RowsAffected() + if err != nil { + return 0, err + } + return int(n), nil +} + func (s *PlatformEventStore) MarkDelivered(ctx context.Context, eventID string, deliveredAt time.Time) error { if s.db == nil { return fmt.Errorf("db is nil")