From 16e53361f2ee5a0f8cafc3b18c2019431eba20d8 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 6 May 2026 10:54:08 +0800 Subject: [PATCH] feat(adapter): externalize platform worker runtime tuning --- .env.platform-adapters.example | 26 ++++++++++ docs/CONFIG_CONTRACT_BASELINE.md | 9 ++++ docs/RUNBOOK_PLATFORM_CALLBACKS.md | 21 +++++++++ internal/app/app.go | 16 +++++++ internal/app/app_test.go | 10 ++++ internal/config/config.go | 76 +++++++++++++++++++++++------- internal/config/config_test.go | 43 +++++++++++++++++ 7 files changed, 183 insertions(+), 18 deletions(-) create mode 100644 .env.platform-adapters.example diff --git a/.env.platform-adapters.example b/.env.platform-adapters.example new file mode 100644 index 0000000..7190282 --- /dev/null +++ b/.env.platform-adapters.example @@ -0,0 +1,26 @@ +# Platform adapters +AI_CS_PLATFORM_ADAPTERS_ENABLED=true + +# Sub2API ingress +AI_CS_PLATFORM_SUB2API_ENABLED=true +AI_CS_PLATFORM_SUB2API_INGRESS_SECRET=replace-with-sub2api-ingress-secret + +# Sub2API callback +AI_CS_PLATFORM_SUB2API_CALLBACK_BASE_URL=https://sub2api.example.com/callbacks/ai-customer-service +AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET=replace-with-sub2api-callback-secret +AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS=3000 +AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES=5 +AI_CS_PLATFORM_SUB2API_CALLBACK_POLL_INTERVAL_MS=5000 +AI_CS_PLATFORM_SUB2API_CALLBACK_BATCH_SIZE=20 +AI_CS_PLATFORM_SUB2API_CALLBACK_RETRY_SCHEDULE_SEC=10,30,60,300,900 + +# NewAPI profile placeholder +AI_CS_PLATFORM_NEWAPI_ENABLED=false +AI_CS_PLATFORM_NEWAPI_INGRESS_SECRET= +AI_CS_PLATFORM_NEWAPI_CALLBACK_BASE_URL= +AI_CS_PLATFORM_NEWAPI_CALLBACK_SECRET= +AI_CS_PLATFORM_NEWAPI_CALLBACK_TIMEOUT_MS=3000 +AI_CS_PLATFORM_NEWAPI_CALLBACK_MAX_RETRIES=5 +AI_CS_PLATFORM_NEWAPI_CALLBACK_POLL_INTERVAL_MS=5000 +AI_CS_PLATFORM_NEWAPI_CALLBACK_BATCH_SIZE=20 +AI_CS_PLATFORM_NEWAPI_CALLBACK_RETRY_SCHEDULE_SEC=10,30,60,300,900 diff --git a/docs/CONFIG_CONTRACT_BASELINE.md b/docs/CONFIG_CONTRACT_BASELINE.md index 1d4ed92..97b4972 100644 --- a/docs/CONFIG_CONTRACT_BASELINE.md +++ b/docs/CONFIG_CONTRACT_BASELINE.md @@ -62,12 +62,18 @@ | `AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET` | 空 | `sub2api` 回调签名 secret | 当前仅解析,不强校验 | 视后续出站回调批次决定 | | `AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS` | `3000` | `sub2api` 回调超时(毫秒) | 必须 > 0(启用时) | 可 | | `AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES` | `5` | `sub2api` 回调最大重试次数 | 必须 >= 0(启用时) | 可 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_POLL_INTERVAL_MS` | `5000` | `sub2api` callback worker 轮询间隔(毫秒) | 必须 > 0(启用时) | 可 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_BATCH_SIZE` | `20` | `sub2api` callback worker 单轮最大投递数 | 必须 > 0(启用时) | 可 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_RETRY_SCHEDULE_SEC` | `10,30,60,300,900` | `sub2api` callback 重试退避序列(秒) | 必须为正整数列表(启用时) | 可 | | `AI_CS_PLATFORM_NEWAPI_ENABLED` | `false` | 是否启用 `newapi` 入站适配 | 解析布尔值 | 视接入计划决定 | | `AI_CS_PLATFORM_NEWAPI_INGRESS_SECRET` | 空 | `newapi` 平台 webhook HMAC secret | 启用 `newapi` 时必填 | **不允许为空** | | `AI_CS_PLATFORM_NEWAPI_CALLBACK_BASE_URL` | 空 | `newapi` 回调基地址 | 当前仅解析,不强校验 | 视后续出站回调批次决定 | | `AI_CS_PLATFORM_NEWAPI_CALLBACK_SECRET` | 空 | `newapi` 回调签名 secret | 当前仅解析,不强校验 | 视后续出站回调批次决定 | | `AI_CS_PLATFORM_NEWAPI_CALLBACK_TIMEOUT_MS` | `3000` | `newapi` 回调超时(毫秒) | 必须 > 0(启用时) | 可 | | `AI_CS_PLATFORM_NEWAPI_CALLBACK_MAX_RETRIES` | `5` | `newapi` 回调最大重试次数 | 必须 >= 0(启用时) | 可 | +| `AI_CS_PLATFORM_NEWAPI_CALLBACK_POLL_INTERVAL_MS` | `5000` | `newapi` callback worker 轮询间隔(毫秒) | 必须 > 0(启用时) | 可 | +| `AI_CS_PLATFORM_NEWAPI_CALLBACK_BATCH_SIZE` | `20` | `newapi` callback worker 单轮最大投递数 | 必须 > 0(启用时) | 可 | +| `AI_CS_PLATFORM_NEWAPI_CALLBACK_RETRY_SCHEDULE_SEC` | `10,30,60,300,900` | `newapi` callback 重试退避序列(秒) | 必须为正整数列表(启用时) | 可 | --- @@ -85,6 +91,9 @@ 8. `AI_CS_PLATFORM_ADAPTERS_ENABLED=true` 且对应平台 `*_ENABLED=true` 时,`*_INGRESS_SECRET` 不允许为空 9. `AI_CS_PLATFORM_*_CALLBACK_TIMEOUT_MS` 在对应平台启用时必须为正数 10. `AI_CS_PLATFORM_*_CALLBACK_MAX_RETRIES` 在对应平台启用时不允许为负数 +11. `AI_CS_PLATFORM_*_CALLBACK_POLL_INTERVAL_MS` 在对应平台启用时必须为正数 +12. `AI_CS_PLATFORM_*_CALLBACK_BATCH_SIZE` 在对应平台启用时必须为正数 +13. `AI_CS_PLATFORM_*_CALLBACK_RETRY_SCHEDULE_SEC` 在对应平台启用时必须是正整数列表,且不允许为空 --- diff --git a/docs/RUNBOOK_PLATFORM_CALLBACKS.md b/docs/RUNBOOK_PLATFORM_CALLBACKS.md index 31c6266..a605de2 100644 --- a/docs/RUNBOOK_PLATFORM_CALLBACKS.md +++ b/docs/RUNBOOK_PLATFORM_CALLBACKS.md @@ -20,6 +20,27 @@ --- +## 1.1 关键运行参数 + +当前 callback worker 已支持通过环境变量外显这些参数: + +| 变量 | 默认值 | 说明 | +|---|---|---| +| `AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS` | `3000` | 单次 callback HTTP 超时 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES` | `5` | 最大重试次数 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_POLL_INTERVAL_MS` | `5000` | worker 轮询间隔 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_BATCH_SIZE` | `20` | 单轮最大拉取事件数 | +| `AI_CS_PLATFORM_SUB2API_CALLBACK_RETRY_SCHEDULE_SEC` | `10,30,60,300,900` | 重试退避序列 | + +`newapi` 侧使用同构变量名: +- `AI_CS_PLATFORM_NEWAPI_CALLBACK_TIMEOUT_MS` +- `AI_CS_PLATFORM_NEWAPI_CALLBACK_MAX_RETRIES` +- `AI_CS_PLATFORM_NEWAPI_CALLBACK_POLL_INTERVAL_MS` +- `AI_CS_PLATFORM_NEWAPI_CALLBACK_BATCH_SIZE` +- `AI_CS_PLATFORM_NEWAPI_CALLBACK_RETRY_SCHEDULE_SEC` + +--- + ## 2. 常用查询 ### 2.1 查看待投递事件 diff --git a/internal/app/app.go b/internal/app/app.go index fb5a409..356d00b 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -178,6 +178,9 @@ func New(cfg *config.Config, logger *slog.Logger) (*App, error) { profile.CallbackMaxRetries, ) worker.Logger = logger + worker.PollInterval = time.Duration(profile.CallbackPollIntervalMS) * time.Millisecond + worker.BatchSize = profile.CallbackBatchSize + worker.RetrySchedule = toRetrySchedule(profile.CallbackRetrySchedule) go worker.Start(workerCtx) } startWorker("sub2api", cfg.PlatformAdapters.Sub2API) @@ -202,6 +205,19 @@ func New(cfg *config.Config, logger *slog.Logger) (*App, error) { }, nil } +func toRetrySchedule(seconds []int) []time.Duration { + if len(seconds) == 0 { + return nil + } + result := make([]time.Duration, 0, len(seconds)) + for _, value := range seconds { + if value > 0 { + result = append(result, time.Duration(value)*time.Second) + } + } + return result +} + func (a *App) TicketStore() ticketLister { return a.ticketStore } diff --git a/internal/app/app_test.go b/internal/app/app_test.go index af92cc4..830fdcc 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -110,6 +110,9 @@ func TestNew_RegistersPlatformWebhookRouteWhenSub2APIEnabled(t *testing.T) { cfg.PlatformAdapters.Enabled = true cfg.PlatformAdapters.Sub2API.Enabled = true cfg.PlatformAdapters.Sub2API.IngressSecret = "sub2api-secret" + cfg.PlatformAdapters.Sub2API.CallbackPollIntervalMS = 2500 + cfg.PlatformAdapters.Sub2API.CallbackBatchSize = 8 + cfg.PlatformAdapters.Sub2API.CallbackRetrySchedule = []int{5, 15, 45} app, err := New(cfg, logging.New()) if err != nil { @@ -124,6 +127,13 @@ func TestNew_RegistersPlatformWebhookRouteWhenSub2APIEnabled(t *testing.T) { } } +func TestToRetrySchedule(t *testing.T) { + got := toRetrySchedule([]int{5, 15, 45}) + if len(got) != 3 || got[0] != 5*time.Second || got[1] != 15*time.Second || got[2] != 45*time.Second { + t.Fatalf("toRetrySchedule() = %v, want [5s 15s 45s]", got) + } +} + func TestApp_TicketStore(t *testing.T) { cfg := minimalHTTPConfig() cfg.Webhook.Secret = "test-secret" diff --git a/internal/config/config.go b/internal/config/config.go index 5e14cce..0d1be22 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -52,12 +52,15 @@ type PlatformAdaptersConfig struct { } type PlatformAdapterProfileConfig struct { - Enabled bool - IngressSecret string - CallbackBaseURL string - CallbackSecret string - CallbackTimeoutMS int - CallbackMaxRetries int + Enabled bool + IngressSecret string + CallbackBaseURL string + CallbackSecret string + CallbackTimeoutMS int + CallbackMaxRetries int + CallbackPollIntervalMS int + CallbackBatchSize int + CallbackRetrySchedule []int } func Load() (*Config, error) { @@ -88,20 +91,26 @@ func Load() (*Config, error) { PlatformAdapters: PlatformAdaptersConfig{ Enabled: getEnvBool("AI_CS_PLATFORM_ADAPTERS_ENABLED", false), Sub2API: PlatformAdapterProfileConfig{ - Enabled: getEnvBool("AI_CS_PLATFORM_SUB2API_ENABLED", false), - IngressSecret: getEnv("AI_CS_PLATFORM_SUB2API_INGRESS_SECRET", ""), - CallbackBaseURL: getEnv("AI_CS_PLATFORM_SUB2API_CALLBACK_BASE_URL", ""), - CallbackSecret: getEnv("AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET", ""), - CallbackTimeoutMS: getEnvInt("AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS", 3000), - CallbackMaxRetries: getEnvInt("AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES", 5), + Enabled: getEnvBool("AI_CS_PLATFORM_SUB2API_ENABLED", false), + IngressSecret: getEnv("AI_CS_PLATFORM_SUB2API_INGRESS_SECRET", ""), + CallbackBaseURL: getEnv("AI_CS_PLATFORM_SUB2API_CALLBACK_BASE_URL", ""), + CallbackSecret: getEnv("AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET", ""), + CallbackTimeoutMS: getEnvInt("AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS", 3000), + CallbackMaxRetries: getEnvInt("AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES", 5), + CallbackPollIntervalMS: getEnvInt("AI_CS_PLATFORM_SUB2API_CALLBACK_POLL_INTERVAL_MS", 5000), + CallbackBatchSize: getEnvInt("AI_CS_PLATFORM_SUB2API_CALLBACK_BATCH_SIZE", 20), + CallbackRetrySchedule: getEnvIntList("AI_CS_PLATFORM_SUB2API_CALLBACK_RETRY_SCHEDULE_SEC", []int{10, 30, 60, 300, 900}), }, NewAPI: PlatformAdapterProfileConfig{ - Enabled: getEnvBool("AI_CS_PLATFORM_NEWAPI_ENABLED", false), - IngressSecret: getEnv("AI_CS_PLATFORM_NEWAPI_INGRESS_SECRET", ""), - CallbackBaseURL: getEnv("AI_CS_PLATFORM_NEWAPI_CALLBACK_BASE_URL", ""), - CallbackSecret: getEnv("AI_CS_PLATFORM_NEWAPI_CALLBACK_SECRET", ""), - CallbackTimeoutMS: getEnvInt("AI_CS_PLATFORM_NEWAPI_CALLBACK_TIMEOUT_MS", 3000), - CallbackMaxRetries: getEnvInt("AI_CS_PLATFORM_NEWAPI_CALLBACK_MAX_RETRIES", 5), + Enabled: getEnvBool("AI_CS_PLATFORM_NEWAPI_ENABLED", false), + IngressSecret: getEnv("AI_CS_PLATFORM_NEWAPI_INGRESS_SECRET", ""), + CallbackBaseURL: getEnv("AI_CS_PLATFORM_NEWAPI_CALLBACK_BASE_URL", ""), + CallbackSecret: getEnv("AI_CS_PLATFORM_NEWAPI_CALLBACK_SECRET", ""), + CallbackTimeoutMS: getEnvInt("AI_CS_PLATFORM_NEWAPI_CALLBACK_TIMEOUT_MS", 3000), + CallbackMaxRetries: getEnvInt("AI_CS_PLATFORM_NEWAPI_CALLBACK_MAX_RETRIES", 5), + CallbackPollIntervalMS: getEnvInt("AI_CS_PLATFORM_NEWAPI_CALLBACK_POLL_INTERVAL_MS", 5000), + CallbackBatchSize: getEnvInt("AI_CS_PLATFORM_NEWAPI_CALLBACK_BATCH_SIZE", 20), + CallbackRetrySchedule: getEnvIntList("AI_CS_PLATFORM_NEWAPI_CALLBACK_RETRY_SCHEDULE_SEC", []int{10, 30, 60, 300, 900}), }, }, Runtime: RuntimeConfig{ @@ -152,6 +161,20 @@ func validatePlatformProfile(platform string, adaptersEnabled bool, profile Plat if profile.CallbackMaxRetries < 0 { return fmt.Errorf("AI_CS_PLATFORM_%s_CALLBACK_MAX_RETRIES must not be negative", upperPlatform) } + if profile.CallbackPollIntervalMS <= 0 { + return fmt.Errorf("AI_CS_PLATFORM_%s_CALLBACK_POLL_INTERVAL_MS must be positive", upperPlatform) + } + if profile.CallbackBatchSize <= 0 { + return fmt.Errorf("AI_CS_PLATFORM_%s_CALLBACK_BATCH_SIZE must be positive", upperPlatform) + } + if len(profile.CallbackRetrySchedule) == 0 { + return fmt.Errorf("AI_CS_PLATFORM_%s_CALLBACK_RETRY_SCHEDULE_SEC must not be empty", upperPlatform) + } + for _, seconds := range profile.CallbackRetrySchedule { + if seconds <= 0 { + return fmt.Errorf("AI_CS_PLATFORM_%s_CALLBACK_RETRY_SCHEDULE_SEC must contain only positive integers", upperPlatform) + } + } return nil } @@ -213,3 +236,20 @@ func getEnvBool(key string, fallback bool) bool { return fallback } } + +func getEnvIntList(key string, fallback []int) []int { + value := strings.TrimSpace(os.Getenv(key)) + if value == "" { + return append([]int(nil), fallback...) + } + parts := strings.Split(value, ",") + result := make([]int, 0, len(parts)) + for _, part := range parts { + parsed, err := strconv.Atoi(strings.TrimSpace(part)) + if err != nil { + return append([]int(nil), fallback...) + } + result = append(result, parsed) + } + return result +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 02a8660..e1073d0 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -85,6 +85,22 @@ func TestGetEnvInt64_ValidValue(t *testing.T) { } } +func TestGetEnvIntList_ValidValue(t *testing.T) { + t.Setenv("TEST_INT_LIST", "10,30,60") + got := getEnvIntList("TEST_INT_LIST", []int{1}) + if len(got) != 3 || got[0] != 10 || got[1] != 30 || got[2] != 60 { + t.Fatalf("getEnvIntList(TEST_INT_LIST) = %v, want [10 30 60]", got) + } +} + +func TestGetEnvIntList_InvalidValueFallsBack(t *testing.T) { + t.Setenv("TEST_INT_LIST", "10,oops,60") + got := getEnvIntList("TEST_INT_LIST", []int{1, 2}) + if len(got) != 2 || got[0] != 1 || got[1] != 2 { + t.Fatalf("getEnvIntList(invalid) = %v, want [1 2]", got) + } +} + func TestLoadDefaults(t *testing.T) { t.Setenv("AI_CS_ADDR", "") cfg, err := Load() @@ -240,6 +256,21 @@ func TestLoad_RejectsEnabledSub2APIWithoutIngressSecret(t *testing.T) { } } +func TestLoad_RejectsEnabledSub2APIWithInvalidWorkerPollingConfig(t *testing.T) { + t.Setenv("AI_CS_PLATFORM_ADAPTERS_ENABLED", "true") + t.Setenv("AI_CS_PLATFORM_SUB2API_ENABLED", "true") + t.Setenv("AI_CS_PLATFORM_SUB2API_INGRESS_SECRET", "sub2api-secret") + t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_POLL_INTERVAL_MS", "0") + + _, err := Load() + if err == nil { + t.Fatal("expected error when sub2api callback poll interval is invalid") + } + if !strings.Contains(err.Error(), "AI_CS_PLATFORM_SUB2API_CALLBACK_POLL_INTERVAL_MS") { + t.Fatalf("unexpected error: %v", err) + } +} + func TestLoad_PlatformAdapterOverrides(t *testing.T) { t.Setenv("AI_CS_PLATFORM_ADAPTERS_ENABLED", "true") t.Setenv("AI_CS_PLATFORM_SUB2API_ENABLED", "true") @@ -248,6 +279,9 @@ func TestLoad_PlatformAdapterOverrides(t *testing.T) { t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_SECRET", "cb-secret") t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_TIMEOUT_MS", "4000") t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_MAX_RETRIES", "7") + t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_POLL_INTERVAL_MS", "2500") + t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_BATCH_SIZE", "12") + t.Setenv("AI_CS_PLATFORM_SUB2API_CALLBACK_RETRY_SCHEDULE_SEC", "5,15,45") cfg, err := Load() if err != nil { @@ -274,4 +308,13 @@ func TestLoad_PlatformAdapterOverrides(t *testing.T) { if cfg.PlatformAdapters.Sub2API.CallbackMaxRetries != 7 { t.Fatalf("sub2api callback max retries = %d, want 7", cfg.PlatformAdapters.Sub2API.CallbackMaxRetries) } + if cfg.PlatformAdapters.Sub2API.CallbackPollIntervalMS != 2500 { + t.Fatalf("sub2api callback poll interval ms = %d, want 2500", cfg.PlatformAdapters.Sub2API.CallbackPollIntervalMS) + } + if cfg.PlatformAdapters.Sub2API.CallbackBatchSize != 12 { + t.Fatalf("sub2api callback batch size = %d, want 12", cfg.PlatformAdapters.Sub2API.CallbackBatchSize) + } + if len(cfg.PlatformAdapters.Sub2API.CallbackRetrySchedule) != 3 || cfg.PlatformAdapters.Sub2API.CallbackRetrySchedule[0] != 5 || cfg.PlatformAdapters.Sub2API.CallbackRetrySchedule[1] != 15 || cfg.PlatformAdapters.Sub2API.CallbackRetrySchedule[2] != 45 { + t.Fatalf("sub2api callback retry schedule = %v, want [5 15 45]", cfg.PlatformAdapters.Sub2API.CallbackRetrySchedule) + } }