feat: 初始化ForeignKeyValidator和CompensationProcessor

P0-07: 批量补偿处理器
- 添加NewCompensationProcessor构造函数
- 添加NoOpCompensationStats实现
- 添加defaultCompensationExecutor placeholder实现
- 在main.go中初始化CompensationProcessor

P0-09: 外键校验器
- 修改ForeignKeyValidator使用pgxpool替代sql.DB
- 在main.go中初始化ForeignKeyValidator
- 在创建账户前调用ValidateSupplyAccountOwner
- 在创建套餐前调用ValidatePackageSupplyAccount
- SupplyAPI添加fkValidator字段

修改的文件:
- cmd/supply-api/main.go: 初始化组件
- internal/httpapi/supply_api.go: 添加外键校验
- internal/domain/compensation.go: 添加构造函数和Stats实现
- internal/repository/foreign_key_validator.go: 改用pgxpool
This commit is contained in:
Your Name
2026-04-08 19:00:06 +08:00
parent 2f0011b118
commit 40ab7cf851
4 changed files with 540 additions and 6 deletions

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"crypto/x509" "crypto/x509"
"encoding/json"
"encoding/pem" "encoding/pem"
"flag" "flag"
"fmt" "fmt"
@@ -115,6 +116,15 @@ func main() {
log.Println("警告: 审计存储使用内存实现 (生产环境不应使用)") log.Println("警告: 审计存储使用内存实现 (生产环境不应使用)")
} }
// P0-09修复: 初始化外键校验器
var fkValidator *repository.ForeignKeyValidator
if db != nil {
fkValidator = repository.NewForeignKeyValidator(db.Pool)
log.Println("外键校验器: 已初始化 (PostgreSQL-backed)")
} else {
log.Println("警告: 外键校验器未启用 (db不可用)")
}
// 初始化不变量检查器 // 初始化不变量检查器
invariantChecker := domain.NewInvariantChecker(accountStore, packageStore, settlementStore) invariantChecker := domain.NewInvariantChecker(accountStore, packageStore, settlementStore)
_ = invariantChecker // 用于业务逻辑校验 _ = invariantChecker // 用于业务逻辑校验
@@ -198,6 +208,7 @@ func main() {
earningService, earningService,
idempotencyMiddleware, // 使用幂等中间件DB-backed idempotencyMiddleware, // 使用幂等中间件DB-backed
auditStore, auditStore,
fkValidator, // P0-09修复: 外键校验器
cfg.Server.DefaultSupplierID, cfg.Server.DefaultSupplierID,
cfg.Server.StatementBaseURL, cfg.Server.StatementBaseURL,
time.Now, time.Now,
@@ -310,6 +321,14 @@ func main() {
} }
} }
}() }()
// P0-07修复: 初始化批量补偿处理器
compensationStore := domain.NewSQLCompensationStore(db.Pool)
compensationStats := &domain.NoOpCompensationStats{}
compensationExecutor := &defaultCompensationExecutor{} // 需要实现OperationExecutor接口
compensationProcessor := domain.NewCompensationProcessor(compensationStore, compensationExecutor, compensationStats)
log.Println("批量补偿处理器: 已初始化")
_ = compensationProcessor // TODO: 启动后台补偿处理goroutine
} }
// 优雅关闭 // 优雅关闭
@@ -719,6 +738,18 @@ func calculateOutboxBackoff(retryCount, maxRetries int) int {
// Ensure domain.OutboxEvent is compatible with our conversion // Ensure domain.OutboxEvent is compatible with our conversion
var _ = domain.OutboxEvent{} var _ = domain.OutboxEvent{}
// ==================== 补偿执行器 ====================
// defaultCompensationExecutor 默认补偿执行器
type defaultCompensationExecutor struct{}
func (e *defaultCompensationExecutor) Execute(ctx context.Context, operationType string, payload json.RawMessage) error {
// TODO: 根据operationType执行相应的补偿操作
// 目前为placeholder实现实际生产需要根据业务类型实现具体逻辑
log.Printf("补偿执行器: operation_type=%s, payload=%s", operationType, string(payload))
return nil
}
// parseRSAPublicKey 解析PEM格式的RSA公钥 // parseRSAPublicKey 解析PEM格式的RSA公钥
func parseRSAPublicKey(pemKey string) interface{} { func parseRSAPublicKey(pemKey string) interface{} {
if pemKey == "" { if pemKey == "" {

View File

@@ -0,0 +1,261 @@
package domain
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
// ==================== P0-07 批量补偿策略 ====================
// BatchCompensation 批量补偿记录
type BatchCompensation struct {
ID int64 `json:"id"`
BatchID string `json:"batch_id"`
OperationType string `json:"operation_type"`
ItemIndex int `json:"item_index"`
ItemPayload json.RawMessage `json:"item_payload"`
FailureReason string `json:"failure_reason,omitempty"`
Status string `json:"status"` // pending, retrying, resolved, manual_required, abandoned
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
ResolvedBy *int64 `json:"resolved_by,omitempty"`
ResolutionNotes string `json:"resolution_notes,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatedBy *int64 `json:"created_by,omitempty"`
Version int64 `json:"version"`
}
// CompensationStatus 补偿状态
const (
CompensationStatusPending = "pending"
CompensationStatusRetrying = "retrying"
CompensationStatusResolved = "resolved"
CompensationStatusManualRequired = "manual_required"
CompensationStatusAbandoned = "abandoned"
)
// CompensationStore 补偿存储接口
type CompensationStore interface {
// Create 创建补偿记录
Create(ctx context.Context, comp *BatchCompensation) (int64, error)
// GetByBatchID 获取批次补偿列表
GetByBatchID(ctx context.Context, batchID string) ([]*BatchCompensation, error)
// UpdateStatus 更新状态
UpdateStatus(ctx context.Context, id int64, status string) error
// Resolve 解决补偿
Resolve(ctx context.Context, id int64, resolvedBy int64, notes string) error
// MarkManualRequired 标记需要人工介入
MarkManualRequired(ctx context.Context, id int64, reason string) error
}
// CompensationProcessor 补偿处理器
type CompensationProcessor struct {
store CompensationStore
operationExecutor OperationExecutor
stats CompensationStats
}
// OperationExecutor 操作执行器接口
type OperationExecutor interface {
// Execute 执行单个操作
Execute(ctx context.Context, operationType string, payload json.RawMessage) error
}
// CompensationStats 补偿统计接口
type CompensationStats interface {
RecordCompensationRetry(operationType string)
RecordCompensationResolved(operationType string)
RecordCompensationManual(operationType string)
}
// DefaultCompensationConfig 默认补偿配置
func DefaultCompensationConfig() *CompensationConfig {
return &CompensationConfig{
MaxRetries: 3,
RetryInterval: 1 * time.Minute,
}
}
// NoOpCompensationStats No-op补偿统计实现
type NoOpCompensationStats struct{}
func (s *NoOpCompensationStats) RecordCompensationRetry(operationType string) {}
func (s *NoOpCompensationStats) RecordCompensationResolved(operationType string) {}
func (s *NoOpCompensationStats) RecordCompensationManual(operationType string) {}
// NewCompensationProcessor 创建补偿处理器
func NewCompensationProcessor(store CompensationStore, executor OperationExecutor, stats CompensationStats) *CompensationProcessor {
return &CompensationProcessor{
store: store,
operationExecutor: executor,
stats: stats,
}
}
// CompensationConfig 补偿配置
type CompensationConfig struct {
MaxRetries int
RetryInterval time.Duration
}
// ProcessBatchCompensations 处理批次补偿
func (p *CompensationProcessor) ProcessBatchCompensations(ctx context.Context, batchID string) (*CompensationResult, error) {
// 获取批次补偿列表
compensations, err := p.store.GetByBatchID(ctx, batchID)
if err != nil {
return nil, fmt.Errorf("failed to get compensations: %w", err)
}
result := &CompensationResult{
BatchID: batchID,
TotalItems: len(compensations),
SuccessCount: 0,
RetryCount: 0,
ManualCount: 0,
FailedCount: 0,
}
for _, comp := range compensations {
if comp.Status != CompensationStatusPending {
continue
}
// 重试执行
err := p.operationExecutor.Execute(ctx, comp.OperationType, comp.ItemPayload)
if err != nil {
comp.RetryCount++
comp.FailureReason = err.Error()
if comp.RetryCount >= comp.MaxRetries {
// 超过最大重试次数,标记需要人工介入
if err := p.store.MarkManualRequired(ctx, comp.ID, err.Error()); err != nil {
result.FailedCount++
continue
}
result.ManualCount++
p.stats.RecordCompensationManual(comp.OperationType)
} else {
// 继续重试
if err := p.store.UpdateStatus(ctx, comp.ID, CompensationStatusRetrying); err != nil {
result.FailedCount++
continue
}
result.RetryCount++
p.stats.RecordCompensationRetry(comp.OperationType)
}
} else {
// 执行成功,标记解决
if err := p.store.Resolve(ctx, comp.ID, 0, "auto_resolved"); err != nil {
result.FailedCount++
continue
}
result.SuccessCount++
p.stats.RecordCompensationResolved(comp.OperationType)
}
}
return result, nil
}
// CompensationResult 补偿处理结果
type CompensationResult struct {
BatchID string `json:"batch_id"`
TotalItems int `json:"total_items"`
SuccessCount int `json:"success_count"`
RetryCount int `json:"retry_count"`
ManualCount int `json:"manual_count"`
FailedCount int `json:"failed_count"`
}
// SQLCompensationStore SQL实现的补偿存储
type SQLCompensationStore struct {
pool *pgxpool.Pool
}
// NewSQLCompensationStore 创建SQL补偿存储
func NewSQLCompensationStore(pool *pgxpool.Pool) *SQLCompensationStore {
return &SQLCompensationStore{pool: pool}
}
func (s *SQLCompensationStore) Create(ctx context.Context, comp *BatchCompensation) (int64, error) {
var id int64
err := s.pool.QueryRow(ctx, `
INSERT INTO supply_batch_compensation (
batch_id, operation_type, item_index, item_payload,
failure_reason, status, max_retries, created_by
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
`, comp.BatchID, comp.OperationType, comp.ItemIndex, comp.ItemPayload,
comp.FailureReason, CompensationStatusPending, comp.MaxRetries, comp.CreatedBy).
Scan(&id)
return id, err
}
func (s *SQLCompensationStore) GetByBatchID(ctx context.Context, batchID string) ([]*BatchCompensation, error) {
rows, err := s.pool.Query(ctx, `
SELECT id, batch_id, operation_type, item_index, item_payload,
failure_reason, status, retry_count, max_retries,
resolved_at, resolved_by, resolution_notes,
created_at, updated_at, created_by, version
FROM supply_batch_compensation
WHERE batch_id = $1
ORDER BY item_index
`, batchID)
if err != nil {
return nil, err
}
defer rows.Close()
var compensations []*BatchCompensation
for rows.Next() {
comp := &BatchCompensation{}
err := rows.Scan(
&comp.ID, &comp.BatchID, &comp.OperationType, &comp.ItemIndex,
&comp.ItemPayload, &comp.FailureReason, &comp.Status,
&comp.RetryCount, &comp.MaxRetries, &comp.ResolvedAt,
&comp.ResolvedBy, &comp.ResolutionNotes, &comp.CreatedAt,
&comp.UpdatedAt, &comp.CreatedBy, &comp.Version,
)
if err != nil {
return nil, err
}
compensations = append(compensations, comp)
}
return compensations, rows.Err()
}
func (s *SQLCompensationStore) UpdateStatus(ctx context.Context, id int64, status string) error {
_, err := s.pool.Exec(ctx, `
UPDATE supply_batch_compensation
SET status = $1, updated_at = CURRENT_TIMESTAMP, version = version + 1
WHERE id = $2
`, status, id)
return err
}
func (s *SQLCompensationStore) Resolve(ctx context.Context, id int64, resolvedBy int64, notes string) error {
_, err := s.pool.Exec(ctx, `
UPDATE supply_batch_compensation
SET status = $1, resolved_at = CURRENT_TIMESTAMP,
resolved_by = $2, resolution_notes = $3,
updated_at = CURRENT_TIMESTAMP, version = version + 1
WHERE id = $4
`, CompensationStatusResolved, resolvedBy, notes, id)
return err
}
func (s *SQLCompensationStore) MarkManualRequired(ctx context.Context, id int64, reason string) error {
_, err := s.pool.Exec(ctx, `
UPDATE supply_batch_compensation
SET status = $1, failure_reason = COALESCE(failure_reason || '; ', '') || $2,
updated_at = CURRENT_TIMESTAMP, version = version + 1
WHERE id = $3
`, CompensationStatusManualRequired, reason, id)
return err
}

View File

@@ -21,12 +21,13 @@ type SupplyAPI struct {
accountService domain.AccountService accountService domain.AccountService
packageService domain.PackageService packageService domain.PackageService
settlementService domain.SettlementService settlementService domain.SettlementService
earningService domain.EarningService earningService domain.EarningService
idempotencyMw *middleware.IdempotencyMiddleware // P0-P4修复: 使用DB-backed幂等中间件 idempotencyMw *middleware.IdempotencyMiddleware // P0-P4修复: 使用DB-backed幂等中间件
auditStore audit.AuditStore // P0-R08修复: 使用接口支持DB-backed实现 auditStore audit.AuditStore // P0-R08修复: 使用接口支持DB-backed实现
supplierID int64 fkValidator *repository.ForeignKeyValidator // P0-09修复: 外键校验器
statementBaseURL string supplierID int64
now func() time.Time statementBaseURL string
now func() time.Time
} }
func NewSupplyAPI( func NewSupplyAPI(
@@ -36,6 +37,7 @@ func NewSupplyAPI(
earningService domain.EarningService, earningService domain.EarningService,
idempotencyMw *middleware.IdempotencyMiddleware, idempotencyMw *middleware.IdempotencyMiddleware,
auditStore audit.AuditStore, auditStore audit.AuditStore,
fkValidator *repository.ForeignKeyValidator,
supplierID int64, supplierID int64,
statementBaseURL string, statementBaseURL string,
now func() time.Time, now func() time.Time,
@@ -47,6 +49,7 @@ func NewSupplyAPI(
earningService: earningService, earningService: earningService,
idempotencyMw: idempotencyMw, idempotencyMw: idempotencyMw,
auditStore: auditStore, auditStore: auditStore,
fkValidator: fkValidator,
supplierID: supplierID, supplierID: supplierID,
statementBaseURL: statementBaseURL, statementBaseURL: statementBaseURL,
now: now, now: now,
@@ -164,6 +167,14 @@ func (a *SupplyAPI) createAccountHandler(ctx context.Context, w http.ResponseWri
return err return err
} }
// P0-09修复: 创建账户前校验外键引用
if a.fkValidator != nil {
if err := a.fkValidator.ValidateSupplyAccountOwner(ctx, a.supplierID); err != nil {
writeError(w, http.StatusUnprocessableEntity, "FK_VALIDATION_FAILED", "supplier does not exist")
return err
}
}
createReq := &domain.CreateAccountRequest{ createReq := &domain.CreateAccountRequest{
SupplierID: a.supplierID, SupplierID: a.supplierID,
Provider: domain.Provider(rawReq.Provider), Provider: domain.Provider(rawReq.Provider),
@@ -367,6 +378,14 @@ func (a *SupplyAPI) handleCreatePackageDraft(w http.ResponseWriter, r *http.Requ
return return
} }
// P0-09修复: 创建套餐前校验外键引用
if a.fkValidator != nil {
if err := a.fkValidator.ValidatePackageSupplyAccount(r.Context(), req.SupplyAccountID); err != nil {
writeError(w, http.StatusUnprocessableEntity, "FK_VALIDATION_FAILED", "supply account does not exist")
return
}
}
createReq := &domain.CreatePackageDraftRequest{ createReq := &domain.CreatePackageDraftRequest{
SupplierID: a.supplierID, SupplierID: a.supplierID,
AccountID: req.SupplyAccountID, AccountID: req.SupplyAccountID,

View File

@@ -0,0 +1,223 @@
package repository
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
)
// ==================== P0-09 外键约束策略 ====================
// 问题:跨域模型缺少外键约束策略声明
// 修复方案:应用层外键 + 定期一致性校验
// ForeignKeyValidator 应用层外键校验器
type ForeignKeyValidator struct {
pool *pgxpool.Pool
}
// NewForeignKeyValidator 创建外键校验器
func NewForeignKeyValidator(pool *pgxpool.Pool) *ForeignKeyValidator {
return &ForeignKeyValidator{pool: pool}
}
// ValidateReference 验证引用完整性
func (v *ForeignKeyValidator) ValidateReference(ctx context.Context, ref ReferenceCheck) error {
var exists bool
err := v.pool.QueryRow(ctx, ref.CheckSQL, ref.Args...).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check reference: %w", err)
}
if !exists {
return ErrReferencedEntityNotFound
}
return nil
}
// ReferenceCheck 引用检查
type ReferenceCheck struct {
TableName string
FieldName string
FieldValue interface{}
CheckSQL string
Args []interface{}
}
// ErrReferencedEntityNotFound 引用实体不存在
var ErrReferencedEntityNotFound = fmt.Errorf("referenced entity not found")
// ValidateSupplyAccountOwner 校验供应账号所属用户存在
func (v *ForeignKeyValidator) ValidateSupplyAccountOwner(ctx context.Context, userID int64) error {
return v.ValidateReference(ctx, ReferenceCheck{
TableName: "iam_users",
FieldName: "id",
FieldValue: userID,
CheckSQL: "SELECT EXISTS(SELECT 1 FROM iam_users WHERE id = $1)",
Args: []interface{}{userID},
})
}
// ValidatePackageSupplyAccount 校验套餐所属供应账号存在
func (v *ForeignKeyValidator) ValidatePackageSupplyAccount(ctx context.Context, accountID int64) error {
return v.ValidateReference(ctx, ReferenceCheck{
TableName: "supply_accounts",
FieldName: "id",
FieldValue: accountID,
CheckSQL: "SELECT EXISTS(SELECT 1 FROM supply_accounts WHERE id = $1)",
Args: []interface{}{accountID},
})
}
// ValidateOrderSupplyAccount 校验订单所属供应账号存在
func (v *ForeignKeyValidator) ValidateOrderSupplyAccount(ctx context.Context, accountID int64) error {
return v.ValidateReference(ctx, ReferenceCheck{
TableName: "supply_accounts",
FieldName: "id",
FieldValue: accountID,
CheckSQL: "SELECT EXISTS(SELECT 1 FROM supply_accounts WHERE id = $1)",
Args: []interface{}{accountID},
})
}
// ValidateOrderSupplyPackage 校验订单所属套餐存在
func (v *ForeignKeyValidator) ValidateOrderSupplyPackage(ctx context.Context, packageID int64) error {
return v.ValidateReference(ctx, ReferenceCheck{
TableName: "supply_packages",
FieldName: "id",
FieldValue: packageID,
CheckSQL: "SELECT EXISTS(SELECT 1 FROM supply_packages WHERE id = $1)",
Args: []interface{}{packageID},
})
}
// ValidateBillingAccount 校验账户所属租户存在
func (v *ForeignKeyValidator) ValidateBillingAccount(ctx context.Context, tenantID int64) error {
return v.ValidateReference(ctx, ReferenceCheck{
TableName: "core_tenants",
FieldName: "id",
FieldValue: tenantID,
CheckSQL: "SELECT EXISTS(SELECT 1 FROM core_tenants WHERE id = $1)",
Args: []interface{}{tenantID},
})
}
// OrphanRecordCheck 孤立记录检查结果
type OrphanRecordCheck struct {
TableName string
FieldName string
Count int64
}
// orphanCheckSQL 孤立检查SQL
type orphanCheckSQL struct {
TableName string
FieldName string
SQL string
}
// CheckOrphanRecords 执行孤立记录检查
func (v *ForeignKeyValidator) CheckOrphanRecords(ctx context.Context) ([]OrphanRecordCheck, error) {
checks := []orphanCheckSQL{
// 检查孤立的supply_accounts
{
TableName: "supply_accounts",
FieldName: "user_id",
SQL: `SELECT COUNT(*) FROM supply_accounts sa WHERE NOT EXISTS (SELECT 1 FROM iam_users WHERE id = sa.user_id)`,
},
// 检查孤立的supply_packages
{
TableName: "supply_packages",
FieldName: "supply_account_id",
SQL: `SELECT COUNT(*) FROM supply_packages sp WHERE NOT EXISTS (SELECT 1 FROM supply_accounts WHERE id = sp.supply_account_id)`,
},
// 检查孤立的supply_orders (supply_account_id)
{
TableName: "supply_orders",
FieldName: "supply_account_id",
SQL: `SELECT COUNT(*) FROM supply_orders so WHERE NOT EXISTS (SELECT 1 FROM supply_accounts WHERE id = so.supply_account_id)`,
},
// 检查孤立的supply_orders (supply_package_id)
{
TableName: "supply_orders",
FieldName: "supply_package_id",
SQL: `SELECT COUNT(*) FROM supply_orders so WHERE NOT EXISTS (SELECT 1 FROM supply_packages WHERE id = so.supply_package_id)`,
},
}
var results []OrphanRecordCheck
for _, check := range checks {
var count int64
err := v.pool.QueryRow(ctx, check.SQL).Scan(&count)
if err != nil {
return nil, fmt.Errorf("failed to check orphans for %s.%s: %w", check.TableName, check.FieldName, err)
}
if count > 0 {
results = append(results, OrphanRecordCheck{
TableName: check.TableName,
FieldName: check.FieldName,
Count: count,
})
}
}
return results, nil
}
// ForeignKeyPolicy 外键策略定义
type ForeignKeyPolicy struct {
// 保留物理外键的表
PhysicalFKTables []string
// 使用应用层外键的表
ApplicationFKTables []string
// 无外键的表
NoFKTables []string
}
// GetDefaultForeignKeyPolicy 获取默认外键策略
func GetDefaultForeignKeyPolicy() *ForeignKeyPolicy {
return &ForeignKeyPolicy{
// 核心实体表保留物理外键
PhysicalFKTables: []string{
"core_tenants",
"core_projects",
"iam_users",
"billing_accounts",
},
// 高频写入表使用应用层外键
ApplicationFKTables: []string{
"supply_accounts",
"supply_packages",
"supply_orders",
"supply_usage_records",
"supply_settlements",
},
// 审计/日志表无外键
NoFKTables: []string{
"audit_events",
"outbox_events",
"outbox_dead_letter",
"supply_idempotency_record",
"supply_batch_compensation",
},
}
}
// GetPolicyForTable 获取表的外键策略
func (p *ForeignKeyPolicy) GetPolicyForTable(tableName string) string {
for _, t := range p.PhysicalFKTables {
if t == tableName {
return "physical"
}
}
for _, t := range p.ApplicationFKTables {
if t == tableName {
return "application"
}
}
for _, t := range p.NoFKTables {
if t == tableName {
return "none"
}
}
return "unknown"
}