diff --git a/supply-api/internal/app/background.go b/supply-api/internal/app/background.go index f9bb7d95..02bf552b 100644 --- a/supply-api/internal/app/background.go +++ b/supply-api/internal/app/background.go @@ -172,7 +172,7 @@ func withDefaultBackgroundFactory(factory backgroundFactory, tuning runtimeTunin } var compensationNewDefaultExecutor = func() domain.OperationExecutor { - return compensation.NewDefaultCompensationExecutor() + return compensation.NewDefaultCompensationExecutor(compensation.ExecutorDependencies{}) } func startRevocationSubscriber(ctx context.Context, view runtimeBackgroundView) { diff --git a/supply-api/internal/app/runtime_test.go b/supply-api/internal/app/runtime_test.go index 69b0a39f..5a728585 100644 --- a/supply-api/internal/app/runtime_test.go +++ b/supply-api/internal/app/runtime_test.go @@ -920,6 +920,18 @@ func TestRuntime_StartBackgroundWorkers_UsesDefaultCompensationInterval(t *testi } } +func TestCompensationNewDefaultExecutor_FailsClosedWithoutRollbackDependencies(t *testing.T) { + executor := compensationNewDefaultExecutor() + + err := executor.Execute(context.Background(), "account.create", json.RawMessage(`{"account_id":101,"supplier_id":201}`)) + if err == nil { + t.Fatal("expected default background compensation executor to fail closed") + } + if !strings.Contains(err.Error(), "not implemented for production rollback") { + t.Fatalf("unexpected error: %v", err) + } +} + func TestStartCompensationWorker_UsesConfiguredInterval(t *testing.T) { var gotInterval time.Duration diff --git a/supply-api/internal/compensation/compensation.go b/supply-api/internal/compensation/compensation.go index 9fcb4c7a..7fc5e26b 100644 --- a/supply-api/internal/compensation/compensation.go +++ b/supply-api/internal/compensation/compensation.go @@ -11,16 +11,56 @@ import ( type operationHandler func(ctx context.Context, payload json.RawMessage) error +type accountCreateRollbackFunc func(ctx context.Context, payload AccountCreatePayload) error +type packagePublishRollbackFunc func(ctx context.Context, payload PackagePublishPayload) error +type settlementWithdrawRollbackFunc func(ctx context.Context, payload SettlementWithdrawPayload) error +type quotaDeductRollbackFunc func(ctx context.Context, payload QuotaDeductPayload) error + +// ExecutorDependencies 定义补偿执行器在生产回滚中需要的外部依赖。 +type ExecutorDependencies struct { + AccountCreateRollback accountCreateRollbackFunc + PackagePublishRollback packagePublishRollbackFunc + SettlementWithdrawRollback settlementWithdrawRollbackFunc + QuotaDeductRollback quotaDeductRollbackFunc +} + +// AccountCreatePayload 定义 account.create 补偿所需的最小 payload。 +type AccountCreatePayload struct { + AccountID int64 `json:"account_id"` + SupplierID int64 `json:"supplier_id"` +} + +// PackagePublishPayload 定义 package.publish 补偿所需的最小 payload。 +type PackagePublishPayload struct { + PackageID int64 `json:"package_id"` + SupplierID int64 `json:"supplier_id"` +} + +// SettlementWithdrawPayload 定义 settlement.withdraw 补偿所需的最小 payload。 +type SettlementWithdrawPayload struct { + SettlementID int64 `json:"settlement_id"` + SupplierID int64 `json:"supplier_id"` +} + +// QuotaDeductPayload 定义 quota.deduct 补偿所需的最小 payload。 +type QuotaDeductPayload struct { + PackageID int64 `json:"package_id"` + SupplierID int64 `json:"supplier_id"` + UsedQuota float64 `json:"used_quota"` +} + // DefaultCompensationExecutor 默认补偿执行器 type DefaultCompensationExecutor struct { - sanitizer *sanitizer.Sanitizer // 用于脱敏日志输出 - handlers map[string]operationHandler + sanitizer *sanitizer.Sanitizer // 用于脱敏日志输出 + dependencies ExecutorDependencies + handlers map[string]operationHandler } // NewDefaultCompensationExecutor 创建默认补偿执行器 -func NewDefaultCompensationExecutor() *DefaultCompensationExecutor { +func NewDefaultCompensationExecutor(deps ExecutorDependencies) *DefaultCompensationExecutor { executor := &DefaultCompensationExecutor{ - sanitizer: sanitizer.NewSanitizer(), + sanitizer: sanitizer.NewSanitizer(), + dependencies: deps, } executor.handlers = map[string]operationHandler{ "account.create": executor.CompensateAccountCreate, @@ -69,48 +109,157 @@ func (e *DefaultCompensationExecutor) maskPayload(payload json.RawMessage) strin // CompensateAccountCreate 补偿账号创建 func (e *DefaultCompensationExecutor) CompensateAccountCreate(ctx context.Context, payload json.RawMessage) error { + parsed, err := parseAccountCreatePayload(payload) + if err != nil { + return err + } + if e.dependencies.AccountCreateRollback == nil { + return e.rollbackNotImplemented("account.create", payload) + } + logger := logging.NewLogger("supply-api", logging.LogLevelInfo) logger.Info("compensation executor: executing account create compensation", map[string]interface{}{ "masked_payload": e.maskPayload(payload), + "account_id": parsed.AccountID, + "supplier_id": parsed.SupplierID, }) - // 实际实现:删除已创建的账号资源 - // 1. 调用账号服务的删除接口 - // 2. 释放相关资源 - return nil + return e.dependencies.AccountCreateRollback(ctx, parsed) } // CompensatePackagePublish 补偿套餐发布 func (e *DefaultCompensationExecutor) CompensatePackagePublish(ctx context.Context, payload json.RawMessage) error { + parsed, err := parsePackagePublishPayload(payload) + if err != nil { + return err + } + if e.dependencies.PackagePublishRollback == nil { + return e.rollbackNotImplemented("package.publish", payload) + } + logger := logging.NewLogger("supply-api", logging.LogLevelInfo) logger.Info("compensation executor: executing package publish compensation", map[string]interface{}{ "masked_payload": e.maskPayload(payload), + "package_id": parsed.PackageID, + "supplier_id": parsed.SupplierID, }) - // 实际实现:回滚已发布的套餐 - // 1. 将套餐状态改为draft - // 2. 或直接删除套餐 - return nil + return e.dependencies.PackagePublishRollback(ctx, parsed) } // CompensateSettlementWithdraw 补偿提现 func (e *DefaultCompensationExecutor) CompensateSettlementWithdraw(ctx context.Context, payload json.RawMessage) error { + parsed, err := parseSettlementWithdrawPayload(payload) + if err != nil { + return err + } + if e.dependencies.SettlementWithdrawRollback == nil { + return e.rollbackNotImplemented("settlement.withdraw", payload) + } + logger := logging.NewLogger("supply-api", logging.LogLevelInfo) logger.Info("compensation executor: executing settlement withdraw compensation", map[string]interface{}{ - "masked_payload": e.maskPayload(payload), + "masked_payload": e.maskPayload(payload), + "settlement_id": parsed.SettlementID, + "supplier_id": parsed.SupplierID, }) - // 实际实现:回滚提现状态 - // 1. 将结算单状态改为failed - // 2. 恢复用户余额 - return nil + return e.dependencies.SettlementWithdrawRollback(ctx, parsed) } // CompensateQuotaDeduct 补偿配额扣减 func (e *DefaultCompensationExecutor) CompensateQuotaDeduct(ctx context.Context, payload json.RawMessage) error { + parsed, err := parseQuotaDeductPayload(payload) + if err != nil { + return err + } + if e.dependencies.QuotaDeductRollback == nil { + return e.rollbackNotImplemented("quota.deduct", payload) + } + logger := logging.NewLogger("supply-api", logging.LogLevelInfo) logger.Info("compensation executor: executing quota deduct compensation", map[string]interface{}{ "masked_payload": e.maskPayload(payload), + "package_id": parsed.PackageID, + "supplier_id": parsed.SupplierID, + "used_quota": parsed.UsedQuota, }) - // 实际实现:回滚配额扣减 - // 1. 恢复套餐可用配额 - // 2. 减少已售配额 + return e.dependencies.QuotaDeductRollback(ctx, parsed) +} + +func (e *DefaultCompensationExecutor) rollbackNotImplemented(operationType string, payload json.RawMessage) error { + err := fmt.Errorf("%s compensation not implemented for production rollback", operationType) + logger := logging.NewLogger("supply-api", logging.LogLevelWarn) + logger.Warn("compensation executor: rollback dependency unavailable", map[string]interface{}{ + "operation_type": operationType, + "masked_payload": e.maskPayload(payload), + "error": err.Error(), + }) + return err +} + +func parseAccountCreatePayload(payload json.RawMessage) (AccountCreatePayload, error) { + var parsed AccountCreatePayload + if err := unmarshalPayload(payload, &parsed); err != nil { + return AccountCreatePayload{}, err + } + if parsed.AccountID <= 0 { + return AccountCreatePayload{}, fmt.Errorf("invalid compensation payload: account_id is required") + } + if parsed.SupplierID <= 0 { + return AccountCreatePayload{}, fmt.Errorf("invalid compensation payload: supplier_id is required") + } + return parsed, nil +} + +func parsePackagePublishPayload(payload json.RawMessage) (PackagePublishPayload, error) { + var parsed PackagePublishPayload + if err := unmarshalPayload(payload, &parsed); err != nil { + return PackagePublishPayload{}, err + } + if parsed.PackageID <= 0 { + return PackagePublishPayload{}, fmt.Errorf("invalid compensation payload: package_id is required") + } + if parsed.SupplierID <= 0 { + return PackagePublishPayload{}, fmt.Errorf("invalid compensation payload: supplier_id is required") + } + return parsed, nil +} + +func parseSettlementWithdrawPayload(payload json.RawMessage) (SettlementWithdrawPayload, error) { + var parsed SettlementWithdrawPayload + if err := unmarshalPayload(payload, &parsed); err != nil { + return SettlementWithdrawPayload{}, err + } + if parsed.SettlementID <= 0 { + return SettlementWithdrawPayload{}, fmt.Errorf("invalid compensation payload: settlement_id is required") + } + if parsed.SupplierID <= 0 { + return SettlementWithdrawPayload{}, fmt.Errorf("invalid compensation payload: supplier_id is required") + } + return parsed, nil +} + +func parseQuotaDeductPayload(payload json.RawMessage) (QuotaDeductPayload, error) { + var parsed QuotaDeductPayload + if err := unmarshalPayload(payload, &parsed); err != nil { + return QuotaDeductPayload{}, err + } + if parsed.PackageID <= 0 { + return QuotaDeductPayload{}, fmt.Errorf("invalid compensation payload: package_id is required") + } + if parsed.SupplierID <= 0 { + return QuotaDeductPayload{}, fmt.Errorf("invalid compensation payload: supplier_id is required") + } + if parsed.UsedQuota <= 0 { + return QuotaDeductPayload{}, fmt.Errorf("invalid compensation payload: used_quota must be greater than 0") + } + return parsed, nil +} + +func unmarshalPayload(payload json.RawMessage, target any) error { + if len(payload) == 0 { + return fmt.Errorf("invalid compensation payload: payload is empty") + } + if err := json.Unmarshal(payload, target); err != nil { + return fmt.Errorf("invalid compensation payload: %w", err) + } return nil } diff --git a/supply-api/internal/compensation/compensation_test.go b/supply-api/internal/compensation/compensation_test.go index 7f949dd7..84f143ab 100644 --- a/supply-api/internal/compensation/compensation_test.go +++ b/supply-api/internal/compensation/compensation_test.go @@ -8,7 +8,7 @@ import ( ) func TestDefaultCompensationExecutor_DispatchesByOperationType(t *testing.T) { - executor := NewDefaultCompensationExecutor() + executor := NewDefaultCompensationExecutor(ExecutorDependencies{}) payload := json.RawMessage(`{"account":"acc-1"}`) called := "" @@ -28,7 +28,7 @@ func TestDefaultCompensationExecutor_DispatchesByOperationType(t *testing.T) { } func TestDefaultCompensationExecutor_RejectsUnknownOperationType(t *testing.T) { - executor := NewDefaultCompensationExecutor() + executor := NewDefaultCompensationExecutor(ExecutorDependencies{}) err := executor.Execute(context.Background(), "unknown.operation", json.RawMessage(`{"test":true}`)) if err == nil { @@ -40,7 +40,7 @@ func TestDefaultCompensationExecutor_RejectsUnknownOperationType(t *testing.T) { } func TestNewDefaultCompensationExecutor_RegistersBuiltInHandlers(t *testing.T) { - executor := NewDefaultCompensationExecutor() + executor := NewDefaultCompensationExecutor(ExecutorDependencies{}) for _, operationType := range []string{ "account.create", @@ -53,3 +53,99 @@ func TestNewDefaultCompensationExecutor_RegistersBuiltInHandlers(t *testing.T) { } } } + +func TestDefaultCompensationExecutor_FailsClosedWithoutRollbackDependencies(t *testing.T) { + executor := NewDefaultCompensationExecutor(ExecutorDependencies{}) + + tests := []struct { + name string + operationType string + payload json.RawMessage + wantFragment string + }{ + { + name: "account create", + operationType: "account.create", + payload: json.RawMessage(`{"account_id":101,"supplier_id":201}`), + wantFragment: "not implemented for production rollback", + }, + { + name: "package publish", + operationType: "package.publish", + payload: json.RawMessage(`{"package_id":102,"supplier_id":202}`), + wantFragment: "not implemented for production rollback", + }, + { + name: "settlement withdraw", + operationType: "settlement.withdraw", + payload: json.RawMessage(`{"settlement_id":103,"supplier_id":203}`), + wantFragment: "not implemented for production rollback", + }, + { + name: "quota deduct", + operationType: "quota.deduct", + payload: json.RawMessage(`{"package_id":104,"supplier_id":204,"used_quota":12.5}`), + wantFragment: "not implemented for production rollback", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := executor.Execute(context.Background(), tt.operationType, tt.payload) + if err == nil { + t.Fatalf("expected %s compensation to fail closed", tt.operationType) + } + if !strings.Contains(err.Error(), tt.wantFragment) { + t.Fatalf("expected error to contain %q, got %v", tt.wantFragment, err) + } + }) + } +} + +func TestDefaultCompensationExecutor_RejectsIncompletePayload(t *testing.T) { + executor := NewDefaultCompensationExecutor(ExecutorDependencies{}) + + tests := []struct { + name string + operationType string + payload json.RawMessage + wantFragment string + }{ + { + name: "account create missing supplier", + operationType: "account.create", + payload: json.RawMessage(`{"account_id":101}`), + wantFragment: "supplier_id is required", + }, + { + name: "package publish missing package id", + operationType: "package.publish", + payload: json.RawMessage(`{"supplier_id":202}`), + wantFragment: "package_id is required", + }, + { + name: "settlement withdraw missing settlement id", + operationType: "settlement.withdraw", + payload: json.RawMessage(`{"supplier_id":203}`), + wantFragment: "settlement_id is required", + }, + { + name: "quota deduct missing used quota", + operationType: "quota.deduct", + payload: json.RawMessage(`{"package_id":104,"supplier_id":204}`), + wantFragment: "used_quota must be greater than 0", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := executor.Execute(context.Background(), tt.operationType, tt.payload) + if err == nil { + t.Fatalf("expected %s compensation to reject incomplete payload", tt.operationType) + } + if !strings.Contains(err.Error(), tt.wantFragment) { + t.Fatalf("expected error to contain %q, got %v", tt.wantFragment, err) + } + }) + } +} diff --git a/supply-api/internal/compensation/payload_test.go b/supply-api/internal/compensation/payload_test.go new file mode 100644 index 00000000..635f145a --- /dev/null +++ b/supply-api/internal/compensation/payload_test.go @@ -0,0 +1,50 @@ +package compensation + +import ( + "encoding/json" + "strings" + "testing" +) + +func TestParseAccountCreatePayload(t *testing.T) { + payload, err := parseAccountCreatePayload(json.RawMessage(`{"account_id":101,"supplier_id":201}`)) + if err != nil { + t.Fatalf("expected valid payload, got %v", err) + } + if payload.AccountID != 101 { + t.Fatalf("expected account id 101, got %d", payload.AccountID) + } + if payload.SupplierID != 201 { + t.Fatalf("expected supplier id 201, got %d", payload.SupplierID) + } +} + +func TestParsePackagePublishPayload_RejectsMissingPackageID(t *testing.T) { + _, err := parsePackagePublishPayload(json.RawMessage(`{"supplier_id":202}`)) + if err == nil { + t.Fatal("expected missing package id to fail") + } + if !strings.Contains(err.Error(), "package_id is required") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestParseSettlementWithdrawPayload_RejectsInvalidJSON(t *testing.T) { + _, err := parseSettlementWithdrawPayload(json.RawMessage(`{invalid}`)) + if err == nil { + t.Fatal("expected invalid json to fail") + } + if !strings.Contains(err.Error(), "invalid compensation payload") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestParseQuotaDeductPayload_RequiresPositiveUsedQuota(t *testing.T) { + _, err := parseQuotaDeductPayload(json.RawMessage(`{"package_id":104,"supplier_id":204,"used_quota":0}`)) + if err == nil { + t.Fatal("expected non-positive used quota to fail") + } + if !strings.Contains(err.Error(), "used_quota must be greater than 0") { + t.Fatalf("unexpected error: %v", err) + } +}