187 lines
8.1 KiB
Go
187 lines
8.1 KiB
Go
package gatewayconsumer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/domain"
|
|
"supply-intelligence/internal/metrics"
|
|
)
|
|
|
|
var ErrInvalidConsumeInput = errors.New("invalid consume input")
|
|
|
|
type GatewayApplyResult struct {
|
|
AckResult domain.GatewayAckResult
|
|
Retryable bool
|
|
FailureCategory domain.GatewayFailureCategory
|
|
Detail string
|
|
}
|
|
|
|
type PackageChangeRepository interface {
|
|
ListPackageEventsAfter(ctx context.Context, cursor string) ([]domain.PackageChangeEvent, string)
|
|
ListRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time, limit int) []domain.PackageChangeEvent
|
|
AckPackageEvent(ctx context.Context, eventID, consumer string, result domain.GatewayAckResult, detail string, ackedAt time.Time) (domain.PackageChangeEvent, error)
|
|
MarkPackageEventRetry(ctx context.Context, eventID string, retryCount int, nextRetryAt time.Time, category domain.GatewayFailureCategory, detail string, retriedAt time.Time) (domain.PackageChangeEvent, error)
|
|
CountPackageEventsBySyncStatus(ctx context.Context, status domain.GatewaySyncStatus) int
|
|
CountRetryablePendingPackageEvents(ctx context.Context, consumer string, now time.Time) int
|
|
UpsertGatewayAppliedSnapshot(ctx context.Context, snapshot domain.GatewayAppliedSnapshot) domain.GatewayAppliedSnapshot
|
|
// ListSupplyAccountsByConsumer returns accounts authorized for a given consumer tag.
|
|
ListSupplyAccountsByConsumer(ctx context.Context, consumerTag string) []domain.SupplyAccount
|
|
}
|
|
|
|
type Service struct {
|
|
repo PackageChangeRepository
|
|
now func() time.Time
|
|
applier func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error)
|
|
consumer string
|
|
}
|
|
|
|
func (s *Service) SetConsumer(consumer string) {
|
|
consumer = strings.TrimSpace(consumer)
|
|
if consumer == "" {
|
|
return
|
|
}
|
|
s.consumer = consumer
|
|
}
|
|
|
|
type ConsumeOnceInput struct {
|
|
Consumer string
|
|
Cursor string
|
|
}
|
|
|
|
type ConsumeOnceOutput struct {
|
|
Consumer string `json:"consumer"`
|
|
NextCursor string `json:"next_cursor"`
|
|
Items []ConsumedPackageChangeItem `json:"items"`
|
|
}
|
|
|
|
type ConsumedPackageChangeItem struct {
|
|
EventID string `json:"event_id"`
|
|
PackageID int64 `json:"package_id"`
|
|
GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"`
|
|
Result domain.GatewayAckResult `json:"result"`
|
|
Detail string `json:"detail,omitempty"`
|
|
RetryCount int `json:"retry_count,omitempty"`
|
|
NextRetryAt *time.Time `json:"next_retry_at,omitempty"`
|
|
FailureCategory domain.GatewayFailureCategory `json:"failure_category,omitempty"`
|
|
}
|
|
|
|
func (s *Service) buildAllowedAccountSetWithConsumer(ctx context.Context, consumer string) map[int64]bool {
|
|
allowed := make(map[int64]bool)
|
|
if s.repo == nil {
|
|
return allowed
|
|
}
|
|
accounts := s.repo.ListSupplyAccountsByConsumer(ctx, consumer)
|
|
for _, acc := range accounts {
|
|
allowed[acc.AccountID] = true
|
|
}
|
|
return allowed
|
|
}
|
|
|
|
func (s *Service) isAuthorizedForEvent(ctx context.Context, event domain.PackageChangeEvent, allowed map[int64]bool) bool {
|
|
if len(allowed) == 0 {
|
|
if s.repo == nil {
|
|
return true
|
|
}
|
|
if accountRepo, ok := s.repo.(interface {
|
|
ListSupplyAccounts(context.Context) []domain.SupplyAccount
|
|
}); ok {
|
|
allAccounts := accountRepo.ListSupplyAccounts(ctx)
|
|
if len(allAccounts) == 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
return allowed[event.AccountID]
|
|
}
|
|
|
|
func NewService(repo PackageChangeRepository) *Service {
|
|
return &Service{
|
|
repo: repo,
|
|
now: func() time.Time { return time.Now().UTC() },
|
|
consumer: "gateway",
|
|
applier: func(_ context.Context, event domain.PackageChangeEvent) (GatewayApplyResult, error) {
|
|
if strings.Contains(strings.ToLower(event.Model), "fail") {
|
|
return GatewayApplyResult{AckResult: domain.GatewayAckResultFailed, Retryable: false, FailureCategory: domain.GatewayFailureCategoryUnknown, Detail: "simulated apply failure"}, nil
|
|
}
|
|
return GatewayApplyResult{AckResult: domain.GatewayAckResultApplied, Detail: "applied to gateway snapshot"}, nil
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *Service) SetApplier(applier func(context.Context, domain.PackageChangeEvent) (GatewayApplyResult, error)) {
|
|
s.applier = applier
|
|
}
|
|
|
|
func retryDelay(retryCount int) time.Duration {
|
|
switch retryCount {
|
|
case 1:
|
|
return time.Minute
|
|
case 2:
|
|
return 5 * time.Minute
|
|
default:
|
|
return 15 * time.Minute
|
|
}
|
|
}
|
|
|
|
func (s *Service) ConsumeOnce(ctx context.Context, input ConsumeOnceInput) (ConsumeOnceOutput, error) {
|
|
if s == nil || s.repo == nil || s.applier == nil {
|
|
return ConsumeOnceOutput{}, ErrInvalidConsumeInput
|
|
}
|
|
consumer := strings.TrimSpace(input.Consumer)
|
|
if consumer == "" {
|
|
consumer = s.consumer
|
|
}
|
|
items, nextCursor := s.repo.ListPackageEventsAfter(ctx, strings.TrimSpace(input.Cursor))
|
|
allowed := s.buildAllowedAccountSetWithConsumer(ctx, consumer)
|
|
result := ConsumeOnceOutput{Consumer: consumer, NextCursor: nextCursor, Items: make([]ConsumedPackageChangeItem, 0, len(items))}
|
|
for _, event := range items {
|
|
if !s.isAuthorizedForEvent(ctx, event, allowed) || event.GatewaySyncStatus != domain.GatewaySyncStatusPending {
|
|
continue
|
|
}
|
|
attempt, err := s.applier(ctx, event)
|
|
if err != nil {
|
|
return ConsumeOnceOutput{}, err
|
|
}
|
|
now := s.now()
|
|
switch {
|
|
case attempt.AckResult == domain.GatewayAckResultApplied:
|
|
s.repo.UpsertGatewayAppliedSnapshot(ctx, domain.GatewayAppliedSnapshot{Consumer: consumer, LastEventID: event.EventID, LastPackageID: event.PackageID, LastPlatform: event.Platform, LastModel: event.Model, LastAppliedVersion: event.Version, LastResult: string(attempt.AckResult), UpdatedAt: now})
|
|
updated, err := s.repo.AckPackageEvent(ctx, event.EventID, consumer, attempt.AckResult, attempt.Detail, now)
|
|
if err != nil {
|
|
return ConsumeOnceOutput{}, err
|
|
}
|
|
metrics.GatewayEventsProcessedTotal.WithLabelValues(event.Platform, event.EventType, string(attempt.AckResult)).Inc()
|
|
metrics.GatewayEventLatencySeconds.WithLabelValues(event.Platform).Observe(time.Since(event.OccurredAt).Seconds())
|
|
result.Items = append(result.Items, ConsumedPackageChangeItem{EventID: updated.EventID, PackageID: updated.PackageID, GatewaySyncStatus: updated.GatewaySyncStatus, Result: attempt.AckResult, Detail: attempt.Detail})
|
|
case attempt.Retryable && event.RetryCount < 2:
|
|
retryCount := event.RetryCount + 1
|
|
nextRetryAt := now.Add(retryDelay(retryCount))
|
|
updated, err := s.repo.MarkPackageEventRetry(ctx, event.EventID, retryCount, nextRetryAt, attempt.FailureCategory, attempt.Detail, now)
|
|
if err != nil {
|
|
return ConsumeOnceOutput{}, err
|
|
}
|
|
metrics.GatewayEventRetriesTotal.WithLabelValues(event.Platform, string(attempt.FailureCategory)).Inc()
|
|
metrics.GatewayPendingRetryEvents.WithLabelValues(consumer).Set(float64(s.repo.CountRetryablePendingPackageEvents(ctx, consumer, now)))
|
|
result.Items = append(result.Items, ConsumedPackageChangeItem{EventID: updated.EventID, PackageID: updated.PackageID, GatewaySyncStatus: updated.GatewaySyncStatus, Result: domain.GatewayAckResultPending, Detail: attempt.Detail, RetryCount: updated.RetryCount, NextRetryAt: updated.NextRetryAt, FailureCategory: updated.LastFailureCategory})
|
|
default:
|
|
updated, err := s.repo.AckPackageEvent(ctx, event.EventID, consumer, domain.GatewayAckResultFailed, attempt.Detail, now)
|
|
if err != nil {
|
|
return ConsumeOnceOutput{}, err
|
|
}
|
|
if attempt.FailureCategory != "" {
|
|
updated.LastFailureCategory = attempt.FailureCategory
|
|
updated.LastFailureDetail = attempt.Detail
|
|
}
|
|
metrics.GatewayEventsProcessedTotal.WithLabelValues(event.Platform, event.EventType, string(domain.GatewayAckResultFailed)).Inc()
|
|
metrics.GatewayFailedEvents.WithLabelValues(consumer).Set(float64(s.repo.CountPackageEventsBySyncStatus(ctx, domain.GatewaySyncStatusFailed)))
|
|
result.Items = append(result.Items, ConsumedPackageChangeItem{EventID: updated.EventID, PackageID: updated.PackageID, GatewaySyncStatus: updated.GatewaySyncStatus, Result: domain.GatewayAckResultFailed, Detail: attempt.Detail, FailureCategory: updated.LastFailureCategory})
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|