Files
sub2api-cn-relay-manager/internal/app/http_batch_import.go

249 lines
8.0 KiB
Go

package app
import (
"context"
"net/http"
"strconv"
"strings"
"sub2api-cn-relay-manager/internal/batch"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
type BatchImportEntryRequest struct {
BaseURL string `json:"base_url"`
APIKey string `json:"api_key"`
RequestedModels []string `json:"requested_models"`
}
type CreateBatchImportRunRequest struct {
HostID string `json:"host_id,omitempty"`
Mode string `json:"mode"`
AccessMode string `json:"access_mode"`
ConfirmWaitTimeoutSec int `json:"confirm_wait_timeout_sec,omitempty"`
SubscriptionUsers []string `json:"subscription_users,omitempty"`
SubscriptionDays int `json:"subscription_days,omitempty"`
ProbeAPIKey string `json:"probe_api_key,omitempty"`
Entries []BatchImportEntryRequest `json:"entries"`
}
type BatchImportRunCreateResponse struct {
RunID string `json:"run_id"`
State string `json:"state"`
ResultPage string `json:"result_page"`
TotalItems int `json:"total_items"`
ActiveItems int `json:"active_items"`
DegradedItems int `json:"degraded_items"`
BrokenItems int `json:"broken_items"`
WarningItems int `json:"warning_items"`
}
type ListBatchImportRunsRequest struct {
State string
AccessMode string
Query string
Cursor string
Limit int
}
type ListBatchImportRunsResponse struct {
Runs []batch.RunSummaryProjection `json:"runs"`
NextCursor *string `json:"next_cursor"`
}
type ListBatchImportRunItemsRequest struct {
RunID string
CurrentStage string
ConfirmationStatus string
AccessStatus string
HasWarning *bool
ProviderID string
MatchedAccountState string
AccountResolution string
Query string
Cursor string
Limit int
}
type ListBatchImportRunItemsResponse struct {
Items []batch.ItemSummaryProjection `json:"items"`
NextCursor *string `json:"next_cursor"`
}
type GetBatchImportRunItemRequest struct {
RunID string
ItemID string
}
func handleCreateBatchImportRun(w http.ResponseWriter, r *http.Request, fn func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "create-batch-import-run action is not configured"})
return
}
var req CreateBatchImportRunRequest
if err := decodeJSON(r, &req); err != nil {
writeHTTPError(w, err)
return
}
if err := validateCreateBatchImportRunRequest(req); err != nil {
writeHTTPError(w, err)
return
}
result, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusOK, result)
}
func handleListBatchImportRuns(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListBatchImportRunsRequest) (ListBatchImportRunsResponse, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-batch-import-runs action is not configured"})
return
}
result, err := fn(r.Context(), ListBatchImportRunsRequest{
State: strings.TrimSpace(r.URL.Query().Get("state")),
AccessMode: strings.TrimSpace(r.URL.Query().Get("access_mode")),
Query: strings.TrimSpace(r.URL.Query().Get("q")),
Cursor: strings.TrimSpace(r.URL.Query().Get("cursor")),
Limit: parsePositiveInt(r.URL.Query().Get("limit")),
})
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
if result.Runs == nil {
result.Runs = []batch.RunSummaryProjection{}
}
writeJSON(w, http.StatusOK, result)
}
func buildCreateBatchImportRunAction(sqliteDSN string) func(context.Context, CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) {
return func(ctx context.Context, req CreateBatchImportRunRequest) (BatchImportRunCreateResponse, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return BatchImportRunCreateResponse{}, err
}
defer store.Close()
hostRow, client, err := resolveManagedHost(ctx, store, req.HostID, "", CreateHostAuth{})
if err != nil {
return BatchImportRunCreateResponse{}, err
}
runner := batchImportRuntimeRunner{
store: store,
hostRow: hostRow,
hostClient: client,
request: req,
}
return runner.execute(ctx)
}
}
func buildListBatchImportRunsAction(sqliteDSN string) func(context.Context, ListBatchImportRunsRequest) (ListBatchImportRunsResponse, error) {
return func(ctx context.Context, req ListBatchImportRunsRequest) (ListBatchImportRunsResponse, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return ListBatchImportRunsResponse{}, err
}
defer store.Close()
runs, err := store.ImportRuns().List(ctx, 1000)
if err != nil {
return ListBatchImportRunsResponse{}, err
}
limit := defaultPositiveInt(req.Limit, 50)
result := make([]batch.RunSummaryProjection, 0, limit)
nextCursor := (*string)(nil)
started := strings.TrimSpace(req.Cursor) == ""
for _, run := range runs {
if !started {
if run.RunID == strings.TrimSpace(req.Cursor) {
started = true
}
continue
}
if req.State != "" && run.State != req.State {
continue
}
if req.AccessMode != "" && run.AccessMode != req.AccessMode {
continue
}
if req.Query != "" {
query := strings.ToLower(req.Query)
if !strings.Contains(strings.ToLower(run.RunID), query) {
items, err := store.ImportRunItems().ListByRunID(ctx, run.RunID)
if err != nil {
return ListBatchImportRunsResponse{}, err
}
matched := false
for _, item := range items {
if strings.Contains(strings.ToLower(item.ProviderID), query) || strings.Contains(strings.ToLower(item.BaseURL), query) {
matched = true
break
}
}
if !matched {
continue
}
}
}
if len(result) >= limit {
cursor := run.RunID
nextCursor = &cursor
break
}
result = append(result, batch.ProjectRunSummary(run))
}
return ListBatchImportRunsResponse{Runs: result, NextCursor: nextCursor}, nil
}
}
func validateCreateBatchImportRunRequest(req CreateBatchImportRunRequest) *httpError {
if strings.TrimSpace(req.HostID) == "" {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "host_id is required"}
}
if strings.TrimSpace(req.Mode) == "" {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "mode is required"}
}
if strings.TrimSpace(req.AccessMode) == "" {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "access_mode is required"}
}
if len(req.Entries) == 0 {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "entries is required"}
}
switch strings.TrimSpace(req.AccessMode) {
case "subscription":
if len(req.SubscriptionUsers) == 0 {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "subscription_users is required when access_mode=subscription"}
}
if req.SubscriptionDays <= 0 {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "subscription_days is required when access_mode=subscription"}
}
case "self_service":
if strings.TrimSpace(req.ProbeAPIKey) == "" {
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "probe_api_key is required when access_mode=self_service"}
}
default:
return &httpError{StatusCode: http.StatusBadRequest, Code: "invalid_request", Message: "access_mode must be subscription or self_service"}
}
return nil
}
func parsePositiveInt(raw string) int {
value, err := strconv.Atoi(strings.TrimSpace(raw))
if err != nil || value <= 0 {
return 0
}
return value
}
func defaultPositiveInt(value, fallback int) int {
if value > 0 {
return value
}
return fallback
}