package integration_test import ( "context" "fmt" "net/http" "net/http/httptest" "strings" "testing" "time" "sub2api-cn-relay-manager/internal/batch" "sub2api-cn-relay-manager/internal/host/sub2api" "sub2api-cn-relay-manager/internal/probe" "sub2api-cn-relay-manager/internal/store/sqlite" ) func TestBatchImportV2(t *testing.T) { t.Parallel() ctx := context.Background() harness := newBatchImportV2Harness(t) defer harness.Close(t) provisioner := &batchImportProvisionerStub{} runID, err := harness.RunBatchImport(ctx, provisioner) if err != nil { t.Fatalf("RunBatchImport() error = %v", err) } if runID == "" { t.Fatal("runID = empty, want persisted run") } if provisioner.provisionCalls != 3 { t.Fatalf("provision calls = %d, want 3 for new/advisory/retry items", provisioner.provisionCalls) } if provisioner.patchCalls != 1 { t.Fatalf("patch calls = %d, want 1 for alias-only reuse flow", provisioner.patchCalls) } if got := provisioner.lastPatch.Contract.ModelMapping["Kimi-K2.6"]; got != "kimi-2.6" { t.Fatalf("patch mapping = %#v, want raw alias mapped to canonical family", provisioner.lastPatch.Contract.ModelMapping) } run, err := harness.store.ImportRuns().GetByRunID(ctx, runID) if err != nil { t.Fatalf("ImportRuns().GetByRunID() error = %v", err) } runView := batch.ProjectRunSummary(run) if runView.State != string(batch.RunStateCompleted) { t.Fatalf("run state = %q, want completed", runView.State) } if runView.CompletedItems != 6 || runView.ActiveItems != 6 || runView.TotalItems != 6 { t.Fatalf("run view = %+v, want all 6 items completed and active", runView) } items, err := harness.store.ImportRunItems().ListByRunID(ctx, runID) if err != nil { t.Fatalf("ImportRunItems().ListByRunID() error = %v", err) } if len(items) != 6 { t.Fatalf("len(items) = %d, want 6", len(items)) } newItem := findItemByBaseURL(t, items, harness.baseURL+"/new") if got := batch.ProjectItemSummary(newItem).CanonicalModelFamilies; len(got) != 1 || got[0] != "deepseek-v4-pro" { t.Fatalf("new item canonical families = %#v, want [deepseek-v4-pro]", got) } activeItem := findItemByBaseURL(t, items, harness.baseURL+"/active") activeView := batch.ProjectItemSummary(activeItem) if activeView.MatchedAccountState != string(batch.MatchedAccountStateActive) || activeView.AccountResolution != string(batch.AccountResolutionReused) || !activeView.ProvisionReused { t.Fatalf("active duplicate projection = %+v, want active/reused/provision_reused", activeView) } deprecatedItem := findItemByBaseURL(t, items, harness.baseURL+"/deprecated") deprecatedView := batch.ProjectItemSummary(deprecatedItem) if deprecatedView.MatchedAccountState != string(batch.MatchedAccountStateDeprecated) || deprecatedView.AccountResolution != string(batch.AccountResolutionReactivated) || !deprecatedView.ProvisionReused { t.Fatalf("deprecated duplicate projection = %+v, want deprecated/reactivated/provision_reused", deprecatedView) } advisoryItem := findItemByBaseURL(t, items, harness.baseURL+"/advisory") advisoryEvents, err := harness.store.ImportRunEvents().ListByItemID(ctx, advisoryItem.ItemID) if err != nil { t.Fatalf("ImportRunEvents().ListByItemID(advisory) error = %v", err) } advisoryDetail, err := batch.ProjectItemDetail(advisoryItem, advisoryEvents) if err != nil { t.Fatalf("ProjectItemDetail(advisory) error = %v", err) } if advisoryDetail.ConfirmationStatus != string(batch.ConfirmationAdvisory) || advisoryDetail.AccessStatus != string(batch.AccessStatusActive) { t.Fatalf("advisory detail = %+v, want advisory confirmation and active access", advisoryDetail) } if !containsString(advisoryDetail.CapabilityProfile.TransportProfile.KnownAdvisories, "initial_probe_race_expected") { t.Fatalf("advisory capability profile = %+v, want initial_probe_race_expected", advisoryDetail.CapabilityProfile.TransportProfile.KnownAdvisories) } if !containsSubstring(advisoryDetail.AdvisoryMessages, "异步探测尚未稳定") { t.Fatalf("advisory messages = %#v, want mapped probe race advisory", advisoryDetail.AdvisoryMessages) } if !containsEventType(advisoryDetail.Events, "advisory_added") { t.Fatalf("advisory events = %+v, want advisory_added event", advisoryDetail.Events) } retryItem := findItemByBaseURL(t, items, harness.baseURL+"/retry") retryEvents, err := harness.store.ImportRunEvents().ListByItemID(ctx, retryItem.ItemID) if err != nil { t.Fatalf("ImportRunEvents().ListByItemID(retry) error = %v", err) } retryDetail, err := batch.ProjectItemDetail(retryItem, retryEvents) if err != nil { t.Fatalf("ProjectItemDetail(retry) error = %v", err) } if retryDetail.RetryCount != 1 || retryDetail.AccessStatus != string(batch.AccessStatusActive) { t.Fatalf("retry detail = %+v, want retry_count=1 and active access", retryDetail) } if !containsEventType(retryDetail.Events, "retry_scheduled") || !containsEventType(retryDetail.Events, "stage_transition") { t.Fatalf("retry events = %+v, want retry_scheduled and stage_transition", retryDetail.Events) } } type batchImportV2Harness struct { store *sqlite.DB server *httptest.Server baseURL string } func newBatchImportV2Harness(t *testing.T) *batchImportV2Harness { t.Helper() store := openTestStore(t) server := httptest.NewServer(newBatchImportUpstreamMux()) return &batchImportV2Harness{ store: store, server: server, baseURL: server.URL, } } func (h *batchImportV2Harness) Close(t *testing.T) { t.Helper() h.server.Close() closeTestStore(t, h.store) } func (h *batchImportV2Harness) RunBatchImport(ctx context.Context, provisioner *batchImportProvisionerStub) (string, error) { service := batch.BatchImportService{ RunStore: h.store.ImportRuns(), ItemStore: h.store.ImportRunItems(), ProbeModels: probe.ProviderModels, ProbeCapabilities: probe.ProbeCapabilities, InspectReuse: h.inspectReuse, Provisioner: provisioner, } result, err := service.StartRun(ctx, batch.BatchImportRunRequest{ RunID: "run-v2-int-001", HostID: "host-int-1", Mode: "strict", AccessMode: "self_service", Entries: []batch.BatchImportEntry{ {BaseURL: h.baseURL + "/new", APIKey: "sk-new", RequestedModels: []string{"DeepSeek V4 Pro"}}, {BaseURL: h.baseURL + "/active", APIKey: "sk-active", RequestedModels: []string{"kimi 2.6"}}, {BaseURL: h.baseURL + "/deprecated", APIKey: "sk-deprecated", RequestedModels: []string{"kimi 2.6"}}, {BaseURL: h.baseURL + "/patch", APIKey: "sk-patch", RequestedModels: []string{"kimi 2.6"}}, {BaseURL: h.baseURL + "/advisory", APIKey: "sk-advisory", RequestedModels: []string{"kimi-k2.6"}}, {BaseURL: h.baseURL + "/retry", APIKey: "sk-retry", RequestedModels: []string{"kimi-k2.6"}}, }, }) if err != nil { return "", err } worker := batch.ConfirmationWorker{ WorkerID: "worker-int-1", ItemStore: batchImportConfirmationStore{store: h.store, runID: result.RunID}, EventStore: h.store.ImportRunEvents(), LeaseDuration: time.Minute, RetryDelay: time.Second, Confirmer: h.confirm, } now := time.Date(2026, 5, 22, 13, 0, 0, 0, time.UTC) if err := worker.Tick(ctx, now); err != nil { return "", err } if err := worker.Tick(ctx, now.Add(2*time.Second)); err != nil { return "", err } items, err := h.store.ImportRunItems().ListByRunID(ctx, result.RunID) if err != nil { return "", err } validator := batch.ValidationService{ ItemStore: h.store.ImportRunItems(), RunStore: h.store.ImportRuns(), Validator: h.validate, } for _, item := range items { if item.CurrentStage != string(batch.ItemStageValidate) { continue } if err := validator.ValidateItem(ctx, item); err != nil { return "", err } } return result.RunID, nil } func (h *batchImportV2Harness) inspectReuse(_ context.Context, input batch.ReuseLookupInput) (batch.ReuseLookupResult, error) { switch { case strings.HasSuffix(input.BaseURL, "/active"): return batch.ReuseLookupResult{ ExistingProviderID: batch.NormalizeProviderID(input.BaseURL), ExistingAccessStatus: batch.AccessStatusActive, ExistingCanonicalFamilys: []string{"kimi 2.6"}, MatchedAccountID: 201, MatchedAccountState: batch.MatchedAccountStateActive, }, nil case strings.HasSuffix(input.BaseURL, "/deprecated"): return batch.ReuseLookupResult{ ExistingProviderID: batch.NormalizeProviderID(input.BaseURL), ExistingAccessStatus: batch.AccessStatusActive, ExistingCanonicalFamilys: []string{"kimi 2.6"}, MatchedAccountID: 301, MatchedAccountState: batch.MatchedAccountStateDeprecated, }, nil case strings.HasSuffix(input.BaseURL, "/patch"): return batch.ReuseLookupResult{ ExistingProviderID: batch.NormalizeProviderID(input.BaseURL), ExistingAccessStatus: batch.AccessStatusActive, ExistingCanonicalFamilys: []string{"kimi 2.6"}, MatchedAccountID: 401, MatchedAccountState: batch.MatchedAccountStateActive, ExistingModelMapping: map[string]string{"kimi-k2.6": "kimi-2.6"}, }, nil default: return batch.ReuseLookupResult{}, nil } } func (h *batchImportV2Harness) confirm(_ context.Context, item sqlite.ImportRunItem) (batch.ConfirmationResult, error) { switch { case strings.HasSuffix(item.BaseURL, "/advisory"): return batch.ConfirmationResult{StatusCode: http.StatusForbidden, Message: "probe race expected"}, nil case strings.HasSuffix(item.BaseURL, "/retry") && item.ConfirmationAttempts == 0: return batch.ConfirmationResult{StatusCode: http.StatusServiceUnavailable, Message: "no available accounts"}, nil default: return batch.ConfirmationResult{StatusCode: http.StatusOK, Message: "confirmation succeeded"}, nil } } func (h *batchImportV2Harness) validate(_ context.Context, item sqlite.ImportRunItem) (sub2api.GatewayCompletionResult, error) { return sub2api.GatewayCompletionResult{ OK: true, StatusCode: http.StatusOK, ContentType: "application/json", BodyPreview: fmt.Sprintf(`{"item_id":%q,"status":"ok"}`, item.ItemID), }, nil } type batchImportProvisionerStub struct { provisionCalls int patchCalls int lastPatch batch.PatchProvisionRequest } func (p *batchImportProvisionerStub) Provision(_ context.Context, req batch.ProvisionRequest) (batch.ProvisionResult, error) { p.provisionCalls++ legacyBatchID := int64(800 + p.provisionCalls) return batch.ProvisionResult{ LegacyBatchID: &legacyBatchID, LegacyProviderID: req.ProviderID, }, nil } func (p *batchImportProvisionerStub) Patch(_ context.Context, req batch.PatchProvisionRequest) error { p.patchCalls++ p.lastPatch = req return nil } type batchImportConfirmationStore struct { store *sqlite.DB runID string } func (s batchImportConfirmationStore) List(ctx context.Context) ([]sqlite.ImportRunItem, error) { return s.store.ImportRunItems().ListByRunID(ctx, s.runID) } func (s batchImportConfirmationStore) Upsert(ctx context.Context, item sqlite.ImportRunItem) error { return s.store.ImportRunItems().Upsert(ctx, item) } func newBatchImportUpstreamMux() http.Handler { modelsByToken := map[string][]string{ "sk-new": {"deepseek-ai/DeepSeek-V4-Pro"}, "sk-active": {"kimi-k2.6"}, "sk-deprecated": {"kimi-k2.6"}, "sk-patch": {"Kimi-K2.6"}, "sk-advisory": {"kimi-k2.6"}, "sk-retry": {"kimi-k2.6"}, } mux := http.NewServeMux() mux.HandleFunc("/v1/models", func(w http.ResponseWriter, r *http.Request) { token := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")) models, ok := modelsByToken[token] if !ok { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) return } data := make([]map[string]any, 0, len(models)) for _, model := range models { data = append(data, map[string]any{"id": model}) } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(mustJSON(data))) }) mux.HandleFunc("/v1/responses", func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusForbidden) _, _ = w.Write([]byte(`{"error":"responses unsupported"}`)) }) mux.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) { token := strings.TrimSpace(strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")) if _, ok := modelsByToken[token]; !ok { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) return } if token == "sk-advisory" { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusForbidden) _, _ = w.Write([]byte(`{"error":"probe race expected"}`)) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"id":"chatcmpl_batch_import_v2","choices":[{"index":0,"message":{"role":"assistant","content":"pong"}}]}`)) }) return mux } func findItemByBaseURL(t *testing.T, items []sqlite.ImportRunItem, baseURL string) sqlite.ImportRunItem { t.Helper() for _, item := range items { if item.BaseURL == baseURL { return item } } t.Fatalf("item with base_url %q not found in %#v", baseURL, items) return sqlite.ImportRunItem{} } func containsString(values []string, want string) bool { for _, value := range values { if value == want { return true } } return false } func containsSubstring(values []string, fragment string) bool { for _, value := range values { if strings.Contains(value, fragment) { return true } } return false } func containsEventType(events []batch.EventProjection, want string) bool { for _, event := range events { if event.EventType == want { return true } } return false } func mustJSON(data []map[string]any) string { values := make([]string, 0, len(data)) for _, item := range data { values = append(values, fmt.Sprintf(`{"id":%q}`, item["id"])) } return `{"data":[` + strings.Join(values, ",") + `]}` }