diff --git a/internal/service/platformevents/builder.go b/internal/service/platformevents/builder.go index 97734bd..1fc269e 100644 --- a/internal/service/platformevents/builder.go +++ b/internal/service/platformevents/builder.go @@ -34,7 +34,7 @@ func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta } eventIndex := 0 baseEvent := func(eventType string, payload map[string]any) platformevent.Event { - eventTime := now.Add(time.Duration(eventIndex) * time.Millisecond) + eventTime := now.Add(time.Duration(eventIndex) * time.Nanosecond) eventIndex++ return platformevent.Event{ ID: uuid.New().String(), @@ -47,7 +47,7 @@ func BuildInboundEvents(msg *message.UnifiedMessage, result *dialog.Result, meta Payload: payload, Status: platformevent.StatusPending, AttemptCount: 0, - NextAttemptAt: now, + NextAttemptAt: eventTime, OccurredAt: eventTime, CreatedAt: eventTime, UpdatedAt: eventTime, diff --git a/internal/store/postgres/platform_event_store.go b/internal/store/postgres/platform_event_store.go index 8081b0c..4f6e8ca 100644 --- a/internal/store/postgres/platform_event_store.go +++ b/internal/store/postgres/platform_event_store.go @@ -81,7 +81,7 @@ func (s *PlatformEventStore) ListDue(ctx context.Context, platform string, dueBe delivered_at, COALESCE(last_error, '') FROM cs_platform_event_outbox WHERE platform = $1 AND status IN ('pending', 'retrying') AND next_attempt_at <= $2 - ORDER BY next_attempt_at ASC, created_at ASC + ORDER BY next_attempt_at ASC, occurred_at ASC, created_at ASC, id ASC LIMIT $3 `, platform, dueBefore, limit) if err != nil { diff --git a/internal/store/postgres/platform_event_store_test.go b/internal/store/postgres/platform_event_store_test.go index 828dede..bf5aab4 100644 --- a/internal/store/postgres/platform_event_store_test.go +++ b/internal/store/postgres/platform_event_store_test.go @@ -59,33 +59,31 @@ func TestPlatformEventStore_ShouldListPendingEventsInOrder(t *testing.T) { store := NewPlatformEventStore(db) now := time.Now().UTC().Truncate(time.Second) - firstID := uniqueID("evt") - secondID := uniqueID("evt") - platformName := "s2a-" + firstID[:8] + platformName := "s2a-" + uniqueID("plt")[:8] first := &platformevent.Event{ - ID: firstID, + ID: uniqueID("evt"), Platform: platformName, - EventType: platformevent.TypeMessageProcessing, + EventType: platformevent.TypeMessageReceived, CallbackTarget: "default", Payload: map[string]any{"step": 1}, Status: platformevent.StatusPending, - NextAttemptAt: now.Add(-2 * time.Minute), - OccurredAt: now.Add(-2 * time.Minute), - CreatedAt: now.Add(-2 * time.Minute), - UpdatedAt: now.Add(-2 * time.Minute), + NextAttemptAt: now, + OccurredAt: now.Add(-5 * time.Nanosecond), + CreatedAt: now.Add(-5 * time.Nanosecond), + UpdatedAt: now.Add(-5 * time.Nanosecond), } second := &platformevent.Event{ - ID: secondID, + ID: uniqueID("evt"), Platform: platformName, - EventType: platformevent.TypeReplyGenerated, + EventType: platformevent.TypeMessageProcessing, CallbackTarget: "default", Payload: map[string]any{"step": 2}, Status: platformevent.StatusPending, - NextAttemptAt: now.Add(-1 * time.Minute), - OccurredAt: now.Add(-1 * time.Minute), - CreatedAt: now.Add(-1 * time.Minute), - UpdatedAt: now.Add(-1 * time.Minute), + NextAttemptAt: now, + OccurredAt: now.Add(-4 * time.Nanosecond), + CreatedAt: now.Add(-4 * time.Nanosecond), + UpdatedAt: now.Add(-4 * time.Nanosecond), } if err := store.InsertPending(context.Background(), second); err != nil { @@ -103,21 +101,14 @@ func TestPlatformEventStore_ShouldListPendingEventsInOrder(t *testing.T) { t.Fatalf("due events count = %d, want at least 2", len(events)) } - firstPos := -1 - secondPos := -1 - for i, event := range events { - if event.ID == firstID { - firstPos = i - } - if event.ID == secondID { - secondPos = i - } + if events[0].EventType != platformevent.TypeMessageReceived { + t.Fatalf("first due event type = %s, want %s", events[0].EventType, platformevent.TypeMessageReceived) } - if firstPos == -1 || secondPos == -1 { - t.Fatalf("did not find inserted events in due list: first=%d second=%d", firstPos, secondPos) + if events[1].EventType != platformevent.TypeMessageProcessing { + t.Fatalf("second due event type = %s, want %s", events[1].EventType, platformevent.TypeMessageProcessing) } - if firstPos >= secondPos { - t.Fatalf("event order invalid: firstPos=%d secondPos=%d", firstPos, secondPos) + if events[0].OccurredAt.After(events[1].OccurredAt) { + t.Fatalf("due events out of order: first occurred_at=%s second occurred_at=%s", events[0].OccurredAt, events[1].OccurredAt) } }