test(e2e): fix stability - sync lifecycle shutdown-before-close + ordered event delivery via channel
This commit is contained in:
@@ -8,7 +8,6 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -63,7 +62,7 @@ func resetE2EPlatformDB(t *testing.T, db *sql.DB) {
|
||||
}
|
||||
}
|
||||
|
||||
func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, maxRetries int) *app.App {
|
||||
func newSub2APIE2EApp(t *testing.T, db *sql.DB, callbackURL string, callbackSecret string, maxRetries int) *app.App {
|
||||
t.Helper()
|
||||
cfg := &config.Config{}
|
||||
cfg.HTTP.Addr = ":0"
|
||||
@@ -101,6 +100,9 @@ func newSub2APIE2EApp(t *testing.T, callbackURL string, callbackSecret string, m
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
_ = application.Shutdown(context.Background())
|
||||
if db != nil {
|
||||
_ = db.Close()
|
||||
}
|
||||
})
|
||||
return application
|
||||
}
|
||||
@@ -117,56 +119,52 @@ func eventually(t *testing.T, timeout time.Duration, fn func() bool) {
|
||||
t.Fatal("condition not satisfied before timeout")
|
||||
}
|
||||
|
||||
func waitForSessionEvents(t *testing.T, timeout time.Duration, mu *sync.Mutex, received *[]platformevent.Event, sessionID string, want int) []platformevent.Event {
|
||||
func waitForSessionEvents(t *testing.T, timeout time.Duration, eventsCh <-chan platformevent.Event, sessionID string, want int) []platformevent.Event {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
var filtered []platformevent.Event
|
||||
for time.Now().Before(deadline) {
|
||||
mu.Lock()
|
||||
filtered := make([]platformevent.Event, 0, len(*received))
|
||||
for _, event := range *received {
|
||||
select {
|
||||
case event := <-eventsCh:
|
||||
if event.SessionID == sessionID {
|
||||
filtered = append(filtered, event)
|
||||
}
|
||||
if len(filtered) == want {
|
||||
return filtered
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
}
|
||||
mu.Unlock()
|
||||
if len(filtered) == want {
|
||||
return filtered
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
snapshot := make([]string, 0, len(*received))
|
||||
for _, event := range *received {
|
||||
snapshot = append(snapshot, fmt.Sprintf("%s/%s", event.SessionID, event.EventType))
|
||||
snapshot := make([]string, 0)
|
||||
for {
|
||||
select {
|
||||
case event := <-eventsCh:
|
||||
snapshot = append(snapshot, fmt.Sprintf("%s/%s", event.SessionID, event.EventType))
|
||||
default:
|
||||
}
|
||||
break
|
||||
}
|
||||
t.Fatalf("session %s received %d events, want %d; snapshot=%v", sessionID, len(snapshot), want, snapshot)
|
||||
t.Fatalf("session %s received %d events, want %d; snapshot=%v", sessionID, len(filtered), want, snapshot)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *testing.T) {
|
||||
db := openE2EPlatformDB(t)
|
||||
defer db.Close()
|
||||
resetE2EPlatformDB(t, db)
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
received []platformevent.Event
|
||||
)
|
||||
eventsCh := make(chan platformevent.Event)
|
||||
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
var event platformevent.Event
|
||||
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
|
||||
t.Fatalf("decode callback body failed: %v", err)
|
||||
}
|
||||
received = append(received, event)
|
||||
eventsCh <- event
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer callbackServer.Close()
|
||||
|
||||
application := newSub2APIE2EApp(t, callbackServer.URL, "sub2api-callback-secret", 3)
|
||||
application := newSub2APIE2EApp(t, db, callbackServer.URL, "sub2api-callback-secret", 3)
|
||||
server := httptest.NewServer(application.Server.Handler)
|
||||
defer server.Close()
|
||||
|
||||
@@ -206,7 +204,7 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes
|
||||
t.Fatalf("ack session_id = %v, want non-empty", ack["session_id"])
|
||||
}
|
||||
|
||||
filtered := waitForSessionEvents(t, 8*time.Second, &mu, &received, sessionID, 6)
|
||||
filtered := waitForSessionEvents(t, 8*time.Second, eventsCh, sessionID, 6)
|
||||
wantTypes := []string{
|
||||
platformevent.TypeMessageReceived,
|
||||
platformevent.TypeMessageProcessing,
|
||||
@@ -247,7 +245,6 @@ func TestSub2APICallbackFlow_ShouldDeliverOrderedEventsWithStableEventIDs(t *tes
|
||||
|
||||
func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) {
|
||||
db := openE2EPlatformDB(t)
|
||||
defer db.Close()
|
||||
resetE2EPlatformDB(t, db)
|
||||
|
||||
callbackServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
@@ -256,7 +253,7 @@ func TestSub2APICallbackFlow_ShouldDeadLetterAfterMaxRetries(t *testing.T) {
|
||||
}))
|
||||
defer callbackServer.Close()
|
||||
|
||||
application := newSub2APIE2EApp(t, callbackServer.URL, "sub2api-callback-secret", 1)
|
||||
application := newSub2APIE2EApp(t, db, callbackServer.URL, "sub2api-callback-secret", 1)
|
||||
server := httptest.NewServer(application.Server.Handler)
|
||||
defer server.Close()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user