Files
2026-05-12 18:49:52 +08:00

187 lines
8.1 KiB
Go

package gatewayconsumer
import (
"context"
"errors"
"strings"
"time"
"supply-intelligence/internal/domain"
"supply-intelligence/internal/metrics"
)
var ErrInvalidConsumeInput = errors.New("invalid consume input")
type GatewayApplyResult struct {
AckResult domain.GatewayAckResult
Retryable bool
FailureCategory domain.GatewayFailureCategory
Detail string
}
type PackageChangeRepository interface {
ListPackageEventsAfter(ctx context.Context, cursor string) ([]domain.PackageChangeEvent, string)
ListRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time, limit int) []domain.PackageChangeEvent
AckPackageEvent(ctx context.Context, eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error)
MarkPackageEventRetry(ctx context.Context, eventID string, retryCount int, nextRetryAt time.Time, category domain.GatewayFailureCategory, detail string, retriedAt time.Time) (domain.PackageChangeEvent, error)
CountPackageEventsBySyncStatus(ctx context.Context, status domain.GatewaySyncStatus) int
CountRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time) int
UpsertGatewayAppliedSnapshot(ctx context.Context, snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot
// ListSupplyAccountsByConsumer returns accounts authorized for a given consumer tag.
ListSupplyAccountsByConsumer(ctx context.Context, consumerTag string) []domain.SupplyAccount
}
type Service struct {
repo PackageChangeRepository
now func() time.Time
applier func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error)
consumer string
}
func (s *Service) SetConsumer(consumer string) {
consumer = strings.TrimSpace(consumer)
if consumer == "" {
return
}
s.consumer = consumer
}
type ConsumeOnceInput struct {
Consumer string
Cursor string
}
type ConsumeOnceOutput struct {
Consumer string `json:"consumer"`
NextCursor string `json:"next_cursor"`
Items []ConsumedPackageChangeItem `json:"items"`
}
type ConsumedPackageChangeItem struct {
EventID string `json:"event_id"`
PackageID int64 `json:"package_id"`
GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"`
Result domain.GatewayAckResult `json:"result"`
Detail string `json:"detail,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
NextRetryAt *time.Time `json:"next_retry_at,omitempty"`
FailureCategory domain.GatewayFailureCategory `json:"failure_category,omitempty"`
}
func (s *Service) buildAllowedAccountSetWithConsumer(ctx context.Context, consumer string) map[int64]bool {
allowed := make(map[int64]bool)
if s.repo == nil {
return allowed
}
accounts := s.repo.ListSupplyAccountsByConsumer(ctx, consumer)
for _, acc := range accounts {
allowed[acc.AccountID] = true
}
return allowed
}
func (s *Service) isAuthorizedForEvent(ctx context.Context, event domain.PackageChangeEvent, allowed map[int64]bool) bool {
if len(allowed) == 0 {
if s.repo == nil {
return true
}
if accountRepo, ok := s.repo.(interface {
ListSupplyAccounts(context.Context) []domain.SupplyAccount
}); ok {
allAccounts := accountRepo.ListSupplyAccounts(ctx)
if len(allAccounts) == 0 {
return true
}
return false
}
return true
}
return allowed[event.AccountID]
}
func NewService(repo PackageChangeRepository) *Service {
return &Service{
repo: repo,
now: func() time.Time { return time.Now().UTC() },
consumer: "gateway",
applier: func(_ context.Context, event domain.PackageChangeEvent) (GatewayApplyResult, error) {
if strings.Contains(strings.ToLower(event.Model), "fail") {
return GatewayApplyResult{AckResult: domain.GatewayAckResultFailed, Retryable: false, FailureCategory: domain.GatewayFailureCategoryUnknown, Detail: "simulated apply failure"}, nil
}
return GatewayApplyResult{AckResult: domain.GatewayAckResultApplied, Detail: "applied to gateway snapshot"}, nil
},
}
}
func (s *Service) SetApplier(applier func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error)) {
s.applier = applier
}
func retryDelay(retryCount int) time.Duration {
switch retryCount {
case 1:
return time.Minute
case 2:
return 5 * time.Minute
default:
return 15 * time.Minute
}
}
func (s *Service) ConsumeOnce(ctx context.Context, input ConsumeOnceInput) (ConsumeOnceOutput, error) {
if s == nil || s.repo == nil || s.applier == nil {
return ConsumeOnceOutput{}, ErrInvalidConsumeInput
}
consumer := strings.TrimSpace(input.Consumer)
if consumer == "" {
consumer = s.consumer
}
items, nextCursor := s.repo.ListPackageEventsAfter(ctx, strings.TrimSpace(input.Cursor))
allowed := s.buildAllowedAccountSetWithConsumer(ctx, consumer)
result := ConsumeOnceOutput{Consumer: consumer, NextCursor: nextCursor, Items: make([]ConsumedPackageChangeItem, 0, len(items))}
for _, event := range items {
if !s.isAuthorizedForEvent(ctx, event, allowed) || event.GatewaySyncStatus != domain.GatewaySyncStatusPending {
continue
}
attempt, err := s.applier(ctx, event)
if err != nil {
return ConsumeOnceOutput{}, err
}
now := s.now()
switch {
case attempt.AckResult == domain.GatewayAckResultApplied:
s.repo.UpsertGatewayAppliedSnapshot(ctx, domain.GatewayAppliedSnapshot{Consumer: consumer, LastEventID: event.EventID, LastPackageID: event.PackageID, LastPlatform: event.Platform, LastModel: event.Model, LastAppliedVersion: event.Version, LastResult: string(attempt.AckResult), UpdatedAt: now})
updated, err := s.repo.AckPackageEvent(ctx, event.EventID, consumer, attempt.AckResult, attempt.Detail, now)
if err != nil {
return ConsumeOnceOutput{}, err
}
metrics.GatewayEventsProcessedTotal.WithLabelValues(event.Platform, event.EventType, string(attempt.AckResult)).Inc()
metrics.GatewayEventLatencySeconds.WithLabelValues(event.Platform).Observe(time.Since(event.OccurredAt).Seconds())
result.Items = append(result.Items, ConsumedPackageChangeItem{EventID: updated.EventID, PackageID: updated.PackageID, GatewaySyncStatus: updated.GatewaySyncStatus, Result: attempt.AckResult, Detail: attempt.Detail})
case attempt.Retryable && event.RetryCount < 2:
retryCount := event.RetryCount + 1
nextRetryAt := now.Add(retryDelay(retryCount))
updated, err := s.repo.MarkPackageEventRetry(ctx, event.EventID, retryCount, nextRetryAt, attempt.FailureCategory, attempt.Detail, now)
if err != nil {
return ConsumeOnceOutput{}, err
}
metrics.GatewayEventRetriesTotal.WithLabelValues(event.Platform, string(attempt.FailureCategory)).Inc()
metrics.GatewayPendingRetryEvents.WithLabelValues(consumer).Set(float64(s.repo.CountRetryablePendingPackageEvents(ctx, consumer, now)))
result.Items = append(result.Items, ConsumedPackageChangeItem{EventID: updated.EventID, PackageID: updated.PackageID, GatewaySyncStatus: updated.GatewaySyncStatus, Result: domain.GatewayAckResultPending, Detail: attempt.Detail, RetryCount: updated.RetryCount, NextRetryAt: updated.NextRetryAt, FailureCategory: updated.LastFailureCategory})
default:
updated, err := s.repo.AckPackageEvent(ctx, event.EventID, consumer, domain.GatewayAckResultFailed, attempt.Detail, now)
if err != nil {
return ConsumeOnceOutput{}, err
}
if attempt.FailureCategory != "" {
updated.LastFailureCategory = attempt.FailureCategory
updated.LastFailureDetail = attempt.Detail
}
metrics.GatewayEventsProcessedTotal.WithLabelValues(event.Platform, event.EventType, string(domain.GatewayAckResultFailed)).Inc()
metrics.GatewayFailedEvents.WithLabelValues(consumer).Set(float64(s.repo.CountPackageEventsBySyncStatus(ctx, domain.GatewaySyncStatusFailed)))
result.Items = append(result.Items, ConsumedPackageChangeItem{EventID: updated.EventID, PackageID: updated.PackageID, GatewaySyncStatus: updated.GatewaySyncStatus, Result: domain.GatewayAckResultFailed, Detail: attempt.Detail, FailureCategory: updated.LastFailureCategory})
}
}
return result, nil
}