fix(supply-api): make compensation executor fail closed
This commit is contained in:
@@ -172,7 +172,7 @@ func withDefaultBackgroundFactory(factory backgroundFactory, tuning runtimeTunin
|
||||
}
|
||||
|
||||
var compensationNewDefaultExecutor = func() domain.OperationExecutor {
|
||||
return compensation.NewDefaultCompensationExecutor()
|
||||
return compensation.NewDefaultCompensationExecutor(compensation.ExecutorDependencies{})
|
||||
}
|
||||
|
||||
func startRevocationSubscriber(ctx context.Context, view runtimeBackgroundView) {
|
||||
|
||||
@@ -920,6 +920,18 @@ func TestRuntime_StartBackgroundWorkers_UsesDefaultCompensationInterval(t *testi
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompensationNewDefaultExecutor_FailsClosedWithoutRollbackDependencies(t *testing.T) {
|
||||
executor := compensationNewDefaultExecutor()
|
||||
|
||||
err := executor.Execute(context.Background(), "account.create", json.RawMessage(`{"account_id":101,"supplier_id":201}`))
|
||||
if err == nil {
|
||||
t.Fatal("expected default background compensation executor to fail closed")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "not implemented for production rollback") {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartCompensationWorker_UsesConfiguredInterval(t *testing.T) {
|
||||
var gotInterval time.Duration
|
||||
|
||||
|
||||
@@ -11,16 +11,56 @@ import (
|
||||
|
||||
type operationHandler func(ctx context.Context, payload json.RawMessage) error
|
||||
|
||||
type accountCreateRollbackFunc func(ctx context.Context, payload AccountCreatePayload) error
|
||||
type packagePublishRollbackFunc func(ctx context.Context, payload PackagePublishPayload) error
|
||||
type settlementWithdrawRollbackFunc func(ctx context.Context, payload SettlementWithdrawPayload) error
|
||||
type quotaDeductRollbackFunc func(ctx context.Context, payload QuotaDeductPayload) error
|
||||
|
||||
// ExecutorDependencies 定义补偿执行器在生产回滚中需要的外部依赖。
|
||||
type ExecutorDependencies struct {
|
||||
AccountCreateRollback accountCreateRollbackFunc
|
||||
PackagePublishRollback packagePublishRollbackFunc
|
||||
SettlementWithdrawRollback settlementWithdrawRollbackFunc
|
||||
QuotaDeductRollback quotaDeductRollbackFunc
|
||||
}
|
||||
|
||||
// AccountCreatePayload 定义 account.create 补偿所需的最小 payload。
|
||||
type AccountCreatePayload struct {
|
||||
AccountID int64 `json:"account_id"`
|
||||
SupplierID int64 `json:"supplier_id"`
|
||||
}
|
||||
|
||||
// PackagePublishPayload 定义 package.publish 补偿所需的最小 payload。
|
||||
type PackagePublishPayload struct {
|
||||
PackageID int64 `json:"package_id"`
|
||||
SupplierID int64 `json:"supplier_id"`
|
||||
}
|
||||
|
||||
// SettlementWithdrawPayload 定义 settlement.withdraw 补偿所需的最小 payload。
|
||||
type SettlementWithdrawPayload struct {
|
||||
SettlementID int64 `json:"settlement_id"`
|
||||
SupplierID int64 `json:"supplier_id"`
|
||||
}
|
||||
|
||||
// QuotaDeductPayload 定义 quota.deduct 补偿所需的最小 payload。
|
||||
type QuotaDeductPayload struct {
|
||||
PackageID int64 `json:"package_id"`
|
||||
SupplierID int64 `json:"supplier_id"`
|
||||
UsedQuota float64 `json:"used_quota"`
|
||||
}
|
||||
|
||||
// DefaultCompensationExecutor 默认补偿执行器
|
||||
type DefaultCompensationExecutor struct {
|
||||
sanitizer *sanitizer.Sanitizer // 用于脱敏日志输出
|
||||
handlers map[string]operationHandler
|
||||
sanitizer *sanitizer.Sanitizer // 用于脱敏日志输出
|
||||
dependencies ExecutorDependencies
|
||||
handlers map[string]operationHandler
|
||||
}
|
||||
|
||||
// NewDefaultCompensationExecutor 创建默认补偿执行器
|
||||
func NewDefaultCompensationExecutor() *DefaultCompensationExecutor {
|
||||
func NewDefaultCompensationExecutor(deps ExecutorDependencies) *DefaultCompensationExecutor {
|
||||
executor := &DefaultCompensationExecutor{
|
||||
sanitizer: sanitizer.NewSanitizer(),
|
||||
sanitizer: sanitizer.NewSanitizer(),
|
||||
dependencies: deps,
|
||||
}
|
||||
executor.handlers = map[string]operationHandler{
|
||||
"account.create": executor.CompensateAccountCreate,
|
||||
@@ -69,48 +109,157 @@ func (e *DefaultCompensationExecutor) maskPayload(payload json.RawMessage) strin
|
||||
|
||||
// CompensateAccountCreate 补偿账号创建
|
||||
func (e *DefaultCompensationExecutor) CompensateAccountCreate(ctx context.Context, payload json.RawMessage) error {
|
||||
parsed, err := parseAccountCreatePayload(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if e.dependencies.AccountCreateRollback == nil {
|
||||
return e.rollbackNotImplemented("account.create", payload)
|
||||
}
|
||||
|
||||
logger := logging.NewLogger("supply-api", logging.LogLevelInfo)
|
||||
logger.Info("compensation executor: executing account create compensation", map[string]interface{}{
|
||||
"masked_payload": e.maskPayload(payload),
|
||||
"account_id": parsed.AccountID,
|
||||
"supplier_id": parsed.SupplierID,
|
||||
})
|
||||
// 实际实现:删除已创建的账号资源
|
||||
// 1. 调用账号服务的删除接口
|
||||
// 2. 释放相关资源
|
||||
return nil
|
||||
return e.dependencies.AccountCreateRollback(ctx, parsed)
|
||||
}
|
||||
|
||||
// CompensatePackagePublish 补偿套餐发布
|
||||
func (e *DefaultCompensationExecutor) CompensatePackagePublish(ctx context.Context, payload json.RawMessage) error {
|
||||
parsed, err := parsePackagePublishPayload(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if e.dependencies.PackagePublishRollback == nil {
|
||||
return e.rollbackNotImplemented("package.publish", payload)
|
||||
}
|
||||
|
||||
logger := logging.NewLogger("supply-api", logging.LogLevelInfo)
|
||||
logger.Info("compensation executor: executing package publish compensation", map[string]interface{}{
|
||||
"masked_payload": e.maskPayload(payload),
|
||||
"package_id": parsed.PackageID,
|
||||
"supplier_id": parsed.SupplierID,
|
||||
})
|
||||
// 实际实现:回滚已发布的套餐
|
||||
// 1. 将套餐状态改为draft
|
||||
// 2. 或直接删除套餐
|
||||
return nil
|
||||
return e.dependencies.PackagePublishRollback(ctx, parsed)
|
||||
}
|
||||
|
||||
// CompensateSettlementWithdraw 补偿提现
|
||||
func (e *DefaultCompensationExecutor) CompensateSettlementWithdraw(ctx context.Context, payload json.RawMessage) error {
|
||||
parsed, err := parseSettlementWithdrawPayload(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if e.dependencies.SettlementWithdrawRollback == nil {
|
||||
return e.rollbackNotImplemented("settlement.withdraw", payload)
|
||||
}
|
||||
|
||||
logger := logging.NewLogger("supply-api", logging.LogLevelInfo)
|
||||
logger.Info("compensation executor: executing settlement withdraw compensation", map[string]interface{}{
|
||||
"masked_payload": e.maskPayload(payload),
|
||||
"masked_payload": e.maskPayload(payload),
|
||||
"settlement_id": parsed.SettlementID,
|
||||
"supplier_id": parsed.SupplierID,
|
||||
})
|
||||
// 实际实现:回滚提现状态
|
||||
// 1. 将结算单状态改为failed
|
||||
// 2. 恢复用户余额
|
||||
return nil
|
||||
return e.dependencies.SettlementWithdrawRollback(ctx, parsed)
|
||||
}
|
||||
|
||||
// CompensateQuotaDeduct 补偿配额扣减
|
||||
func (e *DefaultCompensationExecutor) CompensateQuotaDeduct(ctx context.Context, payload json.RawMessage) error {
|
||||
parsed, err := parseQuotaDeductPayload(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if e.dependencies.QuotaDeductRollback == nil {
|
||||
return e.rollbackNotImplemented("quota.deduct", payload)
|
||||
}
|
||||
|
||||
logger := logging.NewLogger("supply-api", logging.LogLevelInfo)
|
||||
logger.Info("compensation executor: executing quota deduct compensation", map[string]interface{}{
|
||||
"masked_payload": e.maskPayload(payload),
|
||||
"package_id": parsed.PackageID,
|
||||
"supplier_id": parsed.SupplierID,
|
||||
"used_quota": parsed.UsedQuota,
|
||||
})
|
||||
// 实际实现:回滚配额扣减
|
||||
// 1. 恢复套餐可用配额
|
||||
// 2. 减少已售配额
|
||||
return e.dependencies.QuotaDeductRollback(ctx, parsed)
|
||||
}
|
||||
|
||||
func (e *DefaultCompensationExecutor) rollbackNotImplemented(operationType string, payload json.RawMessage) error {
|
||||
err := fmt.Errorf("%s compensation not implemented for production rollback", operationType)
|
||||
logger := logging.NewLogger("supply-api", logging.LogLevelWarn)
|
||||
logger.Warn("compensation executor: rollback dependency unavailable", map[string]interface{}{
|
||||
"operation_type": operationType,
|
||||
"masked_payload": e.maskPayload(payload),
|
||||
"error": err.Error(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func parseAccountCreatePayload(payload json.RawMessage) (AccountCreatePayload, error) {
|
||||
var parsed AccountCreatePayload
|
||||
if err := unmarshalPayload(payload, &parsed); err != nil {
|
||||
return AccountCreatePayload{}, err
|
||||
}
|
||||
if parsed.AccountID <= 0 {
|
||||
return AccountCreatePayload{}, fmt.Errorf("invalid compensation payload: account_id is required")
|
||||
}
|
||||
if parsed.SupplierID <= 0 {
|
||||
return AccountCreatePayload{}, fmt.Errorf("invalid compensation payload: supplier_id is required")
|
||||
}
|
||||
return parsed, nil
|
||||
}
|
||||
|
||||
func parsePackagePublishPayload(payload json.RawMessage) (PackagePublishPayload, error) {
|
||||
var parsed PackagePublishPayload
|
||||
if err := unmarshalPayload(payload, &parsed); err != nil {
|
||||
return PackagePublishPayload{}, err
|
||||
}
|
||||
if parsed.PackageID <= 0 {
|
||||
return PackagePublishPayload{}, fmt.Errorf("invalid compensation payload: package_id is required")
|
||||
}
|
||||
if parsed.SupplierID <= 0 {
|
||||
return PackagePublishPayload{}, fmt.Errorf("invalid compensation payload: supplier_id is required")
|
||||
}
|
||||
return parsed, nil
|
||||
}
|
||||
|
||||
func parseSettlementWithdrawPayload(payload json.RawMessage) (SettlementWithdrawPayload, error) {
|
||||
var parsed SettlementWithdrawPayload
|
||||
if err := unmarshalPayload(payload, &parsed); err != nil {
|
||||
return SettlementWithdrawPayload{}, err
|
||||
}
|
||||
if parsed.SettlementID <= 0 {
|
||||
return SettlementWithdrawPayload{}, fmt.Errorf("invalid compensation payload: settlement_id is required")
|
||||
}
|
||||
if parsed.SupplierID <= 0 {
|
||||
return SettlementWithdrawPayload{}, fmt.Errorf("invalid compensation payload: supplier_id is required")
|
||||
}
|
||||
return parsed, nil
|
||||
}
|
||||
|
||||
func parseQuotaDeductPayload(payload json.RawMessage) (QuotaDeductPayload, error) {
|
||||
var parsed QuotaDeductPayload
|
||||
if err := unmarshalPayload(payload, &parsed); err != nil {
|
||||
return QuotaDeductPayload{}, err
|
||||
}
|
||||
if parsed.PackageID <= 0 {
|
||||
return QuotaDeductPayload{}, fmt.Errorf("invalid compensation payload: package_id is required")
|
||||
}
|
||||
if parsed.SupplierID <= 0 {
|
||||
return QuotaDeductPayload{}, fmt.Errorf("invalid compensation payload: supplier_id is required")
|
||||
}
|
||||
if parsed.UsedQuota <= 0 {
|
||||
return QuotaDeductPayload{}, fmt.Errorf("invalid compensation payload: used_quota must be greater than 0")
|
||||
}
|
||||
return parsed, nil
|
||||
}
|
||||
|
||||
func unmarshalPayload(payload json.RawMessage, target any) error {
|
||||
if len(payload) == 0 {
|
||||
return fmt.Errorf("invalid compensation payload: payload is empty")
|
||||
}
|
||||
if err := json.Unmarshal(payload, target); err != nil {
|
||||
return fmt.Errorf("invalid compensation payload: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
func TestDefaultCompensationExecutor_DispatchesByOperationType(t *testing.T) {
|
||||
executor := NewDefaultCompensationExecutor()
|
||||
executor := NewDefaultCompensationExecutor(ExecutorDependencies{})
|
||||
payload := json.RawMessage(`{"account":"acc-1"}`)
|
||||
called := ""
|
||||
|
||||
@@ -28,7 +28,7 @@ func TestDefaultCompensationExecutor_DispatchesByOperationType(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDefaultCompensationExecutor_RejectsUnknownOperationType(t *testing.T) {
|
||||
executor := NewDefaultCompensationExecutor()
|
||||
executor := NewDefaultCompensationExecutor(ExecutorDependencies{})
|
||||
|
||||
err := executor.Execute(context.Background(), "unknown.operation", json.RawMessage(`{"test":true}`))
|
||||
if err == nil {
|
||||
@@ -40,7 +40,7 @@ func TestDefaultCompensationExecutor_RejectsUnknownOperationType(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewDefaultCompensationExecutor_RegistersBuiltInHandlers(t *testing.T) {
|
||||
executor := NewDefaultCompensationExecutor()
|
||||
executor := NewDefaultCompensationExecutor(ExecutorDependencies{})
|
||||
|
||||
for _, operationType := range []string{
|
||||
"account.create",
|
||||
@@ -53,3 +53,99 @@ func TestNewDefaultCompensationExecutor_RegistersBuiltInHandlers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultCompensationExecutor_FailsClosedWithoutRollbackDependencies(t *testing.T) {
|
||||
executor := NewDefaultCompensationExecutor(ExecutorDependencies{})
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
operationType string
|
||||
payload json.RawMessage
|
||||
wantFragment string
|
||||
}{
|
||||
{
|
||||
name: "account create",
|
||||
operationType: "account.create",
|
||||
payload: json.RawMessage(`{"account_id":101,"supplier_id":201}`),
|
||||
wantFragment: "not implemented for production rollback",
|
||||
},
|
||||
{
|
||||
name: "package publish",
|
||||
operationType: "package.publish",
|
||||
payload: json.RawMessage(`{"package_id":102,"supplier_id":202}`),
|
||||
wantFragment: "not implemented for production rollback",
|
||||
},
|
||||
{
|
||||
name: "settlement withdraw",
|
||||
operationType: "settlement.withdraw",
|
||||
payload: json.RawMessage(`{"settlement_id":103,"supplier_id":203}`),
|
||||
wantFragment: "not implemented for production rollback",
|
||||
},
|
||||
{
|
||||
name: "quota deduct",
|
||||
operationType: "quota.deduct",
|
||||
payload: json.RawMessage(`{"package_id":104,"supplier_id":204,"used_quota":12.5}`),
|
||||
wantFragment: "not implemented for production rollback",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := executor.Execute(context.Background(), tt.operationType, tt.payload)
|
||||
if err == nil {
|
||||
t.Fatalf("expected %s compensation to fail closed", tt.operationType)
|
||||
}
|
||||
if !strings.Contains(err.Error(), tt.wantFragment) {
|
||||
t.Fatalf("expected error to contain %q, got %v", tt.wantFragment, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultCompensationExecutor_RejectsIncompletePayload(t *testing.T) {
|
||||
executor := NewDefaultCompensationExecutor(ExecutorDependencies{})
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
operationType string
|
||||
payload json.RawMessage
|
||||
wantFragment string
|
||||
}{
|
||||
{
|
||||
name: "account create missing supplier",
|
||||
operationType: "account.create",
|
||||
payload: json.RawMessage(`{"account_id":101}`),
|
||||
wantFragment: "supplier_id is required",
|
||||
},
|
||||
{
|
||||
name: "package publish missing package id",
|
||||
operationType: "package.publish",
|
||||
payload: json.RawMessage(`{"supplier_id":202}`),
|
||||
wantFragment: "package_id is required",
|
||||
},
|
||||
{
|
||||
name: "settlement withdraw missing settlement id",
|
||||
operationType: "settlement.withdraw",
|
||||
payload: json.RawMessage(`{"supplier_id":203}`),
|
||||
wantFragment: "settlement_id is required",
|
||||
},
|
||||
{
|
||||
name: "quota deduct missing used quota",
|
||||
operationType: "quota.deduct",
|
||||
payload: json.RawMessage(`{"package_id":104,"supplier_id":204}`),
|
||||
wantFragment: "used_quota must be greater than 0",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := executor.Execute(context.Background(), tt.operationType, tt.payload)
|
||||
if err == nil {
|
||||
t.Fatalf("expected %s compensation to reject incomplete payload", tt.operationType)
|
||||
}
|
||||
if !strings.Contains(err.Error(), tt.wantFragment) {
|
||||
t.Fatalf("expected error to contain %q, got %v", tt.wantFragment, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
50
supply-api/internal/compensation/payload_test.go
Normal file
50
supply-api/internal/compensation/payload_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package compensation
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseAccountCreatePayload(t *testing.T) {
|
||||
payload, err := parseAccountCreatePayload(json.RawMessage(`{"account_id":101,"supplier_id":201}`))
|
||||
if err != nil {
|
||||
t.Fatalf("expected valid payload, got %v", err)
|
||||
}
|
||||
if payload.AccountID != 101 {
|
||||
t.Fatalf("expected account id 101, got %d", payload.AccountID)
|
||||
}
|
||||
if payload.SupplierID != 201 {
|
||||
t.Fatalf("expected supplier id 201, got %d", payload.SupplierID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsePackagePublishPayload_RejectsMissingPackageID(t *testing.T) {
|
||||
_, err := parsePackagePublishPayload(json.RawMessage(`{"supplier_id":202}`))
|
||||
if err == nil {
|
||||
t.Fatal("expected missing package id to fail")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "package_id is required") {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSettlementWithdrawPayload_RejectsInvalidJSON(t *testing.T) {
|
||||
_, err := parseSettlementWithdrawPayload(json.RawMessage(`{invalid}`))
|
||||
if err == nil {
|
||||
t.Fatal("expected invalid json to fail")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "invalid compensation payload") {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseQuotaDeductPayload_RequiresPositiveUsedQuota(t *testing.T) {
|
||||
_, err := parseQuotaDeductPayload(json.RawMessage(`{"package_id":104,"supplier_id":204,"used_quota":0}`))
|
||||
if err == nil {
|
||||
t.Fatal("expected non-positive used quota to fail")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "used_quota must be greater than 0") {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user