refactor(supply-api): split background startup helpers
This commit is contained in:
127
docs/plans/2026-04-15-supply-api-background-helper-split-plan.md
Normal file
127
docs/plans/2026-04-15-supply-api-background-helper-split-plan.md
Normal file
@@ -0,0 +1,127 @@
|
||||
# Supply API Background Helper Split Implementation Plan
|
||||
|
||||
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||
|
||||
**Goal:** 把 `supply-api/internal/app/background.go` 中串行堆叠的后台启动逻辑拆成更小的 helper,降低单函数复杂度,同时保持现有启动语义不变。
|
||||
|
||||
**Architecture:** 保留 `StartBackgroundWorkers` 和 `startBackgroundWorkersWithFactory` 作为编排入口,把主动吊销订阅、Outbox 启动、分区维护和补偿 worker 启动拆到独立 helper。先补 helper 级失败测试锁住关键行为,再在现有高层测试基础上做回归验证。
|
||||
|
||||
**Tech Stack:** Go, Go test
|
||||
|
||||
---
|
||||
|
||||
### Task 1: 提取 Outbox 启动 helper
|
||||
|
||||
**Files:**
|
||||
- Modify: `supply-api/internal/app/background.go`
|
||||
- Modify: `supply-api/internal/app/runtime_test.go`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```go
|
||||
func TestStartOutboxProcessor_ProdRequiresBroker(t *testing.T) {
|
||||
err := startOutboxProcessor(context.Background(), runtime, factory)
|
||||
if err == nil {
|
||||
t.Fatal("expected missing broker to fail in prod")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `cd "supply-api" && go test ./internal/app -run 'TestStartOutboxProcessor_ProdRequiresBroker' -v`
|
||||
Expected: FAIL,因为 helper 尚不存在
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```go
|
||||
func startOutboxProcessor(...) error { ... }
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `cd "supply-api" && go test ./internal/app -run 'TestStartOutboxProcessor_ProdRequiresBroker' -v`
|
||||
Expected: PASS
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add supply-api/internal/app/background.go supply-api/internal/app/runtime_test.go
|
||||
git commit -m "refactor(supply-api): extract background outbox startup"
|
||||
```
|
||||
|
||||
### Task 2: 提取补偿与订阅启动 helper
|
||||
|
||||
**Files:**
|
||||
- Modify: `supply-api/internal/app/background.go`
|
||||
- Modify: `supply-api/internal/app/runtime_test.go`
|
||||
|
||||
**Step 1: Write the failing test**
|
||||
|
||||
```go
|
||||
func TestStartCompensationWorker_UsesConfiguredInterval(t *testing.T) {
|
||||
var gotInterval time.Duration
|
||||
startCompensationWorker(context.Background(), runtime, factory)
|
||||
if gotInterval != 5*time.Minute {
|
||||
t.Fatalf("unexpected compensation interval: %s", gotInterval)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Step 2: Run test to verify it fails**
|
||||
|
||||
Run: `cd "supply-api" && go test ./internal/app -run 'TestStartCompensationWorker_UsesConfiguredInterval' -v`
|
||||
Expected: FAIL,因为 helper 尚不存在
|
||||
|
||||
**Step 3: Write minimal implementation**
|
||||
|
||||
```go
|
||||
func startRevocationSubscriber(...) { ... }
|
||||
func startCompensationWorker(...) { ... }
|
||||
```
|
||||
|
||||
**Step 4: Run test to verify it passes**
|
||||
|
||||
Run: `cd "supply-api" && go test ./internal/app -run 'TestStartCompensationWorker_UsesConfiguredInterval' -v`
|
||||
Expected: PASS
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add supply-api/internal/app/background.go supply-api/internal/app/runtime_test.go
|
||||
git commit -m "refactor(supply-api): extract background startup helpers"
|
||||
```
|
||||
|
||||
### Task 3: 回归验证与收尾
|
||||
|
||||
**Files:**
|
||||
- Modify: `supply-api/internal/app/background.go`
|
||||
- Verify: `supply-api/internal/app/runtime_test.go`
|
||||
- Verify: `supply-api/internal/app/runtime.go`
|
||||
|
||||
**Step 1: Run focused tests**
|
||||
|
||||
Run: `cd "supply-api" && go test ./internal/app -run 'Test(Runtime_StartBackgroundWorkers_.*|StartOutboxProcessor_ProdRequiresBroker|StartCompensationWorker_UsesConfiguredInterval)' -v`
|
||||
Expected: PASS
|
||||
|
||||
**Step 2: Run package regression**
|
||||
|
||||
Run: `cd "supply-api" && go test ./internal/app ./cmd/supply-api ./internal/httpapi`
|
||||
Expected: PASS
|
||||
|
||||
**Step 3: Run repo exit verification**
|
||||
|
||||
Run: `bash "scripts/ci/repo_integrity_check.sh"`
|
||||
Expected: PASS
|
||||
|
||||
**Step 4: Check formatting**
|
||||
|
||||
Run: `git diff --check`
|
||||
Expected: no output
|
||||
|
||||
**Step 5: Commit**
|
||||
|
||||
```bash
|
||||
git add docs/plans/2026-04-15-supply-api-background-helper-split-plan.md supply-api/internal/app/background.go supply-api/internal/app/runtime_test.go
|
||||
git commit -m "refactor(supply-api): split background startup helpers"
|
||||
```
|
||||
@@ -77,48 +77,17 @@ func startBackgroundWorkersWithFactory(
|
||||
|
||||
factory = withDefaultBackgroundFactory(factory, runtime.tuning)
|
||||
|
||||
if runtime.revocationSubscriber != nil && runtime.redisCache != nil {
|
||||
if err := runtime.revocationSubscriber.StartRevocationSubscriber(rootCtx); err != nil {
|
||||
warnf(runtime.logger, "启动主动吊销订阅失败: %v", err)
|
||||
} else {
|
||||
runtime.logger.Info("主动吊销机制: 已启动 (Redis Pub/Sub)", nil)
|
||||
}
|
||||
}
|
||||
startRevocationSubscriber(rootCtx, runtime)
|
||||
|
||||
if runtime.db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
outboxRepo := factory.newOutboxRepository(runtime.db)
|
||||
msgBroker := factory.newMessageBroker(runtime.redisCache)
|
||||
if msgBroker == nil {
|
||||
if runtime.env == "prod" {
|
||||
return errors.New("outbox message broker unavailable")
|
||||
}
|
||||
runtime.logger.Warn("OutboxProcessor未启动 (message broker不可用)", nil)
|
||||
} else {
|
||||
stats := &messaging.NoOpOutboxStats{}
|
||||
runner := factory.newOutboxRunner(outboxRepo, msgBroker, stats)
|
||||
go runner.Start(rootCtx)
|
||||
runtime.logger.Info("OutboxProcessor已启动", nil)
|
||||
if err := startOutboxProcessor(rootCtx, runtime, factory); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partitionManager := factory.newPartitionManager(runtime.db)
|
||||
if err := partitionManager.EnsureFuturePartitions(initCtx); err != nil {
|
||||
warnf(runtime.logger, "预创建未来分区失败: %v", err)
|
||||
} else {
|
||||
runtime.logger.Info("分区管理: 未来分区已确保存在", nil)
|
||||
}
|
||||
|
||||
go startPartitionMaintenance(rootCtx, runtime.logger, partitionManager, runtime.tuning)
|
||||
|
||||
compensationStore := factory.newCompensationStore(runtime.db)
|
||||
compensationStats := &domain.NoOpCompensationStats{}
|
||||
compensationExecutor := factory.newCompensationExecutor()
|
||||
compensationProcessor := factory.newCompensationProcessor(compensationStore, compensationExecutor, compensationStats)
|
||||
runtime.logger.Info("批量补偿处理器: 已初始化", nil)
|
||||
compensationProcessor.StartBackgroundWorker(rootCtx, runtime.tuning.compensationCheckInterval)
|
||||
infof(runtime.logger, "批量补偿处理器: 后台worker已启动 (每%s检查一次)", runtime.tuning.compensationCheckInterval)
|
||||
startPartitionMaintenanceWorker(rootCtx, initCtx, runtime, factory)
|
||||
startCompensationWorker(rootCtx, runtime, factory)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -171,7 +140,82 @@ var compensationNewDefaultExecutor = func() domain.OperationExecutor {
|
||||
return compensation.NewDefaultCompensationExecutor()
|
||||
}
|
||||
|
||||
func startPartitionMaintenance(ctx context.Context, logger logging.Logger, manager partitionManager, tuning runtimeTuning) {
|
||||
func startRevocationSubscriber(ctx context.Context, runtime *Runtime) {
|
||||
if runtime == nil || runtime.revocationSubscriber == nil || runtime.redisCache == nil {
|
||||
return
|
||||
}
|
||||
if err := runtime.revocationSubscriber.StartRevocationSubscriber(ctx); err != nil {
|
||||
warnf(runtime.logger, "启动主动吊销订阅失败: %v", err)
|
||||
return
|
||||
}
|
||||
runtime.logger.Info("主动吊销机制: 已启动 (Redis Pub/Sub)", nil)
|
||||
}
|
||||
|
||||
func startOutboxProcessor(ctx context.Context, runtime *Runtime, factory backgroundFactory) error {
|
||||
if runtime == nil {
|
||||
return errors.New("runtime is required")
|
||||
}
|
||||
if runtime.db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
factory = withDefaultBackgroundFactory(factory, runtime.tuning)
|
||||
|
||||
outboxRepo := factory.newOutboxRepository(runtime.db)
|
||||
msgBroker := factory.newMessageBroker(runtime.redisCache)
|
||||
if msgBroker == nil {
|
||||
if runtime.env == "prod" {
|
||||
return errors.New("outbox message broker unavailable")
|
||||
}
|
||||
runtime.logger.Warn("OutboxProcessor未启动 (message broker不可用)", nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
stats := &messaging.NoOpOutboxStats{}
|
||||
runner := factory.newOutboxRunner(outboxRepo, msgBroker, stats)
|
||||
go runner.Start(ctx)
|
||||
runtime.logger.Info("OutboxProcessor已启动", nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
func startPartitionMaintenanceWorker(
|
||||
rootCtx context.Context,
|
||||
initCtx context.Context,
|
||||
runtime *Runtime,
|
||||
factory backgroundFactory,
|
||||
) {
|
||||
if runtime == nil || runtime.db == nil {
|
||||
return
|
||||
}
|
||||
|
||||
factory = withDefaultBackgroundFactory(factory, runtime.tuning)
|
||||
manager := factory.newPartitionManager(runtime.db)
|
||||
if err := manager.EnsureFuturePartitions(initCtx); err != nil {
|
||||
warnf(runtime.logger, "预创建未来分区失败: %v", err)
|
||||
} else {
|
||||
runtime.logger.Info("分区管理: 未来分区已确保存在", nil)
|
||||
}
|
||||
|
||||
go runPartitionMaintenanceLoop(rootCtx, runtime.logger, manager, runtime.tuning)
|
||||
}
|
||||
|
||||
func startCompensationWorker(ctx context.Context, runtime *Runtime, factory backgroundFactory) {
|
||||
if runtime == nil || runtime.db == nil {
|
||||
return
|
||||
}
|
||||
|
||||
factory = withDefaultBackgroundFactory(factory, runtime.tuning)
|
||||
compensationStore := factory.newCompensationStore(runtime.db)
|
||||
compensationStats := &domain.NoOpCompensationStats{}
|
||||
compensationExecutor := factory.newCompensationExecutor()
|
||||
compensationProcessor := factory.newCompensationProcessor(compensationStore, compensationExecutor, compensationStats)
|
||||
|
||||
runtime.logger.Info("批量补偿处理器: 已初始化", nil)
|
||||
compensationProcessor.StartBackgroundWorker(ctx, runtime.tuning.compensationCheckInterval)
|
||||
infof(runtime.logger, "批量补偿处理器: 后台worker已启动 (每%s检查一次)", runtime.tuning.compensationCheckInterval)
|
||||
}
|
||||
|
||||
func runPartitionMaintenanceLoop(ctx context.Context, logger logging.Logger, manager partitionManager, tuning runtimeTuning) {
|
||||
ticker := time.NewTicker(tuning.partitionMaintenanceInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
||||
@@ -339,6 +339,28 @@ func TestRuntime_StartBackgroundWorkers_ProdRequiresOutboxBroker(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartOutboxProcessor_ProdRequiresBroker(t *testing.T) {
|
||||
err := startOutboxProcessor(context.Background(), &Runtime{
|
||||
env: "prod",
|
||||
logger: testLogger{},
|
||||
db: &repository.DB{},
|
||||
tuning: defaultRuntimeTuning(),
|
||||
}, backgroundFactory{
|
||||
newOutboxRepository: func(*repository.DB) outboxRepository {
|
||||
return stubOutboxRepository{}
|
||||
},
|
||||
newMessageBroker: func(*cache.RedisCache) messaging.MessageBroker {
|
||||
return nil
|
||||
},
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected missing outbox broker to fail in prod")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "outbox message broker unavailable") {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntime_StartBackgroundWorkers_UsesDefaultCompensationInterval(t *testing.T) {
|
||||
var gotInterval time.Duration
|
||||
|
||||
@@ -386,6 +408,38 @@ func TestRuntime_StartBackgroundWorkers_UsesDefaultCompensationInterval(t *testi
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartCompensationWorker_UsesConfiguredInterval(t *testing.T) {
|
||||
var gotInterval time.Duration
|
||||
|
||||
startCompensationWorker(context.Background(), &Runtime{
|
||||
env: "dev",
|
||||
logger: testLogger{},
|
||||
db: &repository.DB{},
|
||||
tuning: defaultRuntimeTuning(),
|
||||
}, backgroundFactory{
|
||||
newCompensationStore: func(*repository.DB) domain.CompensationStore {
|
||||
return stubCompensationStore{}
|
||||
},
|
||||
newCompensationExecutor: func() domain.OperationExecutor {
|
||||
return stubOperationExecutor{}
|
||||
},
|
||||
newCompensationProcessor: func(
|
||||
domain.CompensationStore,
|
||||
domain.OperationExecutor,
|
||||
domain.CompensationStats,
|
||||
) compensationWorker {
|
||||
return stubCompensationWorker{
|
||||
start: func(_ context.Context, interval time.Duration) {
|
||||
gotInterval = interval
|
||||
},
|
||||
}
|
||||
},
|
||||
})
|
||||
if gotInterval != 5*time.Minute {
|
||||
t.Fatalf("unexpected compensation interval: %s", gotInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntime_StartBackgroundWorkers_DevMissingOutboxBrokerLogsWarning(t *testing.T) {
|
||||
logger := &captureLogger{}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user