package app import ( "context" "database/sql" "encoding/json" "fmt" "net/http" "net/http/httptest" "path/filepath" "strings" "testing" "time" "sub2api-cn-relay-manager/internal/batch" "sub2api-cn-relay-manager/internal/config" "sub2api-cn-relay-manager/internal/host/sub2api" "sub2api-cn-relay-manager/internal/provision" "sub2api-cn-relay-manager/internal/store/sqlite" ) func TestBatchImportResumeJobNameAndParseJSONStringList(t *testing.T) { t.Parallel() if got := (batchImportResumeJob{}).Name(); got != "batch import runtime scheduler" { t.Fatalf("batchImportResumeJob.Name() = %q, want batch import runtime scheduler", got) } values := parseJSONStringList(`[" user-1 ","user-2"]`) if len(values) != 2 || values[0] != " user-1 " || values[1] != "user-2" { t.Fatalf("parseJSONStringList() = %v, want raw decoded values", values) } if got := parseJSONStringList("{"); len(got) != 0 { t.Fatalf("parseJSONStringList(invalid) = %v, want empty", got) } } func TestBatchImportResumeJobRunAndReconcileSweepJobName(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "completed", }) job := batchImportResumeJob{sqliteDSN: appTestDSN(t, store)} if err := job.Run(context.Background()); err != nil { t.Fatalf("batchImportResumeJob.Run() error = %v", err) } if got := (reconcileSweepJob{}).Name(); got != "reconcile background scheduler" { t.Fatalf("reconcileSweepJob.Name() = %q, want reconcile background scheduler", got) } } func TestDefaultBackgroundSchedulersAndNewActionSet(t *testing.T) { t.Parallel() schedulers := defaultBackgroundSchedulers() if schedulers.runBatchImport == nil || schedulers.runReconcile == nil { t.Fatalf("defaultBackgroundSchedulers() = %+v, want non-nil functions", schedulers) } actions := NewActionSet("file:/tmp/nonexistent.db") if actions.CreateBatchImportRun == nil || actions.ListBatchImportRuns == nil || actions.GetBatchImportRun == nil || actions.ListBatchImportRunItems == nil || actions.GetBatchImportRunItem == nil { t.Fatalf("NewActionSet() returned nil batch actions: %+v", actions) } if actions.CreateHost == nil || actions.ListPacks == nil || actions.GetPack == nil || actions.ListPackProviders == nil || actions.PublishProviderDraft == nil { t.Fatalf("NewActionSet() returned nil app actions: %+v", actions) } } func TestStartBackgroundSchedulersAndBootstrap(t *testing.T) { var batchCalls int var reconcileCalls int startBackgroundSchedulers(context.Background(), config.StartupConfig{ Database: config.DatabaseConfig{SQLiteDSN: "file:test.db"}, Reconcile: config.ReconcileConfig{ WorkerEnabled: false, }, }, backgroundSchedulers{ runBatchImport: func(context.Context, string) { batchCalls++ }, runReconcile: func(context.Context, string, time.Duration) { reconcileCalls++ }, }) if batchCalls != 1 || reconcileCalls != 0 { t.Fatalf("startBackgroundSchedulers(disabled) calls = (%d, %d), want (1, 0)", batchCalls, reconcileCalls) } startBackgroundSchedulers(context.Background(), config.StartupConfig{ Database: config.DatabaseConfig{SQLiteDSN: "file:test.db"}, Reconcile: config.ReconcileConfig{ WorkerEnabled: true, PollInterval: time.Minute, }, }, backgroundSchedulers{ runBatchImport: func(context.Context, string) { batchCalls++ }, runReconcile: func(context.Context, string, time.Duration) { reconcileCalls++ }, }) if batchCalls != 2 || reconcileCalls != 1 { t.Fatalf("startBackgroundSchedulers(enabled) calls = (%d, %d), want (2, 1)", batchCalls, reconcileCalls) } ctx, cancel := context.WithCancel(context.Background()) cancel() t.Setenv(config.EnvListenAddr, "127.0.0.1:19090") t.Setenv(config.EnvSQLiteDSN, "file:bootstrap-test.db?_foreign_keys=on&_busy_timeout=5000") t.Setenv(config.EnvAdminToken, "bootstrap-token") t.Setenv(config.EnvReconcileWorkerEnabled, "false") server, err := Bootstrap(ctx) if err != nil { t.Fatalf("Bootstrap() error = %v", err) } if server.Addr() != "127.0.0.1:19090" { t.Fatalf("Bootstrap() server.Addr() = %q, want 127.0.0.1:19090", server.Addr()) } } func TestBackgroundSchedulerEntryPoints(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) ctx, cancel := context.WithCancel(context.Background()) cancel() runBatchImportBackgroundScheduler(ctx, appTestDSN(t, store)) runReconcileBackgroundScheduler(ctx, appTestDSN(t, store), 0) job := reconcileSweepJob{sqliteDSN: appTestDSN(t, store), interval: time.Minute} if err := job.Run(context.Background()); err != nil { t.Fatalf("reconcileSweepJob.Run() error = %v", err) } } func TestBatchImportProvisionerPatchAndSleepWithContext(t *testing.T) { t.Parallel() if err := (batchImportProvisioner{}).Patch(context.Background(), batch.PatchProvisionRequest{}); err != nil { t.Fatalf("Patch() error = %v, want nil", err) } if err := sleepWithContext(context.Background(), 0); err != nil { t.Fatalf("sleepWithContext() error = %v, want nil", err) } ctx, cancel := context.WithCancel(context.Background()) cancel() if err := sleepWithContext(ctx, time.Second); err != context.Canceled { t.Fatalf("sleepWithContext(canceled) error = %v, want %v", err, context.Canceled) } } func TestProbeHostSnapshotAndResolveValidationAPIKey(t *testing.T) { t.Parallel() server := httptest.NewServer(newBatchImportActionStubServer(t)) defer server.Close() client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "apikey", Token: "host-token"}) if err != nil { t.Fatalf("newSub2APIClient() error = %v", err) } hostVersion, capabilities, err := probeHostSnapshot(context.Background(), client) if err != nil { t.Fatalf("probeHostSnapshot() error = %v", err) } if hostVersion != "0.1.126" || !capabilities.Groups || !capabilities.Subscriptions { t.Fatalf("probeHostSnapshot() = (%q, %+v), want supported host snapshot", hostVersion, capabilities) } selfServiceRunner := batchImportRuntimeRunner{ request: CreateBatchImportRunRequest{ AccessMode: provision.AccessModeSelfService, ProbeAPIKey: " probe-key ", }, } apiKey, err := selfServiceRunner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{}) if err != nil { t.Fatalf("resolveValidationAPIKey(self_service) error = %v", err) } if apiKey != "probe-key" { t.Fatalf("resolveValidationAPIKey(self_service) = %q, want probe-key", apiKey) } subscriptionRunner := batchImportRuntimeRunner{ request: CreateBatchImportRunRequest{ AccessMode: provision.AccessModeSubscription, SubscriptionDays: 30, }, } if _, err := subscriptionRunner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{}); err == nil || err.Error() != "subscription_users is required" { t.Fatalf("resolveValidationAPIKey(subscription missing users) error = %v, want subscription_users is required", err) } unsupportedRunner := batchImportRuntimeRunner{ request: CreateBatchImportRunRequest{ AccessMode: "other", }, } if _, err := unsupportedRunner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{}); err == nil || !strings.Contains(err.Error(), `unsupported access mode "other"`) { t.Fatalf("resolveValidationAPIKey(unsupported) error = %v, want unsupported access mode", err) } } func TestResolveManagedResourceHostIDAndConfirmItem(t *testing.T) { t.Parallel() if _, err := resolveManagedResourceHostID(context.Background(), nil, sqlite.ImportRunItem{}, "account"); err == nil || err.Error() != "store is required" { t.Fatalf("resolveManagedResourceHostID(nil store) error = %v, want store is required", err) } if _, err := resolveManagedResourceHostID(context.Background(), openAppTestStore(t), sqlite.ImportRunItem{}, "account"); err == nil || err.Error() != "legacy_batch_id is required for account lookup" { t.Fatalf("resolveManagedResourceHostID(missing batch) error = %v, want legacy_batch_id required", err) } server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/api/v1/admin/accounts/account-ok/test": w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":true,\"status\":\"passed\",\"message\":\"smoke passed\"}\n\n")) case "/api/v1/admin/accounts/account-http/test": w.WriteHeader(http.StatusForbidden) _, _ = w.Write([]byte(`{"error":"forbidden"}`)) case "/api/v1/admin/accounts/account-busy/test": w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":false,\"status\":\"failed\",\"message\":\"No available accounts\"}\n\n")) case "/api/v1/admin/accounts/account-forbidden/test": w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":false,\"status\":\"failed\",\"message\":\"Forbidden by upstream\"}\n\n")) case "/api/v1/admin/accounts/account-bad/test": w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":false,\"status\":\"failed\",\"message\":\"model mismatch\"}\n\n")) default: http.NotFound(w, r) } })) defer server.Close() client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "apikey", Token: "host-token"}) if err != nil { t.Fatalf("newSub2APIClient() error = %v", err) } tests := []struct { name string accountID string wantStatus int wantMsg string }{ {name: "probe ok", accountID: "account-ok", wantStatus: http.StatusOK, wantMsg: "smoke passed"}, {name: "http error passthrough", accountID: "account-http", wantStatus: http.StatusForbidden, wantMsg: `{"error":"forbidden"}`}, {name: "busy advisory", accountID: "account-busy", wantStatus: http.StatusServiceUnavailable, wantMsg: "No available accounts"}, {name: "forbidden advisory", accountID: "account-forbidden", wantStatus: http.StatusForbidden, wantMsg: "Forbidden by upstream"}, {name: "generic bad request", accountID: "account-bad", wantStatus: http.StatusBadRequest, wantMsg: "model mismatch"}, } for _, tc := range tests { tc := tc t.Run(tc.name, func(t *testing.T) { store := openAppTestStore(t) defer closeAppTestStore(t, store) hostPK := createAppHostRecord(t, store, server.URL) packPK := createAppPackRecord(t, store) providerPK := createAppProviderRecord(t, store, packPK) batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK) if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{ BatchID: batchID, HostID: hostPK, ResourceType: "account", HostResourceID: tc.accountID, ResourceName: tc.accountID, }); err != nil { t.Fatalf("ManagedResources().Create() error = %v", err) } runner := batchImportRuntimeRunner{ store: store, hostClient: client, } result, err := runner.confirmItem(context.Background(), sqlite.ImportRunItem{ LegacyBatchID: &batchID, ResolvedSmokeModel: "kimi-k2.6", }) if err != nil { t.Fatalf("confirmItem() error = %v", err) } if result.StatusCode != tc.wantStatus || result.Message != tc.wantMsg { t.Fatalf("confirmItem() = %+v, want status=%d message=%q", result, tc.wantStatus, tc.wantMsg) } }) } } func TestBatchImportRunItemStoreTryAcquireLease(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-lease-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running", }) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-lease-1", RunID: "run-lease-1", BaseURL: "https://deepseek.example.com/v1", ProviderID: "deepseek", APIKeyFingerprint: "sha256:lease", ResolvedSmokeModel: "deepseek-v4-pro", CurrentStage: "confirm", ConfirmationStatus: "pending", AccessStatus: "unknown", MatchedAccountState: "active", AccountResolution: "created", }) leaseStore := batchImportRunItemStore{store: store, runID: "run-lease-1"} now := time.Date(2026, 5, 23, 10, 0, 0, 0, time.UTC) if _, _, err := leaseStore.TryAcquireLease(context.Background(), "", "worker-1", now, time.Minute); err == nil || err.Error() != "item_id is required" { t.Fatalf("TryAcquireLease(missing item) error = %v, want item_id is required", err) } if _, _, err := leaseStore.TryAcquireLease(context.Background(), "item-lease-1", "", now, time.Minute); err == nil || err.Error() != "worker_id is required" { t.Fatalf("TryAcquireLease(missing worker) error = %v, want worker_id is required", err) } item, claimed, err := leaseStore.TryAcquireLease(context.Background(), "item-lease-1", "worker-1", now, time.Minute) if err != nil { t.Fatalf("TryAcquireLease(first) error = %v", err) } if !claimed || item.LeaseOwner != "worker-1" { t.Fatalf("TryAcquireLease(first) = (%+v, %v), want claimed by worker-1", item, claimed) } item, claimed, err = leaseStore.TryAcquireLease(context.Background(), "item-lease-1", "worker-2", now.Add(30*time.Second), time.Minute) if err != nil { t.Fatalf("TryAcquireLease(second) error = %v", err) } if claimed || item.ItemID != "" { t.Fatalf("TryAcquireLease(second) = (%+v, %v), want unclaimed empty item", item, claimed) } } func TestDriveRunValidationAndRetryBranches(t *testing.T) { t.Parallel() t.Run("validate stage completes run", func(t *testing.T) { t.Parallel() server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/v1/chat/completions": writeJSON(w, http.StatusOK, map[string]any{ "id": "chatcmpl_validate", "choices": []map[string]any{{ "index": 0, "message": map[string]any{ "role": "assistant", "content": "pong", }, }}, }) default: http.NotFound(w, r) } })) defer server.Close() client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "bearer", Token: "gateway-key"}) if err != nil { t.Fatalf("newSub2APIClient() error = %v", err) } store := openAppTestStore(t) defer closeAppTestStore(t, store) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-validate-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running", TotalItems: 1, }) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-validate-1", RunID: "run-validate-1", BaseURL: server.URL, ProviderID: "deepseek", APIKeyFingerprint: "sha256:validate", ResolvedSmokeModel: "deepseek-v4-pro", CurrentStage: "validate", ConfirmationStatus: "confirmed", AccessStatus: "unknown", MatchedAccountState: "active", AccountResolution: "created", }) runner := batchImportRuntimeRunner{ store: store, hostClient: client, request: CreateBatchImportRunRequest{ AccessMode: provision.AccessModeSelfService, ProbeAPIKey: "gateway-key", }, } if err := runner.driveRun(context.Background(), "run-validate-1", 0); err != nil { t.Fatalf("driveRun(validate) error = %v", err) } run, err := store.ImportRuns().GetByRunID(context.Background(), "run-validate-1") if err != nil { t.Fatalf("ImportRuns().GetByRunID() error = %v", err) } if run.State != "completed" || run.CompletedItems != 1 || run.ActiveItems != 1 { t.Fatalf("run = %+v, want completed with one active item", run) } item, err := store.ImportRunItems().GetByItemID(context.Background(), "item-validate-1") if err != nil { t.Fatalf("ImportRunItems().GetByItemID() error = %v", err) } if item.CurrentStage != "done" || item.AccessStatus != "active" { t.Fatalf("item = %+v, want done/active", item) } }) t.Run("confirm stage schedules retry when wait budget is zero", func(t *testing.T) { t.Parallel() server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/api/v1/admin/accounts/account-busy/test": w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":false,\"status\":\"failed\",\"message\":\"No available accounts\"}\n\n")) default: http.NotFound(w, r) } })) defer server.Close() client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "bearer", Token: "gateway-key"}) if err != nil { t.Fatalf("newSub2APIClient() error = %v", err) } store := openAppTestStore(t) defer closeAppTestStore(t, store) hostPK := createAppHostRecord(t, store, server.URL) packPK := createAppPackRecord(t, store) providerPK := createAppProviderRecord(t, store, packPK) legacyBatchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK) if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{ BatchID: legacyBatchID, HostID: hostPK, ResourceType: "account", HostResourceID: "account-busy", ResourceName: "account-busy", }); err != nil { t.Fatalf("ManagedResources().Create(account) error = %v", err) } mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-confirm-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running", TotalItems: 1, }) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-confirm-1", RunID: "run-confirm-1", BaseURL: server.URL, ProviderID: "deepseek", APIKeyFingerprint: "sha256:confirm", ResolvedSmokeModel: "deepseek-v4-pro", CurrentStage: "confirm", ConfirmationStatus: "pending", AccessStatus: "unknown", MatchedAccountState: "active", AccountResolution: "created", LegacyBatchID: &legacyBatchID, CapabilityProfileJSON: `{"transport_profile":{"known_advisories":[]}}`, }) runner := batchImportRuntimeRunner{ store: store, hostClient: client, request: CreateBatchImportRunRequest{ AccessMode: provision.AccessModeSelfService, ProbeAPIKey: "gateway-key", }, } if err := runner.driveRun(context.Background(), "run-confirm-1", 0); err != nil { t.Fatalf("driveRun(confirm retry) error = %v", err) } run, err := store.ImportRuns().GetByRunID(context.Background(), "run-confirm-1") if err != nil { t.Fatalf("ImportRuns().GetByRunID() error = %v", err) } if run.State != "running" || run.CompletedItems != 0 { t.Fatalf("run = %+v, want still running with no completed items", run) } item, err := store.ImportRunItems().GetByItemID(context.Background(), "item-confirm-1") if err != nil { t.Fatalf("ImportRunItems().GetByItemID() error = %v", err) } if item.CurrentStage != "confirm" || item.RetryCount != 1 || strings.TrimSpace(item.NextRetryAt) == "" { t.Fatalf("item = %+v, want confirm stage with scheduled retry", item) } }) } func TestResolveValidationAPIKeySubscriptionSuccess(t *testing.T) { t.Parallel() server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && strings.HasPrefix(r.URL.RequestURI(), "/api/v1/admin/users?"): writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{}}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84, "email": "relay-sub-user-1@sub2api.local"}}) case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/users/84": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users/84/balance": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/subscriptions/assign": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 401}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/auth/login": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"access_token": "user-jwt"}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/keys": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 501, "key": "sk-relay-key", "name": "managed-key"}}) case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/api-keys/501": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"api_key": map[string]any{"id": 501}}}) default: http.NotFound(w, r) } })) defer server.Close() client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "bearer", Token: "admin-token"}) if err != nil { t.Fatalf("newSub2APIClient() error = %v", err) } store := openAppTestStore(t) defer closeAppTestStore(t, store) hostPK := createAppHostRecord(t, store, server.URL) packPK := createAppPackRecord(t, store) providerPK := createAppProviderRecord(t, store, packPK) batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK) if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{ BatchID: batchID, HostID: hostPK, ResourceType: "group", HostResourceID: "101", ResourceName: "group-101", }); err != nil { t.Fatalf("ManagedResources().Create(group) error = %v", err) } runner := batchImportRuntimeRunner{ store: store, hostClient: client, request: CreateBatchImportRunRequest{ AccessMode: provision.AccessModeSubscription, SubscriptionUsers: []string{"crm-user-1"}, SubscriptionDays: 30, }, } apiKey, err := runner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{ LegacyBatchID: &batchID, }) if err != nil { t.Fatalf("resolveValidationAPIKey(subscription) error = %v", err) } if !strings.HasPrefix(apiKey, "sk-relay-") { t.Fatalf("resolveValidationAPIKey(subscription) = %q, want managed sk-relay-* key", apiKey) } } func TestResolveValidationAPIKeySubscriptionRequiresStoredGroup(t *testing.T) { t.Parallel() server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && strings.HasPrefix(r.URL.RequestURI(), "/api/v1/admin/users?"): writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{}}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84, "email": "relay-sub-user-1@sub2api.local"}}) case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/users/84": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users/84/balance": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/subscriptions/assign": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 401}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/auth/login": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"access_token": "user-jwt"}}) case r.Method == http.MethodPost && r.URL.Path == "/api/v1/keys": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 501, "key": "sk-relay-key", "name": "managed-key"}}) case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/api-keys/501": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"api_key": map[string]any{"id": 501}}}) default: http.NotFound(w, r) } })) defer server.Close() client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "bearer", Token: "admin-token"}) if err != nil { t.Fatalf("newSub2APIClient() error = %v", err) } store := openAppTestStore(t) defer closeAppTestStore(t, store) hostPK := createAppHostRecord(t, store, server.URL) packPK := createAppPackRecord(t, store) providerPK := createAppProviderRecord(t, store, packPK) batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK) runner := batchImportRuntimeRunner{ store: store, hostClient: client, request: CreateBatchImportRunRequest{ AccessMode: provision.AccessModeSubscription, SubscriptionUsers: []string{"crm-user-1"}, SubscriptionDays: 30, }, } _, err = runner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{ LegacyBatchID: &batchID, }) if err == nil || err.Error() != fmt.Sprintf("%s resource not found for batch %d", "group", batchID) { t.Fatalf("resolveValidationAPIKey(subscription missing group) error = %v, want missing group", err) } } func TestCreateHostAuthFromLegacyFields(t *testing.T) { t.Parallel() if got := createHostAuthFromLegacyFields(" api-key ", " bearer-token "); got.Type != "bearer" || got.Token != "bearer-token" { t.Fatalf("createHostAuthFromLegacyFields() = %+v, want bearer token", got) } if got := createHostAuthFromLegacyFields(" api-key ", ""); got.Type != "apikey" || got.Token != "api-key" { t.Fatalf("createHostAuthFromLegacyFields() = %+v, want apikey", got) } } func TestHostRecordToInfoAndPackRecordToInfo(t *testing.T) { t.Parallel() hostInfo := hostRecordToInfo(sqlite.Host{ HostID: "host-1", BaseURL: "https://sub2api.example.com", HostVersion: "0.1.126", AuthType: "apikey", CapabilityProbeJSON: `{"groups":true,"channels":true,"plans":true,"accounts":true,"account_test":true,"account_models":true,"subscriptions":true}`, }) if hostInfo.Status != "supported" || hostInfo.Capabilities == nil || !hostInfo.Capabilities.Subscriptions { t.Fatalf("hostRecordToInfo() = %+v, want supported capabilities", hostInfo) } hostInfo = hostRecordToInfo(sqlite.Host{ HostID: "host-2", BaseURL: "https://bad.example.com", CapabilityProbeJSON: "{", }) if hostInfo.Capabilities != nil || hostInfo.Status != "" { t.Fatalf("hostRecordToInfo(invalid json) = %+v, want nil capabilities and empty status", hostInfo) } packInfo := packRecordToInfo(sqlite.Pack{ PackID: "openai-cn-pack", Version: "1.0.0", Vendor: "OpenAI CN", TargetHost: "sub2api", MinHostVersion: "0.1.126", MaxHostVersion: "0.2.x", }) if packInfo.PackID != "openai-cn-pack" || packInfo.TargetHost != "sub2api" { t.Fatalf("packRecordToInfo() = %+v, want projected pack info", packInfo) } } func TestDeriveAccessStatusAndDefaultPositiveInt(t *testing.T) { t.Parallel() if got := deriveAccessStatus(sub2api.GatewayAccessResult{OK: true, HasExpectedModel: true, CompletionOK: true}); got != provision.AccessStatusSubscriptionReady { t.Fatalf("deriveAccessStatus(ready) = %q, want %q", got, provision.AccessStatusSubscriptionReady) } if got := deriveAccessStatus(sub2api.GatewayAccessResult{OK: true, HasExpectedModel: true, CompletionOK: false}); got != provision.AccessStatusBroken { t.Fatalf("deriveAccessStatus(broken) = %q, want %q", got, provision.AccessStatusBroken) } if got := defaultPositiveInt(3, 9); got != 3 { t.Fatalf("defaultPositiveInt(3, 9) = %d, want 3", got) } if got := defaultPositiveInt(0, 9); got != 9 { t.Fatalf("defaultPositiveInt(0, 9) = %d, want 9", got) } } func TestMatchesItemFilters(t *testing.T) { t.Parallel() view := batch.ItemSummaryProjection{ ItemID: "item-1", BaseURL: "https://kimi.example.com/v1", ProviderID: "kimi-a7m", CurrentStage: "done", ConfirmationStatus: "advisory", AccessStatus: "active", MatchedAccountState: "active", AccountResolution: "reused", AdvisoryMessages: []string{"warning"}, } hasWarning := true if !matchesItemFilters(view, ListBatchImportRunItemsRequest{ CurrentStage: "done", ConfirmationStatus: "advisory", AccessStatus: "active", ProviderID: "kimi-a7m", MatchedAccountState: "active", AccountResolution: "reused", Query: "kimi", HasWarning: &hasWarning, }) { t.Fatal("matchesItemFilters() = false, want match") } noWarning := false if matchesItemFilters(view, ListBatchImportRunItemsRequest{HasWarning: &noWarning}) { t.Fatal("matchesItemFilters(has_warning=false) = true, want false") } if matchesItemFilters(view, ListBatchImportRunItemsRequest{Query: "other"}) { t.Fatal("matchesItemFilters(query=other) = true, want false") } } func TestBuildGetBatchImportRunActionAndGetItemAction(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running", }) mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:00' WHERE run_id = 'run-1'`) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-1", RunID: "run-1", BaseURL: "https://kimi.example.com/v1", ProviderID: "kimi-a7m", APIKeyFingerprint: "sha256:1", CurrentStage: "done", ConfirmationStatus: "confirmed", AccessStatus: "active", MatchedAccountState: "active", AccountResolution: "reused", AdvisoryMessagesJSON: `["gateway_warmup_retry_succeeded"]`, }) action := buildGetBatchImportRunAction(appTestDSN(t, store)) run, err := action(context.Background(), "run-1") if err != nil { t.Fatalf("buildGetBatchImportRunAction() error = %v", err) } if run.RunID != "run-1" { t.Fatalf("run.RunID = %q, want run-1", run.RunID) } if _, err := action(context.Background(), "missing"); err == nil || err.Error() != "run not found: missing" { t.Fatalf("missing run error = %v, want run not found", err) } itemAction := buildGetBatchImportRunItemAction(appTestDSN(t, store)) item, err := itemAction(context.Background(), GetBatchImportRunItemRequest{RunID: "run-1", ItemID: "item-1"}) if err != nil { t.Fatalf("buildGetBatchImportRunItemAction() error = %v", err) } if item.ItemID != "item-1" || item.AccountResolution != "reused" { t.Fatalf("item = %+v, want projected item", item) } if _, err := itemAction(context.Background(), GetBatchImportRunItemRequest{RunID: "run-2", ItemID: "item-1"}); err == nil || err.Error() != "item not found in run run-2" { t.Fatalf("wrong run error = %v, want item not found in run", err) } } func TestResolveProvidersForQueryAndLatestAccessStatus(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) host1, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-1", BaseURL: "https://one.example.com", HostVersion: "0.1.126", AuthToken: "token-1", }) if err != nil { t.Fatalf("Hosts().Create(host1) error = %v", err) } host2, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-2", BaseURL: "https://two.example.com", HostVersion: "0.1.126", AuthToken: "token-2", }) if err != nil { t.Fatalf("Hosts().Create(host2) error = %v", err) } packID, err := store.Packs().Create(context.Background(), sqlite.Pack{ PackID: "openai-cn-pack", Version: "1.0.0", Checksum: "checksum-1", }) if err != nil { t.Fatalf("Packs().Create() error = %v", err) } providerID, err := store.Providers().Create(context.Background(), sqlite.Provider{ PackID: packID, ProviderID: "deepseek", DisplayName: "DeepSeek", BaseURL: "https://api.example.com", Platform: "openai", }) if err != nil { t.Fatalf("Providers().Create() error = %v", err) } batch1, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ HostID: host1, PackID: packID, ProviderID: providerID, Mode: provision.ImportModePartial, BatchStatus: provision.BatchStatusSucceeded, AccessStatus: provision.AccessStatusSelfServiceReady, }) if err != nil { t.Fatalf("ImportBatches().Create(batch1) error = %v", err) } batch2, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ HostID: host2, PackID: packID, ProviderID: providerID, Mode: provision.ImportModeStrict, BatchStatus: provision.BatchStatusSucceeded, AccessStatus: provision.AccessStatusSubscriptionReady, }) if err != nil { t.Fatalf("ImportBatches().Create(batch2) error = %v", err) } if _, err := store.AccessClosures().Create(context.Background(), sqlite.AccessClosureRecord{ BatchID: batch1, ClosureType: provision.AccessModeSelfService, Status: provision.AccessStatusSelfServiceReady, }); err != nil { t.Fatalf("AccessClosures().Create(batch1) error = %v", err) } if _, err := store.AccessClosures().Create(context.Background(), sqlite.AccessClosureRecord{ BatchID: batch2, ClosureType: provision.AccessModeSubscription, Status: provision.AccessStatusSubscriptionReady, }); err != nil { t.Fatalf("AccessClosures().Create(batch2) error = %v", err) } if _, err := resolveProvidersForQuery(context.Background(), nil, ProviderQueryRequest{}); err == nil || err.Error() != "store is required" { t.Fatalf("resolveProvidersForQuery(nil store) error = %v, want store is required", err) } if _, err := resolveProvidersForQuery(context.Background(), store, ProviderQueryRequest{}); err == nil || err.Error() != "provider_id is required" { t.Fatalf("resolveProvidersForQuery(missing provider) error = %v, want provider_id is required", err) } providers, err := resolveProvidersForQuery(context.Background(), store, ProviderQueryRequest{ProviderID: "deepseek", PackID: "openai-cn-pack"}) if err != nil { t.Fatalf("resolveProvidersForQuery() error = %v", err) } if len(providers) != 1 || providers[0].ProviderID != "deepseek" { t.Fatalf("resolveProvidersForQuery() = %+v, want deepseek provider", providers) } if _, err := resolveLatestAccessStatus(context.Background(), nil, sqlite.Provider{}, ""); err == nil || err.Error() != "store is required" { t.Fatalf("resolveLatestAccessStatus(nil store) error = %v, want store is required", err) } status, err := resolveLatestAccessStatus(context.Background(), store, providers[0], "host-1") if err != nil { t.Fatalf("resolveLatestAccessStatus(host-1) error = %v", err) } if status != provision.AccessStatusSelfServiceReady { t.Fatalf("resolveLatestAccessStatus(host-1) = %q, want %q", status, provision.AccessStatusSelfServiceReady) } if _, err := resolveLatestAccessStatus(context.Background(), store, providers[0], ""); err == nil || err.Error() != "provider exists on multiple hosts; host_id is required" { t.Fatalf("resolveLatestAccessStatus(multi-host) error = %v, want multi-host error", err) } } func TestBuildListBatchImportRunsActionAndItemsAction(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running", TotalItems: 1, WarningItems: 1, ActiveItems: 1, BrokenItems: 0, DegradedItems: 0, }) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-2", HostID: "host-1", Mode: "strict", AccessMode: "subscription", State: "completed", }) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-3", HostID: "host-1", Mode: "strict", AccessMode: "subscription", State: "completed", }) mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:03' WHERE run_id = 'run-1'`) mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:02' WHERE run_id = 'run-2'`) mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:01' WHERE run_id = 'run-3'`) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-1", RunID: "run-1", BaseURL: "https://kimi.example.com/v1", ProviderID: "kimi-a7m", APIKeyFingerprint: "sha256:1", CurrentStage: "done", ConfirmationStatus: "advisory", AccessStatus: "active", MatchedAccountState: "active", AccountResolution: "reused", AdvisoryMessagesJSON: `["warning"]`, }) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-2", RunID: "run-2", BaseURL: "https://other.example.com/v1", ProviderID: "deepseek", APIKeyFingerprint: "sha256:2", CurrentStage: "confirm", ConfirmationStatus: "pending", AccessStatus: "broken", MatchedAccountState: "broken", AccountResolution: "created", AdvisoryMessagesJSON: `[]`, }) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-3", RunID: "run-3", BaseURL: "https://other.example.com/v1", ProviderID: "deepseek-backup", APIKeyFingerprint: "sha256:3", CurrentStage: "confirm", ConfirmationStatus: "pending", AccessStatus: "active", MatchedAccountState: "active", AccountResolution: "reused", AdvisoryMessagesJSON: `[]`, }) listRuns := buildListBatchImportRunsAction(appTestDSN(t, store)) runs, err := listRuns(context.Background(), ListBatchImportRunsRequest{ State: "completed", Query: "other.example.com", Limit: 1, }) if err != nil { t.Fatalf("buildListBatchImportRunsAction() error = %v", err) } if len(runs.Runs) != 1 || runs.Runs[0].RunID != "run-2" { t.Fatalf("runs = %+v, want [run-2]", runs.Runs) } if runs.NextCursor == nil || *runs.NextCursor != "run-3" { t.Fatalf("runs.NextCursor = %v, want run-3", runs.NextCursor) } listItems := buildListBatchImportRunItemsAction(appTestDSN(t, store)) hasWarning := true items, err := listItems(context.Background(), ListBatchImportRunItemsRequest{ RunID: "run-1", HasWarning: &hasWarning, Query: "kimi", Limit: 10, }) if err != nil { t.Fatalf("buildListBatchImportRunItemsAction() error = %v", err) } if len(items.Items) != 1 || items.Items[0].ItemID != "item-1" { t.Fatalf("items = %+v, want [item-1]", items.Items) } if items.NextCursor != nil { t.Fatalf("items.NextCursor = %v, want nil", items.NextCursor) } if _, err := listItems(context.Background(), ListBatchImportRunItemsRequest{RunID: "missing"}); err == nil || err.Error() != "run not found: missing" { t.Fatalf("missing items run error = %v, want run not found", err) } } func appTestDSN(t *testing.T, store *sqlite.DB) string { t.Helper() row := store.SQLDB().QueryRow(`PRAGMA database_list`) var seq int var name string var file string if err := row.Scan(&seq, &name, &file); err != nil { t.Fatalf("PRAGMA database_list scan error = %v", err) } return "file:" + file + "?_busy_timeout=5000&_pragma=foreign_keys(0)" } func mustCreateAppImportRun(t *testing.T, store *sqlite.DB, run sqlite.ImportRun) { t.Helper() if err := store.ImportRuns().Create(context.Background(), run); err != nil { t.Fatalf("ImportRuns().Create() error = %v", err) } } func mustCreateAppImportRunItem(t *testing.T, store *sqlite.DB, item sqlite.ImportRunItem) { t.Helper() if err := store.ImportRunItems().Create(context.Background(), item); err != nil { t.Fatalf("ImportRunItems().Create() error = %v", err) } } func mustExecSQL(t *testing.T, store *sqlite.DB, query string, args ...any) { t.Helper() if _, err := store.SQLDB().Exec(query, args...); err != nil { t.Fatalf("Exec(%q) error = %v", query, err) } } func createAppHostRecord(t *testing.T, store *sqlite.DB, baseURL string) int64 { t.Helper() id, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-" + sanitizeAppTestName(t.Name()), BaseURL: baseURL, HostVersion: "0.1.126", AuthType: "apikey", AuthToken: "host-token", }) if err != nil { t.Fatalf("Hosts().Create() error = %v", err) } return id } func createAppPackRecord(t *testing.T, store *sqlite.DB) int64 { t.Helper() id, err := store.Packs().Create(context.Background(), sqlite.Pack{ PackID: "pack-" + sanitizeAppTestName(t.Name()), Version: "1.0.0", Checksum: "checksum-" + sanitizeAppTestName(t.Name()), }) if err != nil { t.Fatalf("Packs().Create() error = %v", err) } return id } func createAppProviderRecord(t *testing.T, store *sqlite.DB, packID int64) int64 { t.Helper() id, err := store.Providers().Create(context.Background(), sqlite.Provider{ PackID: packID, ProviderID: "provider-" + sanitizeAppTestName(t.Name()), DisplayName: "Provider", BaseURL: "https://provider.example.com", Platform: "openai", }) if err != nil { t.Fatalf("Providers().Create() error = %v", err) } return id } func createAppBatchRecord(t *testing.T, store *sqlite.DB, hostID, packID, providerID int64) int64 { t.Helper() id, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ HostID: hostID, PackID: packID, ProviderID: providerID, Mode: provision.ImportModePartial, BatchStatus: provision.BatchStatusSucceeded, AccessStatus: provision.AccessStatusSelfServiceReady, }) if err != nil { t.Fatalf("ImportBatches().Create() error = %v", err) } return id } func sanitizeAppTestName(name string) string { var b strings.Builder for _, c := range name { switch { case c >= 'a' && c <= 'z', c >= '0' && c <= '9': b.WriteRune(c) case c >= 'A' && c <= 'Z': b.WriteRune(c + ('a' - 'A')) } } if b.Len() == 0 { return "default" } return b.String() } func TestResolveManagedHostAndNewSub2APIClient(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) if _, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-1", BaseURL: "https://sub2api.example.com", HostVersion: "0.1.126", AuthType: "", AuthToken: "host-token", }); err != nil { t.Fatalf("Hosts().Create() error = %v", err) } hostRow, client, err := resolveManagedHost(context.Background(), store, "host-1", "", CreateHostAuth{}) if err != nil { t.Fatalf("resolveManagedHost() error = %v", err) } if hostRow.HostID != "host-1" || client == nil { t.Fatalf("resolveManagedHost() = (%+v, %v), want host-1 and client", hostRow, client) } if _, _, err := resolveManagedHost(context.Background(), store, "host-1", "https://other.example.com", CreateHostAuth{}); err == nil || !strings.Contains(err.Error(), `host "host-1" base_url mismatch`) { t.Fatalf("resolveManagedHost(mismatch) error = %v, want mismatch", err) } if _, _, err := resolveManagedHost(context.Background(), store, "", "", CreateHostAuth{}); err == nil || err.Error() != "host_id is required" { t.Fatalf("resolveManagedHost(empty) error = %v, want host_id is required", err) } if auth := authFromStoredHost(sqlite.Host{AuthType: "", AuthToken: " token "}); auth.Type != "apikey" || auth.Token != "token" { t.Fatalf("authFromStoredHost(default) = %+v, want apikey/token", auth) } if _, err := newSub2APIClient("https://sub2api.example.com", CreateHostAuth{Type: "other", Token: "t"}); err == nil || !strings.Contains(err.Error(), `unsupported auth type "other"`) { t.Fatalf("newSub2APIClient(unsupported) error = %v, want unsupported auth type", err) } if _, err := newSub2APIClient("https://sub2api.example.com", CreateHostAuth{Type: "apikey"}); err == nil || !strings.Contains(err.Error(), "auth.token is required") { t.Fatalf("newSub2APIClient(missing token) error = %v, want auth.token is required", err) } } func TestHandleAssignAccessSubscriptionsAndAccessPreview(t *testing.T) { t.Parallel() t.Run("handleAssignAccessSubscriptions returns success", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/providers/deepseek/access/subscriptions", map[string]any{ "host_id": "host-1", "access_api_key": "user-key", "subscription_users": []string{"user-1"}, }, "") req.SetPathValue("providerID", "deepseek") rec := &responseRecorder{header: map[string][]string{}} handleAssignAccessSubscriptions(rec, req, func(_ context.Context, got AssignAccessSubscriptionsRequest) (AssignAccessSubscriptionsResult, error) { if got.ProviderID != "deepseek" || got.HostID != "host-1" || got.AccessAPIKey != "user-key" || len(got.SubscriptionUsers) != 1 || got.SubscriptionUsers[0] != "user-1" { t.Fatalf("AssignAccessSubscriptionsRequest = %+v, want projected request", got) } return AssignAccessSubscriptionsResult{Assigned: 1}, nil }) assertStatusCode(t, rec, 200) assertJSONContains(t, rec.Body().Bytes(), "assigned", float64(1)) }) t.Run("handleAccessPreview falls back to query values", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/providers/deepseek/access/preview?pack_id=openai-cn-pack&host_id=host-1", map[string]any{ "mode": "self_service", }, "") req.SetPathValue("providerID", "deepseek") rec := &responseRecorder{header: map[string][]string{}} handleAccessPreview(rec, req, func(_ context.Context, got AccessPreviewRequest) (AccessPreviewResult, error) { if got.ProviderID != "deepseek" || got.PackID != "openai-cn-pack" || got.HostID != "host-1" { t.Fatalf("AccessPreviewRequest = %+v, want query fallback values", got) } return AccessPreviewResult{Available: true, Mode: got.Mode}, nil }) assertStatusCode(t, rec, 200) assertJSONContains(t, rec.Body().Bytes(), "available", true) assertJSONContains(t, rec.Body().Bytes(), "mode", "self_service") }) } func TestAdditionalHTTPWrappers(t *testing.T) { t.Parallel() t.Run("handleListProviderImportBatches returns query-scoped payload", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "GET", "/providers/deepseek/import-batches?pack_id=openai-cn-pack&host_id=host-1", map[string]any{}, "") req.SetPathValue("providerID", "deepseek") rec := &responseRecorder{header: map[string][]string{}} handleListProviderImportBatches(rec, req, func(_ context.Context, got ProviderQueryRequest) ([]ImportBatchInfo, error) { if got.ProviderID != "deepseek" || got.PackID != "openai-cn-pack" || got.HostID != "host-1" { t.Fatalf("ProviderQueryRequest = %+v, want provider/pack/host filters", got) } return nil, nil }) assertStatusCode(t, rec, 200) batches, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "batches") if !ok || len(batches) != 0 { t.Fatalf("batches = %#v, want empty array", batches) } }) t.Run("handleRollbackBatch validates batch id", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/import-batches/bad/rollback", map[string]any{}, "") req.SetPathValue("batchID", "bad") rec := &responseRecorder{header: map[string][]string{}} handleRollbackBatch(rec, req, func(context.Context, RollbackBatchRequest) (provision.RollbackReport, error) { t.Fatal("rollback fn should not be called for invalid batch id") return provision.RollbackReport{}, nil }) assertStatusCode(t, rec, 400) assertJSONContains(t, rec.Body().Bytes(), "error.message", "batch_id must be a positive integer") }) t.Run("handleRollbackBatch returns summary", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/import-batches/17/rollback", map[string]any{}, "") req.SetPathValue("batchID", "17") rec := &responseRecorder{header: map[string][]string{}} handleRollbackBatch(rec, req, func(_ context.Context, got RollbackBatchRequest) (provision.RollbackReport, error) { if got.BatchID != 17 { t.Fatalf("RollbackBatchRequest.BatchID = %d, want 17", got.BatchID) } return provision.RollbackReport{AccountsDeleted: 2, PlansDeleted: 1, ChannelsDeleted: 1, GroupsDeleted: 1}, nil }) assertStatusCode(t, rec, 200) assertJSONContains(t, rec.Body().Bytes(), "batch_id", float64(17)) assertJSONContains(t, rec.Body().Bytes(), "deleted_accounts", float64(2)) }) t.Run("handleCreateHost returns payload", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/hosts", map[string]any{ "name": "host-1", "base_url": "https://sub2api.example.com", "auth": map[string]any{"type": "apikey", "token": "host-token"}, }, "") rec := &responseRecorder{header: map[string][]string{}} handleCreateHost(rec, req, func(_ context.Context, got CreateHostRequest) (HostInfo, error) { if got.Name != "host-1" || got.BaseURL != "https://sub2api.example.com" || got.Auth.Token != "host-token" { t.Fatalf("CreateHostRequest = %+v, want decoded request", got) } return HostInfo{HostID: got.Name, BaseURL: got.BaseURL, HostVersion: "0.1.126", AuthType: "apikey"}, nil }) assertStatusCode(t, rec, 200) assertJSONContains(t, rec.Body().Bytes(), "host_id", "host-1") assertJSONContains(t, rec.Body().Bytes(), "base_url", "https://sub2api.example.com") }) } func TestActionSetPackClosures(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) packID := createAppPackRecord(t, store) if _, err := store.Providers().Create(context.Background(), sqlite.Provider{ PackID: packID, ProviderID: "provider-a", DisplayName: "Provider A", BaseURL: "https://provider-a.example.com", Platform: "openai", }); err != nil { t.Fatalf("Providers().Create() error = %v", err) } actions := NewActionSet(appTestDSN(t, store)) packs, err := actions.ListPacks(context.Background()) if err != nil { t.Fatalf("ListPacks() error = %v", err) } if len(packs) != 1 || packs[0].PackID == "" { t.Fatalf("ListPacks() = %+v, want single persisted pack", packs) } packInfo, err := actions.GetPack(context.Background(), packs[0].PackID) if err != nil { t.Fatalf("GetPack() error = %v", err) } if packInfo.PackID != packs[0].PackID { t.Fatalf("GetPack() = %+v, want pack_id %q", packInfo, packs[0].PackID) } providers, err := actions.ListPackProviders(context.Background(), packs[0].PackID) if err != nil { t.Fatalf("ListPackProviders() error = %v", err) } if len(providers) != 1 || providers[0].ProviderID != "provider-a" { t.Fatalf("ListPackProviders() = %+v, want provider-a", providers) } } func TestActionSetCreateHostClosure(t *testing.T) { t.Parallel() server := httptest.NewServer(newBatchImportActionStubServer(t)) defer server.Close() store := openAppTestStore(t) defer closeAppTestStore(t, store) actions := NewActionSet(appTestDSN(t, store)) host, err := actions.CreateHost(context.Background(), CreateHostRequest{ Name: "prod-sub2api", BaseURL: server.URL, Auth: CreateHostAuth{Type: "apikey", Token: "host-token"}, }) if err != nil { t.Fatalf("CreateHost() error = %v", err) } if host.HostID != "prod-sub2api" || host.HostVersion != "0.1.126" || host.Status != "supported" { t.Fatalf("CreateHost() = %+v, want stored supported host", host) } stored, err := store.Hosts().GetByHostID(context.Background(), "prod-sub2api") if err != nil { t.Fatalf("Hosts().GetByHostID() error = %v", err) } if stored.BaseURL != server.URL || stored.AuthToken != "host-token" { t.Fatalf("stored host = %+v, want persisted connection details", stored) } } func TestActionSetListProviderImportBatchesClosure(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) hostA, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-a", BaseURL: "https://host-a.example.com", HostVersion: "0.1.126", AuthToken: "token-a", }) if err != nil { t.Fatalf("Hosts().Create(host-a) error = %v", err) } hostB, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-b", BaseURL: "https://host-b.example.com", HostVersion: "0.1.126", AuthToken: "token-b", }) if err != nil { t.Fatalf("Hosts().Create(host-b) error = %v", err) } packID := createAppPackRecord(t, store) providerID, err := store.Providers().Create(context.Background(), sqlite.Provider{ PackID: packID, ProviderID: "shared-provider", DisplayName: "Shared Provider", BaseURL: "https://provider.example.com", Platform: "openai", }) if err != nil { t.Fatalf("Providers().Create() error = %v", err) } if _, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ HostID: hostA, PackID: packID, ProviderID: providerID, Mode: provision.ImportModePartial, BatchStatus: provision.BatchStatusSucceeded, AccessStatus: provision.AccessStatusSelfServiceReady, }); err != nil { t.Fatalf("ImportBatches().Create(host-a) error = %v", err) } if _, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ HostID: hostB, PackID: packID, ProviderID: providerID, Mode: provision.ImportModeStrict, BatchStatus: provision.BatchStatusPartial, AccessStatus: provision.AccessStatusBroken, }); err != nil { t.Fatalf("ImportBatches().Create(host-b) error = %v", err) } actions := NewActionSet(appTestDSN(t, store)) if _, err := actions.ListProviderImportBatches(context.Background(), ProviderQueryRequest{ProviderID: "shared-provider"}); err == nil || err.Error() != "provider exists on multiple hosts; host_id is required" { t.Fatalf("ListProviderImportBatches(multi-host) error = %v, want host_id required", err) } batches, err := actions.ListProviderImportBatches(context.Background(), ProviderQueryRequest{ ProviderID: "shared-provider", HostID: "host-a", }) if err != nil { t.Fatalf("ListProviderImportBatches(host filter) error = %v", err) } if len(batches) != 1 || batches[0].BatchStatus != provision.BatchStatusSucceeded { t.Fatalf("ListProviderImportBatches(host filter) = %+v, want single succeeded batch", batches) } } func TestActionSetHostClosuresAndAccessPreview(t *testing.T) { t.Parallel() server := httptest.NewServer(newBatchImportActionStubServer(t)) defer server.Close() store := openAppTestStore(t) defer closeAppTestStore(t, store) hostID, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-main", BaseURL: server.URL, HostVersion: "0.0.1", CapabilityProbeJSON: "{}", AuthType: "apikey", AuthToken: "host-token", }) if err != nil { t.Fatalf("Hosts().Create() error = %v", err) } packID := createAppPackRecord(t, store) providerID, err := store.Providers().Create(context.Background(), sqlite.Provider{ PackID: packID, ProviderID: "preview-provider", DisplayName: "Preview Provider", BaseURL: "https://provider.example.com", Platform: "openai", }) if err != nil { t.Fatalf("Providers().Create() error = %v", err) } batchID, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ HostID: hostID, PackID: packID, ProviderID: providerID, Mode: provision.ImportModePartial, BatchStatus: provision.BatchStatusSucceeded, AccessStatus: provision.AccessStatusSelfServiceReady, }) if err != nil { t.Fatalf("ImportBatches().Create() error = %v", err) } if _, err := store.AccessClosures().Create(context.Background(), sqlite.AccessClosureRecord{ BatchID: batchID, ClosureType: provision.AccessModeSelfService, Status: provision.AccessStatusSelfServiceReady, DetailsJSON: `{}`, }); err != nil { t.Fatalf("AccessClosures().Create() error = %v", err) } actions := NewActionSet(appTestDSN(t, store)) hosts, err := actions.ListHosts(context.Background()) if err != nil { t.Fatalf("ListHosts() error = %v", err) } if len(hosts) != 1 || hosts[0].HostID != "host-main" { t.Fatalf("ListHosts() = %+v, want host-main", hosts) } hostInfo, err := actions.GetHost(context.Background(), "host-main") if err != nil { t.Fatalf("GetHost() error = %v", err) } if hostInfo.HostID != "host-main" || hostInfo.BaseURL != server.URL { t.Fatalf("GetHost() = %+v, want stored host", hostInfo) } probed, err := actions.ProbeHost(context.Background(), ProbeHostRequest{HostID: "host-main"}) if err != nil { t.Fatalf("ProbeHost() error = %v", err) } if probed.Status != "supported" || probed.HostVersion != "0.1.126" { t.Fatalf("ProbeHost() = %+v, want supported host 0.1.126", probed) } packRow, err := store.Packs().GetByID(context.Background(), packID) if err != nil { t.Fatalf("Packs().GetByID() error = %v", err) } preview, err := actions.AccessPreview(context.Background(), AccessPreviewRequest{ ProviderID: "preview-provider", PackID: packRow.PackID, HostID: "host-main", Mode: provision.AccessModeSelfService, }) if err != nil { t.Fatalf("AccessPreview(self_service) error = %v", err) } if !preview.Available { t.Fatalf("AccessPreview(self_service) = %+v, want available=true", preview) } preview, err = actions.AccessPreview(context.Background(), AccessPreviewRequest{ ProviderID: "preview-provider", PackID: packRow.PackID, HostID: "host-main", Mode: provision.AccessModeSubscription, }) if err != nil { t.Fatalf("AccessPreview(subscription) error = %v", err) } if preview.Available { t.Fatalf("AccessPreview(subscription) = %+v, want available=false", preview) } if err := actions.DeleteHost(context.Background(), "host-main"); err == nil { t.Fatal("DeleteHost() error = nil, want host_in_use conflict") } else { httpErr, ok := err.(*httpError) if !ok || httpErr.StatusCode != http.StatusConflict || httpErr.Code != "host_in_use" { t.Fatalf("DeleteHost() error = %T %v, want *httpError host_in_use conflict", err, err) } } } func TestAdditionalHostHTTPWrappers(t *testing.T) { t.Parallel() t.Run("handleListHosts returns empty array", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "GET", "/hosts", map[string]any{}, "") rec := &responseRecorder{header: map[string][]string{}} handleListHosts(rec, req, func(context.Context) ([]HostInfo, error) { return nil, nil }) assertStatusCode(t, rec, 200) hosts, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "hosts") if !ok || len(hosts) != 0 { t.Fatalf("hosts = %#v, want empty array", hosts) } }) t.Run("handleGetHost requires host id", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "GET", "/hosts/", map[string]any{}, "") rec := &responseRecorder{header: map[string][]string{}} handleGetHost(rec, req, func(context.Context, string) (HostInfo, error) { return HostInfo{}, nil }) assertStatusCode(t, rec, 400) assertJSONContains(t, rec.Body().Bytes(), "error.message", "host_id is required") }) t.Run("handleProbeHost returns payload", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/hosts/host-1/probe", map[string]any{ "auth": map[string]any{"type": "apikey", "token": "token"}, }, "") req.SetPathValue("hostID", "host-1") rec := &responseRecorder{header: map[string][]string{}} handleProbeHost(rec, req, func(_ context.Context, got ProbeHostRequest) (HostInfo, error) { if got.HostID != "host-1" || got.Auth.Token != "token" { t.Fatalf("ProbeHostRequest = %+v, want host-1/token", got) } return HostInfo{HostID: "host-1", HostVersion: "0.1.126", Status: "supported"}, nil }) assertStatusCode(t, rec, 200) assertJSONContains(t, rec.Body().Bytes(), "host_id", "host-1") assertJSONContains(t, rec.Body().Bytes(), "status", "supported") }) } func TestHandlerWrappersForPackAndHostRoutes(t *testing.T) { t.Parallel() t.Run("handleListPacks returns empty array", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "GET", "/packs", map[string]any{}, "") rec := &responseRecorder{header: map[string][]string{}} handleListPacks(rec, req, func(context.Context) ([]PackInfo, error) { return nil, nil }) assertStatusCode(t, rec, 200) packs, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "packs") if !ok || len(packs) != 0 { t.Fatalf("packs = %#v, want empty array", packs) } }) t.Run("handleGetPack requires pack id", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "GET", "/packs/", map[string]any{}, "") rec := &responseRecorder{header: map[string][]string{}} handleGetPack(rec, req, func(context.Context, string) (PackInfo, error) { return PackInfo{}, nil }) assertStatusCode(t, rec, 400) assertJSONContains(t, rec.Body().Bytes(), "error.message", "pack_id is required") }) t.Run("handleGetPack returns payload", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "GET", "/packs/openai-cn-pack", map[string]any{}, "") req.SetPathValue("packID", "openai-cn-pack") rec := &responseRecorder{header: map[string][]string{}} handleGetPack(rec, req, func(_ context.Context, packID string) (PackInfo, error) { if packID != "openai-cn-pack" { t.Fatalf("packID = %q, want openai-cn-pack", packID) } return PackInfo{PackID: "openai-cn-pack", Version: "1.0.0"}, nil }) assertStatusCode(t, rec, 200) assertJSONContains(t, rec.Body().Bytes(), "pack_id", "openai-cn-pack") }) t.Run("handleListPackProviders returns array", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "GET", "/packs/openai-cn-pack/providers", map[string]any{}, "") req.SetPathValue("packID", "openai-cn-pack") rec := &responseRecorder{header: map[string][]string{}} handleListPackProviders(rec, req, func(_ context.Context, packID string) ([]PackProviderInfo, error) { if packID != "openai-cn-pack" { t.Fatalf("packID = %q, want openai-cn-pack", packID) } return nil, nil }) assertStatusCode(t, rec, 200) providers, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "providers") if !ok || len(providers) != 0 { t.Fatalf("providers = %#v, want empty array", providers) } }) t.Run("handleCreateHost decodes request", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/hosts", map[string]any{ "name": "host-1", "base_url": "https://sub2api.example.com", "auth": map[string]any{"type": "apikey", "token": "host-token"}, }, "") rec := &responseRecorder{header: map[string][]string{}} handleCreateHost(rec, req, func(_ context.Context, req CreateHostRequest) (HostInfo, error) { if req.BaseURL != "https://sub2api.example.com" || req.Auth.Token != "host-token" { t.Fatalf("request = %+v, want decoded create host request", req) } return HostInfo{HostID: "host-1", BaseURL: req.BaseURL}, nil }) assertStatusCode(t, rec, 200) assertJSONContains(t, rec.Body().Bytes(), "host_id", "host-1") }) t.Run("handleProbeHost requires host id", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/hosts/probe", map[string]any{ "auth": map[string]any{"type": "apikey", "token": "host-token"}, }, "") rec := &responseRecorder{header: map[string][]string{}} handleProbeHost(rec, req, func(context.Context, ProbeHostRequest) (HostInfo, error) { return HostInfo{}, nil }) assertStatusCode(t, rec, 400) assertJSONContains(t, rec.Body().Bytes(), "error.message", "host_id is required") }) } func TestAdditionalHTTPWrapperErrorBranches(t *testing.T) { t.Parallel() t.Run("list and pack wrappers handle missing actions", func(t *testing.T) { t.Parallel() cases := []struct { name string path string run func(*responseRecorder, *http.Request) wantStatus int wantCode string }{ { name: "list packs", path: "/packs", run: func(rec *responseRecorder, req *http.Request) { handleListPacks(rec, req, nil) }, wantStatus: http.StatusInternalServerError, wantCode: "server_misconfigured", }, { name: "list provider import batches", path: "/providers/deepseek/import-batches", run: func(rec *responseRecorder, req *http.Request) { req.SetPathValue("providerID", "deepseek") handleListProviderImportBatches(rec, req, nil) }, wantStatus: http.StatusInternalServerError, wantCode: "server_misconfigured", }, { name: "get pack", path: "/packs/openai-cn-pack", run: func(rec *responseRecorder, req *http.Request) { req.SetPathValue("packID", "openai-cn-pack") handleGetPack(rec, req, nil) }, wantStatus: http.StatusInternalServerError, wantCode: "server_misconfigured", }, { name: "list pack providers", path: "/packs/openai-cn-pack/providers", run: func(rec *responseRecorder, req *http.Request) { req.SetPathValue("packID", "openai-cn-pack") handleListPackProviders(rec, req, nil) }, wantStatus: http.StatusInternalServerError, wantCode: "server_misconfigured", }, { name: "rollback batch", path: "/import-batches/11/rollback", run: func(rec *responseRecorder, req *http.Request) { req.SetPathValue("batchID", "11") handleRollbackBatch(rec, req, nil) }, wantStatus: http.StatusInternalServerError, wantCode: "server_misconfigured", }, { name: "probe host", path: "/hosts/host-1/probe", run: func(rec *responseRecorder, req *http.Request) { req.SetPathValue("hostID", "host-1") handleProbeHost(rec, req, nil) }, wantStatus: http.StatusInternalServerError, wantCode: "server_misconfigured", }, { name: "create host", path: "/hosts", run: func(rec *responseRecorder, req *http.Request) { handleCreateHost(rec, req, nil) }, wantStatus: http.StatusInternalServerError, wantCode: "server_misconfigured", }, } for _, tc := range cases { tc := tc t.Run(tc.name, func(t *testing.T) { req := httptestRequest(t, "POST", tc.path, map[string]any{}, "") rec := &responseRecorder{header: map[string][]string{}} tc.run(rec, req) assertStatusCode(t, rec, tc.wantStatus) assertJSONContains(t, rec.Body().Bytes(), "error.code", tc.wantCode) }) } }) t.Run("access wrappers handle missing actions and decode errors", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "POST", "/providers/deepseek/access/subscriptions", map[string]any{}, "") req.SetPathValue("providerID", "deepseek") rec := &responseRecorder{header: map[string][]string{}} handleAssignAccessSubscriptions(rec, req, nil) assertStatusCode(t, rec, http.StatusInternalServerError) assertJSONContains(t, rec.Body().Bytes(), "error.code", "server_misconfigured") req = httptestRequest(t, "POST", "/providers/deepseek/access/preview", map[string]any{}, "") req.SetPathValue("providerID", "deepseek") rec = &responseRecorder{header: map[string][]string{}} handleAccessPreview(rec, req, nil) assertStatusCode(t, rec, http.StatusInternalServerError) assertJSONContains(t, rec.Body().Bytes(), "error.code", "server_misconfigured") badReq, err := http.NewRequest(http.MethodPost, "/providers/deepseek/access/subscriptions", strings.NewReader("{")) if err != nil { t.Fatalf("http.NewRequest(assign bad json) error = %v", err) } badReq.SetPathValue("providerID", "deepseek") badReq.Header.Set("Content-Type", "application/json") rec = &responseRecorder{header: map[string][]string{}} handleAssignAccessSubscriptions(rec, badReq, func(context.Context, AssignAccessSubscriptionsRequest) (AssignAccessSubscriptionsResult, error) { t.Fatal("assign action should not be called for bad json") return AssignAccessSubscriptionsResult{}, nil }) assertStatusCode(t, rec, http.StatusBadRequest) assertJSONContains(t, rec.Body().Bytes(), "error.code", "bad_request") badReq, err = http.NewRequest(http.MethodPost, "/providers/deepseek/access/preview", strings.NewReader("{")) if err != nil { t.Fatalf("http.NewRequest(preview bad json) error = %v", err) } badReq.SetPathValue("providerID", "deepseek") badReq.Header.Set("Content-Type", "application/json") rec = &responseRecorder{header: map[string][]string{}} handleAccessPreview(rec, badReq, func(context.Context, AccessPreviewRequest) (AccessPreviewResult, error) { t.Fatal("access preview action should not be called for bad json") return AccessPreviewResult{}, nil }) assertStatusCode(t, rec, http.StatusBadRequest) assertJSONContains(t, rec.Body().Bytes(), "error.code", "bad_request") }) t.Run("pack and host wrappers classify action errors", func(t *testing.T) { t.Parallel() req := httptestRequest(t, "GET", "/packs/openai-cn-pack", map[string]any{}, "") req.SetPathValue("packID", "openai-cn-pack") rec := &responseRecorder{header: map[string][]string{}} handleGetPack(rec, req, func(context.Context, string) (PackInfo, error) { return PackInfo{}, sql.ErrNoRows }) assertStatusCode(t, rec, http.StatusInternalServerError) assertJSONContains(t, rec.Body().Bytes(), "error.code", "internal_error") req = httptestRequest(t, "POST", "/import-batches/11/rollback", map[string]any{}, "") req.SetPathValue("batchID", "11") rec = &responseRecorder{header: map[string][]string{}} handleRollbackBatch(rec, req, func(context.Context, RollbackBatchRequest) (provision.RollbackReport, error) { return provision.RollbackReport{}, fmt.Errorf("batch 11 not found") }) assertStatusCode(t, rec, http.StatusNotFound) assertJSONContains(t, rec.Body().Bytes(), "error.code", "not_found") req = httptestRequest(t, "POST", "/hosts/host-1/probe", map[string]any{ "auth": map[string]any{"type": "apikey", "token": "host-token"}, }, "") req.SetPathValue("hostID", "host-1") rec = &responseRecorder{header: map[string][]string{}} handleProbeHost(rec, req, func(context.Context, ProbeHostRequest) (HostInfo, error) { return HostInfo{}, fmt.Errorf("host probe decode failed") }) assertStatusCode(t, rec, http.StatusBadRequest) assertJSONContains(t, rec.Body().Bytes(), "error.code", "bad_request") req = httptestRequest(t, "POST", "/hosts", map[string]any{ "name": "host-1", }, "") rec = &responseRecorder{header: map[string][]string{}} handleCreateHost(rec, req, func(context.Context, CreateHostRequest) (HostInfo, error) { return HostInfo{}, fmt.Errorf("base_url is required") }) assertStatusCode(t, rec, http.StatusBadRequest) assertJSONContains(t, rec.Body().Bytes(), "error.code", "bad_request") }) } func TestActionSetBatchDetailAndProviderSnapshotClosures(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) hostPK := createAppHostRecord(t, store, "https://sub2api.example.com") packPK := createAppPackRecord(t, store) providerPK := createAppProviderRecord(t, store, packPK) batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK) if _, err := store.ImportBatchItems().Create(context.Background(), sqlite.ImportBatchItem{ BatchID: batchID, KeyFingerprint: "sha256:test", AccountStatus: "passed", }); err != nil { t.Fatalf("ImportBatchItems().Create() error = %v", err) } for _, resource := range []sqlite.ManagedResource{ {BatchID: batchID, HostID: hostPK, ResourceType: "group", HostResourceID: "group-1", ResourceName: "group-1"}, {BatchID: batchID, HostID: hostPK, ResourceType: "account", HostResourceID: "account-1", ResourceName: "account-1"}, } { if _, err := store.ManagedResources().Create(context.Background(), resource); err != nil { t.Fatalf("ManagedResources().Create(%s) error = %v", resource.ResourceType, err) } } if _, err := store.AccessClosures().Create(context.Background(), sqlite.AccessClosureRecord{ BatchID: batchID, ClosureType: provision.AccessModeSelfService, Status: provision.AccessStatusSelfServiceReady, DetailsJSON: `{"ok":true}`, }); err != nil { t.Fatalf("AccessClosures().Create() error = %v", err) } if _, err := store.ReconcileRuns().Create(context.Background(), sqlite.ReconcileRun{ BatchID: batchID, HostID: hostPK, ProviderID: providerPK, Status: "drifted", SummaryJSON: `{"missing_count":1}`, }); err != nil { t.Fatalf("ReconcileRuns().Create() error = %v", err) } packRow, err := store.Packs().GetByID(context.Background(), packPK) if err != nil { t.Fatalf("Packs().GetByID() error = %v", err) } providerRow, err := store.Providers().GetByID(context.Background(), providerPK) if err != nil { t.Fatalf("Providers().GetByID() error = %v", err) } hostRow, err := store.Hosts().GetByID(context.Background(), hostPK) if err != nil { t.Fatalf("Hosts().GetByID() error = %v", err) } actions := NewActionSet(appTestDSN(t, store)) detail, err := actions.BatchDetail(context.Background(), BatchDetailRequest{BatchID: batchID}) if err != nil { t.Fatalf("BatchDetail() error = %v", err) } if detail.Batch.ID != batchID || len(detail.Items) != 1 || len(detail.ManagedResources) != 2 || len(detail.AccessClosures) != 1 || len(detail.ReconcileRuns) != 1 { t.Fatalf("BatchDetail() = %+v, want populated batch detail", detail) } snapshotReq := ProviderQueryRequest{ ProviderID: providerRow.ProviderID, PackID: packRow.PackID, HostID: hostRow.HostID, } statusSnapshot, err := actions.GetProviderStatus(context.Background(), snapshotReq) if err != nil { t.Fatalf("GetProviderStatus() error = %v", err) } if statusSnapshot.Batch.ID != batchID || statusSnapshot.LatestAccessStatus != provision.AccessStatusSelfServiceReady || statusSnapshot.LatestReconcileStatus != "drifted" || statusSnapshot.ProviderStatus != provision.ProviderStatusActive { t.Fatalf("GetProviderStatus() = %+v, want active snapshot with drifted reconcile metadata", statusSnapshot) } if got := statusSnapshot.LatestReconcileSummary["missing_count"]; got != float64(1) { t.Fatalf("LatestReconcileSummary[missing_count] = %#v, want 1", got) } resourceSnapshot, err := actions.GetProviderResources(context.Background(), snapshotReq) if err != nil { t.Fatalf("GetProviderResources() error = %v", err) } if len(resourceSnapshot.ManagedResources) != 2 || len(resourceSnapshot.AccessClosures) != 1 || len(resourceSnapshot.ReconcileRuns) != 1 { t.Fatalf("GetProviderResources() = %+v, want persisted resources/closures/runs", resourceSnapshot) } accessSnapshot, err := actions.GetProviderAccessStatus(context.Background(), snapshotReq) if err != nil { t.Fatalf("GetProviderAccessStatus() error = %v", err) } if accessSnapshot.Batch.ID != batchID || accessSnapshot.LatestAccessStatus != provision.AccessStatusSelfServiceReady || len(accessSnapshot.AccessClosures) != 1 { t.Fatalf("GetProviderAccessStatus() = %+v, want latest access closure", accessSnapshot) } } func TestActionSetRollbackBatchClosure(t *testing.T) { t.Parallel() server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodDelete { t.Fatalf("method = %s, want DELETE", r.Method) } switch r.URL.Path { case "/api/v1/admin/accounts/account-1", "/api/v1/admin/payment/plans/plan-1", "/api/v1/admin/channels/channel-1", "/api/v1/admin/groups/group-1": w.WriteHeader(http.StatusNoContent) default: http.NotFound(w, r) } })) defer server.Close() store := openAppTestStore(t) defer closeAppTestStore(t, store) hostPK := createAppHostRecord(t, store, server.URL) packPK := createAppPackRecord(t, store) providerPK := createAppProviderRecord(t, store, packPK) batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK) for _, resource := range []sqlite.ManagedResource{ {BatchID: batchID, HostID: hostPK, ResourceType: "group", HostResourceID: "group-1", ResourceName: "group-1"}, {BatchID: batchID, HostID: hostPK, ResourceType: "channel", HostResourceID: "channel-1", ResourceName: "channel-1"}, {BatchID: batchID, HostID: hostPK, ResourceType: "plan", HostResourceID: "plan-1", ResourceName: "plan-1"}, {BatchID: batchID, HostID: hostPK, ResourceType: "account", HostResourceID: "account-1", ResourceName: "account-1"}, } { if _, err := store.ManagedResources().Create(context.Background(), resource); err != nil { t.Fatalf("ManagedResources().Create(%s) error = %v", resource.ResourceType, err) } } actions := NewActionSet(appTestDSN(t, store)) report, err := actions.RollbackBatch(context.Background(), RollbackBatchRequest{BatchID: batchID}) if err != nil { t.Fatalf("RollbackBatch() error = %v", err) } if report.AccountsDeleted != 1 || report.PlansDeleted != 1 || report.ChannelsDeleted != 1 || report.GroupsDeleted != 1 { t.Fatalf("RollbackBatch() = %+v, want one deleted resource per type", report) } batchRow, err := store.ImportBatches().GetByID(context.Background(), batchID) if err != nil { t.Fatalf("ImportBatches().GetByID() error = %v", err) } if batchRow.BatchStatus != provision.BatchStatusRolledBack { t.Fatalf("batch status = %q, want %q", batchRow.BatchStatus, provision.BatchStatusRolledBack) } } func TestActionSetCreateHostUpdateAndConflict(t *testing.T) { t.Parallel() t.Run("update existing host connection", func(t *testing.T) { t.Parallel() server := httptest.NewServer(newBatchImportActionStubServer(t)) defer server.Close() store := openAppTestStore(t) defer closeAppTestStore(t, store) if _, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "prod-sub2api", BaseURL: server.URL, HostVersion: "0.0.1", CapabilityProbeJSON: "{}", AuthType: "apikey", AuthToken: "old-token", }); err != nil { t.Fatalf("Hosts().Create() error = %v", err) } actions := NewActionSet(appTestDSN(t, store)) host, err := actions.CreateHost(context.Background(), CreateHostRequest{ Name: "prod-sub2api", BaseURL: server.URL, Auth: CreateHostAuth{Type: "apikey", Token: "host-token"}, }) if err != nil { t.Fatalf("CreateHost(update) error = %v", err) } if host.HostVersion != "0.1.126" || host.Status != "supported" { t.Fatalf("CreateHost(update) = %+v, want reprobed supported host", host) } stored, err := store.Hosts().GetByHostID(context.Background(), "prod-sub2api") if err != nil { t.Fatalf("Hosts().GetByHostID() error = %v", err) } if stored.AuthToken != "host-token" || stored.HostVersion != "0.1.126" { t.Fatalf("stored host = %+v, want updated token/version", stored) } }) t.Run("base url conflict returns 409", func(t *testing.T) { t.Parallel() server := httptest.NewServer(newBatchImportActionStubServer(t)) defer server.Close() store := openAppTestStore(t) defer closeAppTestStore(t, store) if _, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "existing-host", BaseURL: server.URL, HostVersion: "0.1.126", CapabilityProbeJSON: "{}", AuthType: "apikey", AuthToken: "host-token", }); err != nil { t.Fatalf("Hosts().Create() error = %v", err) } actions := NewActionSet(appTestDSN(t, store)) _, err := actions.CreateHost(context.Background(), CreateHostRequest{ Name: "new-host", BaseURL: server.URL, Auth: CreateHostAuth{Type: "apikey", Token: "host-token"}, }) if err == nil { t.Fatal("CreateHost(conflict) error = nil, want conflict") } httpErr, ok := err.(*httpError) if !ok || httpErr.StatusCode != http.StatusConflict { t.Fatalf("CreateHost(conflict) error = %T %v, want *httpError 409", err, err) } }) } func TestActionSetInstallPreviewAndRollbackProviderClosures(t *testing.T) { t.Parallel() baseHandler := newBatchImportActionStubServer(t) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodDelete { switch r.URL.Path { case "/api/v1/admin/accounts/account-1", "/api/v1/admin/payment/plans/plan-1", "/api/v1/admin/channels/channel-1", "/api/v1/admin/groups/group-1": w.WriteHeader(http.StatusNoContent) return } } baseHandler.ServeHTTP(w, r) })) defer server.Close() store := openAppTestStore(t) defer closeAppTestStore(t, store) hostPK := createAppHostRecord(t, store, server.URL) actions := NewActionSet(appTestDSN(t, store)) packPath := filepath.Join("..", "..", "packs", "openai-cn-pack") installResult, err := actions.InstallPack(context.Background(), InstallPackRequest{ HostBaseURL: server.URL, HostAPIKey: "host-token", PackPath: packPath, }) if err != nil { t.Fatalf("InstallPack() error = %v", err) } if installResult.Pack.PackID != "openai-cn-pack" || len(installResult.Providers) == 0 { t.Fatalf("InstallPack() = %+v, want persisted pack with providers", installResult) } preview, err := actions.PreviewProvider(context.Background(), PreviewProviderRequest{ HostBaseURL: server.URL, PackPath: packPath, ProviderID: "deepseek", Mode: provision.ImportModePartial, Keys: []string{" key-1 ", "key-2"}, }) if err != nil { t.Fatalf("PreviewProvider() error = %v", err) } if len(preview.AcceptedKeys) != 2 || preview.Decisions["group"].Action != provision.PreviewActionCreate { t.Fatalf("PreviewProvider() = %+v, want accepted keys and create decisions", preview) } packRow, err := store.Packs().GetByPackID(context.Background(), "openai-cn-pack") if err != nil { t.Fatalf("Packs().GetByPackID() error = %v", err) } providerRow, err := store.Providers().GetByPackIDAndProviderID(context.Background(), packRow.ID, "deepseek") if err != nil { t.Fatalf("Providers().GetByPackIDAndProviderID() error = %v", err) } batchID, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ HostID: hostPK, PackID: packRow.ID, ProviderID: providerRow.ID, Mode: provision.ImportModePartial, BatchStatus: provision.BatchStatusSucceeded, AccessStatus: provision.AccessStatusSelfServiceReady, }) if err != nil { t.Fatalf("ImportBatches().Create() error = %v", err) } for _, resource := range []sqlite.ManagedResource{ {BatchID: batchID, HostID: hostPK, ResourceType: "group", HostResourceID: "group-1", ResourceName: "group-1"}, {BatchID: batchID, HostID: hostPK, ResourceType: "channel", HostResourceID: "channel-1", ResourceName: "channel-1"}, {BatchID: batchID, HostID: hostPK, ResourceType: "plan", HostResourceID: "plan-1", ResourceName: "plan-1"}, {BatchID: batchID, HostID: hostPK, ResourceType: "account", HostResourceID: "account-1", ResourceName: "account-1"}, } { if _, err := store.ManagedResources().Create(context.Background(), resource); err != nil { t.Fatalf("ManagedResources().Create(%s) error = %v", resource.ResourceType, err) } } report, err := actions.RollbackProvider(context.Background(), RollbackProviderRequest{ HostBaseURL: server.URL, PackPath: packPath, ProviderID: "deepseek", }) if err != nil { t.Fatalf("RollbackProvider() error = %v", err) } if report.AccountsDeleted != 1 || report.PlansDeleted != 1 || report.ChannelsDeleted != 1 || report.GroupsDeleted != 1 { t.Fatalf("RollbackProvider() = %+v, want one deleted resource per type", report) } batchRow, err := store.ImportBatches().GetByID(context.Background(), batchID) if err != nil { t.Fatalf("ImportBatches().GetByID() error = %v", err) } if batchRow.BatchStatus != provision.BatchStatusRolledBack { t.Fatalf("batch status = %q, want %q", batchRow.BatchStatus, provision.BatchStatusRolledBack) } } func TestActionSetAssignAccessSubscriptionsClosure(t *testing.T) { t.Parallel() baseHandler := newBatchImportActionStubServer(t) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && strings.HasPrefix(r.URL.RequestURI(), "/api/v1/admin/users?"): writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{}}}) return case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84, "email": "relay-sub-user-1@sub2api.local"}}) return case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/users/84": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}}) return case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users/84/balance": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}}) return case r.Method == http.MethodPost && r.URL.Path == "/api/v1/auth/login": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"access_token": "user-jwt"}}) return case r.Method == http.MethodPost && r.URL.Path == "/api/v1/keys": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 501, "key": "sk-relay-test", "name": "managed-key"}}) return case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/api-keys/501": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"api_key": map[string]any{"id": 501}}}) return case r.URL.Path == "/v1/models" && strings.HasPrefix(strings.TrimSpace(r.Header.Get("Authorization")), "Bearer sk-relay-"): writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "deepseek-v4-pro"}}}) return case r.URL.Path == "/v1/chat/completions" && strings.HasPrefix(strings.TrimSpace(r.Header.Get("Authorization")), "Bearer sk-relay-"): writeJSON(w, http.StatusOK, map[string]any{ "id": "chatcmpl_subscription", "choices": []map[string]any{{ "index": 0, "message": map[string]any{ "role": "assistant", "content": "pong", }, }}, }) return } baseHandler.ServeHTTP(w, r) })) defer server.Close() store := openAppTestStore(t) defer closeAppTestStore(t, store) hostPK := createAppHostRecord(t, store, server.URL) actions := NewActionSet(appTestDSN(t, store)) packPath := filepath.Join("..", "..", "packs", "openai-cn-pack") if _, err := actions.InstallPack(context.Background(), InstallPackRequest{ HostBaseURL: server.URL, HostAPIKey: "host-token", PackPath: packPath, }); err != nil { t.Fatalf("InstallPack() error = %v", err) } packRow, err := store.Packs().GetByPackID(context.Background(), "openai-cn-pack") if err != nil { t.Fatalf("Packs().GetByPackID() error = %v", err) } providerRow, err := store.Providers().GetByPackIDAndProviderID(context.Background(), packRow.ID, "deepseek") if err != nil { t.Fatalf("Providers().GetByPackIDAndProviderID() error = %v", err) } batchID, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{ HostID: hostPK, PackID: packRow.ID, ProviderID: providerRow.ID, Mode: provision.ImportModePartial, BatchStatus: provision.BatchStatusSucceeded, AccessStatus: provision.AccessStatusSelfServiceReady, }) if err != nil { t.Fatalf("ImportBatches().Create() error = %v", err) } if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{ BatchID: batchID, HostID: hostPK, ResourceType: "group", HostResourceID: "101", ResourceName: "group-101", }); err != nil { t.Fatalf("ManagedResources().Create(group) error = %v", err) } result, err := actions.AssignAccessSubscriptions(context.Background(), AssignAccessSubscriptionsRequest{ HostBaseURL: server.URL, PackPath: packPath, ProviderID: "deepseek", AccessAPIKey: "caller-probe-key", SubscriptionUsers: []string{"crm-user-1"}, SubscriptionDays: 30, }) if err != nil { t.Fatalf("AssignAccessSubscriptions() error = %v", err) } if result.Assigned != 1 || result.AccessStatus != provision.AccessStatusSubscriptionReady { t.Fatalf("AssignAccessSubscriptions() = %+v, want one assigned subscription_ready result", result) } closures, err := store.AccessClosures().GetByBatchID(context.Background(), batchID) if err != nil { t.Fatalf("AccessClosures().GetByBatchID() error = %v", err) } if len(closures) != 1 || closures[0].Status != provision.AccessStatusSubscriptionReady { t.Fatalf("AccessClosures() = %+v, want persisted subscription_ready closure", closures) } var payload map[string]any if err := json.Unmarshal([]byte(closures[0].DetailsJSON), &payload); err != nil { t.Fatalf("decode access closure payload: %v", err) } if _, ok := payload["probe_api_key"]; ok { t.Fatalf("subscription access closure should omit probe_api_key, got %#v", payload["probe_api_key"]) } if got, _ := payload["requested_probe_api_key"].(string); got != "caller-probe-key" { t.Fatalf("requested_probe_api_key = %q, want caller-probe-key", got) } if got, _ := payload["effective_probe_key_source"].(string); got != "managed_subscription" { t.Fatalf("effective_probe_key_source = %q, want managed_subscription", got) } batchRow, err := store.ImportBatches().GetByID(context.Background(), batchID) if err != nil { t.Fatalf("ImportBatches().GetByID() error = %v", err) } if batchRow.AccessStatus != provision.AccessStatusSubscriptionReady { t.Fatalf("batch access_status = %q, want %q", batchRow.AccessStatus, provision.AccessStatusSubscriptionReady) } } func TestActionSetImportAndReconcileProviderClosures(t *testing.T) { t.Parallel() baseHandler := newBatchImportActionStubServer(t) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/api/v1/admin/groups": if r.Method == http.MethodGet { writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "group_1", "name": "DeepSeek 默认分组-self-service"}}}) return } if r.Method == http.MethodPut { writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": "group_1", "name": "DeepSeek 默认分组-self-service"}}) return } case "/api/v1/admin/channels": if r.Method == http.MethodGet { writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "channel_1", "name": "DeepSeek 默认渠道-self-service"}}}) return } case "/api/v1/admin/groups/group_1": if r.Method == http.MethodPut { writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": "group_1", "name": "DeepSeek 默认分组-self-service"}}) return } case "/api/v1/admin/channels/channel_1": if r.Method == http.MethodPut { writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": "channel_1", "name": "DeepSeek 默认渠道-self-service"}}) return } case "/api/v1/admin/accounts": if r.Method == http.MethodGet { writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{{"id": "account_1", "name": "deepseek-01"}}, "pages": 1}}) return } case "/api/v1/admin/accounts/account_1/models": writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{{"id": "deepseek-v4-pro", "display_name": "DeepSeek V4 Pro", "type": "chat"}}}}) return case "/api/v1/admin/accounts/account_1": if r.Method == http.MethodDelete { w.WriteHeader(http.StatusNoContent) return } case "/v1/models": switch strings.TrimSpace(r.Header.Get("Authorization")) { case "Bearer entry-key", "Bearer gateway-key": writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "deepseek-v4-pro"}}}) return } case "/v1/chat/completions": switch strings.TrimSpace(r.Header.Get("Authorization")) { case "Bearer entry-key", "Bearer gateway-key": writeJSON(w, http.StatusOK, map[string]any{ "id": "chatcmpl_deepseek", "choices": []map[string]any{{ "index": 0, "message": map[string]any{ "role": "assistant", "content": "pong", }, }}, }) return } } baseHandler.ServeHTTP(w, r) })) defer server.Close() store := openAppTestStore(t) defer closeAppTestStore(t, store) if _, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-deepseek", BaseURL: server.URL, HostVersion: "0.1.126", CapabilityProbeJSON: "{}", AuthType: "apikey", AuthToken: "host-token", }); err != nil { t.Fatalf("Hosts().Create() error = %v", err) } actions := NewActionSet(appTestDSN(t, store)) packPath := filepath.Join("..", "..", "packs", "openai-cn-pack") importResult, err := actions.ImportProvider(context.Background(), ImportProviderRequest{ HostID: "host-deepseek", PackPath: packPath, ProviderID: "deepseek", Keys: []string{"entry-key"}, Mode: provision.ImportModePartial, AccessMode: provision.AccessModeSelfService, AccessAPIKey: "gateway-key", }) if err != nil { t.Fatalf("ImportProvider() error = %v", err) } if importResult.BatchID <= 0 || importResult.Report.BatchStatus != provision.BatchStatusSucceeded || importResult.Report.AccessStatus != provision.AccessStatusSelfServiceReady { t.Fatalf("ImportProvider() = %+v, want succeeded self_service_ready batch", importResult) } reconcileResult, err := actions.ReconcileProvider(context.Background(), ReconcileProviderRequest{ HostID: "host-deepseek", PackPath: packPath, ProviderID: "deepseek", AccessAPIKey: "gateway-key", }) if err != nil { t.Fatalf("ReconcileProvider() error = %v", err) } if reconcileResult.BatchID != importResult.BatchID || reconcileResult.Status != "active" || reconcileResult.AccessStatus != provision.AccessStatusSelfServiceReady { t.Fatalf("ReconcileProvider() = %+v, want active reconcile for imported batch", reconcileResult) } } func TestBuildListBatchImportRunItemsActionCursor(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running", }) mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:00' WHERE run_id = 'run-1'`) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-1", RunID: "run-1", BaseURL: "https://a.example.com/v1", ProviderID: "a", APIKeyFingerprint: "sha256:a", CurrentStage: "done", ConfirmationStatus: "confirmed", AccessStatus: "active", MatchedAccountState: "active", AccountResolution: "created", }) mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{ ItemID: "item-2", RunID: "run-1", BaseURL: "https://b.example.com/v1", ProviderID: "b", APIKeyFingerprint: "sha256:b", CurrentStage: "done", ConfirmationStatus: "confirmed", AccessStatus: "active", MatchedAccountState: "active", AccountResolution: "created", }) action := buildListBatchImportRunItemsAction(appTestDSN(t, store)) result, err := action(context.Background(), ListBatchImportRunItemsRequest{ RunID: "run-1", Cursor: "item-1", Limit: 1, }) if err != nil { t.Fatalf("buildListBatchImportRunItemsAction(cursor) error = %v", err) } if len(result.Items) != 1 || result.Items[0].ItemID != "item-2" { t.Fatalf("result.Items = %+v, want [item-2]", result.Items) } } func TestBuildListBatchImportRunsActionCursorAndDefaults(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) mustCreateAppImportRun(t, store, sqlite.ImportRun{RunID: "run-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running"}) mustCreateAppImportRun(t, store, sqlite.ImportRun{RunID: "run-2", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running"}) mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:02' WHERE run_id = 'run-1'`) mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:01' WHERE run_id = 'run-2'`) action := buildListBatchImportRunsAction(appTestDSN(t, store)) result, err := action(context.Background(), ListBatchImportRunsRequest{Cursor: "run-1", Limit: -1}) if err != nil { t.Fatalf("buildListBatchImportRunsAction(cursor) error = %v", err) } if len(result.Runs) != 1 || result.Runs[0].RunID != "run-2" { t.Fatalf("result.Runs = %+v, want [run-2]", result.Runs) } } func TestBuildGetBatchImportRunActionPropagatesDBError(t *testing.T) { t.Parallel() action := buildGetBatchImportRunAction("file:/definitely-missing-path/does-not-exist.db?mode=ro") if _, err := action(context.Background(), "run-1"); err == nil { t.Fatal("buildGetBatchImportRunAction() error = nil, want open db error") } } func TestBuildListBatchImportRunItemsActionPropagatesDBError(t *testing.T) { t.Parallel() action := buildListBatchImportRunItemsAction("file:/definitely-missing-path/does-not-exist.db?mode=ro") if _, err := action(context.Background(), ListBatchImportRunItemsRequest{RunID: "run-1"}); err == nil { t.Fatal("buildListBatchImportRunItemsAction() error = nil, want open db error") } } func TestBuildListBatchImportRunsActionPropagatesDBError(t *testing.T) { t.Parallel() action := buildListBatchImportRunsAction("file:/definitely-missing-path/does-not-exist.db?mode=ro") if _, err := action(context.Background(), ListBatchImportRunsRequest{}); err == nil { t.Fatal("buildListBatchImportRunsAction() error = nil, want open db error") } } func TestBuildGetBatchImportRunItemActionPropagatesMissingItem(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) action := buildGetBatchImportRunItemAction(appTestDSN(t, store)) if _, err := action(context.Background(), GetBatchImportRunItemRequest{RunID: "run-1", ItemID: "missing"}); err == nil || err.Error() != "item not found: missing" { t.Fatalf("missing item error = %v, want item not found", err) } } func TestResumePendingBatchImportRunsNoRunningRuns(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) mustCreateAppImportRun(t, store, sqlite.ImportRun{ RunID: "run-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "completed", }) if err := resumePendingBatchImportRuns(context.Background(), appTestDSN(t, store)); err != nil { t.Fatalf("resumePendingBatchImportRuns() error = %v", err) } } func TestNewBatchImportRuntimeRunnerFromStoredRun(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) if _, err := store.Hosts().Create(context.Background(), sqlite.Host{ HostID: "host-1", BaseURL: "https://sub2api.example.com", HostVersion: "0.1.126", AuthToken: "host-token", }); err != nil { t.Fatalf("Hosts().Create() error = %v", err) } runner, err := newBatchImportRuntimeRunnerFromStoredRun(context.Background(), store, sqlite.ImportRun{ RunID: "run-1", HostID: "host-1", Mode: "partial", AccessMode: "subscription", SubscriptionUsersJSON: `["user-1","user-2"]`, SubscriptionDays: 30, ProbeAPIKey: "probe-key", }) if err != nil { t.Fatalf("newBatchImportRuntimeRunnerFromStoredRun() error = %v", err) } if runner.request.HostID != "host-1" || len(runner.request.SubscriptionUsers) != 2 || runner.request.ProbeAPIKey != "probe-key" { t.Fatalf("runner.request = %+v, want parsed stored run request", runner.request) } } func TestBuildGetBatchImportRunActionClassifiesNotFoundOnlyOnSQLNoRows(t *testing.T) { t.Parallel() store := openAppTestStore(t) defer closeAppTestStore(t, store) action := buildGetBatchImportRunAction(appTestDSN(t, store)) _, err := action(context.Background(), "missing") if err == nil || err.Error() != "run not found: missing" { t.Fatalf("missing run error = %v, want run not found", err) } } func TestSQLNoRowsReference(t *testing.T) { t.Parallel() if sql.ErrNoRows == nil { t.Fatal("sql.ErrNoRows = nil") } }