feat(batch): add confirmation worker and retry handling
This commit is contained in:
255
internal/batch/confirmation.go
Normal file
255
internal/batch/confirmation.go
Normal file
@@ -0,0 +1,255 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
type ConfirmationResult struct {
|
||||
StatusCode int
|
||||
Message string
|
||||
}
|
||||
|
||||
type ConfirmationItemStore interface {
|
||||
List(ctx context.Context) ([]sqlite.ImportRunItem, error)
|
||||
Upsert(ctx context.Context, item sqlite.ImportRunItem) error
|
||||
}
|
||||
|
||||
type ConfirmationEventStore interface {
|
||||
Append(ctx context.Context, event sqlite.ImportRunItemEvent) error
|
||||
}
|
||||
|
||||
type ConfirmationWorker struct {
|
||||
WorkerID string
|
||||
ItemStore ConfirmationItemStore
|
||||
EventStore ConfirmationEventStore
|
||||
LeaseDuration time.Duration
|
||||
RetryDelay time.Duration
|
||||
Confirmer func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error)
|
||||
}
|
||||
|
||||
func (w ConfirmationWorker) Tick(ctx context.Context, now time.Time) error {
|
||||
if w.ItemStore == nil {
|
||||
return fmt.Errorf("item store is required")
|
||||
}
|
||||
if w.EventStore == nil {
|
||||
return fmt.Errorf("event store is required")
|
||||
}
|
||||
if w.Confirmer == nil {
|
||||
return fmt.Errorf("confirmer is required")
|
||||
}
|
||||
|
||||
items, err := w.ItemStore.List(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
if !isConfirmationCandidate(item, now) {
|
||||
continue
|
||||
}
|
||||
if err := w.ConfirmItem(ctx, item, now); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w ConfirmationWorker) ConfirmItem(ctx context.Context, item sqlite.ImportRunItem, now time.Time) error {
|
||||
result, err := w.Confirmer(ctx, item)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
item.ConfirmationAttempts++
|
||||
item.LeaseOwner = strings.TrimSpace(w.WorkerID)
|
||||
item.LeaseUntil = now.Add(defaultDuration(w.LeaseDuration, time.Minute)).Format(time.RFC3339)
|
||||
|
||||
switch {
|
||||
case result.StatusCode >= 200 && result.StatusCode < 300:
|
||||
item.ConfirmationStatus = string(ConfirmationConfirmed)
|
||||
item.CurrentStage = string(ItemStageValidate)
|
||||
item.NextRetryAt = ""
|
||||
item.LastError = ""
|
||||
item.LastErrorStage = ""
|
||||
item.LeaseOwner = ""
|
||||
item.LeaseUntil = ""
|
||||
if err := w.ItemStore.Upsert(ctx, item); err != nil {
|
||||
return err
|
||||
}
|
||||
return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{
|
||||
EventID: confirmationEventID(item.ItemID, "stage_transition", now),
|
||||
RunID: item.RunID,
|
||||
ItemID: item.ItemID,
|
||||
EventType: "stage_transition",
|
||||
Stage: string(ItemStageValidate),
|
||||
Attempt: item.ConfirmationAttempts,
|
||||
Message: "confirmation succeeded",
|
||||
PayloadJSON: `{"confirmation_status":"confirmed"}`,
|
||||
})
|
||||
case result.StatusCode == 403 && supportsProbe403Advisory(item.CapabilityProfileJSON):
|
||||
item.ConfirmationStatus = string(ConfirmationAdvisory)
|
||||
item.CurrentStage = string(ItemStageValidate)
|
||||
item.AdvisoryMessagesJSON = appendAdvisoryJSON(item.AdvisoryMessagesJSON, "initial_probe_race_expected")
|
||||
item.LastError = strings.TrimSpace(result.Message)
|
||||
item.LastErrorStage = string(ItemStageConfirm)
|
||||
item.NextRetryAt = ""
|
||||
item.LeaseOwner = ""
|
||||
item.LeaseUntil = ""
|
||||
if err := w.ItemStore.Upsert(ctx, item); err != nil {
|
||||
return err
|
||||
}
|
||||
return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{
|
||||
EventID: confirmationEventID(item.ItemID, "advisory_added", now),
|
||||
RunID: item.RunID,
|
||||
ItemID: item.ItemID,
|
||||
EventType: "advisory_added",
|
||||
Stage: string(ItemStageConfirm),
|
||||
Attempt: item.ConfirmationAttempts,
|
||||
Message: "initial probe race handled as advisory",
|
||||
PayloadJSON: `{"advisory":"initial_probe_race_expected"}`,
|
||||
})
|
||||
case isWarmupRetryCandidate(result):
|
||||
item.RetryCount++
|
||||
item.LastRetryAt = now.Format(time.RFC3339)
|
||||
item.NextRetryAt = now.Add(defaultDuration(w.RetryDelay, time.Second)).Format(time.RFC3339)
|
||||
item.LastError = strings.TrimSpace(result.Message)
|
||||
item.LastErrorStage = string(ItemStageConfirm)
|
||||
item.LeaseOwner = ""
|
||||
item.LeaseUntil = ""
|
||||
if err := w.ItemStore.Upsert(ctx, item); err != nil {
|
||||
return err
|
||||
}
|
||||
return w.EventStore.Append(ctx, sqlite.ImportRunItemEvent{
|
||||
EventID: confirmationEventID(item.ItemID, "retry_scheduled", now),
|
||||
RunID: item.RunID,
|
||||
ItemID: item.ItemID,
|
||||
EventType: "retry_scheduled",
|
||||
Stage: string(ItemStageConfirm),
|
||||
Attempt: item.ConfirmationAttempts,
|
||||
Message: "initial 503 no available accounts, retry scheduled",
|
||||
PayloadJSON: fmt.Sprintf(`{"next_retry_at":%q}`, item.NextRetryAt),
|
||||
})
|
||||
default:
|
||||
item.ConfirmationStatus = string(ConfirmationFailed)
|
||||
item.CurrentStage = string(ItemStageDone)
|
||||
item.LastError = strings.TrimSpace(result.Message)
|
||||
item.LastErrorStage = string(ItemStageConfirm)
|
||||
item.NextRetryAt = ""
|
||||
item.LeaseOwner = ""
|
||||
item.LeaseUntil = ""
|
||||
if err := w.ItemStore.Upsert(ctx, item); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func isConfirmationCandidate(item sqlite.ImportRunItem, now time.Time) bool {
|
||||
if item.CurrentStage != string(ItemStageConfirm) {
|
||||
return false
|
||||
}
|
||||
if item.ConfirmationStatus != string(ConfirmationPending) {
|
||||
return false
|
||||
}
|
||||
if !isRetryDue(item.NextRetryAt, now) {
|
||||
return false
|
||||
}
|
||||
if !leaseExpired(item.LeaseUntil, now) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func isRetryDue(nextRetryAt string, now time.Time) bool {
|
||||
nextRetryAt = strings.TrimSpace(nextRetryAt)
|
||||
if nextRetryAt == "" {
|
||||
return true
|
||||
}
|
||||
parsed, err := time.Parse(time.RFC3339, nextRetryAt)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
return !parsed.After(now)
|
||||
}
|
||||
|
||||
func leaseExpired(leaseUntil string, now time.Time) bool {
|
||||
leaseUntil = strings.TrimSpace(leaseUntil)
|
||||
if leaseUntil == "" {
|
||||
return true
|
||||
}
|
||||
parsed, err := time.Parse(time.RFC3339, leaseUntil)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
return parsed.Before(now)
|
||||
}
|
||||
|
||||
func supportsProbe403Advisory(capabilityProfileJSON string) bool {
|
||||
var payload struct {
|
||||
TransportProfile struct {
|
||||
KnownAdvisories []string `json:"known_advisories"`
|
||||
} `json:"transport_profile"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(strings.TrimSpace(capabilityProfileJSON)), &payload); err != nil {
|
||||
return false
|
||||
}
|
||||
for _, advisory := range payload.TransportProfile.KnownAdvisories {
|
||||
if advisory == "initial_probe_race_expected" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isWarmupRetryCandidate(result ConfirmationResult) bool {
|
||||
message := strings.ToLower(strings.TrimSpace(result.Message))
|
||||
return result.StatusCode == 503 && strings.Contains(message, "no available accounts")
|
||||
}
|
||||
|
||||
func appendAdvisoryJSON(rawJSON, advisory string) string {
|
||||
advisory = strings.TrimSpace(advisory)
|
||||
if advisory == "" {
|
||||
return defaultJSONString(rawJSON, "[]")
|
||||
}
|
||||
|
||||
values := []string{}
|
||||
if err := json.Unmarshal([]byte(defaultJSONString(rawJSON, "[]")), &values); err != nil {
|
||||
values = []string{}
|
||||
}
|
||||
for _, existing := range values {
|
||||
if existing == advisory {
|
||||
payload, _ := json.Marshal(values)
|
||||
return string(payload)
|
||||
}
|
||||
}
|
||||
values = append(values, advisory)
|
||||
payload, err := json.Marshal(values)
|
||||
if err != nil {
|
||||
return defaultJSONString(rawJSON, "[]")
|
||||
}
|
||||
return string(payload)
|
||||
}
|
||||
|
||||
func confirmationEventID(itemID, eventType string, now time.Time) string {
|
||||
return fmt.Sprintf("%s-%s-%d", itemID, eventType, now.UnixNano())
|
||||
}
|
||||
|
||||
func defaultDuration(value, fallback time.Duration) time.Duration {
|
||||
if value > 0 {
|
||||
return value
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func defaultJSONString(value, fallback string) string {
|
||||
if strings.TrimSpace(value) == "" {
|
||||
return fallback
|
||||
}
|
||||
return value
|
||||
}
|
||||
246
internal/batch/confirmation_test.go
Normal file
246
internal/batch/confirmation_test.go
Normal file
@@ -0,0 +1,246 @@
|
||||
package batch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
func TestConfirmationWorker(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("only processes pending confirm items that are due and leaseable", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
now := time.Date(2026, 5, 22, 13, 0, 0, 0, time.UTC)
|
||||
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
||||
{ItemID: "eligible", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"},
|
||||
{ItemID: "wrong-stage", RunID: "run-1", CurrentStage: "probe", ConfirmationStatus: "pending"},
|
||||
{ItemID: "not-pending", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "confirmed"},
|
||||
{ItemID: "future-retry", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", NextRetryAt: now.Add(time.Minute).Format(time.RFC3339)},
|
||||
{ItemID: "leased", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending", LeaseUntil: now.Add(time.Minute).Format(time.RFC3339)},
|
||||
})
|
||||
|
||||
worker := ConfirmationWorker{
|
||||
WorkerID: "worker-a",
|
||||
ItemStore: store,
|
||||
EventStore: store,
|
||||
LeaseDuration: time.Minute,
|
||||
RetryDelay: time.Second,
|
||||
Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
||||
return ConfirmationResult{StatusCode: 200}, nil
|
||||
},
|
||||
}
|
||||
|
||||
if err := worker.Tick(context.Background(), now); err != nil {
|
||||
t.Fatalf("Tick() error = %v", err)
|
||||
}
|
||||
if len(store.processed) != 1 || store.processed[0] != "eligible" {
|
||||
t.Fatalf("processed = %#v, want only eligible item", store.processed)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("403 probe race becomes advisory and advances to validate", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
now := time.Date(2026, 5, 22, 13, 1, 0, 0, time.UTC)
|
||||
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
||||
{
|
||||
ItemID: "advisory",
|
||||
RunID: "run-1",
|
||||
CurrentStage: "confirm",
|
||||
ConfirmationStatus: "pending",
|
||||
CapabilityProfileJSON: `{"transport_profile":{"known_advisories":["initial_probe_race_expected"]}}`,
|
||||
},
|
||||
})
|
||||
|
||||
worker := ConfirmationWorker{
|
||||
WorkerID: "worker-a",
|
||||
ItemStore: store,
|
||||
EventStore: store,
|
||||
LeaseDuration: time.Minute,
|
||||
RetryDelay: time.Second,
|
||||
Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
||||
return ConfirmationResult{StatusCode: 403, Message: "forbidden"}, nil
|
||||
},
|
||||
}
|
||||
|
||||
if err := worker.Tick(context.Background(), now); err != nil {
|
||||
t.Fatalf("Tick() error = %v", err)
|
||||
}
|
||||
|
||||
got := store.mustItem(t, "advisory")
|
||||
if got.ConfirmationStatus != string(ConfirmationAdvisory) {
|
||||
t.Fatalf("ConfirmationStatus = %q, want advisory", got.ConfirmationStatus)
|
||||
}
|
||||
if got.CurrentStage != string(ItemStageValidate) {
|
||||
t.Fatalf("CurrentStage = %q, want validate", got.CurrentStage)
|
||||
}
|
||||
if !strings.Contains(got.AdvisoryMessagesJSON, "initial_probe_race_expected") {
|
||||
t.Fatalf("AdvisoryMessagesJSON = %q, want initial_probe_race_expected", got.AdvisoryMessagesJSON)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("initial 503 schedules retry then succeeds", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
now := time.Date(2026, 5, 22, 13, 2, 0, 0, time.UTC)
|
||||
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
||||
{ItemID: "retry", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"},
|
||||
})
|
||||
|
||||
callCount := 0
|
||||
worker := ConfirmationWorker{
|
||||
WorkerID: "worker-a",
|
||||
ItemStore: store,
|
||||
EventStore: store,
|
||||
LeaseDuration: time.Minute,
|
||||
RetryDelay: time.Second,
|
||||
Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
||||
callCount++
|
||||
if callCount == 1 {
|
||||
return ConfirmationResult{StatusCode: 503, Message: "no available accounts"}, nil
|
||||
}
|
||||
return ConfirmationResult{StatusCode: 200}, nil
|
||||
},
|
||||
}
|
||||
|
||||
if err := worker.Tick(context.Background(), now); err != nil {
|
||||
t.Fatalf("first Tick() error = %v", err)
|
||||
}
|
||||
first := store.mustItem(t, "retry")
|
||||
if first.RetryCount != 1 {
|
||||
t.Fatalf("RetryCount = %d, want 1", first.RetryCount)
|
||||
}
|
||||
if first.ConfirmationStatus != string(ConfirmationPending) {
|
||||
t.Fatalf("ConfirmationStatus = %q, want pending", first.ConfirmationStatus)
|
||||
}
|
||||
if !strings.Contains(first.LastError, "no available accounts") {
|
||||
t.Fatalf("LastError = %q, want transient message", first.LastError)
|
||||
}
|
||||
|
||||
if err := worker.Tick(context.Background(), now.Add(2*time.Second)); err != nil {
|
||||
t.Fatalf("second Tick() error = %v", err)
|
||||
}
|
||||
second := store.mustItem(t, "retry")
|
||||
if second.ConfirmationStatus != string(ConfirmationConfirmed) {
|
||||
t.Fatalf("ConfirmationStatus = %q, want confirmed", second.ConfirmationStatus)
|
||||
}
|
||||
if second.CurrentStage != string(ItemStageValidate) {
|
||||
t.Fatalf("CurrentStage = %q, want validate", second.CurrentStage)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("lease prevents duplicate processing across workers", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
now := time.Date(2026, 5, 22, 13, 3, 0, 0, time.UTC)
|
||||
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
||||
{ItemID: "shared", RunID: "run-1", CurrentStage: "confirm", ConfirmationStatus: "pending"},
|
||||
})
|
||||
|
||||
confirmer := func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
||||
return ConfirmationResult{StatusCode: 200}, nil
|
||||
}
|
||||
|
||||
workerA := ConfirmationWorker{WorkerID: "worker-a", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer}
|
||||
workerB := ConfirmationWorker{WorkerID: "worker-b", ItemStore: store, EventStore: store, LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: confirmer}
|
||||
|
||||
if err := workerA.Tick(context.Background(), now); err != nil {
|
||||
t.Fatalf("workerA.Tick() error = %v", err)
|
||||
}
|
||||
if err := workerB.Tick(context.Background(), now); err != nil {
|
||||
t.Fatalf("workerB.Tick() error = %v", err)
|
||||
}
|
||||
if len(store.processed) != 1 {
|
||||
t.Fatalf("processed = %#v, want single processing", store.processed)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("reactivated account metadata is preserved", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
now := time.Date(2026, 5, 22, 13, 4, 0, 0, time.UTC)
|
||||
store := newFakeConfirmationStore([]sqlite.ImportRunItem{
|
||||
{
|
||||
ItemID: "reactivated",
|
||||
RunID: "run-1",
|
||||
CurrentStage: "confirm",
|
||||
ConfirmationStatus: "pending",
|
||||
MatchedAccountState: string(MatchedAccountStateDeprecated),
|
||||
AccountResolution: string(AccountResolutionReactivated),
|
||||
},
|
||||
})
|
||||
|
||||
worker := ConfirmationWorker{
|
||||
WorkerID: "worker-a",
|
||||
ItemStore: store,
|
||||
EventStore: store,
|
||||
LeaseDuration: time.Minute,
|
||||
RetryDelay: time.Second,
|
||||
Confirmer: func(ctx context.Context, item sqlite.ImportRunItem) (ConfirmationResult, error) {
|
||||
return ConfirmationResult{StatusCode: 200}, nil
|
||||
},
|
||||
}
|
||||
|
||||
if err := worker.Tick(context.Background(), now); err != nil {
|
||||
t.Fatalf("Tick() error = %v", err)
|
||||
}
|
||||
got := store.mustItem(t, "reactivated")
|
||||
if got.MatchedAccountState != string(MatchedAccountStateDeprecated) {
|
||||
t.Fatalf("MatchedAccountState = %q, want deprecated", got.MatchedAccountState)
|
||||
}
|
||||
if got.AccountResolution != string(AccountResolutionReactivated) {
|
||||
t.Fatalf("AccountResolution = %q, want reactivated", got.AccountResolution)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type fakeConfirmationStore struct {
|
||||
items map[string]sqlite.ImportRunItem
|
||||
processed []string
|
||||
events []sqlite.ImportRunItemEvent
|
||||
}
|
||||
|
||||
func newFakeConfirmationStore(items []sqlite.ImportRunItem) *fakeConfirmationStore {
|
||||
store := &fakeConfirmationStore{
|
||||
items: make(map[string]sqlite.ImportRunItem, len(items)),
|
||||
events: []sqlite.ImportRunItemEvent{},
|
||||
}
|
||||
for _, item := range items {
|
||||
store.items[item.ItemID] = item
|
||||
}
|
||||
return store
|
||||
}
|
||||
|
||||
func (f *fakeConfirmationStore) List(ctx context.Context) ([]sqlite.ImportRunItem, error) {
|
||||
items := make([]sqlite.ImportRunItem, 0, len(f.items))
|
||||
for _, item := range f.items {
|
||||
items = append(items, item)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (f *fakeConfirmationStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error {
|
||||
f.items[item.ItemID] = item
|
||||
f.processed = append(f.processed, item.ItemID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeConfirmationStore) Append(ctx context.Context, event sqlite.ImportRunItemEvent) error {
|
||||
f.events = append(f.events, event)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeConfirmationStore) mustItem(t *testing.T, itemID string) sqlite.ImportRunItem {
|
||||
t.Helper()
|
||||
|
||||
item, ok := f.items[itemID]
|
||||
if !ok {
|
||||
t.Fatalf("item %q not found", itemID)
|
||||
}
|
||||
return item
|
||||
}
|
||||
Reference in New Issue
Block a user