feat(routing): add route log writer and admin api
This commit is contained in:
@@ -45,6 +45,12 @@ type ActionSet struct {
|
||||
DeleteLogicalGroupRoute func(context.Context, DeleteLogicalGroupRouteRequest) error
|
||||
CreateLogicalGroupRouteModel func(context.Context, CreateLogicalGroupRouteModelRequest) (LogicalGroupRouteModelInfo, error)
|
||||
ListLogicalGroupRouteModels func(context.Context, ListLogicalGroupRouteModelsRequest) ([]LogicalGroupRouteModelInfo, error)
|
||||
AppendRouteDecisionLog func(context.Context, AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error)
|
||||
ListRouteDecisionLogs func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error)
|
||||
AppendRouteFailoverEvent func(context.Context, AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error)
|
||||
ListRouteFailoverEvents func(context.Context, ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error)
|
||||
AppendRouteStickyAudit func(context.Context, AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error)
|
||||
ListRouteStickyAudit func(context.Context, ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error)
|
||||
CreateProviderDraft func(context.Context, CreateProviderDraftRequest) (ProviderDraftInfo, error)
|
||||
ListProviderDrafts func(context.Context, ListProviderDraftsRequest) ([]ProviderDraftInfo, error)
|
||||
GetProviderDraft func(context.Context, string) (ProviderDraftInfo, error)
|
||||
@@ -368,6 +374,24 @@ func NewAPIHandlerWithAuth(adminAuth AdminAuthConfig, actions ActionSet) http.Ha
|
||||
mux.Handle("GET /api/logical-groups/{groupID}/routes/{routeID}/models", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleListLogicalGroupRouteModels(w, r, actions.ListLogicalGroupRouteModels)
|
||||
})))
|
||||
mux.Handle("POST /api/routing/logs/decisions", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleAppendRouteDecisionLog(w, r, actions.AppendRouteDecisionLog)
|
||||
})))
|
||||
mux.Handle("GET /api/routing/logs/decisions", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleListRouteDecisionLogs(w, r, actions.ListRouteDecisionLogs)
|
||||
})))
|
||||
mux.Handle("POST /api/routing/logs/failovers", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleAppendRouteFailoverEvent(w, r, actions.AppendRouteFailoverEvent)
|
||||
})))
|
||||
mux.Handle("GET /api/routing/logs/failovers", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleListRouteFailoverEvents(w, r, actions.ListRouteFailoverEvents)
|
||||
})))
|
||||
mux.Handle("POST /api/routing/logs/sticky-audit", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleAppendRouteStickyAudit(w, r, actions.AppendRouteStickyAudit)
|
||||
})))
|
||||
mux.Handle("GET /api/routing/logs/sticky-audit", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleListRouteStickyAudit(w, r, actions.ListRouteStickyAudit)
|
||||
})))
|
||||
mux.Handle("POST /api/provider-drafts", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handleCreateProviderDraft(w, r, actions.CreateProviderDraft)
|
||||
})))
|
||||
@@ -1172,6 +1196,7 @@ func classifyError(err error) *httpError {
|
||||
}
|
||||
|
||||
func NewActionSet(sqliteDSN string) ActionSet {
|
||||
routeLogWriter := newLazyRouteLogWriter(sqliteDSN)
|
||||
return ActionSet{
|
||||
CreateBatchImportRun: buildCreateBatchImportRunAction(sqliteDSN),
|
||||
ListBatchImportRuns: buildListBatchImportRunsAction(sqliteDSN),
|
||||
@@ -1192,6 +1217,12 @@ func NewActionSet(sqliteDSN string) ActionSet {
|
||||
DeleteLogicalGroupRoute: buildDeleteLogicalGroupRouteAction(sqliteDSN),
|
||||
CreateLogicalGroupRouteModel: buildCreateLogicalGroupRouteModelAction(sqliteDSN),
|
||||
ListLogicalGroupRouteModels: buildListLogicalGroupRouteModelsAction(sqliteDSN),
|
||||
AppendRouteDecisionLog: buildAppendRouteDecisionLogAction(routeLogWriter, sqliteDSN),
|
||||
ListRouteDecisionLogs: buildListRouteDecisionLogsAction(sqliteDSN),
|
||||
AppendRouteFailoverEvent: buildAppendRouteFailoverEventAction(routeLogWriter, sqliteDSN),
|
||||
ListRouteFailoverEvents: buildListRouteFailoverEventsAction(sqliteDSN),
|
||||
AppendRouteStickyAudit: buildAppendRouteStickyAuditAction(routeLogWriter, sqliteDSN),
|
||||
ListRouteStickyAudit: buildListRouteStickyAuditAction(sqliteDSN),
|
||||
CreateProviderDraft: func(ctx context.Context, req CreateProviderDraftRequest) (ProviderDraftInfo, error) {
|
||||
store, err := sqlite.Open(ctx, sqliteDSN)
|
||||
if err != nil {
|
||||
|
||||
636
internal/app/route_logging_api.go
Normal file
636
internal/app/route_logging_api.go
Normal file
@@ -0,0 +1,636 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/routing"
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
type AppendRouteDecisionLogRequest struct {
|
||||
RequestID string `json:"request_id"`
|
||||
LogicalGroupID string `json:"logical_group_id"`
|
||||
PublicModel string `json:"public_model"`
|
||||
UserKey string `json:"user_key,omitempty"`
|
||||
ConversationKey string `json:"conversation_key,omitempty"`
|
||||
StickyKey string `json:"sticky_key,omitempty"`
|
||||
StickyKeyType string `json:"sticky_key_type,omitempty"`
|
||||
StickyHit bool `json:"sticky_hit,omitempty"`
|
||||
SelectedRouteID string `json:"selected_route_id"`
|
||||
SelectedShadowGroupID string `json:"selected_shadow_group_id"`
|
||||
FallbackUsed bool `json:"fallback_used,omitempty"`
|
||||
ErrorClass string `json:"error_class,omitempty"`
|
||||
UpstreamStatus int `json:"upstream_status,omitempty"`
|
||||
LatencyMS int `json:"latency_ms,omitempty"`
|
||||
Sync bool `json:"sync,omitempty"`
|
||||
}
|
||||
|
||||
type ListRouteDecisionLogsRequest struct {
|
||||
RequestID string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
SelectedRouteID string
|
||||
StickyKey string
|
||||
Limit int
|
||||
}
|
||||
|
||||
type RouteDecisionLogInfo struct {
|
||||
ID int64 `json:"id"`
|
||||
RequestID string `json:"request_id"`
|
||||
LogicalGroupID string `json:"logical_group_id"`
|
||||
PublicModel string `json:"public_model"`
|
||||
UserKey string `json:"user_key,omitempty"`
|
||||
ConversationKey string `json:"conversation_key,omitempty"`
|
||||
StickyKey string `json:"sticky_key,omitempty"`
|
||||
StickyKeyType string `json:"sticky_key_type,omitempty"`
|
||||
StickyHit bool `json:"sticky_hit"`
|
||||
SelectedRouteID string `json:"selected_route_id"`
|
||||
SelectedShadowGroupID string `json:"selected_shadow_group_id"`
|
||||
FallbackUsed bool `json:"fallback_used"`
|
||||
ErrorClass string `json:"error_class,omitempty"`
|
||||
UpstreamStatus int `json:"upstream_status,omitempty"`
|
||||
LatencyMS int `json:"latency_ms,omitempty"`
|
||||
CreatedAt string `json:"created_at,omitempty"`
|
||||
}
|
||||
|
||||
type AppendRouteFailoverEventRequest struct {
|
||||
RequestID string `json:"request_id"`
|
||||
LogicalGroupID string `json:"logical_group_id"`
|
||||
PublicModel string `json:"public_model"`
|
||||
FromRouteID string `json:"from_route_id"`
|
||||
ToRouteID string `json:"to_route_id"`
|
||||
Reason string `json:"reason"`
|
||||
FailureCount int `json:"failure_count,omitempty"`
|
||||
Sync bool `json:"sync,omitempty"`
|
||||
}
|
||||
|
||||
type ListRouteFailoverEventsRequest struct {
|
||||
RequestID string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
FromRouteID string
|
||||
ToRouteID string
|
||||
Limit int
|
||||
}
|
||||
|
||||
type RouteFailoverEventInfo struct {
|
||||
ID int64 `json:"id"`
|
||||
RequestID string `json:"request_id"`
|
||||
LogicalGroupID string `json:"logical_group_id"`
|
||||
PublicModel string `json:"public_model"`
|
||||
FromRouteID string `json:"from_route_id"`
|
||||
ToRouteID string `json:"to_route_id"`
|
||||
Reason string `json:"reason"`
|
||||
FailureCount int `json:"failure_count"`
|
||||
CreatedAt string `json:"created_at,omitempty"`
|
||||
}
|
||||
|
||||
type AppendRouteStickyAuditRequest struct {
|
||||
StickyKey string `json:"sticky_key"`
|
||||
StickyKeyType string `json:"sticky_key_type"`
|
||||
LogicalGroupID string `json:"logical_group_id"`
|
||||
PublicModel string `json:"public_model"`
|
||||
RouteID string `json:"route_id"`
|
||||
Action string `json:"action"`
|
||||
ExpiresAt string `json:"expires_at,omitempty"`
|
||||
Sync bool `json:"sync,omitempty"`
|
||||
}
|
||||
|
||||
type ListRouteStickyAuditRequest struct {
|
||||
StickyKey string
|
||||
StickyKeyType string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
RouteID string
|
||||
Action string
|
||||
Limit int
|
||||
}
|
||||
|
||||
type RouteStickyAuditInfo struct {
|
||||
ID int64 `json:"id"`
|
||||
StickyKey string `json:"sticky_key"`
|
||||
StickyKeyType string `json:"sticky_key_type"`
|
||||
LogicalGroupID string `json:"logical_group_id"`
|
||||
PublicModel string `json:"public_model"`
|
||||
RouteID string `json:"route_id"`
|
||||
Action string `json:"action"`
|
||||
ExpiresAt string `json:"expires_at,omitempty"`
|
||||
CreatedAt string `json:"created_at,omitempty"`
|
||||
}
|
||||
|
||||
func handleAppendRouteDecisionLog(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error)) {
|
||||
if fn == nil {
|
||||
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-decision-log action is not configured"})
|
||||
return
|
||||
}
|
||||
var req AppendRouteDecisionLogRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeHTTPError(w, err)
|
||||
return
|
||||
}
|
||||
item, err := fn(r.Context(), req)
|
||||
if err != nil {
|
||||
writeHTTPError(w, classifyError(err))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, map[string]any{"decision_log": item})
|
||||
}
|
||||
|
||||
func handleListRouteDecisionLogs(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error)) {
|
||||
if fn == nil {
|
||||
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-decision-logs action is not configured"})
|
||||
return
|
||||
}
|
||||
req, err := decodeRouteDecisionLogFilter(r)
|
||||
if err != nil {
|
||||
writeHTTPError(w, err)
|
||||
return
|
||||
}
|
||||
items, actionErr := fn(r.Context(), req)
|
||||
if actionErr != nil {
|
||||
writeHTTPError(w, classifyError(actionErr))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"decision_logs": items})
|
||||
}
|
||||
|
||||
func handleAppendRouteFailoverEvent(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error)) {
|
||||
if fn == nil {
|
||||
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-failover-event action is not configured"})
|
||||
return
|
||||
}
|
||||
var req AppendRouteFailoverEventRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeHTTPError(w, err)
|
||||
return
|
||||
}
|
||||
item, err := fn(r.Context(), req)
|
||||
if err != nil {
|
||||
writeHTTPError(w, classifyError(err))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, map[string]any{"failover_event": item})
|
||||
}
|
||||
|
||||
func handleListRouteFailoverEvents(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error)) {
|
||||
if fn == nil {
|
||||
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-failover-events action is not configured"})
|
||||
return
|
||||
}
|
||||
req, err := decodeRouteFailoverEventFilter(r)
|
||||
if err != nil {
|
||||
writeHTTPError(w, err)
|
||||
return
|
||||
}
|
||||
items, actionErr := fn(r.Context(), req)
|
||||
if actionErr != nil {
|
||||
writeHTTPError(w, classifyError(actionErr))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"failover_events": items})
|
||||
}
|
||||
|
||||
func handleAppendRouteStickyAudit(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error)) {
|
||||
if fn == nil {
|
||||
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-sticky-audit action is not configured"})
|
||||
return
|
||||
}
|
||||
var req AppendRouteStickyAuditRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeHTTPError(w, err)
|
||||
return
|
||||
}
|
||||
item, err := fn(r.Context(), req)
|
||||
if err != nil {
|
||||
writeHTTPError(w, classifyError(err))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, map[string]any{"sticky_audit": item})
|
||||
}
|
||||
|
||||
func handleListRouteStickyAudit(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error)) {
|
||||
if fn == nil {
|
||||
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-sticky-audit action is not configured"})
|
||||
return
|
||||
}
|
||||
req, err := decodeRouteStickyAuditFilter(r)
|
||||
if err != nil {
|
||||
writeHTTPError(w, err)
|
||||
return
|
||||
}
|
||||
items, actionErr := fn(r.Context(), req)
|
||||
if actionErr != nil {
|
||||
writeHTTPError(w, classifyError(actionErr))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"sticky_audits": items})
|
||||
}
|
||||
|
||||
type lazyRouteLogWriter struct {
|
||||
sqliteDSN string
|
||||
mu sync.Mutex
|
||||
writer *routing.AsyncLogWriter
|
||||
}
|
||||
|
||||
func newLazyRouteLogWriter(sqliteDSN string) *lazyRouteLogWriter {
|
||||
return &lazyRouteLogWriter{sqliteDSN: sqliteDSN}
|
||||
}
|
||||
|
||||
func (l *lazyRouteLogWriter) get(ctx context.Context) (*routing.AsyncLogWriter, error) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if l.writer != nil {
|
||||
return l.writer, nil
|
||||
}
|
||||
|
||||
writer, err := routing.NewSQLiteLogWriter(ctx, l.sqliteDSN, routing.AsyncLogWriterOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.writer = writer
|
||||
return l.writer, nil
|
||||
}
|
||||
|
||||
func buildAppendRouteDecisionLogAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) {
|
||||
return func(ctx context.Context, req AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) {
|
||||
writer, err := writerSource.get(ctx)
|
||||
if err != nil {
|
||||
return RouteDecisionLogInfo{}, err
|
||||
}
|
||||
|
||||
event := routing.RouteDecisionEvent{
|
||||
RequestID: strings.TrimSpace(req.RequestID),
|
||||
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
|
||||
PublicModel: strings.TrimSpace(req.PublicModel),
|
||||
UserKey: strings.TrimSpace(req.UserKey),
|
||||
ConversationKey: strings.TrimSpace(req.ConversationKey),
|
||||
StickyKey: strings.TrimSpace(req.StickyKey),
|
||||
StickyKeyType: strings.TrimSpace(req.StickyKeyType),
|
||||
StickyHit: req.StickyHit,
|
||||
SelectedRouteID: strings.TrimSpace(req.SelectedRouteID),
|
||||
SelectedShadowGroupID: strings.TrimSpace(req.SelectedShadowGroupID),
|
||||
FallbackUsed: req.FallbackUsed,
|
||||
ErrorClass: strings.TrimSpace(req.ErrorClass),
|
||||
UpstreamStatus: req.UpstreamStatus,
|
||||
LatencyMS: req.LatencyMS,
|
||||
}
|
||||
if err := writer.AppendDecision(ctx, event); err != nil {
|
||||
return RouteDecisionLogInfo{}, err
|
||||
}
|
||||
if !req.Sync {
|
||||
return routeDecisionLogRowToInfo(sqlite.RouteDecisionLog{
|
||||
RequestID: event.RequestID,
|
||||
LogicalGroupID: event.LogicalGroupID,
|
||||
PublicModel: event.PublicModel,
|
||||
UserKey: event.UserKey,
|
||||
ConversationKey: event.ConversationKey,
|
||||
StickyKey: event.StickyKey,
|
||||
StickyKeyType: event.StickyKeyType,
|
||||
StickyHit: event.StickyHit,
|
||||
SelectedRouteID: event.SelectedRouteID,
|
||||
SelectedShadowGroupID: event.SelectedShadowGroupID,
|
||||
FallbackUsed: event.FallbackUsed,
|
||||
ErrorClass: event.ErrorClass,
|
||||
UpstreamStatus: event.UpstreamStatus,
|
||||
LatencyMS: event.LatencyMS,
|
||||
}), nil
|
||||
}
|
||||
if err := writer.Flush(ctx); err != nil {
|
||||
return RouteDecisionLogInfo{}, err
|
||||
}
|
||||
|
||||
store, err := sqlite.Open(ctx, sqliteDSN)
|
||||
if err != nil {
|
||||
return RouteDecisionLogInfo{}, err
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
items, err := store.RouteDecisionLogs().ListRecent(ctx, sqlite.RouteDecisionLogFilter{
|
||||
RequestID: event.RequestID,
|
||||
Limit: 1,
|
||||
})
|
||||
if err != nil {
|
||||
return RouteDecisionLogInfo{}, err
|
||||
}
|
||||
if len(items) == 0 {
|
||||
return RouteDecisionLogInfo{}, fmt.Errorf("route decision log %q not found after append", event.RequestID)
|
||||
}
|
||||
return routeDecisionLogRowToInfo(items[0]), nil
|
||||
}
|
||||
}
|
||||
|
||||
func buildListRouteDecisionLogsAction(sqliteDSN string) func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) {
|
||||
return func(ctx context.Context, req ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) {
|
||||
store, err := sqlite.Open(ctx, sqliteDSN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
rows, err := store.RouteDecisionLogs().ListRecent(ctx, sqlite.RouteDecisionLogFilter{
|
||||
RequestID: strings.TrimSpace(req.RequestID),
|
||||
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
|
||||
PublicModel: strings.TrimSpace(req.PublicModel),
|
||||
SelectedRouteID: strings.TrimSpace(req.SelectedRouteID),
|
||||
StickyKey: strings.TrimSpace(req.StickyKey),
|
||||
Limit: req.Limit,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return routeDecisionLogRowsToInfo(rows), nil
|
||||
}
|
||||
}
|
||||
|
||||
func buildAppendRouteFailoverEventAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error) {
|
||||
return func(ctx context.Context, req AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error) {
|
||||
writer, err := writerSource.get(ctx)
|
||||
if err != nil {
|
||||
return RouteFailoverEventInfo{}, err
|
||||
}
|
||||
|
||||
event := routing.RouteFailoverEvent{
|
||||
RequestID: strings.TrimSpace(req.RequestID),
|
||||
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
|
||||
PublicModel: strings.TrimSpace(req.PublicModel),
|
||||
FromRouteID: strings.TrimSpace(req.FromRouteID),
|
||||
ToRouteID: strings.TrimSpace(req.ToRouteID),
|
||||
Reason: strings.TrimSpace(req.Reason),
|
||||
FailureCount: req.FailureCount,
|
||||
}
|
||||
if err := writer.AppendFailover(ctx, event); err != nil {
|
||||
return RouteFailoverEventInfo{}, err
|
||||
}
|
||||
if !req.Sync {
|
||||
return routeFailoverEventRowToInfo(sqlite.RouteFailoverEvent{
|
||||
RequestID: event.RequestID,
|
||||
LogicalGroupID: event.LogicalGroupID,
|
||||
PublicModel: event.PublicModel,
|
||||
FromRouteID: event.FromRouteID,
|
||||
ToRouteID: event.ToRouteID,
|
||||
Reason: event.Reason,
|
||||
FailureCount: event.FailureCount,
|
||||
}), nil
|
||||
}
|
||||
if err := writer.Flush(ctx); err != nil {
|
||||
return RouteFailoverEventInfo{}, err
|
||||
}
|
||||
|
||||
store, err := sqlite.Open(ctx, sqliteDSN)
|
||||
if err != nil {
|
||||
return RouteFailoverEventInfo{}, err
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
items, err := store.RouteFailoverEvents().ListRecent(ctx, sqlite.RouteFailoverEventFilter{
|
||||
RequestID: event.RequestID,
|
||||
Limit: 1,
|
||||
})
|
||||
if err != nil {
|
||||
return RouteFailoverEventInfo{}, err
|
||||
}
|
||||
if len(items) == 0 {
|
||||
return RouteFailoverEventInfo{}, fmt.Errorf("route failover event %q not found after append", event.RequestID)
|
||||
}
|
||||
return routeFailoverEventRowToInfo(items[0]), nil
|
||||
}
|
||||
}
|
||||
|
||||
func buildListRouteFailoverEventsAction(sqliteDSN string) func(context.Context, ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error) {
|
||||
return func(ctx context.Context, req ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error) {
|
||||
store, err := sqlite.Open(ctx, sqliteDSN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
rows, err := store.RouteFailoverEvents().ListRecent(ctx, sqlite.RouteFailoverEventFilter{
|
||||
RequestID: strings.TrimSpace(req.RequestID),
|
||||
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
|
||||
PublicModel: strings.TrimSpace(req.PublicModel),
|
||||
FromRouteID: strings.TrimSpace(req.FromRouteID),
|
||||
ToRouteID: strings.TrimSpace(req.ToRouteID),
|
||||
Limit: req.Limit,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return routeFailoverEventRowsToInfo(rows), nil
|
||||
}
|
||||
}
|
||||
|
||||
func buildAppendRouteStickyAuditAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error) {
|
||||
return func(ctx context.Context, req AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error) {
|
||||
writer, err := writerSource.get(ctx)
|
||||
if err != nil {
|
||||
return RouteStickyAuditInfo{}, err
|
||||
}
|
||||
|
||||
event := routing.RouteStickyAuditEvent{
|
||||
StickyKey: strings.TrimSpace(req.StickyKey),
|
||||
StickyKeyType: strings.TrimSpace(req.StickyKeyType),
|
||||
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
|
||||
PublicModel: strings.TrimSpace(req.PublicModel),
|
||||
RouteID: strings.TrimSpace(req.RouteID),
|
||||
Action: strings.TrimSpace(req.Action),
|
||||
ExpiresAt: strings.TrimSpace(req.ExpiresAt),
|
||||
}
|
||||
if err := writer.AppendStickyAudit(ctx, event); err != nil {
|
||||
return RouteStickyAuditInfo{}, err
|
||||
}
|
||||
if !req.Sync {
|
||||
return routeStickyAuditRowToInfo(sqlite.RouteStickyAudit{
|
||||
StickyKey: event.StickyKey,
|
||||
StickyKeyType: event.StickyKeyType,
|
||||
LogicalGroupID: event.LogicalGroupID,
|
||||
PublicModel: event.PublicModel,
|
||||
RouteID: event.RouteID,
|
||||
Action: event.Action,
|
||||
ExpiresAt: event.ExpiresAt,
|
||||
}), nil
|
||||
}
|
||||
if err := writer.Flush(ctx); err != nil {
|
||||
return RouteStickyAuditInfo{}, err
|
||||
}
|
||||
|
||||
store, err := sqlite.Open(ctx, sqliteDSN)
|
||||
if err != nil {
|
||||
return RouteStickyAuditInfo{}, err
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
items, err := store.RouteStickyAudit().ListRecent(ctx, sqlite.RouteStickyAuditFilter{
|
||||
StickyKey: event.StickyKey,
|
||||
Action: event.Action,
|
||||
Limit: 1,
|
||||
})
|
||||
if err != nil {
|
||||
return RouteStickyAuditInfo{}, err
|
||||
}
|
||||
if len(items) == 0 {
|
||||
return RouteStickyAuditInfo{}, fmt.Errorf("route sticky audit %q not found after append", event.StickyKey)
|
||||
}
|
||||
return routeStickyAuditRowToInfo(items[0]), nil
|
||||
}
|
||||
}
|
||||
|
||||
func buildListRouteStickyAuditAction(sqliteDSN string) func(context.Context, ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error) {
|
||||
return func(ctx context.Context, req ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error) {
|
||||
store, err := sqlite.Open(ctx, sqliteDSN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
rows, err := store.RouteStickyAudit().ListRecent(ctx, sqlite.RouteStickyAuditFilter{
|
||||
StickyKey: strings.TrimSpace(req.StickyKey),
|
||||
StickyKeyType: strings.TrimSpace(req.StickyKeyType),
|
||||
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
|
||||
PublicModel: strings.TrimSpace(req.PublicModel),
|
||||
RouteID: strings.TrimSpace(req.RouteID),
|
||||
Action: strings.TrimSpace(req.Action),
|
||||
Limit: req.Limit,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return routeStickyAuditRowsToInfo(rows), nil
|
||||
}
|
||||
}
|
||||
|
||||
func decodeRouteDecisionLogFilter(r *http.Request) (ListRouteDecisionLogsRequest, *httpError) {
|
||||
limit, err := parseOptionalLimit(r.URL.Query().Get("limit"))
|
||||
if err != nil {
|
||||
return ListRouteDecisionLogsRequest{}, err
|
||||
}
|
||||
return ListRouteDecisionLogsRequest{
|
||||
RequestID: strings.TrimSpace(r.URL.Query().Get("request_id")),
|
||||
LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")),
|
||||
PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")),
|
||||
SelectedRouteID: strings.TrimSpace(r.URL.Query().Get("selected_route_id")),
|
||||
StickyKey: strings.TrimSpace(r.URL.Query().Get("sticky_key")),
|
||||
Limit: limit,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func decodeRouteFailoverEventFilter(r *http.Request) (ListRouteFailoverEventsRequest, *httpError) {
|
||||
limit, err := parseOptionalLimit(r.URL.Query().Get("limit"))
|
||||
if err != nil {
|
||||
return ListRouteFailoverEventsRequest{}, err
|
||||
}
|
||||
return ListRouteFailoverEventsRequest{
|
||||
RequestID: strings.TrimSpace(r.URL.Query().Get("request_id")),
|
||||
LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")),
|
||||
PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")),
|
||||
FromRouteID: strings.TrimSpace(r.URL.Query().Get("from_route_id")),
|
||||
ToRouteID: strings.TrimSpace(r.URL.Query().Get("to_route_id")),
|
||||
Limit: limit,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func decodeRouteStickyAuditFilter(r *http.Request) (ListRouteStickyAuditRequest, *httpError) {
|
||||
limit, err := parseOptionalLimit(r.URL.Query().Get("limit"))
|
||||
if err != nil {
|
||||
return ListRouteStickyAuditRequest{}, err
|
||||
}
|
||||
return ListRouteStickyAuditRequest{
|
||||
StickyKey: strings.TrimSpace(r.URL.Query().Get("sticky_key")),
|
||||
StickyKeyType: strings.TrimSpace(r.URL.Query().Get("sticky_key_type")),
|
||||
LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")),
|
||||
PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")),
|
||||
RouteID: strings.TrimSpace(r.URL.Query().Get("route_id")),
|
||||
Action: strings.TrimSpace(r.URL.Query().Get("action")),
|
||||
Limit: limit,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parseOptionalLimit(raw string) (int, *httpError) {
|
||||
raw = strings.TrimSpace(raw)
|
||||
if raw == "" {
|
||||
return 0, nil
|
||||
}
|
||||
limit, err := strconv.Atoi(raw)
|
||||
if err != nil || limit <= 0 {
|
||||
return 0, &httpError{StatusCode: http.StatusBadRequest, Code: "bad_request", Message: "limit must be a positive integer"}
|
||||
}
|
||||
return limit, nil
|
||||
}
|
||||
|
||||
func routeDecisionLogRowToInfo(row sqlite.RouteDecisionLog) RouteDecisionLogInfo {
|
||||
return RouteDecisionLogInfo{
|
||||
ID: row.ID,
|
||||
RequestID: row.RequestID,
|
||||
LogicalGroupID: row.LogicalGroupID,
|
||||
PublicModel: row.PublicModel,
|
||||
UserKey: row.UserKey,
|
||||
ConversationKey: row.ConversationKey,
|
||||
StickyKey: row.StickyKey,
|
||||
StickyKeyType: row.StickyKeyType,
|
||||
StickyHit: row.StickyHit,
|
||||
SelectedRouteID: row.SelectedRouteID,
|
||||
SelectedShadowGroupID: row.SelectedShadowGroupID,
|
||||
FallbackUsed: row.FallbackUsed,
|
||||
ErrorClass: row.ErrorClass,
|
||||
UpstreamStatus: row.UpstreamStatus,
|
||||
LatencyMS: row.LatencyMS,
|
||||
CreatedAt: row.CreatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
func routeDecisionLogRowsToInfo(rows []sqlite.RouteDecisionLog) []RouteDecisionLogInfo {
|
||||
result := make([]RouteDecisionLogInfo, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
result = append(result, routeDecisionLogRowToInfo(row))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func routeFailoverEventRowToInfo(row sqlite.RouteFailoverEvent) RouteFailoverEventInfo {
|
||||
return RouteFailoverEventInfo{
|
||||
ID: row.ID,
|
||||
RequestID: row.RequestID,
|
||||
LogicalGroupID: row.LogicalGroupID,
|
||||
PublicModel: row.PublicModel,
|
||||
FromRouteID: row.FromRouteID,
|
||||
ToRouteID: row.ToRouteID,
|
||||
Reason: row.Reason,
|
||||
FailureCount: row.FailureCount,
|
||||
CreatedAt: row.CreatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
func routeFailoverEventRowsToInfo(rows []sqlite.RouteFailoverEvent) []RouteFailoverEventInfo {
|
||||
result := make([]RouteFailoverEventInfo, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
result = append(result, routeFailoverEventRowToInfo(row))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func routeStickyAuditRowToInfo(row sqlite.RouteStickyAudit) RouteStickyAuditInfo {
|
||||
return RouteStickyAuditInfo{
|
||||
ID: row.ID,
|
||||
StickyKey: row.StickyKey,
|
||||
StickyKeyType: row.StickyKeyType,
|
||||
LogicalGroupID: row.LogicalGroupID,
|
||||
PublicModel: row.PublicModel,
|
||||
RouteID: row.RouteID,
|
||||
Action: row.Action,
|
||||
ExpiresAt: row.ExpiresAt,
|
||||
CreatedAt: row.CreatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
func routeStickyAuditRowsToInfo(rows []sqlite.RouteStickyAudit) []RouteStickyAuditInfo {
|
||||
result := make([]RouteStickyAuditInfo, 0, len(rows))
|
||||
for _, row := range rows {
|
||||
result = append(result, routeStickyAuditRowToInfo(row))
|
||||
}
|
||||
return result
|
||||
}
|
||||
145
internal/app/route_logging_api_test.go
Normal file
145
internal/app/route_logging_api_test.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAPIAppendRouteDecisionLogReturnsCreated(t *testing.T) {
|
||||
handler := NewAPIHandler("secret-token", ActionSet{
|
||||
AppendRouteDecisionLog: func(_ context.Context, req AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) {
|
||||
if req.RequestID != "req-1" {
|
||||
t.Fatalf("RequestID = %q, want req-1", req.RequestID)
|
||||
}
|
||||
return RouteDecisionLogInfo{
|
||||
ID: 1,
|
||||
RequestID: req.RequestID,
|
||||
LogicalGroupID: req.LogicalGroupID,
|
||||
PublicModel: req.PublicModel,
|
||||
SelectedRouteID: req.SelectedRouteID,
|
||||
SelectedShadowGroupID: req.SelectedShadowGroupID,
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
|
||||
request := httptestRequest(t, http.MethodPost, "/api/routing/logs/decisions", map[string]any{
|
||||
"request_id": "req-1",
|
||||
"logical_group_id": "gpt-shared",
|
||||
"public_model": "gpt-5.4",
|
||||
"selected_route_id": "asxs",
|
||||
"selected_shadow_group_id": "gpt-shared__asxs",
|
||||
"sync": true,
|
||||
}, "secret-token")
|
||||
response := httptestRecorder(handler, request)
|
||||
assertStatusCode(t, response, http.StatusCreated)
|
||||
assertJSONContains(t, response.Body().Bytes(), "decision_log.request_id", "req-1")
|
||||
}
|
||||
|
||||
func TestAPIListRouteDecisionLogsRejectsInvalidLimit(t *testing.T) {
|
||||
handler := NewAPIHandler("secret-token", ActionSet{
|
||||
ListRouteDecisionLogs: func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) {
|
||||
t.Fatal("ListRouteDecisionLogs should not be called")
|
||||
return nil, nil
|
||||
},
|
||||
})
|
||||
|
||||
request := httptestRequest(t, http.MethodGet, "/api/routing/logs/decisions?limit=zero", nil, "secret-token")
|
||||
response := httptestRecorder(handler, request)
|
||||
assertStatusCode(t, response, http.StatusBadRequest)
|
||||
}
|
||||
|
||||
func TestNewActionSetRouteLoggingFlow(t *testing.T) {
|
||||
dbPath := filepath.Join(t.TempDir(), "route-logging.db")
|
||||
dsn := "file:" + filepath.ToSlash(dbPath) + "?_busy_timeout=5000"
|
||||
actions := NewActionSet(dsn)
|
||||
ctx := context.Background()
|
||||
|
||||
decision, err := actions.AppendRouteDecisionLog(ctx, AppendRouteDecisionLogRequest{
|
||||
RequestID: "req-1",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
StickyKey: "sticky-1",
|
||||
StickyKeyType: "conversation",
|
||||
StickyHit: true,
|
||||
SelectedRouteID: "asxs",
|
||||
SelectedShadowGroupID: "gpt-shared__asxs",
|
||||
UpstreamStatus: 200,
|
||||
LatencyMS: 90,
|
||||
Sync: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("AppendRouteDecisionLog() error = %v", err)
|
||||
}
|
||||
if decision.SelectedRouteID != "asxs" {
|
||||
t.Fatalf("AppendRouteDecisionLog() = %+v, want selected route asxs", decision)
|
||||
}
|
||||
|
||||
failover, err := actions.AppendRouteFailoverEvent(ctx, AppendRouteFailoverEventRequest{
|
||||
RequestID: "req-2",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
FromRouteID: "asxs",
|
||||
ToRouteID: "codex2api",
|
||||
Reason: "timeout",
|
||||
FailureCount: 2,
|
||||
Sync: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("AppendRouteFailoverEvent() error = %v", err)
|
||||
}
|
||||
if failover.ToRouteID != "codex2api" {
|
||||
t.Fatalf("AppendRouteFailoverEvent() = %+v, want to_route_id codex2api", failover)
|
||||
}
|
||||
|
||||
sticky, err := actions.AppendRouteStickyAudit(ctx, AppendRouteStickyAuditRequest{
|
||||
StickyKey: "sticky-1",
|
||||
StickyKeyType: "conversation",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
RouteID: "asxs",
|
||||
Action: "bind",
|
||||
ExpiresAt: "2026-05-28T18:00:00Z",
|
||||
Sync: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("AppendRouteStickyAudit() error = %v", err)
|
||||
}
|
||||
if sticky.Action != "bind" {
|
||||
t.Fatalf("AppendRouteStickyAudit() = %+v, want action bind", sticky)
|
||||
}
|
||||
|
||||
decisions, err := actions.ListRouteDecisionLogs(ctx, ListRouteDecisionLogsRequest{
|
||||
RequestID: "req-1",
|
||||
Limit: 10,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("ListRouteDecisionLogs() error = %v", err)
|
||||
}
|
||||
if len(decisions) != 1 || decisions[0].StickyKey != "sticky-1" {
|
||||
t.Fatalf("ListRouteDecisionLogs() = %+v, want sticky-1", decisions)
|
||||
}
|
||||
|
||||
failovers, err := actions.ListRouteFailoverEvents(ctx, ListRouteFailoverEventsRequest{
|
||||
RequestID: "req-2",
|
||||
Limit: 10,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("ListRouteFailoverEvents() error = %v", err)
|
||||
}
|
||||
if len(failovers) != 1 || failovers[0].FailureCount != 2 {
|
||||
t.Fatalf("ListRouteFailoverEvents() = %+v, want failure_count 2", failovers)
|
||||
}
|
||||
|
||||
stickyAudits, err := actions.ListRouteStickyAudit(ctx, ListRouteStickyAuditRequest{
|
||||
StickyKey: "sticky-1",
|
||||
Limit: 10,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("ListRouteStickyAudit() error = %v", err)
|
||||
}
|
||||
if len(stickyAudits) != 1 || stickyAudits[0].RouteID != "asxs" {
|
||||
t.Fatalf("ListRouteStickyAudit() = %+v, want route asxs", stickyAudits)
|
||||
}
|
||||
}
|
||||
319
internal/routing/logwriter.go
Normal file
319
internal/routing/logwriter.go
Normal file
@@ -0,0 +1,319 @@
|
||||
package routing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultQueueSize = 128
|
||||
defaultFlushInterval = 2 * time.Second
|
||||
defaultMaxBatchSize = 32
|
||||
defaultFallbackWriteTimeout = 3 * time.Second
|
||||
)
|
||||
|
||||
type RouteDecisionEvent struct {
|
||||
RequestID string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
UserKey string
|
||||
ConversationKey string
|
||||
StickyKey string
|
||||
StickyKeyType string
|
||||
StickyHit bool
|
||||
SelectedRouteID string
|
||||
SelectedShadowGroupID string
|
||||
FallbackUsed bool
|
||||
ErrorClass string
|
||||
UpstreamStatus int
|
||||
LatencyMS int
|
||||
}
|
||||
|
||||
type RouteFailoverEvent struct {
|
||||
RequestID string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
FromRouteID string
|
||||
ToRouteID string
|
||||
Reason string
|
||||
FailureCount int
|
||||
}
|
||||
|
||||
type RouteStickyAuditEvent struct {
|
||||
StickyKey string
|
||||
StickyKeyType string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
RouteID string
|
||||
Action string
|
||||
ExpiresAt string
|
||||
}
|
||||
|
||||
type RouteDecisionLogger interface {
|
||||
AppendDecision(context.Context, RouteDecisionEvent) error
|
||||
AppendFailover(context.Context, RouteFailoverEvent) error
|
||||
AppendStickyAudit(context.Context, RouteStickyAuditEvent) error
|
||||
Flush(context.Context) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type routeLogSink interface {
|
||||
AppendDecision(context.Context, RouteDecisionEvent) error
|
||||
AppendFailover(context.Context, RouteFailoverEvent) error
|
||||
AppendStickyAudit(context.Context, RouteStickyAuditEvent) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type AsyncLogWriterOptions struct {
|
||||
QueueSize int
|
||||
FlushInterval time.Duration
|
||||
MaxBatchSize int
|
||||
FallbackWriteTimeout time.Duration
|
||||
}
|
||||
|
||||
type queuedLogEvent struct {
|
||||
decision *RouteDecisionEvent
|
||||
failover *RouteFailoverEvent
|
||||
sticky *RouteStickyAuditEvent
|
||||
}
|
||||
|
||||
type AsyncLogWriter struct {
|
||||
sink routeLogSink
|
||||
queue chan queuedLogEvent
|
||||
flushRequests chan chan error
|
||||
closeCh chan struct{}
|
||||
closedCh chan struct{}
|
||||
flushInterval time.Duration
|
||||
maxBatchSize int
|
||||
fallbackWriteTimeout time.Duration
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) {
|
||||
store, err := sqlite.Open(ctx, sqliteDSN)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewAsyncLogWriter(newSQLiteRouteLogSink(store), opts), nil
|
||||
}
|
||||
|
||||
func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogWriter {
|
||||
if opts.QueueSize <= 0 {
|
||||
opts.QueueSize = defaultQueueSize
|
||||
}
|
||||
if opts.FlushInterval <= 0 {
|
||||
opts.FlushInterval = defaultFlushInterval
|
||||
}
|
||||
if opts.MaxBatchSize <= 0 {
|
||||
opts.MaxBatchSize = defaultMaxBatchSize
|
||||
}
|
||||
if opts.FallbackWriteTimeout <= 0 {
|
||||
opts.FallbackWriteTimeout = defaultFallbackWriteTimeout
|
||||
}
|
||||
|
||||
writer := &AsyncLogWriter{
|
||||
sink: sink,
|
||||
queue: make(chan queuedLogEvent, opts.QueueSize),
|
||||
flushRequests: make(chan chan error),
|
||||
closeCh: make(chan struct{}),
|
||||
closedCh: make(chan struct{}),
|
||||
flushInterval: opts.FlushInterval,
|
||||
maxBatchSize: opts.MaxBatchSize,
|
||||
fallbackWriteTimeout: opts.FallbackWriteTimeout,
|
||||
}
|
||||
go writer.loop()
|
||||
return writer
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) AppendDecision(ctx context.Context, event RouteDecisionEvent) error {
|
||||
return w.enqueue(ctx, queuedLogEvent{decision: &event})
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) AppendFailover(ctx context.Context, event RouteFailoverEvent) error {
|
||||
return w.enqueue(ctx, queuedLogEvent{failover: &event})
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error {
|
||||
return w.enqueue(ctx, queuedLogEvent{sticky: &event})
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) Flush(ctx context.Context) error {
|
||||
resp := make(chan error, 1)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-w.closedCh:
|
||||
return nil
|
||||
case w.flushRequests <- resp:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-resp:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) Close() error {
|
||||
var err error
|
||||
w.once.Do(func() {
|
||||
close(w.closeCh)
|
||||
<-w.closedCh
|
||||
err = w.sink.Close()
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) loop() {
|
||||
ticker := time.NewTicker(w.flushInterval)
|
||||
defer ticker.Stop()
|
||||
defer close(w.closedCh)
|
||||
|
||||
batch := make([]queuedLogEvent, 0, w.maxBatchSize)
|
||||
for {
|
||||
select {
|
||||
case item := <-w.queue:
|
||||
batch = append(batch, item)
|
||||
if len(batch) >= w.maxBatchSize {
|
||||
w.flushBatch(batch)
|
||||
batch = batch[:0]
|
||||
}
|
||||
case resp := <-w.flushRequests:
|
||||
batch = w.drainQueue(batch)
|
||||
err := w.flushBatch(batch)
|
||||
batch = batch[:0]
|
||||
resp <- err
|
||||
case <-ticker.C:
|
||||
batch = w.drainQueue(batch)
|
||||
if len(batch) == 0 {
|
||||
continue
|
||||
}
|
||||
w.flushBatch(batch)
|
||||
batch = batch[:0]
|
||||
case <-w.closeCh:
|
||||
batch = w.drainQueue(batch)
|
||||
w.flushBatch(batch)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-w.closedCh:
|
||||
return fmt.Errorf("route log writer is closed")
|
||||
case w.queue <- item:
|
||||
return nil
|
||||
default:
|
||||
fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout)
|
||||
defer cancel()
|
||||
return w.writeOne(fallbackCtx, item)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error {
|
||||
if len(batch) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var firstErr error
|
||||
for _, item := range batch {
|
||||
if err := w.writeOne(context.Background(), item); err != nil {
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
log.Printf("routing: flush route log event failed: %v", err)
|
||||
}
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent {
|
||||
for {
|
||||
select {
|
||||
case item := <-w.queue:
|
||||
batch = append(batch, item)
|
||||
default:
|
||||
return batch
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *AsyncLogWriter) writeOne(ctx context.Context, item queuedLogEvent) error {
|
||||
switch {
|
||||
case item.decision != nil:
|
||||
return w.sink.AppendDecision(ctx, *item.decision)
|
||||
case item.failover != nil:
|
||||
return w.sink.AppendFailover(ctx, *item.failover)
|
||||
case item.sticky != nil:
|
||||
return w.sink.AppendStickyAudit(ctx, *item.sticky)
|
||||
default:
|
||||
return fmt.Errorf("route log event payload is empty")
|
||||
}
|
||||
}
|
||||
|
||||
type sqliteRouteLogSink struct {
|
||||
store *sqlite.DB
|
||||
}
|
||||
|
||||
func newSQLiteRouteLogSink(store *sqlite.DB) *sqliteRouteLogSink {
|
||||
return &sqliteRouteLogSink{store: store}
|
||||
}
|
||||
|
||||
func (s *sqliteRouteLogSink) AppendDecision(ctx context.Context, event RouteDecisionEvent) error {
|
||||
_, err := s.store.RouteDecisionLogs().Create(ctx, sqlite.RouteDecisionLog{
|
||||
RequestID: event.RequestID,
|
||||
LogicalGroupID: event.LogicalGroupID,
|
||||
PublicModel: event.PublicModel,
|
||||
UserKey: event.UserKey,
|
||||
ConversationKey: event.ConversationKey,
|
||||
StickyKey: event.StickyKey,
|
||||
StickyKeyType: event.StickyKeyType,
|
||||
StickyHit: event.StickyHit,
|
||||
SelectedRouteID: event.SelectedRouteID,
|
||||
SelectedShadowGroupID: event.SelectedShadowGroupID,
|
||||
FallbackUsed: event.FallbackUsed,
|
||||
ErrorClass: event.ErrorClass,
|
||||
UpstreamStatus: event.UpstreamStatus,
|
||||
LatencyMS: event.LatencyMS,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *sqliteRouteLogSink) AppendFailover(ctx context.Context, event RouteFailoverEvent) error {
|
||||
_, err := s.store.RouteFailoverEvents().Create(ctx, sqlite.RouteFailoverEvent{
|
||||
RequestID: event.RequestID,
|
||||
LogicalGroupID: event.LogicalGroupID,
|
||||
PublicModel: event.PublicModel,
|
||||
FromRouteID: event.FromRouteID,
|
||||
ToRouteID: event.ToRouteID,
|
||||
Reason: event.Reason,
|
||||
FailureCount: event.FailureCount,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *sqliteRouteLogSink) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error {
|
||||
_, err := s.store.RouteStickyAudit().Create(ctx, sqlite.RouteStickyAudit{
|
||||
StickyKey: event.StickyKey,
|
||||
StickyKeyType: event.StickyKeyType,
|
||||
LogicalGroupID: event.LogicalGroupID,
|
||||
PublicModel: event.PublicModel,
|
||||
RouteID: event.RouteID,
|
||||
Action: event.Action,
|
||||
ExpiresAt: event.ExpiresAt,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *sqliteRouteLogSink) Close() error {
|
||||
return s.store.Close()
|
||||
}
|
||||
203
internal/routing/logwriter_test.go
Normal file
203
internal/routing/logwriter_test.go
Normal file
@@ -0,0 +1,203 @@
|
||||
package routing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"sub2api-cn-relay-manager/internal/store/sqlite"
|
||||
)
|
||||
|
||||
func TestSQLiteLogWriterAppendAndFlush(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dbPath := filepath.Join(t.TempDir(), "route-logs.db")
|
||||
dsn := "file:" + filepath.ToSlash(dbPath) + "?_busy_timeout=5000"
|
||||
writer, err := NewSQLiteLogWriter(context.Background(), dsn, AsyncLogWriterOptions{
|
||||
QueueSize: 4,
|
||||
FlushInterval: time.Hour,
|
||||
MaxBatchSize: 8,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("NewSQLiteLogWriter() error = %v", err)
|
||||
}
|
||||
defer writer.Close()
|
||||
|
||||
if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{
|
||||
RequestID: "req-1",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
StickyKey: "sticky-1",
|
||||
StickyKeyType: "conversation",
|
||||
StickyHit: true,
|
||||
SelectedRouteID: "asxs",
|
||||
SelectedShadowGroupID: "gpt-shared__asxs",
|
||||
UpstreamStatus: 200,
|
||||
LatencyMS: 120,
|
||||
}); err != nil {
|
||||
t.Fatalf("AppendDecision() error = %v", err)
|
||||
}
|
||||
if err := writer.AppendFailover(context.Background(), RouteFailoverEvent{
|
||||
RequestID: "req-2",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
FromRouteID: "asxs",
|
||||
ToRouteID: "codex2api",
|
||||
Reason: "timeout",
|
||||
FailureCount: 2,
|
||||
}); err != nil {
|
||||
t.Fatalf("AppendFailover() error = %v", err)
|
||||
}
|
||||
if err := writer.AppendStickyAudit(context.Background(), RouteStickyAuditEvent{
|
||||
StickyKey: "sticky-1",
|
||||
StickyKeyType: "conversation",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
RouteID: "asxs",
|
||||
Action: "bind",
|
||||
ExpiresAt: "2026-05-28T18:00:00Z",
|
||||
}); err != nil {
|
||||
t.Fatalf("AppendStickyAudit() error = %v", err)
|
||||
}
|
||||
if err := writer.Flush(context.Background()); err != nil {
|
||||
t.Fatalf("Flush() error = %v", err)
|
||||
}
|
||||
|
||||
store, err := sqlite.Open(context.Background(), dsn)
|
||||
if err != nil {
|
||||
t.Fatalf("sqlite.Open() error = %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
decisions, err := store.RouteDecisionLogs().ListRecent(context.Background(), sqlite.RouteDecisionLogFilter{Limit: 10})
|
||||
if err != nil {
|
||||
t.Fatalf("RouteDecisionLogs().ListRecent() error = %v", err)
|
||||
}
|
||||
if len(decisions) != 1 || decisions[0].SelectedRouteID != "asxs" || !decisions[0].StickyHit {
|
||||
t.Fatalf("decision logs = %+v", decisions)
|
||||
}
|
||||
|
||||
failovers, err := store.RouteFailoverEvents().ListRecent(context.Background(), sqlite.RouteFailoverEventFilter{Limit: 10})
|
||||
if err != nil {
|
||||
t.Fatalf("RouteFailoverEvents().ListRecent() error = %v", err)
|
||||
}
|
||||
if len(failovers) != 1 || failovers[0].ToRouteID != "codex2api" {
|
||||
t.Fatalf("failover events = %+v", failovers)
|
||||
}
|
||||
|
||||
stickyAudit, err := store.RouteStickyAudit().ListRecent(context.Background(), sqlite.RouteStickyAuditFilter{Limit: 10})
|
||||
if err != nil {
|
||||
t.Fatalf("RouteStickyAudit().ListRecent() error = %v", err)
|
||||
}
|
||||
if len(stickyAudit) != 1 || stickyAudit[0].Action != "bind" {
|
||||
t.Fatalf("sticky audit = %+v", stickyAudit)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAsyncLogWriterFlushFailureDoesNotCrash(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sink := &failingRouteLogSink{}
|
||||
writer := NewAsyncLogWriter(sink, AsyncLogWriterOptions{
|
||||
QueueSize: 2,
|
||||
FlushInterval: time.Hour,
|
||||
MaxBatchSize: 2,
|
||||
})
|
||||
defer writer.Close()
|
||||
|
||||
if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{
|
||||
RequestID: "req-fail",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
SelectedRouteID: "asxs",
|
||||
SelectedShadowGroupID: "gpt-shared__asxs",
|
||||
}); err != nil {
|
||||
t.Fatalf("AppendDecision() error = %v", err)
|
||||
}
|
||||
if err := writer.Flush(context.Background()); err == nil {
|
||||
t.Fatal("Flush() error = nil, want failure")
|
||||
}
|
||||
if sink.appendCalls == 0 {
|
||||
t.Fatal("appendCalls = 0, want at least one attempted write")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAsyncLogWriterFlushesQueuedEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sink := &recordingRouteLogSink{}
|
||||
writer := NewAsyncLogWriter(sink, AsyncLogWriterOptions{
|
||||
QueueSize: 1,
|
||||
FlushInterval: time.Hour,
|
||||
MaxBatchSize: 100,
|
||||
})
|
||||
defer writer.Close()
|
||||
|
||||
if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{
|
||||
RequestID: "req-a",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
SelectedRouteID: "asxs",
|
||||
SelectedShadowGroupID: "gpt-shared__asxs",
|
||||
}); err != nil {
|
||||
t.Fatalf("AppendDecision() first error = %v", err)
|
||||
}
|
||||
if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{
|
||||
RequestID: "req-b",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
SelectedRouteID: "codex2api",
|
||||
SelectedShadowGroupID: "gpt-shared__codex2api",
|
||||
}); err != nil {
|
||||
t.Fatalf("AppendDecision() second error = %v", err)
|
||||
}
|
||||
if err := writer.Flush(context.Background()); err != nil {
|
||||
t.Fatalf("Flush() error = %v", err)
|
||||
}
|
||||
|
||||
if len(sink.decisions) != 2 {
|
||||
t.Fatalf("recorded decisions len = %d, want 2", len(sink.decisions))
|
||||
}
|
||||
}
|
||||
|
||||
type failingRouteLogSink struct {
|
||||
appendCalls int
|
||||
}
|
||||
|
||||
func (s *failingRouteLogSink) AppendDecision(context.Context, RouteDecisionEvent) error {
|
||||
s.appendCalls++
|
||||
return errors.New("boom")
|
||||
}
|
||||
|
||||
func (s *failingRouteLogSink) AppendFailover(context.Context, RouteFailoverEvent) error {
|
||||
s.appendCalls++
|
||||
return errors.New("boom")
|
||||
}
|
||||
|
||||
func (s *failingRouteLogSink) AppendStickyAudit(context.Context, RouteStickyAuditEvent) error {
|
||||
s.appendCalls++
|
||||
return errors.New("boom")
|
||||
}
|
||||
|
||||
func (s *failingRouteLogSink) Close() error { return nil }
|
||||
|
||||
type recordingRouteLogSink struct {
|
||||
decisions []RouteDecisionEvent
|
||||
}
|
||||
|
||||
func (s *recordingRouteLogSink) AppendDecision(_ context.Context, event RouteDecisionEvent) error {
|
||||
s.decisions = append(s.decisions, event)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *recordingRouteLogSink) AppendFailover(context.Context, RouteFailoverEvent) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *recordingRouteLogSink) AppendStickyAudit(context.Context, RouteStickyAuditEvent) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *recordingRouteLogSink) Close() error { return nil }
|
||||
52
internal/store/migrations/0011_route_logging.sql
Normal file
52
internal/store/migrations/0011_route_logging.sql
Normal file
@@ -0,0 +1,52 @@
|
||||
CREATE TABLE route_decision_logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
request_id TEXT NOT NULL,
|
||||
logical_group_id TEXT NOT NULL,
|
||||
public_model TEXT NOT NULL,
|
||||
user_key TEXT NOT NULL DEFAULT '',
|
||||
conversation_key TEXT NOT NULL DEFAULT '',
|
||||
sticky_key TEXT NOT NULL DEFAULT '',
|
||||
sticky_key_type TEXT NOT NULL DEFAULT '',
|
||||
sticky_hit INTEGER NOT NULL DEFAULT 0,
|
||||
selected_route_id TEXT NOT NULL,
|
||||
selected_shadow_group_id TEXT NOT NULL,
|
||||
fallback_used INTEGER NOT NULL DEFAULT 0,
|
||||
error_class TEXT NOT NULL DEFAULT '',
|
||||
upstream_status INTEGER NOT NULL DEFAULT 0,
|
||||
latency_ms INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX idx_route_decision_logs_request_id ON route_decision_logs(request_id);
|
||||
CREATE INDEX idx_route_decision_logs_group_model ON route_decision_logs(logical_group_id, public_model, id DESC);
|
||||
CREATE INDEX idx_route_decision_logs_selected_route ON route_decision_logs(selected_route_id, id DESC);
|
||||
|
||||
CREATE TABLE route_failover_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
request_id TEXT NOT NULL,
|
||||
logical_group_id TEXT NOT NULL,
|
||||
public_model TEXT NOT NULL,
|
||||
from_route_id TEXT NOT NULL,
|
||||
to_route_id TEXT NOT NULL,
|
||||
reason TEXT NOT NULL,
|
||||
failure_count INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX idx_route_failover_events_request_id ON route_failover_events(request_id);
|
||||
CREATE INDEX idx_route_failover_events_group_model ON route_failover_events(logical_group_id, public_model, id DESC);
|
||||
|
||||
CREATE TABLE route_sticky_audit (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
sticky_key TEXT NOT NULL,
|
||||
sticky_key_type TEXT NOT NULL,
|
||||
logical_group_id TEXT NOT NULL,
|
||||
public_model TEXT NOT NULL,
|
||||
route_id TEXT NOT NULL,
|
||||
action TEXT NOT NULL,
|
||||
expires_at TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX idx_route_sticky_audit_sticky_key ON route_sticky_audit(sticky_key, id DESC);
|
||||
CREATE INDEX idx_route_sticky_audit_group_model ON route_sticky_audit(logical_group_id, public_model, id DESC);
|
||||
@@ -27,6 +27,9 @@ type Queries struct {
|
||||
LogicalGroupModels *LogicalGroupModelsRepo
|
||||
LogicalGroupRoutes *LogicalGroupRoutesRepo
|
||||
LogicalGroupRouteModels *LogicalGroupRouteModelsRepo
|
||||
RouteDecisionLogs *RouteDecisionLogsRepo
|
||||
RouteFailoverEvents *RouteFailoverEventsRepo
|
||||
RouteStickyAudit *RouteStickyAuditRepo
|
||||
ProviderDrafts *ProviderDraftsRepo
|
||||
ImportBatches *ImportBatchesRepo
|
||||
ImportBatchItems *ImportBatchItemsRepo
|
||||
@@ -112,6 +115,18 @@ func (db *DB) LogicalGroupRouteModels() *LogicalGroupRouteModelsRepo {
|
||||
return db.queries.LogicalGroupRouteModels
|
||||
}
|
||||
|
||||
func (db *DB) RouteDecisionLogs() *RouteDecisionLogsRepo {
|
||||
return db.queries.RouteDecisionLogs
|
||||
}
|
||||
|
||||
func (db *DB) RouteFailoverEvents() *RouteFailoverEventsRepo {
|
||||
return db.queries.RouteFailoverEvents
|
||||
}
|
||||
|
||||
func (db *DB) RouteStickyAudit() *RouteStickyAuditRepo {
|
||||
return db.queries.RouteStickyAudit
|
||||
}
|
||||
|
||||
func (db *DB) ProviderDrafts() *ProviderDraftsRepo {
|
||||
return db.queries.ProviderDrafts
|
||||
}
|
||||
@@ -188,6 +203,9 @@ func newQueries(db execQuerier) *Queries {
|
||||
LogicalGroupModels: newLogicalGroupModelsRepo(db),
|
||||
LogicalGroupRoutes: newLogicalGroupRoutesRepo(db),
|
||||
LogicalGroupRouteModels: newLogicalGroupRouteModelsRepo(db),
|
||||
RouteDecisionLogs: newRouteDecisionLogsRepo(db),
|
||||
RouteFailoverEvents: newRouteFailoverEventsRepo(db),
|
||||
RouteStickyAudit: newRouteStickyAuditRepo(db),
|
||||
ProviderDrafts: newProviderDraftsRepo(db),
|
||||
ImportBatches: newImportBatchesRepo(db),
|
||||
ImportBatchItems: newImportBatchItemsRepo(db),
|
||||
|
||||
@@ -111,6 +111,9 @@ func TestOpenAppliesLogicalRoutingTables(t *testing.T) {
|
||||
"logical_group_models",
|
||||
"logical_group_routes",
|
||||
"logical_group_route_models",
|
||||
"route_decision_logs",
|
||||
"route_failover_events",
|
||||
"route_sticky_audit",
|
||||
} {
|
||||
found, err := tableExists(context.Background(), db, table)
|
||||
if err != nil {
|
||||
|
||||
208
internal/store/sqlite/route_decision_logs_repo.go
Normal file
208
internal/store/sqlite/route_decision_logs_repo.go
Normal file
@@ -0,0 +1,208 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type RouteDecisionLog struct {
|
||||
ID int64
|
||||
RequestID string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
UserKey string
|
||||
ConversationKey string
|
||||
StickyKey string
|
||||
StickyKeyType string
|
||||
StickyHit bool
|
||||
SelectedRouteID string
|
||||
SelectedShadowGroupID string
|
||||
FallbackUsed bool
|
||||
ErrorClass string
|
||||
UpstreamStatus int
|
||||
LatencyMS int
|
||||
CreatedAt string
|
||||
}
|
||||
|
||||
type RouteDecisionLogFilter struct {
|
||||
RequestID string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
SelectedRouteID string
|
||||
StickyKey string
|
||||
Limit int
|
||||
}
|
||||
|
||||
type RouteDecisionLogsRepo struct {
|
||||
db execQuerier
|
||||
}
|
||||
|
||||
func newRouteDecisionLogsRepo(db execQuerier) *RouteDecisionLogsRepo {
|
||||
return &RouteDecisionLogsRepo{db: db}
|
||||
}
|
||||
|
||||
func (r *RouteDecisionLogsRepo) Create(ctx context.Context, row RouteDecisionLog) (int64, error) {
|
||||
row, err := normalizeRouteDecisionLog(row)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
result, err := r.db.ExecContext(
|
||||
ctx,
|
||||
`INSERT INTO route_decision_logs (
|
||||
request_id,
|
||||
logical_group_id,
|
||||
public_model,
|
||||
user_key,
|
||||
conversation_key,
|
||||
sticky_key,
|
||||
sticky_key_type,
|
||||
sticky_hit,
|
||||
selected_route_id,
|
||||
selected_shadow_group_id,
|
||||
fallback_used,
|
||||
error_class,
|
||||
upstream_status,
|
||||
latency_ms
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
row.RequestID,
|
||||
row.LogicalGroupID,
|
||||
row.PublicModel,
|
||||
row.UserKey,
|
||||
row.ConversationKey,
|
||||
row.StickyKey,
|
||||
row.StickyKeyType,
|
||||
boolToSQLiteInt(row.StickyHit),
|
||||
row.SelectedRouteID,
|
||||
row.SelectedShadowGroupID,
|
||||
boolToSQLiteInt(row.FallbackUsed),
|
||||
row.ErrorClass,
|
||||
row.UpstreamStatus,
|
||||
row.LatencyMS,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("insert route decision log %q: %w", row.RequestID, err)
|
||||
}
|
||||
|
||||
id, err := result.LastInsertId()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read inserted route decision log id for %q: %w", row.RequestID, err)
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (r *RouteDecisionLogsRepo) ListRecent(ctx context.Context, filter RouteDecisionLogFilter) ([]RouteDecisionLog, error) {
|
||||
clauses := make([]string, 0, 5)
|
||||
args := make([]any, 0, 6)
|
||||
|
||||
if requestID := strings.TrimSpace(filter.RequestID); requestID != "" {
|
||||
clauses = append(clauses, "request_id = ?")
|
||||
args = append(args, requestID)
|
||||
}
|
||||
if logicalGroupID := strings.TrimSpace(filter.LogicalGroupID); logicalGroupID != "" {
|
||||
clauses = append(clauses, "logical_group_id = ?")
|
||||
args = append(args, logicalGroupID)
|
||||
}
|
||||
if publicModel := strings.TrimSpace(filter.PublicModel); publicModel != "" {
|
||||
clauses = append(clauses, "public_model = ?")
|
||||
args = append(args, publicModel)
|
||||
}
|
||||
if selectedRouteID := strings.TrimSpace(filter.SelectedRouteID); selectedRouteID != "" {
|
||||
clauses = append(clauses, "selected_route_id = ?")
|
||||
args = append(args, selectedRouteID)
|
||||
}
|
||||
if stickyKey := strings.TrimSpace(filter.StickyKey); stickyKey != "" {
|
||||
clauses = append(clauses, "sticky_key = ?")
|
||||
args = append(args, stickyKey)
|
||||
}
|
||||
|
||||
query := `SELECT id, request_id, logical_group_id, public_model, user_key, conversation_key, sticky_key, sticky_key_type, sticky_hit, selected_route_id, selected_shadow_group_id, fallback_used, error_class, upstream_status, latency_ms, created_at
|
||||
FROM route_decision_logs`
|
||||
if len(clauses) > 0 {
|
||||
query += " WHERE " + strings.Join(clauses, " AND ")
|
||||
}
|
||||
query += " ORDER BY id DESC LIMIT ?"
|
||||
args = append(args, normalizeRouteLogListLimit(filter.Limit))
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list route decision logs: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
items := make([]RouteDecisionLog, 0)
|
||||
for rows.Next() {
|
||||
var (
|
||||
item RouteDecisionLog
|
||||
stickyHit int
|
||||
fallbackUsed int
|
||||
)
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
&item.RequestID,
|
||||
&item.LogicalGroupID,
|
||||
&item.PublicModel,
|
||||
&item.UserKey,
|
||||
&item.ConversationKey,
|
||||
&item.StickyKey,
|
||||
&item.StickyKeyType,
|
||||
&stickyHit,
|
||||
&item.SelectedRouteID,
|
||||
&item.SelectedShadowGroupID,
|
||||
&fallbackUsed,
|
||||
&item.ErrorClass,
|
||||
&item.UpstreamStatus,
|
||||
&item.LatencyMS,
|
||||
&item.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scan route decision log: %w", err)
|
||||
}
|
||||
item.StickyHit = stickyHit != 0
|
||||
item.FallbackUsed = fallbackUsed != 0
|
||||
items = append(items, item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterate route decision logs: %w", err)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func normalizeRouteDecisionLog(row RouteDecisionLog) (RouteDecisionLog, error) {
|
||||
row.RequestID = strings.TrimSpace(row.RequestID)
|
||||
row.LogicalGroupID = strings.TrimSpace(row.LogicalGroupID)
|
||||
row.PublicModel = strings.TrimSpace(row.PublicModel)
|
||||
row.UserKey = strings.TrimSpace(row.UserKey)
|
||||
row.ConversationKey = strings.TrimSpace(row.ConversationKey)
|
||||
row.StickyKey = strings.TrimSpace(row.StickyKey)
|
||||
row.StickyKeyType = strings.TrimSpace(row.StickyKeyType)
|
||||
row.SelectedRouteID = strings.TrimSpace(row.SelectedRouteID)
|
||||
row.SelectedShadowGroupID = strings.TrimSpace(row.SelectedShadowGroupID)
|
||||
row.ErrorClass = strings.TrimSpace(row.ErrorClass)
|
||||
|
||||
switch {
|
||||
case row.RequestID == "":
|
||||
return RouteDecisionLog{}, fmt.Errorf("request_id is required")
|
||||
case row.LogicalGroupID == "":
|
||||
return RouteDecisionLog{}, fmt.Errorf("logical_group_id is required")
|
||||
case row.PublicModel == "":
|
||||
return RouteDecisionLog{}, fmt.Errorf("public_model is required")
|
||||
case row.SelectedRouteID == "":
|
||||
return RouteDecisionLog{}, fmt.Errorf("selected_route_id is required")
|
||||
case row.SelectedShadowGroupID == "":
|
||||
return RouteDecisionLog{}, fmt.Errorf("selected_shadow_group_id is required")
|
||||
case row.UpstreamStatus < 0:
|
||||
return RouteDecisionLog{}, fmt.Errorf("upstream_status must be >= 0")
|
||||
case row.LatencyMS < 0:
|
||||
return RouteDecisionLog{}, fmt.Errorf("latency_ms must be >= 0")
|
||||
}
|
||||
|
||||
return row, nil
|
||||
}
|
||||
|
||||
func boolToSQLiteInt(value bool) int {
|
||||
if value {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
163
internal/store/sqlite/route_failover_events_repo.go
Normal file
163
internal/store/sqlite/route_failover_events_repo.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type RouteFailoverEvent struct {
|
||||
ID int64
|
||||
RequestID string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
FromRouteID string
|
||||
ToRouteID string
|
||||
Reason string
|
||||
FailureCount int
|
||||
CreatedAt string
|
||||
}
|
||||
|
||||
type RouteFailoverEventFilter struct {
|
||||
RequestID string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
FromRouteID string
|
||||
ToRouteID string
|
||||
Limit int
|
||||
}
|
||||
|
||||
type RouteFailoverEventsRepo struct {
|
||||
db execQuerier
|
||||
}
|
||||
|
||||
func newRouteFailoverEventsRepo(db execQuerier) *RouteFailoverEventsRepo {
|
||||
return &RouteFailoverEventsRepo{db: db}
|
||||
}
|
||||
|
||||
func (r *RouteFailoverEventsRepo) Create(ctx context.Context, row RouteFailoverEvent) (int64, error) {
|
||||
row, err := normalizeRouteFailoverEvent(row)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
result, err := r.db.ExecContext(
|
||||
ctx,
|
||||
`INSERT INTO route_failover_events (
|
||||
request_id,
|
||||
logical_group_id,
|
||||
public_model,
|
||||
from_route_id,
|
||||
to_route_id,
|
||||
reason,
|
||||
failure_count
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
row.RequestID,
|
||||
row.LogicalGroupID,
|
||||
row.PublicModel,
|
||||
row.FromRouteID,
|
||||
row.ToRouteID,
|
||||
row.Reason,
|
||||
row.FailureCount,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("insert route failover event %q: %w", row.RequestID, err)
|
||||
}
|
||||
|
||||
id, err := result.LastInsertId()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read inserted route failover event id for %q: %w", row.RequestID, err)
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (r *RouteFailoverEventsRepo) ListRecent(ctx context.Context, filter RouteFailoverEventFilter) ([]RouteFailoverEvent, error) {
|
||||
clauses := make([]string, 0, 5)
|
||||
args := make([]any, 0, 6)
|
||||
|
||||
if requestID := strings.TrimSpace(filter.RequestID); requestID != "" {
|
||||
clauses = append(clauses, "request_id = ?")
|
||||
args = append(args, requestID)
|
||||
}
|
||||
if logicalGroupID := strings.TrimSpace(filter.LogicalGroupID); logicalGroupID != "" {
|
||||
clauses = append(clauses, "logical_group_id = ?")
|
||||
args = append(args, logicalGroupID)
|
||||
}
|
||||
if publicModel := strings.TrimSpace(filter.PublicModel); publicModel != "" {
|
||||
clauses = append(clauses, "public_model = ?")
|
||||
args = append(args, publicModel)
|
||||
}
|
||||
if fromRouteID := strings.TrimSpace(filter.FromRouteID); fromRouteID != "" {
|
||||
clauses = append(clauses, "from_route_id = ?")
|
||||
args = append(args, fromRouteID)
|
||||
}
|
||||
if toRouteID := strings.TrimSpace(filter.ToRouteID); toRouteID != "" {
|
||||
clauses = append(clauses, "to_route_id = ?")
|
||||
args = append(args, toRouteID)
|
||||
}
|
||||
|
||||
query := `SELECT id, request_id, logical_group_id, public_model, from_route_id, to_route_id, reason, failure_count, created_at
|
||||
FROM route_failover_events`
|
||||
if len(clauses) > 0 {
|
||||
query += " WHERE " + strings.Join(clauses, " AND ")
|
||||
}
|
||||
query += " ORDER BY id DESC LIMIT ?"
|
||||
args = append(args, normalizeRouteLogListLimit(filter.Limit))
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list route failover events: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
items := make([]RouteFailoverEvent, 0)
|
||||
for rows.Next() {
|
||||
var item RouteFailoverEvent
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
&item.RequestID,
|
||||
&item.LogicalGroupID,
|
||||
&item.PublicModel,
|
||||
&item.FromRouteID,
|
||||
&item.ToRouteID,
|
||||
&item.Reason,
|
||||
&item.FailureCount,
|
||||
&item.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scan route failover event: %w", err)
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterate route failover events: %w", err)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func normalizeRouteFailoverEvent(row RouteFailoverEvent) (RouteFailoverEvent, error) {
|
||||
row.RequestID = strings.TrimSpace(row.RequestID)
|
||||
row.LogicalGroupID = strings.TrimSpace(row.LogicalGroupID)
|
||||
row.PublicModel = strings.TrimSpace(row.PublicModel)
|
||||
row.FromRouteID = strings.TrimSpace(row.FromRouteID)
|
||||
row.ToRouteID = strings.TrimSpace(row.ToRouteID)
|
||||
row.Reason = strings.TrimSpace(row.Reason)
|
||||
|
||||
switch {
|
||||
case row.RequestID == "":
|
||||
return RouteFailoverEvent{}, fmt.Errorf("request_id is required")
|
||||
case row.LogicalGroupID == "":
|
||||
return RouteFailoverEvent{}, fmt.Errorf("logical_group_id is required")
|
||||
case row.PublicModel == "":
|
||||
return RouteFailoverEvent{}, fmt.Errorf("public_model is required")
|
||||
case row.FromRouteID == "":
|
||||
return RouteFailoverEvent{}, fmt.Errorf("from_route_id is required")
|
||||
case row.ToRouteID == "":
|
||||
return RouteFailoverEvent{}, fmt.Errorf("to_route_id is required")
|
||||
case row.Reason == "":
|
||||
return RouteFailoverEvent{}, fmt.Errorf("reason is required")
|
||||
case row.FailureCount < 0:
|
||||
return RouteFailoverEvent{}, fmt.Errorf("failure_count must be >= 0")
|
||||
}
|
||||
|
||||
return row, nil
|
||||
}
|
||||
17
internal/store/sqlite/route_logging_helpers.go
Normal file
17
internal/store/sqlite/route_logging_helpers.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package sqlite
|
||||
|
||||
const (
|
||||
defaultRouteLogListLimit = 50
|
||||
maxRouteLogListLimit = 200
|
||||
)
|
||||
|
||||
func normalizeRouteLogListLimit(limit int) int {
|
||||
switch {
|
||||
case limit <= 0:
|
||||
return defaultRouteLogListLimit
|
||||
case limit > maxRouteLogListLimit:
|
||||
return maxRouteLogListLimit
|
||||
default:
|
||||
return limit
|
||||
}
|
||||
}
|
||||
129
internal/store/sqlite/route_logging_repos_test.go
Normal file
129
internal/store/sqlite/route_logging_repos_test.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRouteDecisionLogsRepoCreateAndListRecent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
store := openTestDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
if _, err := store.RouteDecisionLogs().Create(ctx, RouteDecisionLog{
|
||||
RequestID: "req-1",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
UserKey: "user-a",
|
||||
ConversationKey: "conv-a",
|
||||
StickyKey: "sticky-a",
|
||||
StickyKeyType: "conversation",
|
||||
StickyHit: true,
|
||||
SelectedRouteID: "asxs",
|
||||
SelectedShadowGroupID: "gpt-shared__asxs",
|
||||
FallbackUsed: false,
|
||||
ErrorClass: "",
|
||||
UpstreamStatus: 200,
|
||||
LatencyMS: 123,
|
||||
}); err != nil {
|
||||
t.Fatalf("RouteDecisionLogs().Create() error = %v", err)
|
||||
}
|
||||
|
||||
logs, err := store.RouteDecisionLogs().ListRecent(ctx, RouteDecisionLogFilter{
|
||||
LogicalGroupID: "gpt-shared",
|
||||
Limit: 10,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("RouteDecisionLogs().ListRecent() error = %v", err)
|
||||
}
|
||||
if len(logs) != 1 {
|
||||
t.Fatalf("RouteDecisionLogs().ListRecent() len = %d, want 1", len(logs))
|
||||
}
|
||||
if logs[0].SelectedRouteID != "asxs" || !logs[0].StickyHit || logs[0].UpstreamStatus != 200 {
|
||||
t.Fatalf("RouteDecisionLogs().ListRecent()[0] = %+v", logs[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteFailoverEventsRepoCreateAndListRecent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
store := openTestDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
if _, err := store.RouteFailoverEvents().Create(ctx, RouteFailoverEvent{
|
||||
RequestID: "req-2",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4",
|
||||
FromRouteID: "asxs",
|
||||
ToRouteID: "codex2api",
|
||||
Reason: "upstream_5xx",
|
||||
FailureCount: 2,
|
||||
}); err != nil {
|
||||
t.Fatalf("RouteFailoverEvents().Create() error = %v", err)
|
||||
}
|
||||
|
||||
items, err := store.RouteFailoverEvents().ListRecent(ctx, RouteFailoverEventFilter{
|
||||
RequestID: "req-2",
|
||||
Limit: 10,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("RouteFailoverEvents().ListRecent() error = %v", err)
|
||||
}
|
||||
if len(items) != 1 {
|
||||
t.Fatalf("RouteFailoverEvents().ListRecent() len = %d, want 1", len(items))
|
||||
}
|
||||
if items[0].ToRouteID != "codex2api" || items[0].FailureCount != 2 {
|
||||
t.Fatalf("RouteFailoverEvents().ListRecent()[0] = %+v", items[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteStickyAuditRepoCreateAndListRecent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
store := openTestDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
if _, err := store.RouteStickyAudit().Create(ctx, RouteStickyAudit{
|
||||
StickyKey: "sticky-3",
|
||||
StickyKeyType: "user_model",
|
||||
LogicalGroupID: "gpt-shared",
|
||||
PublicModel: "gpt-5.4-mini",
|
||||
RouteID: "asxs",
|
||||
Action: "bind",
|
||||
ExpiresAt: "2026-05-28T18:00:00Z",
|
||||
}); err != nil {
|
||||
t.Fatalf("RouteStickyAudit().Create() error = %v", err)
|
||||
}
|
||||
|
||||
items, err := store.RouteStickyAudit().ListRecent(ctx, RouteStickyAuditFilter{
|
||||
StickyKey: "sticky-3",
|
||||
Limit: 10,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("RouteStickyAudit().ListRecent() error = %v", err)
|
||||
}
|
||||
if len(items) != 1 {
|
||||
t.Fatalf("RouteStickyAudit().ListRecent() len = %d, want 1", len(items))
|
||||
}
|
||||
if items[0].Action != "bind" || items[0].RouteID != "asxs" {
|
||||
t.Fatalf("RouteStickyAudit().ListRecent()[0] = %+v", items[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteLoggingReposRejectInvalidRows(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
store := openTestDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
if _, err := store.RouteDecisionLogs().Create(ctx, RouteDecisionLog{}); err == nil {
|
||||
t.Fatal("RouteDecisionLogs().Create() error = nil, want validation error")
|
||||
}
|
||||
if _, err := store.RouteFailoverEvents().Create(ctx, RouteFailoverEvent{}); err == nil {
|
||||
t.Fatal("RouteFailoverEvents().Create() error = nil, want validation error")
|
||||
}
|
||||
if _, err := store.RouteStickyAudit().Create(ctx, RouteStickyAudit{}); err == nil {
|
||||
t.Fatal("RouteStickyAudit().Create() error = nil, want validation error")
|
||||
}
|
||||
}
|
||||
167
internal/store/sqlite/route_sticky_audit_repo.go
Normal file
167
internal/store/sqlite/route_sticky_audit_repo.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type RouteStickyAudit struct {
|
||||
ID int64
|
||||
StickyKey string
|
||||
StickyKeyType string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
RouteID string
|
||||
Action string
|
||||
ExpiresAt string
|
||||
CreatedAt string
|
||||
}
|
||||
|
||||
type RouteStickyAuditFilter struct {
|
||||
StickyKey string
|
||||
StickyKeyType string
|
||||
LogicalGroupID string
|
||||
PublicModel string
|
||||
RouteID string
|
||||
Action string
|
||||
Limit int
|
||||
}
|
||||
|
||||
type RouteStickyAuditRepo struct {
|
||||
db execQuerier
|
||||
}
|
||||
|
||||
func newRouteStickyAuditRepo(db execQuerier) *RouteStickyAuditRepo {
|
||||
return &RouteStickyAuditRepo{db: db}
|
||||
}
|
||||
|
||||
func (r *RouteStickyAuditRepo) Create(ctx context.Context, row RouteStickyAudit) (int64, error) {
|
||||
row, err := normalizeRouteStickyAudit(row)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
result, err := r.db.ExecContext(
|
||||
ctx,
|
||||
`INSERT INTO route_sticky_audit (
|
||||
sticky_key,
|
||||
sticky_key_type,
|
||||
logical_group_id,
|
||||
public_model,
|
||||
route_id,
|
||||
action,
|
||||
expires_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
row.StickyKey,
|
||||
row.StickyKeyType,
|
||||
row.LogicalGroupID,
|
||||
row.PublicModel,
|
||||
row.RouteID,
|
||||
row.Action,
|
||||
row.ExpiresAt,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("insert route sticky audit %q: %w", row.StickyKey, err)
|
||||
}
|
||||
|
||||
id, err := result.LastInsertId()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read inserted route sticky audit id for %q: %w", row.StickyKey, err)
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (r *RouteStickyAuditRepo) ListRecent(ctx context.Context, filter RouteStickyAuditFilter) ([]RouteStickyAudit, error) {
|
||||
clauses := make([]string, 0, 6)
|
||||
args := make([]any, 0, 7)
|
||||
|
||||
if stickyKey := strings.TrimSpace(filter.StickyKey); stickyKey != "" {
|
||||
clauses = append(clauses, "sticky_key = ?")
|
||||
args = append(args, stickyKey)
|
||||
}
|
||||
if stickyKeyType := strings.TrimSpace(filter.StickyKeyType); stickyKeyType != "" {
|
||||
clauses = append(clauses, "sticky_key_type = ?")
|
||||
args = append(args, stickyKeyType)
|
||||
}
|
||||
if logicalGroupID := strings.TrimSpace(filter.LogicalGroupID); logicalGroupID != "" {
|
||||
clauses = append(clauses, "logical_group_id = ?")
|
||||
args = append(args, logicalGroupID)
|
||||
}
|
||||
if publicModel := strings.TrimSpace(filter.PublicModel); publicModel != "" {
|
||||
clauses = append(clauses, "public_model = ?")
|
||||
args = append(args, publicModel)
|
||||
}
|
||||
if routeID := strings.TrimSpace(filter.RouteID); routeID != "" {
|
||||
clauses = append(clauses, "route_id = ?")
|
||||
args = append(args, routeID)
|
||||
}
|
||||
if action := strings.TrimSpace(filter.Action); action != "" {
|
||||
clauses = append(clauses, "action = ?")
|
||||
args = append(args, action)
|
||||
}
|
||||
|
||||
query := `SELECT id, sticky_key, sticky_key_type, logical_group_id, public_model, route_id, action, expires_at, created_at
|
||||
FROM route_sticky_audit`
|
||||
if len(clauses) > 0 {
|
||||
query += " WHERE " + strings.Join(clauses, " AND ")
|
||||
}
|
||||
query += " ORDER BY id DESC LIMIT ?"
|
||||
args = append(args, normalizeRouteLogListLimit(filter.Limit))
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list route sticky audit: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
items := make([]RouteStickyAudit, 0)
|
||||
for rows.Next() {
|
||||
var item RouteStickyAudit
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
&item.StickyKey,
|
||||
&item.StickyKeyType,
|
||||
&item.LogicalGroupID,
|
||||
&item.PublicModel,
|
||||
&item.RouteID,
|
||||
&item.Action,
|
||||
&item.ExpiresAt,
|
||||
&item.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scan route sticky audit: %w", err)
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterate route sticky audit: %w", err)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func normalizeRouteStickyAudit(row RouteStickyAudit) (RouteStickyAudit, error) {
|
||||
row.StickyKey = strings.TrimSpace(row.StickyKey)
|
||||
row.StickyKeyType = strings.TrimSpace(row.StickyKeyType)
|
||||
row.LogicalGroupID = strings.TrimSpace(row.LogicalGroupID)
|
||||
row.PublicModel = strings.TrimSpace(row.PublicModel)
|
||||
row.RouteID = strings.TrimSpace(row.RouteID)
|
||||
row.Action = strings.TrimSpace(row.Action)
|
||||
row.ExpiresAt = strings.TrimSpace(row.ExpiresAt)
|
||||
|
||||
switch {
|
||||
case row.StickyKey == "":
|
||||
return RouteStickyAudit{}, fmt.Errorf("sticky_key is required")
|
||||
case row.StickyKeyType == "":
|
||||
return RouteStickyAudit{}, fmt.Errorf("sticky_key_type is required")
|
||||
case row.LogicalGroupID == "":
|
||||
return RouteStickyAudit{}, fmt.Errorf("logical_group_id is required")
|
||||
case row.PublicModel == "":
|
||||
return RouteStickyAudit{}, fmt.Errorf("public_model is required")
|
||||
case row.RouteID == "":
|
||||
return RouteStickyAudit{}, fmt.Errorf("route_id is required")
|
||||
case row.Action == "":
|
||||
return RouteStickyAudit{}, fmt.Errorf("action is required")
|
||||
}
|
||||
|
||||
return row, nil
|
||||
}
|
||||
@@ -141,6 +141,9 @@ func TestStoreAppliesLatestMigration(t *testing.T) {
|
||||
"logical_group_models",
|
||||
"logical_group_routes",
|
||||
"logical_group_route_models",
|
||||
"route_decision_logs",
|
||||
"route_failover_events",
|
||||
"route_sticky_audit",
|
||||
} {
|
||||
if !tableExists(t, store.SQLDB(), table) {
|
||||
t.Fatalf("table %q does not exist after latest migration", table)
|
||||
@@ -224,6 +227,51 @@ func TestStoreAppliesLatestMigration(t *testing.T) {
|
||||
t.Fatalf("column %q missing from logical_group_route_models", column)
|
||||
}
|
||||
}
|
||||
|
||||
for _, column := range []string{
|
||||
"request_id",
|
||||
"logical_group_id",
|
||||
"public_model",
|
||||
"sticky_key",
|
||||
"sticky_hit",
|
||||
"selected_route_id",
|
||||
"selected_shadow_group_id",
|
||||
"fallback_used",
|
||||
"upstream_status",
|
||||
"latency_ms",
|
||||
} {
|
||||
if !tableColumnExists(t, store.SQLDB(), "route_decision_logs", column) {
|
||||
t.Fatalf("column %q missing from route_decision_logs", column)
|
||||
}
|
||||
}
|
||||
|
||||
for _, column := range []string{
|
||||
"request_id",
|
||||
"logical_group_id",
|
||||
"public_model",
|
||||
"from_route_id",
|
||||
"to_route_id",
|
||||
"reason",
|
||||
"failure_count",
|
||||
} {
|
||||
if !tableColumnExists(t, store.SQLDB(), "route_failover_events", column) {
|
||||
t.Fatalf("column %q missing from route_failover_events", column)
|
||||
}
|
||||
}
|
||||
|
||||
for _, column := range []string{
|
||||
"sticky_key",
|
||||
"sticky_key_type",
|
||||
"logical_group_id",
|
||||
"public_model",
|
||||
"route_id",
|
||||
"action",
|
||||
"expires_at",
|
||||
} {
|
||||
if !tableColumnExists(t, store.SQLDB(), "route_sticky_audit", column) {
|
||||
t.Fatalf("column %q missing from route_sticky_audit", column)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreInitEnforcesLogicalRoutingConstraints(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user