89 lines
2.1 KiB
Go
89 lines
2.1 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"sub2api-cn-relay-manager/internal/store/sqlite"
|
|
"sub2api-cn-relay-manager/internal/worker"
|
|
)
|
|
|
|
const batchImportBackgroundPollInterval = time.Second
|
|
|
|
func runBatchImportBackgroundScheduler(ctx context.Context, sqliteDSN string) {
|
|
worker.NewRunner(
|
|
[]worker.Job{batchImportResumeJob{sqliteDSN: sqliteDSN}},
|
|
batchImportBackgroundPollInterval,
|
|
log.Printf,
|
|
).Start(ctx)
|
|
}
|
|
|
|
type batchImportResumeJob struct {
|
|
sqliteDSN string
|
|
}
|
|
|
|
func (j batchImportResumeJob) Name() string {
|
|
return "batch import runtime scheduler"
|
|
}
|
|
|
|
func (j batchImportResumeJob) Run(ctx context.Context) error {
|
|
return resumePendingBatchImportRuns(ctx, j.sqliteDSN)
|
|
}
|
|
|
|
func resumePendingBatchImportRuns(ctx context.Context, sqliteDSN string) error {
|
|
store, err := sqlite.Open(ctx, sqliteDSN)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer store.Close()
|
|
|
|
runs, err := store.ImportRuns().List(ctx, 1000)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, run := range runs {
|
|
if strings.TrimSpace(run.State) != "running" {
|
|
continue
|
|
}
|
|
runner, err := newBatchImportRuntimeRunnerFromStoredRun(ctx, store, run)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := runner.driveRun(ctx, run.RunID, 0); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newBatchImportRuntimeRunnerFromStoredRun(ctx context.Context, store *sqlite.DB, run sqlite.ImportRun) (batchImportRuntimeRunner, error) {
|
|
hostRow, client, err := resolveManagedHost(ctx, store, run.HostID, "", CreateHostAuth{})
|
|
if err != nil {
|
|
return batchImportRuntimeRunner{}, err
|
|
}
|
|
return batchImportRuntimeRunner{
|
|
store: store,
|
|
hostRow: hostRow,
|
|
hostClient: client,
|
|
request: CreateBatchImportRunRequest{
|
|
HostID: run.HostID,
|
|
Mode: run.Mode,
|
|
AccessMode: run.AccessMode,
|
|
SubscriptionUsers: parseJSONStringList(run.SubscriptionUsersJSON),
|
|
SubscriptionDays: run.SubscriptionDays,
|
|
ProbeAPIKey: run.ProbeAPIKey,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func parseJSONStringList(raw string) []string {
|
|
values := []string{}
|
|
if err := json.Unmarshal([]byte(strings.TrimSpace(raw)), &values); err != nil {
|
|
return []string{}
|
|
}
|
|
return values
|
|
}
|