Files
phamnazage-jpg 8984451845 feat(log): H-03 日志 flush 错误监控
- 添加 ErrorMetrics 结构体记录 flush/write/drop 错误数
- 添加 ErrorHandler 回调接口用于自定义错误处理
- 在 AsyncLogWriterOptions 中支持配置错误处理器
- 在 flushBatch 中记录 flush 错误指标并回调错误处理器
- 在 enqueue fallback 路径中记录丢弃事件数
- 添加 Metrics() 方法暴露错误统计
2026-06-02 06:51:14 +08:00

406 lines
10 KiB
Go

package routing
import (
"context"
"fmt"
"log"
"sync"
"time"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
const (
defaultQueueSize = 128
defaultFlushInterval = 2 * time.Second
defaultMaxBatchSize = 32
defaultFallbackWriteTimeout = 3 * time.Second
)
type RouteDecisionEvent struct {
RequestID string
LogicalGroupID string
PublicModel string
UserKey string
ConversationKey string
StickyKey string
StickyKeyType string
StickyHit bool
SelectedRouteID string
SelectedShadowGroupID string
FallbackUsed bool
ErrorClass string
UpstreamStatus int
LatencyMS int
}
type RouteFailoverEvent struct {
RequestID string
LogicalGroupID string
PublicModel string
FromRouteID string
ToRouteID string
Reason string
FailureCount int
}
type RouteStickyAuditEvent struct {
StickyKey string
StickyKeyType string
LogicalGroupID string
PublicModel string
RouteID string
Action string
ExpiresAt string
}
type RouteDecisionLogger interface {
AppendDecision(context.Context, RouteDecisionEvent) error
AppendFailover(context.Context, RouteFailoverEvent) error
AppendStickyAudit(context.Context, RouteStickyAuditEvent) error
Flush(context.Context) error
Close() error
}
type routeLogSink interface {
AppendDecision(context.Context, RouteDecisionEvent) error
AppendFailover(context.Context, RouteFailoverEvent) error
AppendStickyAudit(context.Context, RouteStickyAuditEvent) error
Close() error
}
type ErrorMetrics struct {
FlushErrors int64
WriteErrors int64
DroppedEvents int64
mu sync.RWMutex
}
func (e *ErrorMetrics) RecordFlushError() {
e.mu.Lock()
defer e.mu.Unlock()
e.FlushErrors++
}
func (e *ErrorMetrics) RecordWriteError() {
e.mu.Lock()
defer e.mu.Unlock()
e.WriteErrors++
}
func (e *ErrorMetrics) RecordDroppedEvent() {
e.mu.Lock()
defer e.mu.Unlock()
e.DroppedEvents++
}
func (e *ErrorMetrics) GetFlushErrors() int64 {
e.mu.RLock()
defer e.mu.RUnlock()
return e.FlushErrors
}
func (e *ErrorMetrics) GetWriteErrors() int64 {
e.mu.RLock()
defer e.mu.RUnlock()
return e.WriteErrors
}
func (e *ErrorMetrics) GetDroppedEvents() int64 {
e.mu.RLock()
defer e.mu.RUnlock()
return e.DroppedEvents
}
type ErrorHandler func(ctx context.Context, err error, eventType string)
type AsyncLogWriterOptions struct {
QueueSize int
FlushInterval time.Duration
MaxBatchSize int
FallbackWriteTimeout time.Duration
OnError ErrorHandler
}
type queuedLogEvent struct {
decision *RouteDecisionEvent
failover *RouteFailoverEvent
sticky *RouteStickyAuditEvent
}
type AsyncLogWriter struct {
sink routeLogSink
queue chan queuedLogEvent
flushRequests chan chan error
closeCh chan struct{}
closedCh chan struct{}
flushInterval time.Duration
maxBatchSize int
fallbackWriteTimeout time.Duration
once sync.Once
onError ErrorHandler
metrics ErrorMetrics
}
func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return nil, err
}
return NewAsyncLogWriter(newSQLiteRouteLogSink(store), opts), nil
}
func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogWriter {
if opts.QueueSize <= 0 {
opts.QueueSize = defaultQueueSize
}
if opts.FlushInterval <= 0 {
opts.FlushInterval = defaultFlushInterval
}
if opts.MaxBatchSize <= 0 {
opts.MaxBatchSize = defaultMaxBatchSize
}
if opts.FallbackWriteTimeout <= 0 {
opts.FallbackWriteTimeout = defaultFallbackWriteTimeout
}
writer := &AsyncLogWriter{
sink: sink,
queue: make(chan queuedLogEvent, opts.QueueSize),
flushRequests: make(chan chan error),
closeCh: make(chan struct{}),
closedCh: make(chan struct{}),
flushInterval: opts.FlushInterval,
maxBatchSize: opts.MaxBatchSize,
fallbackWriteTimeout: opts.FallbackWriteTimeout,
onError: opts.OnError,
}
go writer.loop()
return writer
}
func (w *AsyncLogWriter) AppendDecision(ctx context.Context, event RouteDecisionEvent) error {
return w.enqueue(ctx, queuedLogEvent{decision: &event})
}
func (w *AsyncLogWriter) AppendFailover(ctx context.Context, event RouteFailoverEvent) error {
return w.enqueue(ctx, queuedLogEvent{failover: &event})
}
func (w *AsyncLogWriter) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error {
return w.enqueue(ctx, queuedLogEvent{sticky: &event})
}
func (w *AsyncLogWriter) Flush(ctx context.Context) error {
resp := make(chan error, 1)
select {
case <-ctx.Done():
return ctx.Err()
case <-w.closedCh:
return nil
case w.flushRequests <- resp:
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-resp:
return err
}
}
func (w *AsyncLogWriter) Close() error {
var err error
w.once.Do(func() {
close(w.closeCh)
<-w.closedCh
err = w.sink.Close()
})
return err
}
func (w *AsyncLogWriter) loop() {
ticker := time.NewTicker(w.flushInterval)
defer ticker.Stop()
defer close(w.closedCh)
batch := make([]queuedLogEvent, 0, w.maxBatchSize)
for {
select {
case item := <-w.queue:
batch = append(batch, item)
if len(batch) >= w.maxBatchSize {
w.flushBatch(batch)
batch = batch[:0]
}
case resp := <-w.flushRequests:
batch = w.drainQueue(batch)
err := w.flushBatch(batch)
batch = batch[:0]
resp <- err
case <-ticker.C:
batch = w.drainQueue(batch)
if len(batch) == 0 {
continue
}
w.flushBatch(batch)
batch = batch[:0]
case <-w.closeCh:
batch = w.drainQueue(batch)
w.flushBatch(batch)
return
}
}
}
func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-w.closedCh:
return fmt.Errorf("route log writer is closed")
case w.queue <- item:
return nil
default:
// Queue is full, use fallback direct write
w.metrics.RecordDroppedEvent()
fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout)
defer cancel()
err := w.writeOne(fallbackCtx, item)
if err != nil && w.onError != nil {
w.onError(fallbackCtx, err, w.getEventType(item))
}
return err
}
}
func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error {
if len(batch) == 0 {
return nil
}
var firstErr error
for _, item := range batch {
if err := w.writeOne(context.Background(), item); err != nil {
if firstErr == nil {
firstErr = err
}
w.metrics.RecordFlushError()
log.Printf("routing: flush route log event failed: %v", err)
// Call error handler if configured
if w.onError != nil {
w.onError(context.Background(), err, w.getEventType(item))
}
}
}
if firstErr != nil {
w.metrics.RecordWriteError()
}
return firstErr
}
func (w *AsyncLogWriter) getEventType(item queuedLogEvent) string {
switch {
case item.decision != nil:
return "decision"
case item.failover != nil:
return "failover"
case item.sticky != nil:
return "sticky_audit"
default:
return "unknown"
}
}
// Metrics returns the error metrics for monitoring
func (w *AsyncLogWriter) Metrics() ErrorMetrics {
return ErrorMetrics{
FlushErrors: w.metrics.GetFlushErrors(),
WriteErrors: w.metrics.GetWriteErrors(),
DroppedEvents: w.metrics.GetDroppedEvents(),
}
}
func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent {
for {
select {
case item := <-w.queue:
batch = append(batch, item)
default:
return batch
}
}
}
func (w *AsyncLogWriter) writeOne(ctx context.Context, item queuedLogEvent) error {
switch {
case item.decision != nil:
return w.sink.AppendDecision(ctx, *item.decision)
case item.failover != nil:
return w.sink.AppendFailover(ctx, *item.failover)
case item.sticky != nil:
return w.sink.AppendStickyAudit(ctx, *item.sticky)
default:
return fmt.Errorf("route log event payload is empty")
}
}
type sqliteRouteLogSink struct {
store *sqlite.DB
}
func newSQLiteRouteLogSink(store *sqlite.DB) *sqliteRouteLogSink {
return &sqliteRouteLogSink{store: store}
}
func (s *sqliteRouteLogSink) AppendDecision(ctx context.Context, event RouteDecisionEvent) error {
_, err := s.store.RouteDecisionLogs().Create(ctx, sqlite.RouteDecisionLog{
RequestID: event.RequestID,
LogicalGroupID: event.LogicalGroupID,
PublicModel: event.PublicModel,
UserKey: event.UserKey,
ConversationKey: event.ConversationKey,
StickyKey: event.StickyKey,
StickyKeyType: event.StickyKeyType,
StickyHit: event.StickyHit,
SelectedRouteID: event.SelectedRouteID,
SelectedShadowGroupID: event.SelectedShadowGroupID,
FallbackUsed: event.FallbackUsed,
ErrorClass: event.ErrorClass,
UpstreamStatus: event.UpstreamStatus,
LatencyMS: event.LatencyMS,
})
return err
}
func (s *sqliteRouteLogSink) AppendFailover(ctx context.Context, event RouteFailoverEvent) error {
_, err := s.store.RouteFailoverEvents().Create(ctx, sqlite.RouteFailoverEvent{
RequestID: event.RequestID,
LogicalGroupID: event.LogicalGroupID,
PublicModel: event.PublicModel,
FromRouteID: event.FromRouteID,
ToRouteID: event.ToRouteID,
Reason: event.Reason,
FailureCount: event.FailureCount,
})
return err
}
func (s *sqliteRouteLogSink) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error {
_, err := s.store.RouteStickyAudit().Create(ctx, sqlite.RouteStickyAudit{
StickyKey: event.StickyKey,
StickyKeyType: event.StickyKeyType,
LogicalGroupID: event.LogicalGroupID,
PublicModel: event.PublicModel,
RouteID: event.RouteID,
Action: event.Action,
ExpiresAt: event.ExpiresAt,
})
return err
}
func (s *sqliteRouteLogSink) Close() error {
return s.store.Close()
}