Files
sub2api-cn-relay-manager/internal/app/coverage_helpers_test.go
2026-05-28 07:30:02 +08:00

2643 lines
98 KiB
Go

package app
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"time"
"sub2api-cn-relay-manager/internal/batch"
"sub2api-cn-relay-manager/internal/config"
"sub2api-cn-relay-manager/internal/host/sub2api"
"sub2api-cn-relay-manager/internal/provision"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
func TestBatchImportResumeJobNameAndParseJSONStringList(t *testing.T) {
t.Parallel()
if got := (batchImportResumeJob{}).Name(); got != "batch import runtime scheduler" {
t.Fatalf("batchImportResumeJob.Name() = %q, want batch import runtime scheduler", got)
}
values := parseJSONStringList(`[" user-1 ","user-2"]`)
if len(values) != 2 || values[0] != " user-1 " || values[1] != "user-2" {
t.Fatalf("parseJSONStringList() = %v, want raw decoded values", values)
}
if got := parseJSONStringList("{"); len(got) != 0 {
t.Fatalf("parseJSONStringList(invalid) = %v, want empty", got)
}
}
func TestBatchImportResumeJobRunAndReconcileSweepJobName(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "self_service",
State: "completed",
})
job := batchImportResumeJob{sqliteDSN: appTestDSN(t, store)}
if err := job.Run(context.Background()); err != nil {
t.Fatalf("batchImportResumeJob.Run() error = %v", err)
}
if got := (reconcileSweepJob{}).Name(); got != "reconcile background scheduler" {
t.Fatalf("reconcileSweepJob.Name() = %q, want reconcile background scheduler", got)
}
}
func TestDefaultBackgroundSchedulersAndNewActionSet(t *testing.T) {
t.Parallel()
schedulers := defaultBackgroundSchedulers()
if schedulers.runBatchImport == nil || schedulers.runReconcile == nil {
t.Fatalf("defaultBackgroundSchedulers() = %+v, want non-nil functions", schedulers)
}
actions := NewActionSet("file:/tmp/nonexistent.db")
if actions.CreateBatchImportRun == nil || actions.ListBatchImportRuns == nil || actions.GetBatchImportRun == nil || actions.ListBatchImportRunItems == nil || actions.GetBatchImportRunItem == nil {
t.Fatalf("NewActionSet() returned nil batch actions: %+v", actions)
}
if actions.CreateHost == nil || actions.ListPacks == nil || actions.GetPack == nil || actions.ListPackProviders == nil || actions.PublishProviderDraft == nil {
t.Fatalf("NewActionSet() returned nil app actions: %+v", actions)
}
}
func TestStartBackgroundSchedulersAndBootstrap(t *testing.T) {
var batchCalls int
var reconcileCalls int
startBackgroundSchedulers(context.Background(), config.StartupConfig{
Database: config.DatabaseConfig{SQLiteDSN: "file:test.db"},
Reconcile: config.ReconcileConfig{
WorkerEnabled: false,
},
}, backgroundSchedulers{
runBatchImport: func(context.Context, string) { batchCalls++ },
runReconcile: func(context.Context, string, time.Duration) { reconcileCalls++ },
})
if batchCalls != 1 || reconcileCalls != 0 {
t.Fatalf("startBackgroundSchedulers(disabled) calls = (%d, %d), want (1, 0)", batchCalls, reconcileCalls)
}
startBackgroundSchedulers(context.Background(), config.StartupConfig{
Database: config.DatabaseConfig{SQLiteDSN: "file:test.db"},
Reconcile: config.ReconcileConfig{
WorkerEnabled: true,
PollInterval: time.Minute,
},
}, backgroundSchedulers{
runBatchImport: func(context.Context, string) { batchCalls++ },
runReconcile: func(context.Context, string, time.Duration) { reconcileCalls++ },
})
if batchCalls != 2 || reconcileCalls != 1 {
t.Fatalf("startBackgroundSchedulers(enabled) calls = (%d, %d), want (2, 1)", batchCalls, reconcileCalls)
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
t.Setenv(config.EnvListenAddr, "127.0.0.1:19090")
t.Setenv(config.EnvSQLiteDSN, "file:bootstrap-test.db?_foreign_keys=on&_busy_timeout=5000")
t.Setenv(config.EnvAdminToken, "bootstrap-token")
t.Setenv(config.EnvReconcileWorkerEnabled, "false")
server, err := Bootstrap(ctx)
if err != nil {
t.Fatalf("Bootstrap() error = %v", err)
}
if server.Addr() != "127.0.0.1:19090" {
t.Fatalf("Bootstrap() server.Addr() = %q, want 127.0.0.1:19090", server.Addr())
}
}
func TestBackgroundSchedulerEntryPoints(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
ctx, cancel := context.WithCancel(context.Background())
cancel()
runBatchImportBackgroundScheduler(ctx, appTestDSN(t, store))
runReconcileBackgroundScheduler(ctx, appTestDSN(t, store), 0)
job := reconcileSweepJob{sqliteDSN: appTestDSN(t, store), interval: time.Minute}
if err := job.Run(context.Background()); err != nil {
t.Fatalf("reconcileSweepJob.Run() error = %v", err)
}
}
func TestBatchImportProvisionerPatchAndSleepWithContext(t *testing.T) {
t.Parallel()
if err := (batchImportProvisioner{}).Patch(context.Background(), batch.PatchProvisionRequest{}); err != nil {
t.Fatalf("Patch() error = %v, want nil", err)
}
if err := sleepWithContext(context.Background(), 0); err != nil {
t.Fatalf("sleepWithContext() error = %v, want nil", err)
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := sleepWithContext(ctx, time.Second); err != context.Canceled {
t.Fatalf("sleepWithContext(canceled) error = %v, want %v", err, context.Canceled)
}
}
func TestProbeHostSnapshotAndResolveValidationAPIKey(t *testing.T) {
t.Parallel()
server := httptest.NewServer(newBatchImportActionStubServer(t))
defer server.Close()
client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "apikey", Token: "host-token"})
if err != nil {
t.Fatalf("newSub2APIClient() error = %v", err)
}
hostVersion, capabilities, err := probeHostSnapshot(context.Background(), client)
if err != nil {
t.Fatalf("probeHostSnapshot() error = %v", err)
}
if hostVersion != "0.1.126" || !capabilities.Groups || !capabilities.Subscriptions {
t.Fatalf("probeHostSnapshot() = (%q, %+v), want supported host snapshot", hostVersion, capabilities)
}
selfServiceRunner := batchImportRuntimeRunner{
request: CreateBatchImportRunRequest{
AccessMode: provision.AccessModeSelfService,
ProbeAPIKey: " probe-key ",
},
}
apiKey, err := selfServiceRunner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{})
if err != nil {
t.Fatalf("resolveValidationAPIKey(self_service) error = %v", err)
}
if apiKey != "probe-key" {
t.Fatalf("resolveValidationAPIKey(self_service) = %q, want probe-key", apiKey)
}
subscriptionRunner := batchImportRuntimeRunner{
request: CreateBatchImportRunRequest{
AccessMode: provision.AccessModeSubscription,
SubscriptionDays: 30,
},
}
if _, err := subscriptionRunner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{}); err == nil || err.Error() != "subscription_users is required" {
t.Fatalf("resolveValidationAPIKey(subscription missing users) error = %v, want subscription_users is required", err)
}
unsupportedRunner := batchImportRuntimeRunner{
request: CreateBatchImportRunRequest{
AccessMode: "other",
},
}
if _, err := unsupportedRunner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{}); err == nil || !strings.Contains(err.Error(), `unsupported access mode "other"`) {
t.Fatalf("resolveValidationAPIKey(unsupported) error = %v, want unsupported access mode", err)
}
}
func TestResolveManagedResourceHostIDAndConfirmItem(t *testing.T) {
t.Parallel()
if _, err := resolveManagedResourceHostID(context.Background(), nil, sqlite.ImportRunItem{}, "account"); err == nil || err.Error() != "store is required" {
t.Fatalf("resolveManagedResourceHostID(nil store) error = %v, want store is required", err)
}
if _, err := resolveManagedResourceHostID(context.Background(), openAppTestStore(t), sqlite.ImportRunItem{}, "account"); err == nil || err.Error() != "legacy_batch_id is required for account lookup" {
t.Fatalf("resolveManagedResourceHostID(missing batch) error = %v, want legacy_batch_id required", err)
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v1/admin/accounts/account-ok/test":
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":true,\"status\":\"passed\",\"message\":\"smoke passed\"}\n\n"))
case "/api/v1/admin/accounts/account-http/test":
w.WriteHeader(http.StatusForbidden)
_, _ = w.Write([]byte(`{"error":"forbidden"}`))
case "/api/v1/admin/accounts/account-busy/test":
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":false,\"status\":\"failed\",\"message\":\"No available accounts\"}\n\n"))
case "/api/v1/admin/accounts/account-forbidden/test":
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":false,\"status\":\"failed\",\"message\":\"Forbidden by upstream\"}\n\n"))
case "/api/v1/admin/accounts/account-bad/test":
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":false,\"status\":\"failed\",\"message\":\"model mismatch\"}\n\n"))
default:
http.NotFound(w, r)
}
}))
defer server.Close()
client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "apikey", Token: "host-token"})
if err != nil {
t.Fatalf("newSub2APIClient() error = %v", err)
}
tests := []struct {
name string
accountID string
wantStatus int
wantMsg string
}{
{name: "probe ok", accountID: "account-ok", wantStatus: http.StatusOK, wantMsg: "smoke passed"},
{name: "http error passthrough", accountID: "account-http", wantStatus: http.StatusForbidden, wantMsg: `{"error":"forbidden"}`},
{name: "busy advisory", accountID: "account-busy", wantStatus: http.StatusServiceUnavailable, wantMsg: "No available accounts"},
{name: "forbidden advisory", accountID: "account-forbidden", wantStatus: http.StatusForbidden, wantMsg: "Forbidden by upstream"},
{name: "generic bad request", accountID: "account-bad", wantStatus: http.StatusBadRequest, wantMsg: "model mismatch"},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostPK := createAppHostRecord(t, store, server.URL)
packPK := createAppPackRecord(t, store)
providerPK := createAppProviderRecord(t, store, packPK)
batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK)
if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{
BatchID: batchID,
HostID: hostPK,
ResourceType: "account",
HostResourceID: tc.accountID,
ResourceName: tc.accountID,
}); err != nil {
t.Fatalf("ManagedResources().Create() error = %v", err)
}
runner := batchImportRuntimeRunner{
store: store,
hostClient: client,
}
result, err := runner.confirmItem(context.Background(), sqlite.ImportRunItem{
LegacyBatchID: &batchID,
ResolvedSmokeModel: "kimi-k2.6",
})
if err != nil {
t.Fatalf("confirmItem() error = %v", err)
}
if result.StatusCode != tc.wantStatus || result.Message != tc.wantMsg {
t.Fatalf("confirmItem() = %+v, want status=%d message=%q", result, tc.wantStatus, tc.wantMsg)
}
})
}
}
func TestBatchImportRunItemStoreTryAcquireLease(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-lease-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "self_service",
State: "running",
})
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-lease-1",
RunID: "run-lease-1",
BaseURL: "https://deepseek.example.com/v1",
ProviderID: "deepseek",
APIKeyFingerprint: "sha256:lease",
ResolvedSmokeModel: "deepseek-v4-pro",
CurrentStage: "confirm",
ConfirmationStatus: "pending",
AccessStatus: "unknown",
MatchedAccountState: "active",
AccountResolution: "created",
})
leaseStore := batchImportRunItemStore{store: store, runID: "run-lease-1"}
now := time.Date(2026, 5, 23, 10, 0, 0, 0, time.UTC)
if _, _, err := leaseStore.TryAcquireLease(context.Background(), "", "worker-1", now, time.Minute); err == nil || err.Error() != "item_id is required" {
t.Fatalf("TryAcquireLease(missing item) error = %v, want item_id is required", err)
}
if _, _, err := leaseStore.TryAcquireLease(context.Background(), "item-lease-1", "", now, time.Minute); err == nil || err.Error() != "worker_id is required" {
t.Fatalf("TryAcquireLease(missing worker) error = %v, want worker_id is required", err)
}
item, claimed, err := leaseStore.TryAcquireLease(context.Background(), "item-lease-1", "worker-1", now, time.Minute)
if err != nil {
t.Fatalf("TryAcquireLease(first) error = %v", err)
}
if !claimed || item.LeaseOwner != "worker-1" {
t.Fatalf("TryAcquireLease(first) = (%+v, %v), want claimed by worker-1", item, claimed)
}
item, claimed, err = leaseStore.TryAcquireLease(context.Background(), "item-lease-1", "worker-2", now.Add(30*time.Second), time.Minute)
if err != nil {
t.Fatalf("TryAcquireLease(second) error = %v", err)
}
if claimed || item.ItemID != "" {
t.Fatalf("TryAcquireLease(second) = (%+v, %v), want unclaimed empty item", item, claimed)
}
}
func TestDriveRunValidationAndRetryBranches(t *testing.T) {
t.Parallel()
t.Run("validate stage completes run", func(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/v1/chat/completions":
writeJSON(w, http.StatusOK, map[string]any{
"id": "chatcmpl_validate",
"choices": []map[string]any{{
"index": 0,
"message": map[string]any{
"role": "assistant",
"content": "pong",
},
}},
})
default:
http.NotFound(w, r)
}
}))
defer server.Close()
client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "bearer", Token: "gateway-key"})
if err != nil {
t.Fatalf("newSub2APIClient() error = %v", err)
}
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-validate-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "self_service",
State: "running",
TotalItems: 1,
})
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-validate-1",
RunID: "run-validate-1",
BaseURL: server.URL,
ProviderID: "deepseek",
APIKeyFingerprint: "sha256:validate",
ResolvedSmokeModel: "deepseek-v4-pro",
CurrentStage: "validate",
ConfirmationStatus: "confirmed",
AccessStatus: "unknown",
MatchedAccountState: "active",
AccountResolution: "created",
})
runner := batchImportRuntimeRunner{
store: store,
hostClient: client,
request: CreateBatchImportRunRequest{
AccessMode: provision.AccessModeSelfService,
ProbeAPIKey: "gateway-key",
},
}
if err := runner.driveRun(context.Background(), "run-validate-1", 0); err != nil {
t.Fatalf("driveRun(validate) error = %v", err)
}
run, err := store.ImportRuns().GetByRunID(context.Background(), "run-validate-1")
if err != nil {
t.Fatalf("ImportRuns().GetByRunID() error = %v", err)
}
if run.State != "completed" || run.CompletedItems != 1 || run.ActiveItems != 1 {
t.Fatalf("run = %+v, want completed with one active item", run)
}
item, err := store.ImportRunItems().GetByItemID(context.Background(), "item-validate-1")
if err != nil {
t.Fatalf("ImportRunItems().GetByItemID() error = %v", err)
}
if item.CurrentStage != "done" || item.AccessStatus != "active" {
t.Fatalf("item = %+v, want done/active", item)
}
})
t.Run("confirm stage schedules retry when wait budget is zero", func(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v1/admin/accounts/account-busy/test":
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("data: {\"type\":\"test_complete\",\"ok\":false,\"status\":\"failed\",\"message\":\"No available accounts\"}\n\n"))
default:
http.NotFound(w, r)
}
}))
defer server.Close()
client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "bearer", Token: "gateway-key"})
if err != nil {
t.Fatalf("newSub2APIClient() error = %v", err)
}
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostPK := createAppHostRecord(t, store, server.URL)
packPK := createAppPackRecord(t, store)
providerPK := createAppProviderRecord(t, store, packPK)
legacyBatchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK)
if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{
BatchID: legacyBatchID,
HostID: hostPK,
ResourceType: "account",
HostResourceID: "account-busy",
ResourceName: "account-busy",
}); err != nil {
t.Fatalf("ManagedResources().Create(account) error = %v", err)
}
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-confirm-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "self_service",
State: "running",
TotalItems: 1,
})
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-confirm-1",
RunID: "run-confirm-1",
BaseURL: server.URL,
ProviderID: "deepseek",
APIKeyFingerprint: "sha256:confirm",
ResolvedSmokeModel: "deepseek-v4-pro",
CurrentStage: "confirm",
ConfirmationStatus: "pending",
AccessStatus: "unknown",
MatchedAccountState: "active",
AccountResolution: "created",
LegacyBatchID: &legacyBatchID,
CapabilityProfileJSON: `{"transport_profile":{"known_advisories":[]}}`,
})
runner := batchImportRuntimeRunner{
store: store,
hostClient: client,
request: CreateBatchImportRunRequest{
AccessMode: provision.AccessModeSelfService,
ProbeAPIKey: "gateway-key",
},
}
if err := runner.driveRun(context.Background(), "run-confirm-1", 0); err != nil {
t.Fatalf("driveRun(confirm retry) error = %v", err)
}
run, err := store.ImportRuns().GetByRunID(context.Background(), "run-confirm-1")
if err != nil {
t.Fatalf("ImportRuns().GetByRunID() error = %v", err)
}
if run.State != "running" || run.CompletedItems != 0 {
t.Fatalf("run = %+v, want still running with no completed items", run)
}
item, err := store.ImportRunItems().GetByItemID(context.Background(), "item-confirm-1")
if err != nil {
t.Fatalf("ImportRunItems().GetByItemID() error = %v", err)
}
if item.CurrentStage != "confirm" || item.RetryCount != 1 || strings.TrimSpace(item.NextRetryAt) == "" {
t.Fatalf("item = %+v, want confirm stage with scheduled retry", item)
}
})
}
func TestResolveValidationAPIKeySubscriptionSuccess(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && strings.HasPrefix(r.URL.RequestURI(), "/api/v1/admin/users?"):
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{}}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84, "email": "relay-sub-user-1@sub2api.local"}})
case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/users/84":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users/84/balance":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/subscriptions/assign":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 401}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/auth/login":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"access_token": "user-jwt"}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/keys":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 501, "key": "sk-relay-key", "name": "managed-key"}})
case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/api-keys/501":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"api_key": map[string]any{"id": 501}}})
default:
http.NotFound(w, r)
}
}))
defer server.Close()
client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "bearer", Token: "admin-token"})
if err != nil {
t.Fatalf("newSub2APIClient() error = %v", err)
}
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostPK := createAppHostRecord(t, store, server.URL)
packPK := createAppPackRecord(t, store)
providerPK := createAppProviderRecord(t, store, packPK)
batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK)
if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{
BatchID: batchID,
HostID: hostPK,
ResourceType: "group",
HostResourceID: "101",
ResourceName: "group-101",
}); err != nil {
t.Fatalf("ManagedResources().Create(group) error = %v", err)
}
runner := batchImportRuntimeRunner{
store: store,
hostClient: client,
request: CreateBatchImportRunRequest{
AccessMode: provision.AccessModeSubscription,
SubscriptionUsers: []string{"crm-user-1"},
SubscriptionDays: 30,
},
}
apiKey, err := runner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{
LegacyBatchID: &batchID,
})
if err != nil {
t.Fatalf("resolveValidationAPIKey(subscription) error = %v", err)
}
if !strings.HasPrefix(apiKey, "sk-relay-") {
t.Fatalf("resolveValidationAPIKey(subscription) = %q, want managed sk-relay-* key", apiKey)
}
}
func TestResolveValidationAPIKeySubscriptionRequiresStoredGroup(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && strings.HasPrefix(r.URL.RequestURI(), "/api/v1/admin/users?"):
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{}}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84, "email": "relay-sub-user-1@sub2api.local"}})
case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/users/84":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users/84/balance":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/subscriptions/assign":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 401}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/auth/login":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"access_token": "user-jwt"}})
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/keys":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 501, "key": "sk-relay-key", "name": "managed-key"}})
case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/api-keys/501":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"api_key": map[string]any{"id": 501}}})
default:
http.NotFound(w, r)
}
}))
defer server.Close()
client, err := newSub2APIClient(server.URL, CreateHostAuth{Type: "bearer", Token: "admin-token"})
if err != nil {
t.Fatalf("newSub2APIClient() error = %v", err)
}
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostPK := createAppHostRecord(t, store, server.URL)
packPK := createAppPackRecord(t, store)
providerPK := createAppProviderRecord(t, store, packPK)
batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK)
runner := batchImportRuntimeRunner{
store: store,
hostClient: client,
request: CreateBatchImportRunRequest{
AccessMode: provision.AccessModeSubscription,
SubscriptionUsers: []string{"crm-user-1"},
SubscriptionDays: 30,
},
}
_, err = runner.resolveValidationAPIKey(context.Background(), sqlite.ImportRunItem{
LegacyBatchID: &batchID,
})
if err == nil || err.Error() != fmt.Sprintf("%s resource not found for batch %d", "group", batchID) {
t.Fatalf("resolveValidationAPIKey(subscription missing group) error = %v, want missing group", err)
}
}
func TestCreateHostAuthFromLegacyFields(t *testing.T) {
t.Parallel()
if got := createHostAuthFromLegacyFields(" api-key ", " bearer-token "); got.Type != "bearer" || got.Token != "bearer-token" {
t.Fatalf("createHostAuthFromLegacyFields() = %+v, want bearer token", got)
}
if got := createHostAuthFromLegacyFields(" api-key ", ""); got.Type != "apikey" || got.Token != "api-key" {
t.Fatalf("createHostAuthFromLegacyFields() = %+v, want apikey", got)
}
}
func TestHostRecordToInfoAndPackRecordToInfo(t *testing.T) {
t.Parallel()
hostInfo := hostRecordToInfo(sqlite.Host{
HostID: "host-1",
BaseURL: "https://sub2api.example.com",
HostVersion: "0.1.126",
AuthType: "apikey",
CapabilityProbeJSON: `{"groups":true,"channels":true,"plans":true,"accounts":true,"account_test":true,"account_models":true,"subscriptions":true}`,
})
if hostInfo.Status != "supported" || hostInfo.Capabilities == nil || !hostInfo.Capabilities.Subscriptions {
t.Fatalf("hostRecordToInfo() = %+v, want supported capabilities", hostInfo)
}
hostInfo = hostRecordToInfo(sqlite.Host{
HostID: "host-2",
BaseURL: "https://bad.example.com",
CapabilityProbeJSON: "{",
})
if hostInfo.Capabilities != nil || hostInfo.Status != "" {
t.Fatalf("hostRecordToInfo(invalid json) = %+v, want nil capabilities and empty status", hostInfo)
}
packInfo := packRecordToInfo(sqlite.Pack{
PackID: "openai-cn-pack",
Version: "1.0.0",
Vendor: "OpenAI CN",
TargetHost: "sub2api",
MinHostVersion: "0.1.126",
MaxHostVersion: "0.2.x",
})
if packInfo.PackID != "openai-cn-pack" || packInfo.TargetHost != "sub2api" {
t.Fatalf("packRecordToInfo() = %+v, want projected pack info", packInfo)
}
}
func TestDeriveAccessStatusAndDefaultPositiveInt(t *testing.T) {
t.Parallel()
if got := deriveAccessStatus(sub2api.GatewayAccessResult{OK: true, HasExpectedModel: true, CompletionOK: true}); got != provision.AccessStatusSubscriptionReady {
t.Fatalf("deriveAccessStatus(ready) = %q, want %q", got, provision.AccessStatusSubscriptionReady)
}
if got := deriveAccessStatus(sub2api.GatewayAccessResult{OK: true, HasExpectedModel: true, CompletionOK: false}); got != provision.AccessStatusBroken {
t.Fatalf("deriveAccessStatus(broken) = %q, want %q", got, provision.AccessStatusBroken)
}
if got := defaultPositiveInt(3, 9); got != 3 {
t.Fatalf("defaultPositiveInt(3, 9) = %d, want 3", got)
}
if got := defaultPositiveInt(0, 9); got != 9 {
t.Fatalf("defaultPositiveInt(0, 9) = %d, want 9", got)
}
}
func TestMatchesItemFilters(t *testing.T) {
t.Parallel()
view := batch.ItemSummaryProjection{
ItemID: "item-1",
BaseURL: "https://kimi.example.com/v1",
ProviderID: "kimi-a7m",
CurrentStage: "done",
ConfirmationStatus: "advisory",
AccessStatus: "active",
MatchedAccountState: "active",
AccountResolution: "reused",
AdvisoryMessages: []string{"warning"},
}
hasWarning := true
if !matchesItemFilters(view, ListBatchImportRunItemsRequest{
CurrentStage: "done",
ConfirmationStatus: "advisory",
AccessStatus: "active",
ProviderID: "kimi-a7m",
MatchedAccountState: "active",
AccountResolution: "reused",
Query: "kimi",
HasWarning: &hasWarning,
}) {
t.Fatal("matchesItemFilters() = false, want match")
}
noWarning := false
if matchesItemFilters(view, ListBatchImportRunItemsRequest{HasWarning: &noWarning}) {
t.Fatal("matchesItemFilters(has_warning=false) = true, want false")
}
if matchesItemFilters(view, ListBatchImportRunItemsRequest{Query: "other"}) {
t.Fatal("matchesItemFilters(query=other) = true, want false")
}
}
func TestBuildGetBatchImportRunActionAndGetItemAction(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "self_service",
State: "running",
})
mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:00' WHERE run_id = 'run-1'`)
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-1",
RunID: "run-1",
BaseURL: "https://kimi.example.com/v1",
ProviderID: "kimi-a7m",
APIKeyFingerprint: "sha256:1",
CurrentStage: "done",
ConfirmationStatus: "confirmed",
AccessStatus: "active",
MatchedAccountState: "active",
AccountResolution: "reused",
AdvisoryMessagesJSON: `["gateway_warmup_retry_succeeded"]`,
})
action := buildGetBatchImportRunAction(appTestDSN(t, store))
run, err := action(context.Background(), "run-1")
if err != nil {
t.Fatalf("buildGetBatchImportRunAction() error = %v", err)
}
if run.RunID != "run-1" {
t.Fatalf("run.RunID = %q, want run-1", run.RunID)
}
if _, err := action(context.Background(), "missing"); err == nil || err.Error() != "run not found: missing" {
t.Fatalf("missing run error = %v, want run not found", err)
}
itemAction := buildGetBatchImportRunItemAction(appTestDSN(t, store))
item, err := itemAction(context.Background(), GetBatchImportRunItemRequest{RunID: "run-1", ItemID: "item-1"})
if err != nil {
t.Fatalf("buildGetBatchImportRunItemAction() error = %v", err)
}
if item.ItemID != "item-1" || item.AccountResolution != "reused" {
t.Fatalf("item = %+v, want projected item", item)
}
if _, err := itemAction(context.Background(), GetBatchImportRunItemRequest{RunID: "run-2", ItemID: "item-1"}); err == nil || err.Error() != "item not found in run run-2" {
t.Fatalf("wrong run error = %v, want item not found in run", err)
}
}
func TestResolveProvidersForQueryAndLatestAccessStatus(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
host1, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-1",
BaseURL: "https://one.example.com",
HostVersion: "0.1.126",
AuthToken: "token-1",
})
if err != nil {
t.Fatalf("Hosts().Create(host1) error = %v", err)
}
host2, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-2",
BaseURL: "https://two.example.com",
HostVersion: "0.1.126",
AuthToken: "token-2",
})
if err != nil {
t.Fatalf("Hosts().Create(host2) error = %v", err)
}
packID, err := store.Packs().Create(context.Background(), sqlite.Pack{
PackID: "openai-cn-pack",
Version: "1.0.0",
Checksum: "checksum-1",
})
if err != nil {
t.Fatalf("Packs().Create() error = %v", err)
}
providerID, err := store.Providers().Create(context.Background(), sqlite.Provider{
PackID: packID,
ProviderID: "deepseek",
DisplayName: "DeepSeek",
BaseURL: "https://api.example.com",
Platform: "openai",
})
if err != nil {
t.Fatalf("Providers().Create() error = %v", err)
}
batch1, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: host1,
PackID: packID,
ProviderID: providerID,
Mode: provision.ImportModePartial,
BatchStatus: provision.BatchStatusSucceeded,
AccessStatus: provision.AccessStatusSelfServiceReady,
})
if err != nil {
t.Fatalf("ImportBatches().Create(batch1) error = %v", err)
}
batch2, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: host2,
PackID: packID,
ProviderID: providerID,
Mode: provision.ImportModeStrict,
BatchStatus: provision.BatchStatusSucceeded,
AccessStatus: provision.AccessStatusSubscriptionReady,
})
if err != nil {
t.Fatalf("ImportBatches().Create(batch2) error = %v", err)
}
if _, err := store.AccessClosures().Create(context.Background(), sqlite.AccessClosureRecord{
BatchID: batch1,
ClosureType: provision.AccessModeSelfService,
Status: provision.AccessStatusSelfServiceReady,
}); err != nil {
t.Fatalf("AccessClosures().Create(batch1) error = %v", err)
}
if _, err := store.AccessClosures().Create(context.Background(), sqlite.AccessClosureRecord{
BatchID: batch2,
ClosureType: provision.AccessModeSubscription,
Status: provision.AccessStatusSubscriptionReady,
}); err != nil {
t.Fatalf("AccessClosures().Create(batch2) error = %v", err)
}
if _, err := resolveProvidersForQuery(context.Background(), nil, ProviderQueryRequest{}); err == nil || err.Error() != "store is required" {
t.Fatalf("resolveProvidersForQuery(nil store) error = %v, want store is required", err)
}
if _, err := resolveProvidersForQuery(context.Background(), store, ProviderQueryRequest{}); err == nil || err.Error() != "provider_id is required" {
t.Fatalf("resolveProvidersForQuery(missing provider) error = %v, want provider_id is required", err)
}
providers, err := resolveProvidersForQuery(context.Background(), store, ProviderQueryRequest{ProviderID: "deepseek", PackID: "openai-cn-pack"})
if err != nil {
t.Fatalf("resolveProvidersForQuery() error = %v", err)
}
if len(providers) != 1 || providers[0].ProviderID != "deepseek" {
t.Fatalf("resolveProvidersForQuery() = %+v, want deepseek provider", providers)
}
if _, err := resolveLatestAccessStatus(context.Background(), nil, sqlite.Provider{}, ""); err == nil || err.Error() != "store is required" {
t.Fatalf("resolveLatestAccessStatus(nil store) error = %v, want store is required", err)
}
status, err := resolveLatestAccessStatus(context.Background(), store, providers[0], "host-1")
if err != nil {
t.Fatalf("resolveLatestAccessStatus(host-1) error = %v", err)
}
if status != provision.AccessStatusSelfServiceReady {
t.Fatalf("resolveLatestAccessStatus(host-1) = %q, want %q", status, provision.AccessStatusSelfServiceReady)
}
if _, err := resolveLatestAccessStatus(context.Background(), store, providers[0], ""); err == nil || err.Error() != "provider exists on multiple hosts; host_id is required" {
t.Fatalf("resolveLatestAccessStatus(multi-host) error = %v, want multi-host error", err)
}
}
func TestBuildListBatchImportRunsActionAndItemsAction(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "self_service",
State: "running",
TotalItems: 1,
WarningItems: 1,
ActiveItems: 1,
BrokenItems: 0,
DegradedItems: 0,
})
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-2",
HostID: "host-1",
Mode: "strict",
AccessMode: "subscription",
State: "completed",
})
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-3",
HostID: "host-1",
Mode: "strict",
AccessMode: "subscription",
State: "completed",
})
mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:03' WHERE run_id = 'run-1'`)
mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:02' WHERE run_id = 'run-2'`)
mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:01' WHERE run_id = 'run-3'`)
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-1",
RunID: "run-1",
BaseURL: "https://kimi.example.com/v1",
ProviderID: "kimi-a7m",
APIKeyFingerprint: "sha256:1",
CurrentStage: "done",
ConfirmationStatus: "advisory",
AccessStatus: "active",
MatchedAccountState: "active",
AccountResolution: "reused",
AdvisoryMessagesJSON: `["warning"]`,
})
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-2",
RunID: "run-2",
BaseURL: "https://other.example.com/v1",
ProviderID: "deepseek",
APIKeyFingerprint: "sha256:2",
CurrentStage: "confirm",
ConfirmationStatus: "pending",
AccessStatus: "broken",
MatchedAccountState: "broken",
AccountResolution: "created",
AdvisoryMessagesJSON: `[]`,
})
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-3",
RunID: "run-3",
BaseURL: "https://other.example.com/v1",
ProviderID: "deepseek-backup",
APIKeyFingerprint: "sha256:3",
CurrentStage: "confirm",
ConfirmationStatus: "pending",
AccessStatus: "active",
MatchedAccountState: "active",
AccountResolution: "reused",
AdvisoryMessagesJSON: `[]`,
})
listRuns := buildListBatchImportRunsAction(appTestDSN(t, store))
runs, err := listRuns(context.Background(), ListBatchImportRunsRequest{
State: "completed",
Query: "other.example.com",
Limit: 1,
})
if err != nil {
t.Fatalf("buildListBatchImportRunsAction() error = %v", err)
}
if len(runs.Runs) != 1 || runs.Runs[0].RunID != "run-2" {
t.Fatalf("runs = %+v, want [run-2]", runs.Runs)
}
if runs.NextCursor == nil || *runs.NextCursor != "run-3" {
t.Fatalf("runs.NextCursor = %v, want run-3", runs.NextCursor)
}
listItems := buildListBatchImportRunItemsAction(appTestDSN(t, store))
hasWarning := true
items, err := listItems(context.Background(), ListBatchImportRunItemsRequest{
RunID: "run-1",
HasWarning: &hasWarning,
Query: "kimi",
Limit: 10,
})
if err != nil {
t.Fatalf("buildListBatchImportRunItemsAction() error = %v", err)
}
if len(items.Items) != 1 || items.Items[0].ItemID != "item-1" {
t.Fatalf("items = %+v, want [item-1]", items.Items)
}
if items.NextCursor != nil {
t.Fatalf("items.NextCursor = %v, want nil", items.NextCursor)
}
if _, err := listItems(context.Background(), ListBatchImportRunItemsRequest{RunID: "missing"}); err == nil || err.Error() != "run not found: missing" {
t.Fatalf("missing items run error = %v, want run not found", err)
}
}
func appTestDSN(t *testing.T, store *sqlite.DB) string {
t.Helper()
row := store.SQLDB().QueryRow(`PRAGMA database_list`)
var seq int
var name string
var file string
if err := row.Scan(&seq, &name, &file); err != nil {
t.Fatalf("PRAGMA database_list scan error = %v", err)
}
return "file:" + file + "?_busy_timeout=5000&_pragma=foreign_keys(0)"
}
func mustCreateAppImportRun(t *testing.T, store *sqlite.DB, run sqlite.ImportRun) {
t.Helper()
if err := store.ImportRuns().Create(context.Background(), run); err != nil {
t.Fatalf("ImportRuns().Create() error = %v", err)
}
}
func mustCreateAppImportRunItem(t *testing.T, store *sqlite.DB, item sqlite.ImportRunItem) {
t.Helper()
if err := store.ImportRunItems().Create(context.Background(), item); err != nil {
t.Fatalf("ImportRunItems().Create() error = %v", err)
}
}
func mustExecSQL(t *testing.T, store *sqlite.DB, query string, args ...any) {
t.Helper()
if _, err := store.SQLDB().Exec(query, args...); err != nil {
t.Fatalf("Exec(%q) error = %v", query, err)
}
}
func createAppHostRecord(t *testing.T, store *sqlite.DB, baseURL string) int64 {
t.Helper()
id, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-" + sanitizeAppTestName(t.Name()),
BaseURL: baseURL,
HostVersion: "0.1.126",
AuthType: "apikey",
AuthToken: "host-token",
})
if err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
return id
}
func createAppPackRecord(t *testing.T, store *sqlite.DB) int64 {
t.Helper()
id, err := store.Packs().Create(context.Background(), sqlite.Pack{
PackID: "pack-" + sanitizeAppTestName(t.Name()),
Version: "1.0.0",
Checksum: "checksum-" + sanitizeAppTestName(t.Name()),
})
if err != nil {
t.Fatalf("Packs().Create() error = %v", err)
}
return id
}
func createAppProviderRecord(t *testing.T, store *sqlite.DB, packID int64) int64 {
t.Helper()
id, err := store.Providers().Create(context.Background(), sqlite.Provider{
PackID: packID,
ProviderID: "provider-" + sanitizeAppTestName(t.Name()),
DisplayName: "Provider",
BaseURL: "https://provider.example.com",
Platform: "openai",
})
if err != nil {
t.Fatalf("Providers().Create() error = %v", err)
}
return id
}
func createAppBatchRecord(t *testing.T, store *sqlite.DB, hostID, packID, providerID int64) int64 {
t.Helper()
id, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: hostID,
PackID: packID,
ProviderID: providerID,
Mode: provision.ImportModePartial,
BatchStatus: provision.BatchStatusSucceeded,
AccessStatus: provision.AccessStatusSelfServiceReady,
})
if err != nil {
t.Fatalf("ImportBatches().Create() error = %v", err)
}
return id
}
func sanitizeAppTestName(name string) string {
var b strings.Builder
for _, c := range name {
switch {
case c >= 'a' && c <= 'z', c >= '0' && c <= '9':
b.WriteRune(c)
case c >= 'A' && c <= 'Z':
b.WriteRune(c + ('a' - 'A'))
}
}
if b.Len() == 0 {
return "default"
}
return b.String()
}
func TestResolveManagedHostAndNewSub2APIClient(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
if _, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-1",
BaseURL: "https://sub2api.example.com",
HostVersion: "0.1.126",
AuthType: "",
AuthToken: "host-token",
}); err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
hostRow, client, err := resolveManagedHost(context.Background(), store, "host-1", "", CreateHostAuth{})
if err != nil {
t.Fatalf("resolveManagedHost() error = %v", err)
}
if hostRow.HostID != "host-1" || client == nil {
t.Fatalf("resolveManagedHost() = (%+v, %v), want host-1 and client", hostRow, client)
}
if _, _, err := resolveManagedHost(context.Background(), store, "host-1", "https://other.example.com", CreateHostAuth{}); err == nil || !strings.Contains(err.Error(), `host "host-1" base_url mismatch`) {
t.Fatalf("resolveManagedHost(mismatch) error = %v, want mismatch", err)
}
if _, _, err := resolveManagedHost(context.Background(), store, "", "", CreateHostAuth{}); err == nil || err.Error() != "host_id is required" {
t.Fatalf("resolveManagedHost(empty) error = %v, want host_id is required", err)
}
if auth := authFromStoredHost(sqlite.Host{AuthType: "", AuthToken: " token "}); auth.Type != "apikey" || auth.Token != "token" {
t.Fatalf("authFromStoredHost(default) = %+v, want apikey/token", auth)
}
if _, err := newSub2APIClient("https://sub2api.example.com", CreateHostAuth{Type: "other", Token: "t"}); err == nil || !strings.Contains(err.Error(), `unsupported auth type "other"`) {
t.Fatalf("newSub2APIClient(unsupported) error = %v, want unsupported auth type", err)
}
if _, err := newSub2APIClient("https://sub2api.example.com", CreateHostAuth{Type: "apikey"}); err == nil || !strings.Contains(err.Error(), "auth.token is required") {
t.Fatalf("newSub2APIClient(missing token) error = %v, want auth.token is required", err)
}
}
func TestHandleAssignAccessSubscriptionsAndAccessPreview(t *testing.T) {
t.Parallel()
t.Run("handleAssignAccessSubscriptions returns success", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/providers/deepseek/access/subscriptions", map[string]any{
"host_id": "host-1",
"access_api_key": "user-key",
"subscription_users": []string{"user-1"},
}, "")
req.SetPathValue("providerID", "deepseek")
rec := &responseRecorder{header: map[string][]string{}}
handleAssignAccessSubscriptions(rec, req, func(_ context.Context, got AssignAccessSubscriptionsRequest) (AssignAccessSubscriptionsResult, error) {
if got.ProviderID != "deepseek" || got.HostID != "host-1" || got.AccessAPIKey != "user-key" || len(got.SubscriptionUsers) != 1 || got.SubscriptionUsers[0] != "user-1" {
t.Fatalf("AssignAccessSubscriptionsRequest = %+v, want projected request", got)
}
return AssignAccessSubscriptionsResult{Assigned: 1}, nil
})
assertStatusCode(t, rec, 200)
assertJSONContains(t, rec.Body().Bytes(), "assigned", float64(1))
})
t.Run("handleAccessPreview falls back to query values", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/providers/deepseek/access/preview?pack_id=openai-cn-pack&host_id=host-1", map[string]any{
"mode": "self_service",
}, "")
req.SetPathValue("providerID", "deepseek")
rec := &responseRecorder{header: map[string][]string{}}
handleAccessPreview(rec, req, func(_ context.Context, got AccessPreviewRequest) (AccessPreviewResult, error) {
if got.ProviderID != "deepseek" || got.PackID != "openai-cn-pack" || got.HostID != "host-1" {
t.Fatalf("AccessPreviewRequest = %+v, want query fallback values", got)
}
return AccessPreviewResult{Available: true, Mode: got.Mode}, nil
})
assertStatusCode(t, rec, 200)
assertJSONContains(t, rec.Body().Bytes(), "available", true)
assertJSONContains(t, rec.Body().Bytes(), "mode", "self_service")
})
}
func TestAdditionalHTTPWrappers(t *testing.T) {
t.Parallel()
t.Run("handleListProviderImportBatches returns query-scoped payload", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "GET", "/providers/deepseek/import-batches?pack_id=openai-cn-pack&host_id=host-1", map[string]any{}, "")
req.SetPathValue("providerID", "deepseek")
rec := &responseRecorder{header: map[string][]string{}}
handleListProviderImportBatches(rec, req, func(_ context.Context, got ProviderQueryRequest) ([]ImportBatchInfo, error) {
if got.ProviderID != "deepseek" || got.PackID != "openai-cn-pack" || got.HostID != "host-1" {
t.Fatalf("ProviderQueryRequest = %+v, want provider/pack/host filters", got)
}
return nil, nil
})
assertStatusCode(t, rec, 200)
batches, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "batches")
if !ok || len(batches) != 0 {
t.Fatalf("batches = %#v, want empty array", batches)
}
})
t.Run("handleRollbackBatch validates batch id", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/import-batches/bad/rollback", map[string]any{}, "")
req.SetPathValue("batchID", "bad")
rec := &responseRecorder{header: map[string][]string{}}
handleRollbackBatch(rec, req, func(context.Context, RollbackBatchRequest) (provision.RollbackReport, error) {
t.Fatal("rollback fn should not be called for invalid batch id")
return provision.RollbackReport{}, nil
})
assertStatusCode(t, rec, 400)
assertJSONContains(t, rec.Body().Bytes(), "error.message", "batch_id must be a positive integer")
})
t.Run("handleRollbackBatch returns summary", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/import-batches/17/rollback", map[string]any{}, "")
req.SetPathValue("batchID", "17")
rec := &responseRecorder{header: map[string][]string{}}
handleRollbackBatch(rec, req, func(_ context.Context, got RollbackBatchRequest) (provision.RollbackReport, error) {
if got.BatchID != 17 {
t.Fatalf("RollbackBatchRequest.BatchID = %d, want 17", got.BatchID)
}
return provision.RollbackReport{AccountsDeleted: 2, PlansDeleted: 1, ChannelsDeleted: 1, GroupsDeleted: 1}, nil
})
assertStatusCode(t, rec, 200)
assertJSONContains(t, rec.Body().Bytes(), "batch_id", float64(17))
assertJSONContains(t, rec.Body().Bytes(), "deleted_accounts", float64(2))
})
t.Run("handleCreateHost returns payload", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/hosts", map[string]any{
"name": "host-1",
"base_url": "https://sub2api.example.com",
"auth": map[string]any{"type": "apikey", "token": "host-token"},
}, "")
rec := &responseRecorder{header: map[string][]string{}}
handleCreateHost(rec, req, func(_ context.Context, got CreateHostRequest) (HostInfo, error) {
if got.Name != "host-1" || got.BaseURL != "https://sub2api.example.com" || got.Auth.Token != "host-token" {
t.Fatalf("CreateHostRequest = %+v, want decoded request", got)
}
return HostInfo{HostID: got.Name, BaseURL: got.BaseURL, HostVersion: "0.1.126", AuthType: "apikey"}, nil
})
assertStatusCode(t, rec, 200)
assertJSONContains(t, rec.Body().Bytes(), "host_id", "host-1")
assertJSONContains(t, rec.Body().Bytes(), "base_url", "https://sub2api.example.com")
})
}
func TestActionSetPackClosures(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
packID := createAppPackRecord(t, store)
if _, err := store.Providers().Create(context.Background(), sqlite.Provider{
PackID: packID,
ProviderID: "provider-a",
DisplayName: "Provider A",
BaseURL: "https://provider-a.example.com",
Platform: "openai",
}); err != nil {
t.Fatalf("Providers().Create() error = %v", err)
}
actions := NewActionSet(appTestDSN(t, store))
packs, err := actions.ListPacks(context.Background())
if err != nil {
t.Fatalf("ListPacks() error = %v", err)
}
if len(packs) != 1 || packs[0].PackID == "" {
t.Fatalf("ListPacks() = %+v, want single persisted pack", packs)
}
packInfo, err := actions.GetPack(context.Background(), packs[0].PackID)
if err != nil {
t.Fatalf("GetPack() error = %v", err)
}
if packInfo.PackID != packs[0].PackID {
t.Fatalf("GetPack() = %+v, want pack_id %q", packInfo, packs[0].PackID)
}
providers, err := actions.ListPackProviders(context.Background(), packs[0].PackID)
if err != nil {
t.Fatalf("ListPackProviders() error = %v", err)
}
if len(providers) != 1 || providers[0].ProviderID != "provider-a" {
t.Fatalf("ListPackProviders() = %+v, want provider-a", providers)
}
}
func TestActionSetCreateHostClosure(t *testing.T) {
t.Parallel()
server := httptest.NewServer(newBatchImportActionStubServer(t))
defer server.Close()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
actions := NewActionSet(appTestDSN(t, store))
host, err := actions.CreateHost(context.Background(), CreateHostRequest{
Name: "prod-sub2api",
BaseURL: server.URL,
Auth: CreateHostAuth{Type: "apikey", Token: "host-token"},
})
if err != nil {
t.Fatalf("CreateHost() error = %v", err)
}
if host.HostID != "prod-sub2api" || host.HostVersion != "0.1.126" || host.Status != "supported" {
t.Fatalf("CreateHost() = %+v, want stored supported host", host)
}
stored, err := store.Hosts().GetByHostID(context.Background(), "prod-sub2api")
if err != nil {
t.Fatalf("Hosts().GetByHostID() error = %v", err)
}
if stored.BaseURL != server.URL || stored.AuthToken != "host-token" {
t.Fatalf("stored host = %+v, want persisted connection details", stored)
}
}
func TestActionSetListProviderImportBatchesClosure(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostA, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-a",
BaseURL: "https://host-a.example.com",
HostVersion: "0.1.126",
AuthToken: "token-a",
})
if err != nil {
t.Fatalf("Hosts().Create(host-a) error = %v", err)
}
hostB, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-b",
BaseURL: "https://host-b.example.com",
HostVersion: "0.1.126",
AuthToken: "token-b",
})
if err != nil {
t.Fatalf("Hosts().Create(host-b) error = %v", err)
}
packID := createAppPackRecord(t, store)
providerID, err := store.Providers().Create(context.Background(), sqlite.Provider{
PackID: packID,
ProviderID: "shared-provider",
DisplayName: "Shared Provider",
BaseURL: "https://provider.example.com",
Platform: "openai",
})
if err != nil {
t.Fatalf("Providers().Create() error = %v", err)
}
if _, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: hostA,
PackID: packID,
ProviderID: providerID,
Mode: provision.ImportModePartial,
BatchStatus: provision.BatchStatusSucceeded,
AccessStatus: provision.AccessStatusSelfServiceReady,
}); err != nil {
t.Fatalf("ImportBatches().Create(host-a) error = %v", err)
}
if _, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: hostB,
PackID: packID,
ProviderID: providerID,
Mode: provision.ImportModeStrict,
BatchStatus: provision.BatchStatusPartial,
AccessStatus: provision.AccessStatusBroken,
}); err != nil {
t.Fatalf("ImportBatches().Create(host-b) error = %v", err)
}
actions := NewActionSet(appTestDSN(t, store))
if _, err := actions.ListProviderImportBatches(context.Background(), ProviderQueryRequest{ProviderID: "shared-provider"}); err == nil || err.Error() != "provider exists on multiple hosts; host_id is required" {
t.Fatalf("ListProviderImportBatches(multi-host) error = %v, want host_id required", err)
}
batches, err := actions.ListProviderImportBatches(context.Background(), ProviderQueryRequest{
ProviderID: "shared-provider",
HostID: "host-a",
})
if err != nil {
t.Fatalf("ListProviderImportBatches(host filter) error = %v", err)
}
if len(batches) != 1 || batches[0].BatchStatus != provision.BatchStatusSucceeded {
t.Fatalf("ListProviderImportBatches(host filter) = %+v, want single succeeded batch", batches)
}
}
func TestActionSetHostClosuresAndAccessPreview(t *testing.T) {
t.Parallel()
server := httptest.NewServer(newBatchImportActionStubServer(t))
defer server.Close()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostID, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-main",
BaseURL: server.URL,
HostVersion: "0.0.1",
CapabilityProbeJSON: "{}",
AuthType: "apikey",
AuthToken: "host-token",
})
if err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
packID := createAppPackRecord(t, store)
providerID, err := store.Providers().Create(context.Background(), sqlite.Provider{
PackID: packID,
ProviderID: "preview-provider",
DisplayName: "Preview Provider",
BaseURL: "https://provider.example.com",
Platform: "openai",
})
if err != nil {
t.Fatalf("Providers().Create() error = %v", err)
}
batchID, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: hostID,
PackID: packID,
ProviderID: providerID,
Mode: provision.ImportModePartial,
BatchStatus: provision.BatchStatusSucceeded,
AccessStatus: provision.AccessStatusSelfServiceReady,
})
if err != nil {
t.Fatalf("ImportBatches().Create() error = %v", err)
}
if _, err := store.AccessClosures().Create(context.Background(), sqlite.AccessClosureRecord{
BatchID: batchID,
ClosureType: provision.AccessModeSelfService,
Status: provision.AccessStatusSelfServiceReady,
DetailsJSON: `{}`,
}); err != nil {
t.Fatalf("AccessClosures().Create() error = %v", err)
}
actions := NewActionSet(appTestDSN(t, store))
hosts, err := actions.ListHosts(context.Background())
if err != nil {
t.Fatalf("ListHosts() error = %v", err)
}
if len(hosts) != 1 || hosts[0].HostID != "host-main" {
t.Fatalf("ListHosts() = %+v, want host-main", hosts)
}
hostInfo, err := actions.GetHost(context.Background(), "host-main")
if err != nil {
t.Fatalf("GetHost() error = %v", err)
}
if hostInfo.HostID != "host-main" || hostInfo.BaseURL != server.URL {
t.Fatalf("GetHost() = %+v, want stored host", hostInfo)
}
probed, err := actions.ProbeHost(context.Background(), ProbeHostRequest{HostID: "host-main"})
if err != nil {
t.Fatalf("ProbeHost() error = %v", err)
}
if probed.Status != "supported" || probed.HostVersion != "0.1.126" {
t.Fatalf("ProbeHost() = %+v, want supported host 0.1.126", probed)
}
packRow, err := store.Packs().GetByID(context.Background(), packID)
if err != nil {
t.Fatalf("Packs().GetByID() error = %v", err)
}
preview, err := actions.AccessPreview(context.Background(), AccessPreviewRequest{
ProviderID: "preview-provider",
PackID: packRow.PackID,
HostID: "host-main",
Mode: provision.AccessModeSelfService,
})
if err != nil {
t.Fatalf("AccessPreview(self_service) error = %v", err)
}
if !preview.Available {
t.Fatalf("AccessPreview(self_service) = %+v, want available=true", preview)
}
preview, err = actions.AccessPreview(context.Background(), AccessPreviewRequest{
ProviderID: "preview-provider",
PackID: packRow.PackID,
HostID: "host-main",
Mode: provision.AccessModeSubscription,
})
if err != nil {
t.Fatalf("AccessPreview(subscription) error = %v", err)
}
if preview.Available {
t.Fatalf("AccessPreview(subscription) = %+v, want available=false", preview)
}
if err := actions.DeleteHost(context.Background(), "host-main"); err == nil {
t.Fatal("DeleteHost() error = nil, want host_in_use conflict")
} else {
httpErr, ok := err.(*httpError)
if !ok || httpErr.StatusCode != http.StatusConflict || httpErr.Code != "host_in_use" {
t.Fatalf("DeleteHost() error = %T %v, want *httpError host_in_use conflict", err, err)
}
}
}
func TestAdditionalHostHTTPWrappers(t *testing.T) {
t.Parallel()
t.Run("handleListHosts returns empty array", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "GET", "/hosts", map[string]any{}, "")
rec := &responseRecorder{header: map[string][]string{}}
handleListHosts(rec, req, func(context.Context) ([]HostInfo, error) { return nil, nil })
assertStatusCode(t, rec, 200)
hosts, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "hosts")
if !ok || len(hosts) != 0 {
t.Fatalf("hosts = %#v, want empty array", hosts)
}
})
t.Run("handleGetHost requires host id", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "GET", "/hosts/", map[string]any{}, "")
rec := &responseRecorder{header: map[string][]string{}}
handleGetHost(rec, req, func(context.Context, string) (HostInfo, error) { return HostInfo{}, nil })
assertStatusCode(t, rec, 400)
assertJSONContains(t, rec.Body().Bytes(), "error.message", "host_id is required")
})
t.Run("handleProbeHost returns payload", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/hosts/host-1/probe", map[string]any{
"auth": map[string]any{"type": "apikey", "token": "token"},
}, "")
req.SetPathValue("hostID", "host-1")
rec := &responseRecorder{header: map[string][]string{}}
handleProbeHost(rec, req, func(_ context.Context, got ProbeHostRequest) (HostInfo, error) {
if got.HostID != "host-1" || got.Auth.Token != "token" {
t.Fatalf("ProbeHostRequest = %+v, want host-1/token", got)
}
return HostInfo{HostID: "host-1", HostVersion: "0.1.126", Status: "supported"}, nil
})
assertStatusCode(t, rec, 200)
assertJSONContains(t, rec.Body().Bytes(), "host_id", "host-1")
assertJSONContains(t, rec.Body().Bytes(), "status", "supported")
})
}
func TestHandlerWrappersForPackAndHostRoutes(t *testing.T) {
t.Parallel()
t.Run("handleListPacks returns empty array", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "GET", "/packs", map[string]any{}, "")
rec := &responseRecorder{header: map[string][]string{}}
handleListPacks(rec, req, func(context.Context) ([]PackInfo, error) { return nil, nil })
assertStatusCode(t, rec, 200)
packs, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "packs")
if !ok || len(packs) != 0 {
t.Fatalf("packs = %#v, want empty array", packs)
}
})
t.Run("handleGetPack requires pack id", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "GET", "/packs/", map[string]any{}, "")
rec := &responseRecorder{header: map[string][]string{}}
handleGetPack(rec, req, func(context.Context, string) (PackInfo, error) { return PackInfo{}, nil })
assertStatusCode(t, rec, 400)
assertJSONContains(t, rec.Body().Bytes(), "error.message", "pack_id is required")
})
t.Run("handleGetPack returns payload", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "GET", "/packs/openai-cn-pack", map[string]any{}, "")
req.SetPathValue("packID", "openai-cn-pack")
rec := &responseRecorder{header: map[string][]string{}}
handleGetPack(rec, req, func(_ context.Context, packID string) (PackInfo, error) {
if packID != "openai-cn-pack" {
t.Fatalf("packID = %q, want openai-cn-pack", packID)
}
return PackInfo{PackID: "openai-cn-pack", Version: "1.0.0"}, nil
})
assertStatusCode(t, rec, 200)
assertJSONContains(t, rec.Body().Bytes(), "pack_id", "openai-cn-pack")
})
t.Run("handleListPackProviders returns array", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "GET", "/packs/openai-cn-pack/providers", map[string]any{}, "")
req.SetPathValue("packID", "openai-cn-pack")
rec := &responseRecorder{header: map[string][]string{}}
handleListPackProviders(rec, req, func(_ context.Context, packID string) ([]PackProviderInfo, error) {
if packID != "openai-cn-pack" {
t.Fatalf("packID = %q, want openai-cn-pack", packID)
}
return nil, nil
})
assertStatusCode(t, rec, 200)
providers, ok := decodeTopLevelArray(t, rec.Body().Bytes(), "providers")
if !ok || len(providers) != 0 {
t.Fatalf("providers = %#v, want empty array", providers)
}
})
t.Run("handleCreateHost decodes request", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/hosts", map[string]any{
"name": "host-1",
"base_url": "https://sub2api.example.com",
"auth": map[string]any{"type": "apikey", "token": "host-token"},
}, "")
rec := &responseRecorder{header: map[string][]string{}}
handleCreateHost(rec, req, func(_ context.Context, req CreateHostRequest) (HostInfo, error) {
if req.BaseURL != "https://sub2api.example.com" || req.Auth.Token != "host-token" {
t.Fatalf("request = %+v, want decoded create host request", req)
}
return HostInfo{HostID: "host-1", BaseURL: req.BaseURL}, nil
})
assertStatusCode(t, rec, 200)
assertJSONContains(t, rec.Body().Bytes(), "host_id", "host-1")
})
t.Run("handleProbeHost requires host id", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/hosts/probe", map[string]any{
"auth": map[string]any{"type": "apikey", "token": "host-token"},
}, "")
rec := &responseRecorder{header: map[string][]string{}}
handleProbeHost(rec, req, func(context.Context, ProbeHostRequest) (HostInfo, error) { return HostInfo{}, nil })
assertStatusCode(t, rec, 400)
assertJSONContains(t, rec.Body().Bytes(), "error.message", "host_id is required")
})
}
func TestAdditionalHTTPWrapperErrorBranches(t *testing.T) {
t.Parallel()
t.Run("list and pack wrappers handle missing actions", func(t *testing.T) {
t.Parallel()
cases := []struct {
name string
path string
run func(*responseRecorder, *http.Request)
wantStatus int
wantCode string
}{
{
name: "list packs",
path: "/packs",
run: func(rec *responseRecorder, req *http.Request) {
handleListPacks(rec, req, nil)
},
wantStatus: http.StatusInternalServerError,
wantCode: "server_misconfigured",
},
{
name: "list provider import batches",
path: "/providers/deepseek/import-batches",
run: func(rec *responseRecorder, req *http.Request) {
req.SetPathValue("providerID", "deepseek")
handleListProviderImportBatches(rec, req, nil)
},
wantStatus: http.StatusInternalServerError,
wantCode: "server_misconfigured",
},
{
name: "get pack",
path: "/packs/openai-cn-pack",
run: func(rec *responseRecorder, req *http.Request) {
req.SetPathValue("packID", "openai-cn-pack")
handleGetPack(rec, req, nil)
},
wantStatus: http.StatusInternalServerError,
wantCode: "server_misconfigured",
},
{
name: "list pack providers",
path: "/packs/openai-cn-pack/providers",
run: func(rec *responseRecorder, req *http.Request) {
req.SetPathValue("packID", "openai-cn-pack")
handleListPackProviders(rec, req, nil)
},
wantStatus: http.StatusInternalServerError,
wantCode: "server_misconfigured",
},
{
name: "rollback batch",
path: "/import-batches/11/rollback",
run: func(rec *responseRecorder, req *http.Request) {
req.SetPathValue("batchID", "11")
handleRollbackBatch(rec, req, nil)
},
wantStatus: http.StatusInternalServerError,
wantCode: "server_misconfigured",
},
{
name: "probe host",
path: "/hosts/host-1/probe",
run: func(rec *responseRecorder, req *http.Request) {
req.SetPathValue("hostID", "host-1")
handleProbeHost(rec, req, nil)
},
wantStatus: http.StatusInternalServerError,
wantCode: "server_misconfigured",
},
{
name: "create host",
path: "/hosts",
run: func(rec *responseRecorder, req *http.Request) {
handleCreateHost(rec, req, nil)
},
wantStatus: http.StatusInternalServerError,
wantCode: "server_misconfigured",
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
req := httptestRequest(t, "POST", tc.path, map[string]any{}, "")
rec := &responseRecorder{header: map[string][]string{}}
tc.run(rec, req)
assertStatusCode(t, rec, tc.wantStatus)
assertJSONContains(t, rec.Body().Bytes(), "error.code", tc.wantCode)
})
}
})
t.Run("access wrappers handle missing actions and decode errors", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "POST", "/providers/deepseek/access/subscriptions", map[string]any{}, "")
req.SetPathValue("providerID", "deepseek")
rec := &responseRecorder{header: map[string][]string{}}
handleAssignAccessSubscriptions(rec, req, nil)
assertStatusCode(t, rec, http.StatusInternalServerError)
assertJSONContains(t, rec.Body().Bytes(), "error.code", "server_misconfigured")
req = httptestRequest(t, "POST", "/providers/deepseek/access/preview", map[string]any{}, "")
req.SetPathValue("providerID", "deepseek")
rec = &responseRecorder{header: map[string][]string{}}
handleAccessPreview(rec, req, nil)
assertStatusCode(t, rec, http.StatusInternalServerError)
assertJSONContains(t, rec.Body().Bytes(), "error.code", "server_misconfigured")
badReq, err := http.NewRequest(http.MethodPost, "/providers/deepseek/access/subscriptions", strings.NewReader("{"))
if err != nil {
t.Fatalf("http.NewRequest(assign bad json) error = %v", err)
}
badReq.SetPathValue("providerID", "deepseek")
badReq.Header.Set("Content-Type", "application/json")
rec = &responseRecorder{header: map[string][]string{}}
handleAssignAccessSubscriptions(rec, badReq, func(context.Context, AssignAccessSubscriptionsRequest) (AssignAccessSubscriptionsResult, error) {
t.Fatal("assign action should not be called for bad json")
return AssignAccessSubscriptionsResult{}, nil
})
assertStatusCode(t, rec, http.StatusBadRequest)
assertJSONContains(t, rec.Body().Bytes(), "error.code", "bad_request")
badReq, err = http.NewRequest(http.MethodPost, "/providers/deepseek/access/preview", strings.NewReader("{"))
if err != nil {
t.Fatalf("http.NewRequest(preview bad json) error = %v", err)
}
badReq.SetPathValue("providerID", "deepseek")
badReq.Header.Set("Content-Type", "application/json")
rec = &responseRecorder{header: map[string][]string{}}
handleAccessPreview(rec, badReq, func(context.Context, AccessPreviewRequest) (AccessPreviewResult, error) {
t.Fatal("access preview action should not be called for bad json")
return AccessPreviewResult{}, nil
})
assertStatusCode(t, rec, http.StatusBadRequest)
assertJSONContains(t, rec.Body().Bytes(), "error.code", "bad_request")
})
t.Run("pack and host wrappers classify action errors", func(t *testing.T) {
t.Parallel()
req := httptestRequest(t, "GET", "/packs/openai-cn-pack", map[string]any{}, "")
req.SetPathValue("packID", "openai-cn-pack")
rec := &responseRecorder{header: map[string][]string{}}
handleGetPack(rec, req, func(context.Context, string) (PackInfo, error) {
return PackInfo{}, sql.ErrNoRows
})
assertStatusCode(t, rec, http.StatusInternalServerError)
assertJSONContains(t, rec.Body().Bytes(), "error.code", "internal_error")
req = httptestRequest(t, "POST", "/import-batches/11/rollback", map[string]any{}, "")
req.SetPathValue("batchID", "11")
rec = &responseRecorder{header: map[string][]string{}}
handleRollbackBatch(rec, req, func(context.Context, RollbackBatchRequest) (provision.RollbackReport, error) {
return provision.RollbackReport{}, fmt.Errorf("batch 11 not found")
})
assertStatusCode(t, rec, http.StatusNotFound)
assertJSONContains(t, rec.Body().Bytes(), "error.code", "not_found")
req = httptestRequest(t, "POST", "/hosts/host-1/probe", map[string]any{
"auth": map[string]any{"type": "apikey", "token": "host-token"},
}, "")
req.SetPathValue("hostID", "host-1")
rec = &responseRecorder{header: map[string][]string{}}
handleProbeHost(rec, req, func(context.Context, ProbeHostRequest) (HostInfo, error) {
return HostInfo{}, fmt.Errorf("host probe decode failed")
})
assertStatusCode(t, rec, http.StatusBadRequest)
assertJSONContains(t, rec.Body().Bytes(), "error.code", "bad_request")
req = httptestRequest(t, "POST", "/hosts", map[string]any{
"name": "host-1",
}, "")
rec = &responseRecorder{header: map[string][]string{}}
handleCreateHost(rec, req, func(context.Context, CreateHostRequest) (HostInfo, error) {
return HostInfo{}, fmt.Errorf("base_url is required")
})
assertStatusCode(t, rec, http.StatusBadRequest)
assertJSONContains(t, rec.Body().Bytes(), "error.code", "bad_request")
})
}
func TestActionSetBatchDetailAndProviderSnapshotClosures(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostPK := createAppHostRecord(t, store, "https://sub2api.example.com")
packPK := createAppPackRecord(t, store)
providerPK := createAppProviderRecord(t, store, packPK)
batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK)
if _, err := store.ImportBatchItems().Create(context.Background(), sqlite.ImportBatchItem{
BatchID: batchID,
KeyFingerprint: "sha256:test",
AccountStatus: "passed",
}); err != nil {
t.Fatalf("ImportBatchItems().Create() error = %v", err)
}
for _, resource := range []sqlite.ManagedResource{
{BatchID: batchID, HostID: hostPK, ResourceType: "group", HostResourceID: "group-1", ResourceName: "group-1"},
{BatchID: batchID, HostID: hostPK, ResourceType: "account", HostResourceID: "account-1", ResourceName: "account-1"},
} {
if _, err := store.ManagedResources().Create(context.Background(), resource); err != nil {
t.Fatalf("ManagedResources().Create(%s) error = %v", resource.ResourceType, err)
}
}
if _, err := store.AccessClosures().Create(context.Background(), sqlite.AccessClosureRecord{
BatchID: batchID,
ClosureType: provision.AccessModeSelfService,
Status: provision.AccessStatusSelfServiceReady,
DetailsJSON: `{"ok":true}`,
}); err != nil {
t.Fatalf("AccessClosures().Create() error = %v", err)
}
if _, err := store.ReconcileRuns().Create(context.Background(), sqlite.ReconcileRun{
BatchID: batchID,
HostID: hostPK,
ProviderID: providerPK,
Status: "drifted",
SummaryJSON: `{"missing_count":1}`,
}); err != nil {
t.Fatalf("ReconcileRuns().Create() error = %v", err)
}
packRow, err := store.Packs().GetByID(context.Background(), packPK)
if err != nil {
t.Fatalf("Packs().GetByID() error = %v", err)
}
providerRow, err := store.Providers().GetByID(context.Background(), providerPK)
if err != nil {
t.Fatalf("Providers().GetByID() error = %v", err)
}
hostRow, err := store.Hosts().GetByID(context.Background(), hostPK)
if err != nil {
t.Fatalf("Hosts().GetByID() error = %v", err)
}
actions := NewActionSet(appTestDSN(t, store))
detail, err := actions.BatchDetail(context.Background(), BatchDetailRequest{BatchID: batchID})
if err != nil {
t.Fatalf("BatchDetail() error = %v", err)
}
if detail.Batch.ID != batchID || len(detail.Items) != 1 || len(detail.ManagedResources) != 2 || len(detail.AccessClosures) != 1 || len(detail.ReconcileRuns) != 1 {
t.Fatalf("BatchDetail() = %+v, want populated batch detail", detail)
}
snapshotReq := ProviderQueryRequest{
ProviderID: providerRow.ProviderID,
PackID: packRow.PackID,
HostID: hostRow.HostID,
}
statusSnapshot, err := actions.GetProviderStatus(context.Background(), snapshotReq)
if err != nil {
t.Fatalf("GetProviderStatus() error = %v", err)
}
if statusSnapshot.Batch.ID != batchID || statusSnapshot.LatestAccessStatus != provision.AccessStatusSelfServiceReady || statusSnapshot.LatestReconcileStatus != "drifted" || statusSnapshot.ProviderStatus != provision.ProviderStatusActive {
t.Fatalf("GetProviderStatus() = %+v, want active snapshot with drifted reconcile metadata", statusSnapshot)
}
if got := statusSnapshot.LatestReconcileSummary["missing_count"]; got != float64(1) {
t.Fatalf("LatestReconcileSummary[missing_count] = %#v, want 1", got)
}
resourceSnapshot, err := actions.GetProviderResources(context.Background(), snapshotReq)
if err != nil {
t.Fatalf("GetProviderResources() error = %v", err)
}
if len(resourceSnapshot.ManagedResources) != 2 || len(resourceSnapshot.AccessClosures) != 1 || len(resourceSnapshot.ReconcileRuns) != 1 {
t.Fatalf("GetProviderResources() = %+v, want persisted resources/closures/runs", resourceSnapshot)
}
accessSnapshot, err := actions.GetProviderAccessStatus(context.Background(), snapshotReq)
if err != nil {
t.Fatalf("GetProviderAccessStatus() error = %v", err)
}
if accessSnapshot.Batch.ID != batchID || accessSnapshot.LatestAccessStatus != provision.AccessStatusSelfServiceReady || len(accessSnapshot.AccessClosures) != 1 {
t.Fatalf("GetProviderAccessStatus() = %+v, want latest access closure", accessSnapshot)
}
}
func TestActionSetRollbackBatchClosure(t *testing.T) {
t.Parallel()
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
t.Fatalf("method = %s, want DELETE", r.Method)
}
switch r.URL.Path {
case "/api/v1/admin/accounts/account-1",
"/api/v1/admin/payment/plans/plan-1",
"/api/v1/admin/channels/channel-1",
"/api/v1/admin/groups/group-1":
w.WriteHeader(http.StatusNoContent)
default:
http.NotFound(w, r)
}
}))
defer server.Close()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostPK := createAppHostRecord(t, store, server.URL)
packPK := createAppPackRecord(t, store)
providerPK := createAppProviderRecord(t, store, packPK)
batchID := createAppBatchRecord(t, store, hostPK, packPK, providerPK)
for _, resource := range []sqlite.ManagedResource{
{BatchID: batchID, HostID: hostPK, ResourceType: "group", HostResourceID: "group-1", ResourceName: "group-1"},
{BatchID: batchID, HostID: hostPK, ResourceType: "channel", HostResourceID: "channel-1", ResourceName: "channel-1"},
{BatchID: batchID, HostID: hostPK, ResourceType: "plan", HostResourceID: "plan-1", ResourceName: "plan-1"},
{BatchID: batchID, HostID: hostPK, ResourceType: "account", HostResourceID: "account-1", ResourceName: "account-1"},
} {
if _, err := store.ManagedResources().Create(context.Background(), resource); err != nil {
t.Fatalf("ManagedResources().Create(%s) error = %v", resource.ResourceType, err)
}
}
actions := NewActionSet(appTestDSN(t, store))
report, err := actions.RollbackBatch(context.Background(), RollbackBatchRequest{BatchID: batchID})
if err != nil {
t.Fatalf("RollbackBatch() error = %v", err)
}
if report.AccountsDeleted != 1 || report.PlansDeleted != 1 || report.ChannelsDeleted != 1 || report.GroupsDeleted != 1 {
t.Fatalf("RollbackBatch() = %+v, want one deleted resource per type", report)
}
batchRow, err := store.ImportBatches().GetByID(context.Background(), batchID)
if err != nil {
t.Fatalf("ImportBatches().GetByID() error = %v", err)
}
if batchRow.BatchStatus != provision.BatchStatusRolledBack {
t.Fatalf("batch status = %q, want %q", batchRow.BatchStatus, provision.BatchStatusRolledBack)
}
}
func TestActionSetCreateHostUpdateAndConflict(t *testing.T) {
t.Parallel()
t.Run("update existing host connection", func(t *testing.T) {
t.Parallel()
server := httptest.NewServer(newBatchImportActionStubServer(t))
defer server.Close()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
if _, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "prod-sub2api",
BaseURL: server.URL,
HostVersion: "0.0.1",
CapabilityProbeJSON: "{}",
AuthType: "apikey",
AuthToken: "old-token",
}); err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
actions := NewActionSet(appTestDSN(t, store))
host, err := actions.CreateHost(context.Background(), CreateHostRequest{
Name: "prod-sub2api",
BaseURL: server.URL,
Auth: CreateHostAuth{Type: "apikey", Token: "host-token"},
})
if err != nil {
t.Fatalf("CreateHost(update) error = %v", err)
}
if host.HostVersion != "0.1.126" || host.Status != "supported" {
t.Fatalf("CreateHost(update) = %+v, want reprobed supported host", host)
}
stored, err := store.Hosts().GetByHostID(context.Background(), "prod-sub2api")
if err != nil {
t.Fatalf("Hosts().GetByHostID() error = %v", err)
}
if stored.AuthToken != "host-token" || stored.HostVersion != "0.1.126" {
t.Fatalf("stored host = %+v, want updated token/version", stored)
}
})
t.Run("base url conflict returns 409", func(t *testing.T) {
t.Parallel()
server := httptest.NewServer(newBatchImportActionStubServer(t))
defer server.Close()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
if _, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "existing-host",
BaseURL: server.URL,
HostVersion: "0.1.126",
CapabilityProbeJSON: "{}",
AuthType: "apikey",
AuthToken: "host-token",
}); err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
actions := NewActionSet(appTestDSN(t, store))
_, err := actions.CreateHost(context.Background(), CreateHostRequest{
Name: "new-host",
BaseURL: server.URL,
Auth: CreateHostAuth{Type: "apikey", Token: "host-token"},
})
if err == nil {
t.Fatal("CreateHost(conflict) error = nil, want conflict")
}
httpErr, ok := err.(*httpError)
if !ok || httpErr.StatusCode != http.StatusConflict {
t.Fatalf("CreateHost(conflict) error = %T %v, want *httpError 409", err, err)
}
})
}
func TestActionSetInstallPreviewAndRollbackProviderClosures(t *testing.T) {
t.Parallel()
baseHandler := newBatchImportActionStubServer(t)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodDelete {
switch r.URL.Path {
case "/api/v1/admin/accounts/account-1",
"/api/v1/admin/payment/plans/plan-1",
"/api/v1/admin/channels/channel-1",
"/api/v1/admin/groups/group-1":
w.WriteHeader(http.StatusNoContent)
return
}
}
baseHandler.ServeHTTP(w, r)
}))
defer server.Close()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostPK := createAppHostRecord(t, store, server.URL)
actions := NewActionSet(appTestDSN(t, store))
packPath := filepath.Join("..", "..", "packs", "openai-cn-pack")
installResult, err := actions.InstallPack(context.Background(), InstallPackRequest{
HostBaseURL: server.URL,
HostAPIKey: "host-token",
PackPath: packPath,
})
if err != nil {
t.Fatalf("InstallPack() error = %v", err)
}
if installResult.Pack.PackID != "openai-cn-pack" || len(installResult.Providers) == 0 {
t.Fatalf("InstallPack() = %+v, want persisted pack with providers", installResult)
}
preview, err := actions.PreviewProvider(context.Background(), PreviewProviderRequest{
HostBaseURL: server.URL,
PackPath: packPath,
ProviderID: "deepseek",
Mode: provision.ImportModePartial,
Keys: []string{" key-1 ", "key-2"},
})
if err != nil {
t.Fatalf("PreviewProvider() error = %v", err)
}
if len(preview.AcceptedKeys) != 2 || preview.Decisions["group"].Action != provision.PreviewActionCreate {
t.Fatalf("PreviewProvider() = %+v, want accepted keys and create decisions", preview)
}
packRow, err := store.Packs().GetByPackID(context.Background(), "openai-cn-pack")
if err != nil {
t.Fatalf("Packs().GetByPackID() error = %v", err)
}
providerRow, err := store.Providers().GetByPackIDAndProviderID(context.Background(), packRow.ID, "deepseek")
if err != nil {
t.Fatalf("Providers().GetByPackIDAndProviderID() error = %v", err)
}
batchID, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: hostPK,
PackID: packRow.ID,
ProviderID: providerRow.ID,
Mode: provision.ImportModePartial,
BatchStatus: provision.BatchStatusSucceeded,
AccessStatus: provision.AccessStatusSelfServiceReady,
})
if err != nil {
t.Fatalf("ImportBatches().Create() error = %v", err)
}
for _, resource := range []sqlite.ManagedResource{
{BatchID: batchID, HostID: hostPK, ResourceType: "group", HostResourceID: "group-1", ResourceName: "group-1"},
{BatchID: batchID, HostID: hostPK, ResourceType: "channel", HostResourceID: "channel-1", ResourceName: "channel-1"},
{BatchID: batchID, HostID: hostPK, ResourceType: "plan", HostResourceID: "plan-1", ResourceName: "plan-1"},
{BatchID: batchID, HostID: hostPK, ResourceType: "account", HostResourceID: "account-1", ResourceName: "account-1"},
} {
if _, err := store.ManagedResources().Create(context.Background(), resource); err != nil {
t.Fatalf("ManagedResources().Create(%s) error = %v", resource.ResourceType, err)
}
}
report, err := actions.RollbackProvider(context.Background(), RollbackProviderRequest{
HostBaseURL: server.URL,
PackPath: packPath,
ProviderID: "deepseek",
})
if err != nil {
t.Fatalf("RollbackProvider() error = %v", err)
}
if report.AccountsDeleted != 1 || report.PlansDeleted != 1 || report.ChannelsDeleted != 1 || report.GroupsDeleted != 1 {
t.Fatalf("RollbackProvider() = %+v, want one deleted resource per type", report)
}
batchRow, err := store.ImportBatches().GetByID(context.Background(), batchID)
if err != nil {
t.Fatalf("ImportBatches().GetByID() error = %v", err)
}
if batchRow.BatchStatus != provision.BatchStatusRolledBack {
t.Fatalf("batch status = %q, want %q", batchRow.BatchStatus, provision.BatchStatusRolledBack)
}
}
func TestActionSetAssignAccessSubscriptionsClosure(t *testing.T) {
t.Parallel()
baseHandler := newBatchImportActionStubServer(t)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && strings.HasPrefix(r.URL.RequestURI(), "/api/v1/admin/users?"):
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{}}})
return
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84, "email": "relay-sub-user-1@sub2api.local"}})
return
case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/users/84":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}})
return
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/admin/users/84/balance":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 84}})
return
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/auth/login":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"access_token": "user-jwt"}})
return
case r.Method == http.MethodPost && r.URL.Path == "/api/v1/keys":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": 501, "key": "sk-relay-test", "name": "managed-key"}})
return
case r.Method == http.MethodPut && r.URL.Path == "/api/v1/admin/api-keys/501":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"api_key": map[string]any{"id": 501}}})
return
case r.URL.Path == "/v1/models" && strings.HasPrefix(strings.TrimSpace(r.Header.Get("Authorization")), "Bearer sk-relay-"):
writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "deepseek-v4-pro"}}})
return
case r.URL.Path == "/v1/chat/completions" && strings.HasPrefix(strings.TrimSpace(r.Header.Get("Authorization")), "Bearer sk-relay-"):
writeJSON(w, http.StatusOK, map[string]any{
"id": "chatcmpl_subscription",
"choices": []map[string]any{{
"index": 0,
"message": map[string]any{
"role": "assistant",
"content": "pong",
},
}},
})
return
}
baseHandler.ServeHTTP(w, r)
}))
defer server.Close()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
hostPK := createAppHostRecord(t, store, server.URL)
actions := NewActionSet(appTestDSN(t, store))
packPath := filepath.Join("..", "..", "packs", "openai-cn-pack")
if _, err := actions.InstallPack(context.Background(), InstallPackRequest{
HostBaseURL: server.URL,
HostAPIKey: "host-token",
PackPath: packPath,
}); err != nil {
t.Fatalf("InstallPack() error = %v", err)
}
packRow, err := store.Packs().GetByPackID(context.Background(), "openai-cn-pack")
if err != nil {
t.Fatalf("Packs().GetByPackID() error = %v", err)
}
providerRow, err := store.Providers().GetByPackIDAndProviderID(context.Background(), packRow.ID, "deepseek")
if err != nil {
t.Fatalf("Providers().GetByPackIDAndProviderID() error = %v", err)
}
batchID, err := store.ImportBatches().Create(context.Background(), sqlite.ImportBatch{
HostID: hostPK,
PackID: packRow.ID,
ProviderID: providerRow.ID,
Mode: provision.ImportModePartial,
BatchStatus: provision.BatchStatusSucceeded,
AccessStatus: provision.AccessStatusSelfServiceReady,
})
if err != nil {
t.Fatalf("ImportBatches().Create() error = %v", err)
}
if _, err := store.ManagedResources().Create(context.Background(), sqlite.ManagedResource{
BatchID: batchID,
HostID: hostPK,
ResourceType: "group",
HostResourceID: "101",
ResourceName: "group-101",
}); err != nil {
t.Fatalf("ManagedResources().Create(group) error = %v", err)
}
result, err := actions.AssignAccessSubscriptions(context.Background(), AssignAccessSubscriptionsRequest{
HostBaseURL: server.URL,
PackPath: packPath,
ProviderID: "deepseek",
AccessAPIKey: "caller-probe-key",
SubscriptionUsers: []string{"crm-user-1"},
SubscriptionDays: 30,
})
if err != nil {
t.Fatalf("AssignAccessSubscriptions() error = %v", err)
}
if result.Assigned != 1 || result.AccessStatus != provision.AccessStatusSubscriptionReady {
t.Fatalf("AssignAccessSubscriptions() = %+v, want one assigned subscription_ready result", result)
}
closures, err := store.AccessClosures().GetByBatchID(context.Background(), batchID)
if err != nil {
t.Fatalf("AccessClosures().GetByBatchID() error = %v", err)
}
if len(closures) != 1 || closures[0].Status != provision.AccessStatusSubscriptionReady {
t.Fatalf("AccessClosures() = %+v, want persisted subscription_ready closure", closures)
}
var payload map[string]any
if err := json.Unmarshal([]byte(closures[0].DetailsJSON), &payload); err != nil {
t.Fatalf("decode access closure payload: %v", err)
}
if _, ok := payload["probe_api_key"]; ok {
t.Fatalf("subscription access closure should omit probe_api_key, got %#v", payload["probe_api_key"])
}
if got, _ := payload["requested_probe_api_key"].(string); got != "caller-probe-key" {
t.Fatalf("requested_probe_api_key = %q, want caller-probe-key", got)
}
if got, _ := payload["effective_probe_key_source"].(string); got != "managed_subscription" {
t.Fatalf("effective_probe_key_source = %q, want managed_subscription", got)
}
batchRow, err := store.ImportBatches().GetByID(context.Background(), batchID)
if err != nil {
t.Fatalf("ImportBatches().GetByID() error = %v", err)
}
if batchRow.AccessStatus != provision.AccessStatusSubscriptionReady {
t.Fatalf("batch access_status = %q, want %q", batchRow.AccessStatus, provision.AccessStatusSubscriptionReady)
}
}
func TestActionSetImportAndReconcileProviderClosures(t *testing.T) {
t.Parallel()
baseHandler := newBatchImportActionStubServer(t)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v1/admin/groups":
if r.Method == http.MethodGet {
writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "group_1", "name": "DeepSeek 默认分组-self-service"}}})
return
}
if r.Method == http.MethodPut {
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": "group_1", "name": "DeepSeek 默认分组-self-service"}})
return
}
case "/api/v1/admin/channels":
if r.Method == http.MethodGet {
writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "channel_1", "name": "DeepSeek 默认渠道-self-service"}}})
return
}
case "/api/v1/admin/groups/group_1":
if r.Method == http.MethodPut {
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": "group_1", "name": "DeepSeek 默认分组-self-service"}})
return
}
case "/api/v1/admin/channels/channel_1":
if r.Method == http.MethodPut {
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"id": "channel_1", "name": "DeepSeek 默认渠道-self-service"}})
return
}
case "/api/v1/admin/accounts":
if r.Method == http.MethodGet {
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{{"id": "account_1", "name": "deepseek-01"}}, "pages": 1}})
return
}
case "/api/v1/admin/accounts/account_1/models":
writeJSON(w, http.StatusOK, map[string]any{"data": map[string]any{"items": []map[string]any{{"id": "deepseek-v4-pro", "display_name": "DeepSeek V4 Pro", "type": "chat"}}}})
return
case "/api/v1/admin/accounts/account_1":
if r.Method == http.MethodDelete {
w.WriteHeader(http.StatusNoContent)
return
}
case "/v1/models":
switch strings.TrimSpace(r.Header.Get("Authorization")) {
case "Bearer entry-key", "Bearer gateway-key":
writeJSON(w, http.StatusOK, map[string]any{"data": []map[string]any{{"id": "deepseek-v4-pro"}}})
return
}
case "/v1/chat/completions":
switch strings.TrimSpace(r.Header.Get("Authorization")) {
case "Bearer entry-key", "Bearer gateway-key":
writeJSON(w, http.StatusOK, map[string]any{
"id": "chatcmpl_deepseek",
"choices": []map[string]any{{
"index": 0,
"message": map[string]any{
"role": "assistant",
"content": "pong",
},
}},
})
return
}
}
baseHandler.ServeHTTP(w, r)
}))
defer server.Close()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
if _, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-deepseek",
BaseURL: server.URL,
HostVersion: "0.1.126",
CapabilityProbeJSON: "{}",
AuthType: "apikey",
AuthToken: "host-token",
}); err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
actions := NewActionSet(appTestDSN(t, store))
packPath := filepath.Join("..", "..", "packs", "openai-cn-pack")
importResult, err := actions.ImportProvider(context.Background(), ImportProviderRequest{
HostID: "host-deepseek",
PackPath: packPath,
ProviderID: "deepseek",
Keys: []string{"entry-key"},
Mode: provision.ImportModePartial,
AccessMode: provision.AccessModeSelfService,
AccessAPIKey: "gateway-key",
})
if err != nil {
t.Fatalf("ImportProvider() error = %v", err)
}
if importResult.BatchID <= 0 || importResult.Report.BatchStatus != provision.BatchStatusSucceeded || importResult.Report.AccessStatus != provision.AccessStatusSelfServiceReady {
t.Fatalf("ImportProvider() = %+v, want succeeded self_service_ready batch", importResult)
}
reconcileResult, err := actions.ReconcileProvider(context.Background(), ReconcileProviderRequest{
HostID: "host-deepseek",
PackPath: packPath,
ProviderID: "deepseek",
AccessAPIKey: "gateway-key",
})
if err != nil {
t.Fatalf("ReconcileProvider() error = %v", err)
}
if reconcileResult.BatchID != importResult.BatchID || reconcileResult.Status != "active" || reconcileResult.AccessStatus != provision.AccessStatusSelfServiceReady {
t.Fatalf("ReconcileProvider() = %+v, want active reconcile for imported batch", reconcileResult)
}
}
func TestBuildListBatchImportRunItemsActionCursor(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "self_service",
State: "running",
})
mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:00' WHERE run_id = 'run-1'`)
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-1",
RunID: "run-1",
BaseURL: "https://a.example.com/v1",
ProviderID: "a",
APIKeyFingerprint: "sha256:a",
CurrentStage: "done",
ConfirmationStatus: "confirmed",
AccessStatus: "active",
MatchedAccountState: "active",
AccountResolution: "created",
})
mustCreateAppImportRunItem(t, store, sqlite.ImportRunItem{
ItemID: "item-2",
RunID: "run-1",
BaseURL: "https://b.example.com/v1",
ProviderID: "b",
APIKeyFingerprint: "sha256:b",
CurrentStage: "done",
ConfirmationStatus: "confirmed",
AccessStatus: "active",
MatchedAccountState: "active",
AccountResolution: "created",
})
action := buildListBatchImportRunItemsAction(appTestDSN(t, store))
result, err := action(context.Background(), ListBatchImportRunItemsRequest{
RunID: "run-1",
Cursor: "item-1",
Limit: 1,
})
if err != nil {
t.Fatalf("buildListBatchImportRunItemsAction(cursor) error = %v", err)
}
if len(result.Items) != 1 || result.Items[0].ItemID != "item-2" {
t.Fatalf("result.Items = %+v, want [item-2]", result.Items)
}
}
func TestBuildListBatchImportRunsActionCursorAndDefaults(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
mustCreateAppImportRun(t, store, sqlite.ImportRun{RunID: "run-1", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running"})
mustCreateAppImportRun(t, store, sqlite.ImportRun{RunID: "run-2", HostID: "host-1", Mode: "partial", AccessMode: "self_service", State: "running"})
mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:02' WHERE run_id = 'run-1'`)
mustExecSQL(t, store, `UPDATE import_runs SET started_at = '2026-05-23 10:00:01' WHERE run_id = 'run-2'`)
action := buildListBatchImportRunsAction(appTestDSN(t, store))
result, err := action(context.Background(), ListBatchImportRunsRequest{Cursor: "run-1", Limit: -1})
if err != nil {
t.Fatalf("buildListBatchImportRunsAction(cursor) error = %v", err)
}
if len(result.Runs) != 1 || result.Runs[0].RunID != "run-2" {
t.Fatalf("result.Runs = %+v, want [run-2]", result.Runs)
}
}
func TestBuildGetBatchImportRunActionPropagatesDBError(t *testing.T) {
t.Parallel()
action := buildGetBatchImportRunAction("file:/definitely-missing-path/does-not-exist.db?mode=ro")
if _, err := action(context.Background(), "run-1"); err == nil {
t.Fatal("buildGetBatchImportRunAction() error = nil, want open db error")
}
}
func TestBuildListBatchImportRunItemsActionPropagatesDBError(t *testing.T) {
t.Parallel()
action := buildListBatchImportRunItemsAction("file:/definitely-missing-path/does-not-exist.db?mode=ro")
if _, err := action(context.Background(), ListBatchImportRunItemsRequest{RunID: "run-1"}); err == nil {
t.Fatal("buildListBatchImportRunItemsAction() error = nil, want open db error")
}
}
func TestBuildListBatchImportRunsActionPropagatesDBError(t *testing.T) {
t.Parallel()
action := buildListBatchImportRunsAction("file:/definitely-missing-path/does-not-exist.db?mode=ro")
if _, err := action(context.Background(), ListBatchImportRunsRequest{}); err == nil {
t.Fatal("buildListBatchImportRunsAction() error = nil, want open db error")
}
}
func TestBuildGetBatchImportRunItemActionPropagatesMissingItem(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
action := buildGetBatchImportRunItemAction(appTestDSN(t, store))
if _, err := action(context.Background(), GetBatchImportRunItemRequest{RunID: "run-1", ItemID: "missing"}); err == nil || err.Error() != "item not found: missing" {
t.Fatalf("missing item error = %v, want item not found", err)
}
}
func TestResumePendingBatchImportRunsNoRunningRuns(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
mustCreateAppImportRun(t, store, sqlite.ImportRun{
RunID: "run-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "self_service",
State: "completed",
})
if err := resumePendingBatchImportRuns(context.Background(), appTestDSN(t, store)); err != nil {
t.Fatalf("resumePendingBatchImportRuns() error = %v", err)
}
}
func TestNewBatchImportRuntimeRunnerFromStoredRun(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
if _, err := store.Hosts().Create(context.Background(), sqlite.Host{
HostID: "host-1",
BaseURL: "https://sub2api.example.com",
HostVersion: "0.1.126",
AuthToken: "host-token",
}); err != nil {
t.Fatalf("Hosts().Create() error = %v", err)
}
runner, err := newBatchImportRuntimeRunnerFromStoredRun(context.Background(), store, sqlite.ImportRun{
RunID: "run-1",
HostID: "host-1",
Mode: "partial",
AccessMode: "subscription",
SubscriptionUsersJSON: `["user-1","user-2"]`,
SubscriptionDays: 30,
ProbeAPIKey: "probe-key",
})
if err != nil {
t.Fatalf("newBatchImportRuntimeRunnerFromStoredRun() error = %v", err)
}
if runner.request.HostID != "host-1" || len(runner.request.SubscriptionUsers) != 2 || runner.request.ProbeAPIKey != "probe-key" {
t.Fatalf("runner.request = %+v, want parsed stored run request", runner.request)
}
}
func TestBuildGetBatchImportRunActionClassifiesNotFoundOnlyOnSQLNoRows(t *testing.T) {
t.Parallel()
store := openAppTestStore(t)
defer closeAppTestStore(t, store)
action := buildGetBatchImportRunAction(appTestDSN(t, store))
_, err := action(context.Background(), "missing")
if err == nil || err.Error() != "run not found: missing" {
t.Fatalf("missing run error = %v, want run not found", err)
}
}
func TestSQLNoRowsReference(t *testing.T) {
t.Parallel()
if sql.ErrNoRows == nil {
t.Fatal("sql.ErrNoRows = nil")
}
}