324 lines
10 KiB
Go
324 lines
10 KiB
Go
package batch
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"sub2api-cn-relay-manager/internal/store/sqlite"
|
|
)
|
|
|
|
func TestConfirmationWorker(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("only processes pending confirm items that are due and leaseable", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
now := time.Date(2026, 5, 22, 13, 0, 0, 0, time.UTC)
|
|
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
|
{ItemID: "eligible", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"},
|
|
{ItemID: "wrong-stage", RunID: "run-1", CurrentStage: "probe", ConfirmationStatus: "pending"},
|
|
{ItemID: "not-pending", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "confirmed"},
|
|
{ItemID: "future-retry", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", NextRetryAt: now.Add(time.Minute).Format(time.RFC3339)},
|
|
{ItemID: "leased", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", LeaseUntil: now.Add(time.Minute).Format(time.RFC3339)},
|
|
})
|
|
|
|
worker := ConfirmationWorker{
|
|
WorkerID: "worker-a",
|
|
ItemStore: store,
|
|
EventStore: store,
|
|
LeaseDuration: time.Minute,
|
|
RetryDelay: time.Second,
|
|
Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
|
return ConfirmationResult{StatusCode: 200}, nil
|
|
},
|
|
}
|
|
|
|
if err := worker.Tick(context.Background(), now); err != nil {
|
|
t.Fatalf("Tick() error = %v", err)
|
|
}
|
|
if len(store.processed) != 1 || store.processed[0] != "eligible" {
|
|
t.Fatalf("processed = %#v, want only eligible item", store.processed)
|
|
}
|
|
})
|
|
|
|
t.Run("403 probe race becomes advisory and advances to validate", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
now := time.Date(2026, 5, 22, 13, 1, 0, 0, time.UTC)
|
|
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
|
{
|
|
ItemID: "advisory",
|
|
RunID: "run-1",
|
|
CurrentStage: "confirm",
|
|
ConfirmationStatus: "pending",
|
|
CapabilityProfileJSON: `{"transport_profile":{"known_advisories":["initial_probe_race_expected"]}}`,
|
|
},
|
|
})
|
|
|
|
worker := ConfirmationWorker{
|
|
WorkerID: "worker-a",
|
|
ItemStore: store,
|
|
EventStore: store,
|
|
LeaseDuration: time.Minute,
|
|
RetryDelay: time.Second,
|
|
Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
|
return ConfirmationResult{StatusCode: 403, Message: "forbidden"}, nil
|
|
},
|
|
}
|
|
|
|
if err := worker.Tick(context.Background(), now); err != nil {
|
|
t.Fatalf("Tick() error = %v", err)
|
|
}
|
|
|
|
got := store.mustItem(t, "advisory")
|
|
if got.ConfirmationStatus != string(ConfirmationAdvisory) {
|
|
t.Fatalf("ConfirmationStatus = %q, want advisory", got.ConfirmationStatus)
|
|
}
|
|
if got.CurrentStage != string(ItemStageValidate) {
|
|
t.Fatalf("CurrentStage = %q, want validate", got.CurrentStage)
|
|
}
|
|
if !strings.Contains(got.AdvisoryMessagesJSON, "initial_probe_race_expected") {
|
|
t.Fatalf("AdvisoryMessagesJSON = %q, want initial_probe_race_expected", got.AdvisoryMessagesJSON)
|
|
}
|
|
})
|
|
|
|
t.Run("initial 503 schedules retry then succeeds", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
now := time.Date(2026, 5, 22, 13, 2, 0, 0, time.UTC)
|
|
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
|
{ItemID: "retry", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"},
|
|
})
|
|
|
|
callCount := 0
|
|
worker := ConfirmationWorker{
|
|
WorkerID: "worker-a",
|
|
ItemStore: store,
|
|
EventStore: store,
|
|
LeaseDuration: time.Minute,
|
|
RetryDelay: time.Second,
|
|
Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
|
callCount++
|
|
if callCount == 1 {
|
|
return ConfirmationResult{StatusCode: 503, Message: "no available accounts"}, nil
|
|
}
|
|
return ConfirmationResult{StatusCode: 200}, nil
|
|
},
|
|
}
|
|
|
|
if err := worker.Tick(context.Background(), now); err != nil {
|
|
t.Fatalf("first Tick() error = %v", err)
|
|
}
|
|
first := store.mustItem(t, "retry")
|
|
if first.RetryCount != 1 {
|
|
t.Fatalf("RetryCount = %d, want 1", first.RetryCount)
|
|
}
|
|
if first.ConfirmationStatus != string(ConfirmationPending) {
|
|
t.Fatalf("ConfirmationStatus = %q, want pending", first.ConfirmationStatus)
|
|
}
|
|
if !strings.Contains(first.LastError, "no available accounts") {
|
|
t.Fatalf("LastError = %q, want transient message", first.LastError)
|
|
}
|
|
|
|
if err := worker.Tick(context.Background(), now.Add(2*time.Second)); err != nil {
|
|
t.Fatalf("second Tick() error = %v", err)
|
|
}
|
|
second := store.mustItem(t, "retry")
|
|
if second.ConfirmationStatus != string(ConfirmationConfirmed) {
|
|
t.Fatalf("ConfirmationStatus = %q, want confirmed", second.ConfirmationStatus)
|
|
}
|
|
if second.CurrentStage != string(ItemStageValidate) {
|
|
t.Fatalf("CurrentStage = %q, want validate", second.CurrentStage)
|
|
}
|
|
})
|
|
|
|
t.Run("lease prevents duplicate processing across workers", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
now := time.Date(2026, 5, 22, 13, 3, 0, 0, time.UTC)
|
|
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
|
{ItemID: "shared", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"},
|
|
})
|
|
|
|
confirmer := func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
|
return ConfirmationResult{StatusCode: 200}, nil
|
|
}
|
|
|
|
workerA := ConfirmationWorker{WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer}
|
|
workerB := ConfirmationWorker{WorkerID: "worker-b", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer}
|
|
|
|
if err := workerA.Tick(context.Background(), now); err != nil {
|
|
t.Fatalf("workerA.Tick() error = %v", err)
|
|
}
|
|
if err := workerB.Tick(context.Background(), now); err != nil {
|
|
t.Fatalf("workerB.Tick() error = %v", err)
|
|
}
|
|
if len(store.processed) != 1 {
|
|
t.Fatalf("processed = %#v, want single processing", store.processed)
|
|
}
|
|
})
|
|
|
|
t.Run("concurrent workers do not both call confirmer before lease is persisted", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
now := time.Date(2026, 5, 22, 13, 3, 30, 0, time.UTC)
|
|
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
|
{ItemID: "shared", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"},
|
|
})
|
|
|
|
started := make(chan struct{}, 2)
|
|
release := make(chan struct{})
|
|
var calls atomic.Int32
|
|
|
|
confirmer := func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
|
calls.Add(1)
|
|
started <- struct{}{}
|
|
<-release
|
|
return ConfirmationResult{StatusCode: 200}, nil
|
|
}
|
|
|
|
workerA := ConfirmationWorker{WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer}
|
|
workerB := ConfirmationWorker{WorkerID: "worker-b", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer}
|
|
|
|
errCh := make(chan error, 2)
|
|
go func() { errCh <- workerA.Tick(context.Background(), now) }()
|
|
go func() { errCh <- workerB.Tick(context.Background(), now) }()
|
|
|
|
<-started
|
|
select {
|
|
case <-started:
|
|
t.Fatal("second worker reached confirmer before lease was acquired")
|
|
case <-time.After(50 * time.Millisecond):
|
|
}
|
|
|
|
close(release)
|
|
for range 2 {
|
|
if err := <-errCh; err != nil {
|
|
t.Fatalf("Tick() error = %v", err)
|
|
}
|
|
}
|
|
if got := calls.Load(); got != 1 {
|
|
t.Fatalf("confirmer calls = %d, want 1", got)
|
|
}
|
|
})
|
|
|
|
t.Run("reactivated account metadata is preserved", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
now := time.Date(2026, 5, 22, 13, 4, 0, 0, time.UTC)
|
|
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
|
{
|
|
ItemID: "reactivated",
|
|
RunID: "run-1",
|
|
CurrentStage: "confirm",
|
|
ConfirmationStatus: "pending",
|
|
MatchedAccountState: string(MatchedAccountStateDeprecated),
|
|
AccountResolution: string(AccountResolutionReactivated),
|
|
},
|
|
})
|
|
|
|
worker := ConfirmationWorker{
|
|
WorkerID: "worker-a",
|
|
ItemStore: store,
|
|
EventStore: store,
|
|
LeaseDuration: time.Minute,
|
|
RetryDelay: time.Second,
|
|
Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
|
return ConfirmationResult{StatusCode: 200}, nil
|
|
},
|
|
}
|
|
|
|
if err := worker.Tick(context.Background(), now); err != nil {
|
|
t.Fatalf("Tick() error = %v", err)
|
|
}
|
|
got := store.mustItem(t, "reactivated")
|
|
if got.MatchedAccountState != string(MatchedAccountStateDeprecated) {
|
|
t.Fatalf("MatchedAccountState = %q, want deprecated", got.MatchedAccountState)
|
|
}
|
|
if got.AccountResolution != string(AccountResolutionReactivated) {
|
|
t.Fatalf("AccountResolution = %q, want reactivated", got.AccountResolution)
|
|
}
|
|
})
|
|
}
|
|
|
|
type fakeConfirmationStore struct {
|
|
mu sync.Mutex
|
|
items map[string]sqlite.ImportRunItem
|
|
processed []string
|
|
events []sqlite.ImportRunItemEvent
|
|
}
|
|
|
|
func newFakeConfirmationStore(items []sqlite.ImportRunItem) *fakeConfirmationStore {
|
|
store := &fakeConfirmationStore{
|
|
items: make(map[string]sqlite.ImportRunItem, len(items)),
|
|
events: []sqlite.ImportRunItemEvent{},
|
|
}
|
|
for _, item := range items {
|
|
store.items[item.ItemID] = item
|
|
}
|
|
return store
|
|
}
|
|
|
|
func (f *fakeConfirmationStore) List(ctx context.Context) ([]sqlite.ImportRunItem, error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
|
|
items := make([]sqlite.ImportRunItem, 0, len(f.items))
|
|
for _, item := range f.items {
|
|
items = append(items, item)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (f *fakeConfirmationStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
|
|
f.items[item.ItemID] = item
|
|
f.processed = append(f.processed, item.ItemID)
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeConfirmationStore) TryAcquireLease(ctx context.Context, itemID, workerID string, now time.Time, leaseDuration time.Duration) (sqlite.ImportRunItem, bool, error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
|
|
item, ok := f.items[itemID]
|
|
if !ok {
|
|
return sqlite.ImportRunItem{}, false, nil
|
|
}
|
|
if !isConfirmationCandidate(item, now) {
|
|
return sqlite.ImportRunItem{}, false, nil
|
|
}
|
|
item.ConfirmationAttempts++
|
|
item.LeaseOwner = workerID
|
|
item.LeaseUntil = now.Add(leaseDuration).Format(time.RFC3339)
|
|
f.items[itemID] = item
|
|
return item, true, nil
|
|
}
|
|
|
|
func (f *fakeConfirmationStore) Append(ctx context.Context, event sqlite.ImportRunItemEvent) error {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
|
|
f.events = append(f.events, event)
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeConfirmationStore) mustItem(t *testing.T, itemID string) sqlite.ImportRunItem {
|
|
t.Helper()
|
|
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
|
|
item, ok := f.items[itemID]
|
|
if !ok {
|
|
t.Fatalf("item %q not found", itemID)
|
|
}
|
|
return item
|
|
}
|