diff --git a/supply-api/cmd/supply-api/main.go b/supply-api/cmd/supply-api/main.go index d9e2438d..6db8396a 100644 --- a/supply-api/cmd/supply-api/main.go +++ b/supply-api/cmd/supply-api/main.go @@ -3,6 +3,7 @@ package main import ( "context" "crypto/x509" + "encoding/json" "encoding/pem" "flag" "fmt" @@ -115,6 +116,15 @@ func main() { log.Println("警告: 审计存储使用内存实现 (生产环境不应使用)") } + // P0-09修复: 初始化外键校验器 + var fkValidator *repository.ForeignKeyValidator + if db != nil { + fkValidator = repository.NewForeignKeyValidator(db.Pool) + log.Println("外键校验器: 已初始化 (PostgreSQL-backed)") + } else { + log.Println("警告: 外键校验器未启用 (db不可用)") + } + // 初始化不变量检查器 invariantChecker := domain.NewInvariantChecker(accountStore, packageStore, settlementStore) _ = invariantChecker // 用于业务逻辑校验 @@ -198,6 +208,7 @@ func main() { earningService, idempotencyMiddleware, // 使用幂等中间件(DB-backed) auditStore, + fkValidator, // P0-09修复: 外键校验器 cfg.Server.DefaultSupplierID, cfg.Server.StatementBaseURL, time.Now, @@ -310,6 +321,14 @@ func main() { } } }() + + // P0-07修复: 初始化批量补偿处理器 + compensationStore := domain.NewSQLCompensationStore(db.Pool) + compensationStats := &domain.NoOpCompensationStats{} + compensationExecutor := &defaultCompensationExecutor{} // 需要实现OperationExecutor接口 + compensationProcessor := domain.NewCompensationProcessor(compensationStore, compensationExecutor, compensationStats) + log.Println("批量补偿处理器: 已初始化") + _ = compensationProcessor // TODO: 启动后台补偿处理goroutine } // 优雅关闭 @@ -719,6 +738,18 @@ func calculateOutboxBackoff(retryCount, maxRetries int) int { // Ensure domain.OutboxEvent is compatible with our conversion var _ = domain.OutboxEvent{} +// ==================== 补偿执行器 ==================== + +// defaultCompensationExecutor 默认补偿执行器 +type defaultCompensationExecutor struct{} + +func (e *defaultCompensationExecutor) Execute(ctx context.Context, operationType string, payload json.RawMessage) error { + // TODO: 根据operationType执行相应的补偿操作 + // 目前为placeholder实现,实际生产需要根据业务类型实现具体逻辑 + log.Printf("补偿执行器: operation_type=%s, payload=%s", operationType, string(payload)) + return nil +} + // parseRSAPublicKey 解析PEM格式的RSA公钥 func parseRSAPublicKey(pemKey string) interface{} { if pemKey == "" { diff --git a/supply-api/internal/domain/compensation.go b/supply-api/internal/domain/compensation.go new file mode 100644 index 00000000..db36a771 --- /dev/null +++ b/supply-api/internal/domain/compensation.go @@ -0,0 +1,261 @@ +package domain + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// ==================== P0-07 批量补偿策略 ==================== + +// BatchCompensation 批量补偿记录 +type BatchCompensation struct { + ID int64 `json:"id"` + BatchID string `json:"batch_id"` + OperationType string `json:"operation_type"` + ItemIndex int `json:"item_index"` + ItemPayload json.RawMessage `json:"item_payload"` + FailureReason string `json:"failure_reason,omitempty"` + Status string `json:"status"` // pending, retrying, resolved, manual_required, abandoned + RetryCount int `json:"retry_count"` + MaxRetries int `json:"max_retries"` + ResolvedAt *time.Time `json:"resolved_at,omitempty"` + ResolvedBy *int64 `json:"resolved_by,omitempty"` + ResolutionNotes string `json:"resolution_notes,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + CreatedBy *int64 `json:"created_by,omitempty"` + Version int64 `json:"version"` +} + +// CompensationStatus 补偿状态 +const ( + CompensationStatusPending = "pending" + CompensationStatusRetrying = "retrying" + CompensationStatusResolved = "resolved" + CompensationStatusManualRequired = "manual_required" + CompensationStatusAbandoned = "abandoned" +) + +// CompensationStore 补偿存储接口 +type CompensationStore interface { + // Create 创建补偿记录 + Create(ctx context.Context, comp *BatchCompensation) (int64, error) + // GetByBatchID 获取批次补偿列表 + GetByBatchID(ctx context.Context, batchID string) ([]*BatchCompensation, error) + // UpdateStatus 更新状态 + UpdateStatus(ctx context.Context, id int64, status string) error + // Resolve 解决补偿 + Resolve(ctx context.Context, id int64, resolvedBy int64, notes string) error + // MarkManualRequired 标记需要人工介入 + MarkManualRequired(ctx context.Context, id int64, reason string) error +} + +// CompensationProcessor 补偿处理器 +type CompensationProcessor struct { + store CompensationStore + operationExecutor OperationExecutor + stats CompensationStats +} + +// OperationExecutor 操作执行器接口 +type OperationExecutor interface { + // Execute 执行单个操作 + Execute(ctx context.Context, operationType string, payload json.RawMessage) error +} + +// CompensationStats 补偿统计接口 +type CompensationStats interface { + RecordCompensationRetry(operationType string) + RecordCompensationResolved(operationType string) + RecordCompensationManual(operationType string) +} + +// DefaultCompensationConfig 默认补偿配置 +func DefaultCompensationConfig() *CompensationConfig { + return &CompensationConfig{ + MaxRetries: 3, + RetryInterval: 1 * time.Minute, + } +} + +// NoOpCompensationStats No-op补偿统计实现 +type NoOpCompensationStats struct{} + +func (s *NoOpCompensationStats) RecordCompensationRetry(operationType string) {} +func (s *NoOpCompensationStats) RecordCompensationResolved(operationType string) {} +func (s *NoOpCompensationStats) RecordCompensationManual(operationType string) {} + +// NewCompensationProcessor 创建补偿处理器 +func NewCompensationProcessor(store CompensationStore, executor OperationExecutor, stats CompensationStats) *CompensationProcessor { + return &CompensationProcessor{ + store: store, + operationExecutor: executor, + stats: stats, + } +} + +// CompensationConfig 补偿配置 +type CompensationConfig struct { + MaxRetries int + RetryInterval time.Duration +} + +// ProcessBatchCompensations 处理批次补偿 +func (p *CompensationProcessor) ProcessBatchCompensations(ctx context.Context, batchID string) (*CompensationResult, error) { + // 获取批次补偿列表 + compensations, err := p.store.GetByBatchID(ctx, batchID) + if err != nil { + return nil, fmt.Errorf("failed to get compensations: %w", err) + } + + result := &CompensationResult{ + BatchID: batchID, + TotalItems: len(compensations), + SuccessCount: 0, + RetryCount: 0, + ManualCount: 0, + FailedCount: 0, + } + + for _, comp := range compensations { + if comp.Status != CompensationStatusPending { + continue + } + + // 重试执行 + err := p.operationExecutor.Execute(ctx, comp.OperationType, comp.ItemPayload) + if err != nil { + comp.RetryCount++ + comp.FailureReason = err.Error() + + if comp.RetryCount >= comp.MaxRetries { + // 超过最大重试次数,标记需要人工介入 + if err := p.store.MarkManualRequired(ctx, comp.ID, err.Error()); err != nil { + result.FailedCount++ + continue + } + result.ManualCount++ + p.stats.RecordCompensationManual(comp.OperationType) + } else { + // 继续重试 + if err := p.store.UpdateStatus(ctx, comp.ID, CompensationStatusRetrying); err != nil { + result.FailedCount++ + continue + } + result.RetryCount++ + p.stats.RecordCompensationRetry(comp.OperationType) + } + } else { + // 执行成功,标记解决 + if err := p.store.Resolve(ctx, comp.ID, 0, "auto_resolved"); err != nil { + result.FailedCount++ + continue + } + result.SuccessCount++ + p.stats.RecordCompensationResolved(comp.OperationType) + } + } + + return result, nil +} + +// CompensationResult 补偿处理结果 +type CompensationResult struct { + BatchID string `json:"batch_id"` + TotalItems int `json:"total_items"` + SuccessCount int `json:"success_count"` + RetryCount int `json:"retry_count"` + ManualCount int `json:"manual_count"` + FailedCount int `json:"failed_count"` +} + +// SQLCompensationStore SQL实现的补偿存储 +type SQLCompensationStore struct { + pool *pgxpool.Pool +} + +// NewSQLCompensationStore 创建SQL补偿存储 +func NewSQLCompensationStore(pool *pgxpool.Pool) *SQLCompensationStore { + return &SQLCompensationStore{pool: pool} +} + +func (s *SQLCompensationStore) Create(ctx context.Context, comp *BatchCompensation) (int64, error) { + var id int64 + err := s.pool.QueryRow(ctx, ` + INSERT INTO supply_batch_compensation ( + batch_id, operation_type, item_index, item_payload, + failure_reason, status, max_retries, created_by + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING id + `, comp.BatchID, comp.OperationType, comp.ItemIndex, comp.ItemPayload, + comp.FailureReason, CompensationStatusPending, comp.MaxRetries, comp.CreatedBy). + Scan(&id) + return id, err +} + +func (s *SQLCompensationStore) GetByBatchID(ctx context.Context, batchID string) ([]*BatchCompensation, error) { + rows, err := s.pool.Query(ctx, ` + SELECT id, batch_id, operation_type, item_index, item_payload, + failure_reason, status, retry_count, max_retries, + resolved_at, resolved_by, resolution_notes, + created_at, updated_at, created_by, version + FROM supply_batch_compensation + WHERE batch_id = $1 + ORDER BY item_index + `, batchID) + if err != nil { + return nil, err + } + defer rows.Close() + + var compensations []*BatchCompensation + for rows.Next() { + comp := &BatchCompensation{} + err := rows.Scan( + &comp.ID, &comp.BatchID, &comp.OperationType, &comp.ItemIndex, + &comp.ItemPayload, &comp.FailureReason, &comp.Status, + &comp.RetryCount, &comp.MaxRetries, &comp.ResolvedAt, + &comp.ResolvedBy, &comp.ResolutionNotes, &comp.CreatedAt, + &comp.UpdatedAt, &comp.CreatedBy, &comp.Version, + ) + if err != nil { + return nil, err + } + compensations = append(compensations, comp) + } + return compensations, rows.Err() +} + +func (s *SQLCompensationStore) UpdateStatus(ctx context.Context, id int64, status string) error { + _, err := s.pool.Exec(ctx, ` + UPDATE supply_batch_compensation + SET status = $1, updated_at = CURRENT_TIMESTAMP, version = version + 1 + WHERE id = $2 + `, status, id) + return err +} + +func (s *SQLCompensationStore) Resolve(ctx context.Context, id int64, resolvedBy int64, notes string) error { + _, err := s.pool.Exec(ctx, ` + UPDATE supply_batch_compensation + SET status = $1, resolved_at = CURRENT_TIMESTAMP, + resolved_by = $2, resolution_notes = $3, + updated_at = CURRENT_TIMESTAMP, version = version + 1 + WHERE id = $4 + `, CompensationStatusResolved, resolvedBy, notes, id) + return err +} + +func (s *SQLCompensationStore) MarkManualRequired(ctx context.Context, id int64, reason string) error { + _, err := s.pool.Exec(ctx, ` + UPDATE supply_batch_compensation + SET status = $1, failure_reason = COALESCE(failure_reason || '; ', '') || $2, + updated_at = CURRENT_TIMESTAMP, version = version + 1 + WHERE id = $3 + `, CompensationStatusManualRequired, reason, id) + return err +} diff --git a/supply-api/internal/httpapi/supply_api.go b/supply-api/internal/httpapi/supply_api.go index d5cdcaf0..89b52f3f 100644 --- a/supply-api/internal/httpapi/supply_api.go +++ b/supply-api/internal/httpapi/supply_api.go @@ -21,12 +21,13 @@ type SupplyAPI struct { accountService domain.AccountService packageService domain.PackageService settlementService domain.SettlementService - earningService domain.EarningService - idempotencyMw *middleware.IdempotencyMiddleware // P0-P4修复: 使用DB-backed幂等中间件 - auditStore audit.AuditStore // P0-R08修复: 使用接口支持DB-backed实现 - supplierID int64 - statementBaseURL string - now func() time.Time + earningService domain.EarningService + idempotencyMw *middleware.IdempotencyMiddleware // P0-P4修复: 使用DB-backed幂等中间件 + auditStore audit.AuditStore // P0-R08修复: 使用接口支持DB-backed实现 + fkValidator *repository.ForeignKeyValidator // P0-09修复: 外键校验器 + supplierID int64 + statementBaseURL string + now func() time.Time } func NewSupplyAPI( @@ -36,6 +37,7 @@ func NewSupplyAPI( earningService domain.EarningService, idempotencyMw *middleware.IdempotencyMiddleware, auditStore audit.AuditStore, + fkValidator *repository.ForeignKeyValidator, supplierID int64, statementBaseURL string, now func() time.Time, @@ -47,6 +49,7 @@ func NewSupplyAPI( earningService: earningService, idempotencyMw: idempotencyMw, auditStore: auditStore, + fkValidator: fkValidator, supplierID: supplierID, statementBaseURL: statementBaseURL, now: now, @@ -164,6 +167,14 @@ func (a *SupplyAPI) createAccountHandler(ctx context.Context, w http.ResponseWri return err } + // P0-09修复: 创建账户前校验外键引用 + if a.fkValidator != nil { + if err := a.fkValidator.ValidateSupplyAccountOwner(ctx, a.supplierID); err != nil { + writeError(w, http.StatusUnprocessableEntity, "FK_VALIDATION_FAILED", "supplier does not exist") + return err + } + } + createReq := &domain.CreateAccountRequest{ SupplierID: a.supplierID, Provider: domain.Provider(rawReq.Provider), @@ -367,6 +378,14 @@ func (a *SupplyAPI) handleCreatePackageDraft(w http.ResponseWriter, r *http.Requ return } + // P0-09修复: 创建套餐前校验外键引用 + if a.fkValidator != nil { + if err := a.fkValidator.ValidatePackageSupplyAccount(r.Context(), req.SupplyAccountID); err != nil { + writeError(w, http.StatusUnprocessableEntity, "FK_VALIDATION_FAILED", "supply account does not exist") + return + } + } + createReq := &domain.CreatePackageDraftRequest{ SupplierID: a.supplierID, AccountID: req.SupplyAccountID, diff --git a/supply-api/internal/repository/foreign_key_validator.go b/supply-api/internal/repository/foreign_key_validator.go new file mode 100644 index 00000000..aff9f426 --- /dev/null +++ b/supply-api/internal/repository/foreign_key_validator.go @@ -0,0 +1,223 @@ +package repository + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// ==================== P0-09 外键约束策略 ==================== +// 问题:跨域模型缺少外键约束策略声明 +// 修复方案:应用层外键 + 定期一致性校验 + +// ForeignKeyValidator 应用层外键校验器 +type ForeignKeyValidator struct { + pool *pgxpool.Pool +} + +// NewForeignKeyValidator 创建外键校验器 +func NewForeignKeyValidator(pool *pgxpool.Pool) *ForeignKeyValidator { + return &ForeignKeyValidator{pool: pool} +} + +// ValidateReference 验证引用完整性 +func (v *ForeignKeyValidator) ValidateReference(ctx context.Context, ref ReferenceCheck) error { + var exists bool + err := v.pool.QueryRow(ctx, ref.CheckSQL, ref.Args...).Scan(&exists) + if err != nil { + return fmt.Errorf("failed to check reference: %w", err) + } + if !exists { + return ErrReferencedEntityNotFound + } + return nil +} + +// ReferenceCheck 引用检查 +type ReferenceCheck struct { + TableName string + FieldName string + FieldValue interface{} + CheckSQL string + Args []interface{} +} + +// ErrReferencedEntityNotFound 引用实体不存在 +var ErrReferencedEntityNotFound = fmt.Errorf("referenced entity not found") + +// ValidateSupplyAccountOwner 校验供应账号所属用户存在 +func (v *ForeignKeyValidator) ValidateSupplyAccountOwner(ctx context.Context, userID int64) error { + return v.ValidateReference(ctx, ReferenceCheck{ + TableName: "iam_users", + FieldName: "id", + FieldValue: userID, + CheckSQL: "SELECT EXISTS(SELECT 1 FROM iam_users WHERE id = $1)", + Args: []interface{}{userID}, + }) +} + +// ValidatePackageSupplyAccount 校验套餐所属供应账号存在 +func (v *ForeignKeyValidator) ValidatePackageSupplyAccount(ctx context.Context, accountID int64) error { + return v.ValidateReference(ctx, ReferenceCheck{ + TableName: "supply_accounts", + FieldName: "id", + FieldValue: accountID, + CheckSQL: "SELECT EXISTS(SELECT 1 FROM supply_accounts WHERE id = $1)", + Args: []interface{}{accountID}, + }) +} + +// ValidateOrderSupplyAccount 校验订单所属供应账号存在 +func (v *ForeignKeyValidator) ValidateOrderSupplyAccount(ctx context.Context, accountID int64) error { + return v.ValidateReference(ctx, ReferenceCheck{ + TableName: "supply_accounts", + FieldName: "id", + FieldValue: accountID, + CheckSQL: "SELECT EXISTS(SELECT 1 FROM supply_accounts WHERE id = $1)", + Args: []interface{}{accountID}, + }) +} + +// ValidateOrderSupplyPackage 校验订单所属套餐存在 +func (v *ForeignKeyValidator) ValidateOrderSupplyPackage(ctx context.Context, packageID int64) error { + return v.ValidateReference(ctx, ReferenceCheck{ + TableName: "supply_packages", + FieldName: "id", + FieldValue: packageID, + CheckSQL: "SELECT EXISTS(SELECT 1 FROM supply_packages WHERE id = $1)", + Args: []interface{}{packageID}, + }) +} + +// ValidateBillingAccount 校验账户所属租户存在 +func (v *ForeignKeyValidator) ValidateBillingAccount(ctx context.Context, tenantID int64) error { + return v.ValidateReference(ctx, ReferenceCheck{ + TableName: "core_tenants", + FieldName: "id", + FieldValue: tenantID, + CheckSQL: "SELECT EXISTS(SELECT 1 FROM core_tenants WHERE id = $1)", + Args: []interface{}{tenantID}, + }) +} + +// OrphanRecordCheck 孤立记录检查结果 +type OrphanRecordCheck struct { + TableName string + FieldName string + Count int64 +} + +// orphanCheckSQL 孤立检查SQL +type orphanCheckSQL struct { + TableName string + FieldName string + SQL string +} + +// CheckOrphanRecords 执行孤立记录检查 +func (v *ForeignKeyValidator) CheckOrphanRecords(ctx context.Context) ([]OrphanRecordCheck, error) { + checks := []orphanCheckSQL{ + // 检查孤立的supply_accounts + { + TableName: "supply_accounts", + FieldName: "user_id", + SQL: `SELECT COUNT(*) FROM supply_accounts sa WHERE NOT EXISTS (SELECT 1 FROM iam_users WHERE id = sa.user_id)`, + }, + // 检查孤立的supply_packages + { + TableName: "supply_packages", + FieldName: "supply_account_id", + SQL: `SELECT COUNT(*) FROM supply_packages sp WHERE NOT EXISTS (SELECT 1 FROM supply_accounts WHERE id = sp.supply_account_id)`, + }, + // 检查孤立的supply_orders (supply_account_id) + { + TableName: "supply_orders", + FieldName: "supply_account_id", + SQL: `SELECT COUNT(*) FROM supply_orders so WHERE NOT EXISTS (SELECT 1 FROM supply_accounts WHERE id = so.supply_account_id)`, + }, + // 检查孤立的supply_orders (supply_package_id) + { + TableName: "supply_orders", + FieldName: "supply_package_id", + SQL: `SELECT COUNT(*) FROM supply_orders so WHERE NOT EXISTS (SELECT 1 FROM supply_packages WHERE id = so.supply_package_id)`, + }, + } + + var results []OrphanRecordCheck + for _, check := range checks { + var count int64 + err := v.pool.QueryRow(ctx, check.SQL).Scan(&count) + if err != nil { + return nil, fmt.Errorf("failed to check orphans for %s.%s: %w", check.TableName, check.FieldName, err) + } + if count > 0 { + results = append(results, OrphanRecordCheck{ + TableName: check.TableName, + FieldName: check.FieldName, + Count: count, + }) + } + } + + return results, nil +} + +// ForeignKeyPolicy 外键策略定义 +type ForeignKeyPolicy struct { + // 保留物理外键的表 + PhysicalFKTables []string + // 使用应用层外键的表 + ApplicationFKTables []string + // 无外键的表 + NoFKTables []string +} + +// GetDefaultForeignKeyPolicy 获取默认外键策略 +func GetDefaultForeignKeyPolicy() *ForeignKeyPolicy { + return &ForeignKeyPolicy{ + // 核心实体表保留物理外键 + PhysicalFKTables: []string{ + "core_tenants", + "core_projects", + "iam_users", + "billing_accounts", + }, + // 高频写入表使用应用层外键 + ApplicationFKTables: []string{ + "supply_accounts", + "supply_packages", + "supply_orders", + "supply_usage_records", + "supply_settlements", + }, + // 审计/日志表无外键 + NoFKTables: []string{ + "audit_events", + "outbox_events", + "outbox_dead_letter", + "supply_idempotency_record", + "supply_batch_compensation", + }, + } +} + +// GetPolicyForTable 获取表的外键策略 +func (p *ForeignKeyPolicy) GetPolicyForTable(tableName string) string { + for _, t := range p.PhysicalFKTables { + if t == tableName { + return "physical" + } + } + for _, t := range p.ApplicationFKTables { + if t == tableName { + return "application" + } + } + for _, t := range p.NoFKTables { + if t == tableName { + return "none" + } + } + return "unknown" +}