- 添加 ErrorMetrics 结构体记录 flush/write/drop 错误数 - 添加 ErrorHandler 回调接口用于自定义错误处理 - 在 AsyncLogWriterOptions 中支持配置错误处理器 - 在 flushBatch 中记录 flush 错误指标并回调错误处理器 - 在 enqueue fallback 路径中记录丢弃事件数 - 添加 Metrics() 方法暴露错误统计
406 lines
10 KiB
Go
406 lines
10 KiB
Go
package routing
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"sub2api-cn-relay-manager/internal/store/sqlite"
|
|
)
|
|
|
|
const (
|
|
defaultQueueSize = 128
|
|
defaultFlushInterval = 2 * time.Second
|
|
defaultMaxBatchSize = 32
|
|
defaultFallbackWriteTimeout = 3 * time.Second
|
|
)
|
|
|
|
type RouteDecisionEvent struct {
|
|
RequestID string
|
|
LogicalGroupID string
|
|
PublicModel string
|
|
UserKey string
|
|
ConversationKey string
|
|
StickyKey string
|
|
StickyKeyType string
|
|
StickyHit bool
|
|
SelectedRouteID string
|
|
SelectedShadowGroupID string
|
|
FallbackUsed bool
|
|
ErrorClass string
|
|
UpstreamStatus int
|
|
LatencyMS int
|
|
}
|
|
|
|
type RouteFailoverEvent struct {
|
|
RequestID string
|
|
LogicalGroupID string
|
|
PublicModel string
|
|
FromRouteID string
|
|
ToRouteID string
|
|
Reason string
|
|
FailureCount int
|
|
}
|
|
|
|
type RouteStickyAuditEvent struct {
|
|
StickyKey string
|
|
StickyKeyType string
|
|
LogicalGroupID string
|
|
PublicModel string
|
|
RouteID string
|
|
Action string
|
|
ExpiresAt string
|
|
}
|
|
|
|
type RouteDecisionLogger interface {
|
|
AppendDecision(context.Context, RouteDecisionEvent) error
|
|
AppendFailover(context.Context, RouteFailoverEvent) error
|
|
AppendStickyAudit(context.Context, RouteStickyAuditEvent) error
|
|
Flush(context.Context) error
|
|
Close() error
|
|
}
|
|
|
|
type routeLogSink interface {
|
|
AppendDecision(context.Context, RouteDecisionEvent) error
|
|
AppendFailover(context.Context, RouteFailoverEvent) error
|
|
AppendStickyAudit(context.Context, RouteStickyAuditEvent) error
|
|
Close() error
|
|
}
|
|
|
|
type ErrorMetrics struct {
|
|
FlushErrors int64
|
|
WriteErrors int64
|
|
DroppedEvents int64
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (e *ErrorMetrics) RecordFlushError() {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.FlushErrors++
|
|
}
|
|
|
|
func (e *ErrorMetrics) RecordWriteError() {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.WriteErrors++
|
|
}
|
|
|
|
func (e *ErrorMetrics) RecordDroppedEvent() {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
e.DroppedEvents++
|
|
}
|
|
|
|
func (e *ErrorMetrics) GetFlushErrors() int64 {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
return e.FlushErrors
|
|
}
|
|
|
|
func (e *ErrorMetrics) GetWriteErrors() int64 {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
return e.WriteErrors
|
|
}
|
|
|
|
func (e *ErrorMetrics) GetDroppedEvents() int64 {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
return e.DroppedEvents
|
|
}
|
|
|
|
type ErrorHandler func(ctx context.Context, err error, eventType string)
|
|
|
|
type AsyncLogWriterOptions struct {
|
|
QueueSize int
|
|
FlushInterval time.Duration
|
|
MaxBatchSize int
|
|
FallbackWriteTimeout time.Duration
|
|
OnError ErrorHandler
|
|
}
|
|
|
|
type queuedLogEvent struct {
|
|
decision *RouteDecisionEvent
|
|
failover *RouteFailoverEvent
|
|
sticky *RouteStickyAuditEvent
|
|
}
|
|
|
|
type AsyncLogWriter struct {
|
|
sink routeLogSink
|
|
queue chan queuedLogEvent
|
|
flushRequests chan chan error
|
|
closeCh chan struct{}
|
|
closedCh chan struct{}
|
|
flushInterval time.Duration
|
|
maxBatchSize int
|
|
fallbackWriteTimeout time.Duration
|
|
once sync.Once
|
|
onError ErrorHandler
|
|
metrics ErrorMetrics
|
|
}
|
|
|
|
func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) {
|
|
store, err := sqlite.Open(ctx, sqliteDSN)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewAsyncLogWriter(newSQLiteRouteLogSink(store), opts), nil
|
|
}
|
|
|
|
func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogWriter {
|
|
if opts.QueueSize <= 0 {
|
|
opts.QueueSize = defaultQueueSize
|
|
}
|
|
if opts.FlushInterval <= 0 {
|
|
opts.FlushInterval = defaultFlushInterval
|
|
}
|
|
if opts.MaxBatchSize <= 0 {
|
|
opts.MaxBatchSize = defaultMaxBatchSize
|
|
}
|
|
if opts.FallbackWriteTimeout <= 0 {
|
|
opts.FallbackWriteTimeout = defaultFallbackWriteTimeout
|
|
}
|
|
|
|
writer := &AsyncLogWriter{
|
|
sink: sink,
|
|
queue: make(chan queuedLogEvent, opts.QueueSize),
|
|
flushRequests: make(chan chan error),
|
|
closeCh: make(chan struct{}),
|
|
closedCh: make(chan struct{}),
|
|
flushInterval: opts.FlushInterval,
|
|
maxBatchSize: opts.MaxBatchSize,
|
|
fallbackWriteTimeout: opts.FallbackWriteTimeout,
|
|
onError: opts.OnError,
|
|
}
|
|
go writer.loop()
|
|
return writer
|
|
}
|
|
|
|
func (w *AsyncLogWriter) AppendDecision(ctx context.Context, event RouteDecisionEvent) error {
|
|
return w.enqueue(ctx, queuedLogEvent{decision: &event})
|
|
}
|
|
|
|
func (w *AsyncLogWriter) AppendFailover(ctx context.Context, event RouteFailoverEvent) error {
|
|
return w.enqueue(ctx, queuedLogEvent{failover: &event})
|
|
}
|
|
|
|
func (w *AsyncLogWriter) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error {
|
|
return w.enqueue(ctx, queuedLogEvent{sticky: &event})
|
|
}
|
|
|
|
func (w *AsyncLogWriter) Flush(ctx context.Context) error {
|
|
resp := make(chan error, 1)
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-w.closedCh:
|
|
return nil
|
|
case w.flushRequests <- resp:
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-resp:
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (w *AsyncLogWriter) Close() error {
|
|
var err error
|
|
w.once.Do(func() {
|
|
close(w.closeCh)
|
|
<-w.closedCh
|
|
err = w.sink.Close()
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (w *AsyncLogWriter) loop() {
|
|
ticker := time.NewTicker(w.flushInterval)
|
|
defer ticker.Stop()
|
|
defer close(w.closedCh)
|
|
|
|
batch := make([]queuedLogEvent, 0, w.maxBatchSize)
|
|
for {
|
|
select {
|
|
case item := <-w.queue:
|
|
batch = append(batch, item)
|
|
if len(batch) >= w.maxBatchSize {
|
|
w.flushBatch(batch)
|
|
batch = batch[:0]
|
|
}
|
|
case resp := <-w.flushRequests:
|
|
batch = w.drainQueue(batch)
|
|
err := w.flushBatch(batch)
|
|
batch = batch[:0]
|
|
resp <- err
|
|
case <-ticker.C:
|
|
batch = w.drainQueue(batch)
|
|
if len(batch) == 0 {
|
|
continue
|
|
}
|
|
w.flushBatch(batch)
|
|
batch = batch[:0]
|
|
case <-w.closeCh:
|
|
batch = w.drainQueue(batch)
|
|
w.flushBatch(batch)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-w.closedCh:
|
|
return fmt.Errorf("route log writer is closed")
|
|
case w.queue <- item:
|
|
return nil
|
|
default:
|
|
// Queue is full, use fallback direct write
|
|
w.metrics.RecordDroppedEvent()
|
|
fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout)
|
|
defer cancel()
|
|
err := w.writeOne(fallbackCtx, item)
|
|
if err != nil && w.onError != nil {
|
|
w.onError(fallbackCtx, err, w.getEventType(item))
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error {
|
|
if len(batch) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var firstErr error
|
|
for _, item := range batch {
|
|
if err := w.writeOne(context.Background(), item); err != nil {
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
w.metrics.RecordFlushError()
|
|
log.Printf("routing: flush route log event failed: %v", err)
|
|
|
|
// Call error handler if configured
|
|
if w.onError != nil {
|
|
w.onError(context.Background(), err, w.getEventType(item))
|
|
}
|
|
}
|
|
}
|
|
if firstErr != nil {
|
|
w.metrics.RecordWriteError()
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
func (w *AsyncLogWriter) getEventType(item queuedLogEvent) string {
|
|
switch {
|
|
case item.decision != nil:
|
|
return "decision"
|
|
case item.failover != nil:
|
|
return "failover"
|
|
case item.sticky != nil:
|
|
return "sticky_audit"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
// Metrics returns the error metrics for monitoring
|
|
func (w *AsyncLogWriter) Metrics() ErrorMetrics {
|
|
return ErrorMetrics{
|
|
FlushErrors: w.metrics.GetFlushErrors(),
|
|
WriteErrors: w.metrics.GetWriteErrors(),
|
|
DroppedEvents: w.metrics.GetDroppedEvents(),
|
|
}
|
|
}
|
|
|
|
func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent {
|
|
for {
|
|
select {
|
|
case item := <-w.queue:
|
|
batch = append(batch, item)
|
|
default:
|
|
return batch
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *AsyncLogWriter) writeOne(ctx context.Context, item queuedLogEvent) error {
|
|
switch {
|
|
case item.decision != nil:
|
|
return w.sink.AppendDecision(ctx, *item.decision)
|
|
case item.failover != nil:
|
|
return w.sink.AppendFailover(ctx, *item.failover)
|
|
case item.sticky != nil:
|
|
return w.sink.AppendStickyAudit(ctx, *item.sticky)
|
|
default:
|
|
return fmt.Errorf("route log event payload is empty")
|
|
}
|
|
}
|
|
|
|
type sqliteRouteLogSink struct {
|
|
store *sqlite.DB
|
|
}
|
|
|
|
func newSQLiteRouteLogSink(store *sqlite.DB) *sqliteRouteLogSink {
|
|
return &sqliteRouteLogSink{store: store}
|
|
}
|
|
|
|
func (s *sqliteRouteLogSink) AppendDecision(ctx context.Context, event RouteDecisionEvent) error {
|
|
_, err := s.store.RouteDecisionLogs().Create(ctx, sqlite.RouteDecisionLog{
|
|
RequestID: event.RequestID,
|
|
LogicalGroupID: event.LogicalGroupID,
|
|
PublicModel: event.PublicModel,
|
|
UserKey: event.UserKey,
|
|
ConversationKey: event.ConversationKey,
|
|
StickyKey: event.StickyKey,
|
|
StickyKeyType: event.StickyKeyType,
|
|
StickyHit: event.StickyHit,
|
|
SelectedRouteID: event.SelectedRouteID,
|
|
SelectedShadowGroupID: event.SelectedShadowGroupID,
|
|
FallbackUsed: event.FallbackUsed,
|
|
ErrorClass: event.ErrorClass,
|
|
UpstreamStatus: event.UpstreamStatus,
|
|
LatencyMS: event.LatencyMS,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (s *sqliteRouteLogSink) AppendFailover(ctx context.Context, event RouteFailoverEvent) error {
|
|
_, err := s.store.RouteFailoverEvents().Create(ctx, sqlite.RouteFailoverEvent{
|
|
RequestID: event.RequestID,
|
|
LogicalGroupID: event.LogicalGroupID,
|
|
PublicModel: event.PublicModel,
|
|
FromRouteID: event.FromRouteID,
|
|
ToRouteID: event.ToRouteID,
|
|
Reason: event.Reason,
|
|
FailureCount: event.FailureCount,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (s *sqliteRouteLogSink) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error {
|
|
_, err := s.store.RouteStickyAudit().Create(ctx, sqlite.RouteStickyAudit{
|
|
StickyKey: event.StickyKey,
|
|
StickyKeyType: event.StickyKeyType,
|
|
LogicalGroupID: event.LogicalGroupID,
|
|
PublicModel: event.PublicModel,
|
|
RouteID: event.RouteID,
|
|
Action: event.Action,
|
|
ExpiresAt: event.ExpiresAt,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (s *sqliteRouteLogSink) Close() error {
|
|
return s.store.Close()
|
|
}
|