diff --git a/internal/app/http_api.go b/internal/app/http_api.go index b83533b0..427c214c 100644 --- a/internal/app/http_api.go +++ b/internal/app/http_api.go @@ -45,6 +45,12 @@ type ActionSet struct { DeleteLogicalGroupRoute func(context.Context, DeleteLogicalGroupRouteRequest) error CreateLogicalGroupRouteModel func(context.Context, CreateLogicalGroupRouteModelRequest) (LogicalGroupRouteModelInfo, error) ListLogicalGroupRouteModels func(context.Context, ListLogicalGroupRouteModelsRequest) ([]LogicalGroupRouteModelInfo, error) + AppendRouteDecisionLog func(context.Context, AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) + ListRouteDecisionLogs func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) + AppendRouteFailoverEvent func(context.Context, AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error) + ListRouteFailoverEvents func(context.Context, ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error) + AppendRouteStickyAudit func(context.Context, AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error) + ListRouteStickyAudit func(context.Context, ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error) CreateProviderDraft func(context.Context, CreateProviderDraftRequest) (ProviderDraftInfo, error) ListProviderDrafts func(context.Context, ListProviderDraftsRequest) ([]ProviderDraftInfo, error) GetProviderDraft func(context.Context, string) (ProviderDraftInfo, error) @@ -368,6 +374,24 @@ func NewAPIHandlerWithAuth(adminAuth AdminAuthConfig, actions ActionSet) http.Ha mux.Handle("GET /api/logical-groups/{groupID}/routes/{routeID}/models", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handleListLogicalGroupRouteModels(w, r, actions.ListLogicalGroupRouteModels) }))) + mux.Handle("POST /api/routing/logs/decisions", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleAppendRouteDecisionLog(w, r, actions.AppendRouteDecisionLog) + }))) + mux.Handle("GET /api/routing/logs/decisions", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleListRouteDecisionLogs(w, r, actions.ListRouteDecisionLogs) + }))) + mux.Handle("POST /api/routing/logs/failovers", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleAppendRouteFailoverEvent(w, r, actions.AppendRouteFailoverEvent) + }))) + mux.Handle("GET /api/routing/logs/failovers", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleListRouteFailoverEvents(w, r, actions.ListRouteFailoverEvents) + }))) + mux.Handle("POST /api/routing/logs/sticky-audit", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleAppendRouteStickyAudit(w, r, actions.AppendRouteStickyAudit) + }))) + mux.Handle("GET /api/routing/logs/sticky-audit", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handleListRouteStickyAudit(w, r, actions.ListRouteStickyAudit) + }))) mux.Handle("POST /api/provider-drafts", requireAdminAccess(adminAuth, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { handleCreateProviderDraft(w, r, actions.CreateProviderDraft) }))) @@ -1172,6 +1196,7 @@ func classifyError(err error) *httpError { } func NewActionSet(sqliteDSN string) ActionSet { + routeLogWriter := newLazyRouteLogWriter(sqliteDSN) return ActionSet{ CreateBatchImportRun: buildCreateBatchImportRunAction(sqliteDSN), ListBatchImportRuns: buildListBatchImportRunsAction(sqliteDSN), @@ -1192,6 +1217,12 @@ func NewActionSet(sqliteDSN string) ActionSet { DeleteLogicalGroupRoute: buildDeleteLogicalGroupRouteAction(sqliteDSN), CreateLogicalGroupRouteModel: buildCreateLogicalGroupRouteModelAction(sqliteDSN), ListLogicalGroupRouteModels: buildListLogicalGroupRouteModelsAction(sqliteDSN), + AppendRouteDecisionLog: buildAppendRouteDecisionLogAction(routeLogWriter, sqliteDSN), + ListRouteDecisionLogs: buildListRouteDecisionLogsAction(sqliteDSN), + AppendRouteFailoverEvent: buildAppendRouteFailoverEventAction(routeLogWriter, sqliteDSN), + ListRouteFailoverEvents: buildListRouteFailoverEventsAction(sqliteDSN), + AppendRouteStickyAudit: buildAppendRouteStickyAuditAction(routeLogWriter, sqliteDSN), + ListRouteStickyAudit: buildListRouteStickyAuditAction(sqliteDSN), CreateProviderDraft: func(ctx context.Context, req CreateProviderDraftRequest) (ProviderDraftInfo, error) { store, err := sqlite.Open(ctx, sqliteDSN) if err != nil { diff --git a/internal/app/route_logging_api.go b/internal/app/route_logging_api.go new file mode 100644 index 00000000..6a16017c --- /dev/null +++ b/internal/app/route_logging_api.go @@ -0,0 +1,636 @@ +package app + +import ( + "context" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + + "sub2api-cn-relay-manager/internal/routing" + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +type AppendRouteDecisionLogRequest struct { + RequestID string `json:"request_id"` + LogicalGroupID string `json:"logical_group_id"` + PublicModel string `json:"public_model"` + UserKey string `json:"user_key,omitempty"` + ConversationKey string `json:"conversation_key,omitempty"` + StickyKey string `json:"sticky_key,omitempty"` + StickyKeyType string `json:"sticky_key_type,omitempty"` + StickyHit bool `json:"sticky_hit,omitempty"` + SelectedRouteID string `json:"selected_route_id"` + SelectedShadowGroupID string `json:"selected_shadow_group_id"` + FallbackUsed bool `json:"fallback_used,omitempty"` + ErrorClass string `json:"error_class,omitempty"` + UpstreamStatus int `json:"upstream_status,omitempty"` + LatencyMS int `json:"latency_ms,omitempty"` + Sync bool `json:"sync,omitempty"` +} + +type ListRouteDecisionLogsRequest struct { + RequestID string + LogicalGroupID string + PublicModel string + SelectedRouteID string + StickyKey string + Limit int +} + +type RouteDecisionLogInfo struct { + ID int64 `json:"id"` + RequestID string `json:"request_id"` + LogicalGroupID string `json:"logical_group_id"` + PublicModel string `json:"public_model"` + UserKey string `json:"user_key,omitempty"` + ConversationKey string `json:"conversation_key,omitempty"` + StickyKey string `json:"sticky_key,omitempty"` + StickyKeyType string `json:"sticky_key_type,omitempty"` + StickyHit bool `json:"sticky_hit"` + SelectedRouteID string `json:"selected_route_id"` + SelectedShadowGroupID string `json:"selected_shadow_group_id"` + FallbackUsed bool `json:"fallback_used"` + ErrorClass string `json:"error_class,omitempty"` + UpstreamStatus int `json:"upstream_status,omitempty"` + LatencyMS int `json:"latency_ms,omitempty"` + CreatedAt string `json:"created_at,omitempty"` +} + +type AppendRouteFailoverEventRequest struct { + RequestID string `json:"request_id"` + LogicalGroupID string `json:"logical_group_id"` + PublicModel string `json:"public_model"` + FromRouteID string `json:"from_route_id"` + ToRouteID string `json:"to_route_id"` + Reason string `json:"reason"` + FailureCount int `json:"failure_count,omitempty"` + Sync bool `json:"sync,omitempty"` +} + +type ListRouteFailoverEventsRequest struct { + RequestID string + LogicalGroupID string + PublicModel string + FromRouteID string + ToRouteID string + Limit int +} + +type RouteFailoverEventInfo struct { + ID int64 `json:"id"` + RequestID string `json:"request_id"` + LogicalGroupID string `json:"logical_group_id"` + PublicModel string `json:"public_model"` + FromRouteID string `json:"from_route_id"` + ToRouteID string `json:"to_route_id"` + Reason string `json:"reason"` + FailureCount int `json:"failure_count"` + CreatedAt string `json:"created_at,omitempty"` +} + +type AppendRouteStickyAuditRequest struct { + StickyKey string `json:"sticky_key"` + StickyKeyType string `json:"sticky_key_type"` + LogicalGroupID string `json:"logical_group_id"` + PublicModel string `json:"public_model"` + RouteID string `json:"route_id"` + Action string `json:"action"` + ExpiresAt string `json:"expires_at,omitempty"` + Sync bool `json:"sync,omitempty"` +} + +type ListRouteStickyAuditRequest struct { + StickyKey string + StickyKeyType string + LogicalGroupID string + PublicModel string + RouteID string + Action string + Limit int +} + +type RouteStickyAuditInfo struct { + ID int64 `json:"id"` + StickyKey string `json:"sticky_key"` + StickyKeyType string `json:"sticky_key_type"` + LogicalGroupID string `json:"logical_group_id"` + PublicModel string `json:"public_model"` + RouteID string `json:"route_id"` + Action string `json:"action"` + ExpiresAt string `json:"expires_at,omitempty"` + CreatedAt string `json:"created_at,omitempty"` +} + +func handleAppendRouteDecisionLog(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-decision-log action is not configured"}) + return + } + var req AppendRouteDecisionLogRequest + if err := decodeJSON(r, &req); err != nil { + writeHTTPError(w, err) + return + } + item, err := fn(r.Context(), req) + if err != nil { + writeHTTPError(w, classifyError(err)) + return + } + writeJSON(w, http.StatusCreated, map[string]any{"decision_log": item}) +} + +func handleListRouteDecisionLogs(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-decision-logs action is not configured"}) + return + } + req, err := decodeRouteDecisionLogFilter(r) + if err != nil { + writeHTTPError(w, err) + return + } + items, actionErr := fn(r.Context(), req) + if actionErr != nil { + writeHTTPError(w, classifyError(actionErr)) + return + } + writeJSON(w, http.StatusOK, map[string]any{"decision_logs": items}) +} + +func handleAppendRouteFailoverEvent(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-failover-event action is not configured"}) + return + } + var req AppendRouteFailoverEventRequest + if err := decodeJSON(r, &req); err != nil { + writeHTTPError(w, err) + return + } + item, err := fn(r.Context(), req) + if err != nil { + writeHTTPError(w, classifyError(err)) + return + } + writeJSON(w, http.StatusCreated, map[string]any{"failover_event": item}) +} + +func handleListRouteFailoverEvents(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-failover-events action is not configured"}) + return + } + req, err := decodeRouteFailoverEventFilter(r) + if err != nil { + writeHTTPError(w, err) + return + } + items, actionErr := fn(r.Context(), req) + if actionErr != nil { + writeHTTPError(w, classifyError(actionErr)) + return + } + writeJSON(w, http.StatusOK, map[string]any{"failover_events": items}) +} + +func handleAppendRouteStickyAudit(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-sticky-audit action is not configured"}) + return + } + var req AppendRouteStickyAuditRequest + if err := decodeJSON(r, &req); err != nil { + writeHTTPError(w, err) + return + } + item, err := fn(r.Context(), req) + if err != nil { + writeHTTPError(w, classifyError(err)) + return + } + writeJSON(w, http.StatusCreated, map[string]any{"sticky_audit": item}) +} + +func handleListRouteStickyAudit(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error)) { + if fn == nil { + writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-sticky-audit action is not configured"}) + return + } + req, err := decodeRouteStickyAuditFilter(r) + if err != nil { + writeHTTPError(w, err) + return + } + items, actionErr := fn(r.Context(), req) + if actionErr != nil { + writeHTTPError(w, classifyError(actionErr)) + return + } + writeJSON(w, http.StatusOK, map[string]any{"sticky_audits": items}) +} + +type lazyRouteLogWriter struct { + sqliteDSN string + mu sync.Mutex + writer *routing.AsyncLogWriter +} + +func newLazyRouteLogWriter(sqliteDSN string) *lazyRouteLogWriter { + return &lazyRouteLogWriter{sqliteDSN: sqliteDSN} +} + +func (l *lazyRouteLogWriter) get(ctx context.Context) (*routing.AsyncLogWriter, error) { + l.mu.Lock() + defer l.mu.Unlock() + + if l.writer != nil { + return l.writer, nil + } + + writer, err := routing.NewSQLiteLogWriter(ctx, l.sqliteDSN, routing.AsyncLogWriterOptions{}) + if err != nil { + return nil, err + } + l.writer = writer + return l.writer, nil +} + +func buildAppendRouteDecisionLogAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) { + return func(ctx context.Context, req AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) { + writer, err := writerSource.get(ctx) + if err != nil { + return RouteDecisionLogInfo{}, err + } + + event := routing.RouteDecisionEvent{ + RequestID: strings.TrimSpace(req.RequestID), + LogicalGroupID: strings.TrimSpace(req.LogicalGroupID), + PublicModel: strings.TrimSpace(req.PublicModel), + UserKey: strings.TrimSpace(req.UserKey), + ConversationKey: strings.TrimSpace(req.ConversationKey), + StickyKey: strings.TrimSpace(req.StickyKey), + StickyKeyType: strings.TrimSpace(req.StickyKeyType), + StickyHit: req.StickyHit, + SelectedRouteID: strings.TrimSpace(req.SelectedRouteID), + SelectedShadowGroupID: strings.TrimSpace(req.SelectedShadowGroupID), + FallbackUsed: req.FallbackUsed, + ErrorClass: strings.TrimSpace(req.ErrorClass), + UpstreamStatus: req.UpstreamStatus, + LatencyMS: req.LatencyMS, + } + if err := writer.AppendDecision(ctx, event); err != nil { + return RouteDecisionLogInfo{}, err + } + if !req.Sync { + return routeDecisionLogRowToInfo(sqlite.RouteDecisionLog{ + RequestID: event.RequestID, + LogicalGroupID: event.LogicalGroupID, + PublicModel: event.PublicModel, + UserKey: event.UserKey, + ConversationKey: event.ConversationKey, + StickyKey: event.StickyKey, + StickyKeyType: event.StickyKeyType, + StickyHit: event.StickyHit, + SelectedRouteID: event.SelectedRouteID, + SelectedShadowGroupID: event.SelectedShadowGroupID, + FallbackUsed: event.FallbackUsed, + ErrorClass: event.ErrorClass, + UpstreamStatus: event.UpstreamStatus, + LatencyMS: event.LatencyMS, + }), nil + } + if err := writer.Flush(ctx); err != nil { + return RouteDecisionLogInfo{}, err + } + + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return RouteDecisionLogInfo{}, err + } + defer store.Close() + + items, err := store.RouteDecisionLogs().ListRecent(ctx, sqlite.RouteDecisionLogFilter{ + RequestID: event.RequestID, + Limit: 1, + }) + if err != nil { + return RouteDecisionLogInfo{}, err + } + if len(items) == 0 { + return RouteDecisionLogInfo{}, fmt.Errorf("route decision log %q not found after append", event.RequestID) + } + return routeDecisionLogRowToInfo(items[0]), nil + } +} + +func buildListRouteDecisionLogsAction(sqliteDSN string) func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) { + return func(ctx context.Context, req ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return nil, err + } + defer store.Close() + + rows, err := store.RouteDecisionLogs().ListRecent(ctx, sqlite.RouteDecisionLogFilter{ + RequestID: strings.TrimSpace(req.RequestID), + LogicalGroupID: strings.TrimSpace(req.LogicalGroupID), + PublicModel: strings.TrimSpace(req.PublicModel), + SelectedRouteID: strings.TrimSpace(req.SelectedRouteID), + StickyKey: strings.TrimSpace(req.StickyKey), + Limit: req.Limit, + }) + if err != nil { + return nil, err + } + return routeDecisionLogRowsToInfo(rows), nil + } +} + +func buildAppendRouteFailoverEventAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error) { + return func(ctx context.Context, req AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error) { + writer, err := writerSource.get(ctx) + if err != nil { + return RouteFailoverEventInfo{}, err + } + + event := routing.RouteFailoverEvent{ + RequestID: strings.TrimSpace(req.RequestID), + LogicalGroupID: strings.TrimSpace(req.LogicalGroupID), + PublicModel: strings.TrimSpace(req.PublicModel), + FromRouteID: strings.TrimSpace(req.FromRouteID), + ToRouteID: strings.TrimSpace(req.ToRouteID), + Reason: strings.TrimSpace(req.Reason), + FailureCount: req.FailureCount, + } + if err := writer.AppendFailover(ctx, event); err != nil { + return RouteFailoverEventInfo{}, err + } + if !req.Sync { + return routeFailoverEventRowToInfo(sqlite.RouteFailoverEvent{ + RequestID: event.RequestID, + LogicalGroupID: event.LogicalGroupID, + PublicModel: event.PublicModel, + FromRouteID: event.FromRouteID, + ToRouteID: event.ToRouteID, + Reason: event.Reason, + FailureCount: event.FailureCount, + }), nil + } + if err := writer.Flush(ctx); err != nil { + return RouteFailoverEventInfo{}, err + } + + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return RouteFailoverEventInfo{}, err + } + defer store.Close() + + items, err := store.RouteFailoverEvents().ListRecent(ctx, sqlite.RouteFailoverEventFilter{ + RequestID: event.RequestID, + Limit: 1, + }) + if err != nil { + return RouteFailoverEventInfo{}, err + } + if len(items) == 0 { + return RouteFailoverEventInfo{}, fmt.Errorf("route failover event %q not found after append", event.RequestID) + } + return routeFailoverEventRowToInfo(items[0]), nil + } +} + +func buildListRouteFailoverEventsAction(sqliteDSN string) func(context.Context, ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error) { + return func(ctx context.Context, req ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return nil, err + } + defer store.Close() + + rows, err := store.RouteFailoverEvents().ListRecent(ctx, sqlite.RouteFailoverEventFilter{ + RequestID: strings.TrimSpace(req.RequestID), + LogicalGroupID: strings.TrimSpace(req.LogicalGroupID), + PublicModel: strings.TrimSpace(req.PublicModel), + FromRouteID: strings.TrimSpace(req.FromRouteID), + ToRouteID: strings.TrimSpace(req.ToRouteID), + Limit: req.Limit, + }) + if err != nil { + return nil, err + } + return routeFailoverEventRowsToInfo(rows), nil + } +} + +func buildAppendRouteStickyAuditAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error) { + return func(ctx context.Context, req AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error) { + writer, err := writerSource.get(ctx) + if err != nil { + return RouteStickyAuditInfo{}, err + } + + event := routing.RouteStickyAuditEvent{ + StickyKey: strings.TrimSpace(req.StickyKey), + StickyKeyType: strings.TrimSpace(req.StickyKeyType), + LogicalGroupID: strings.TrimSpace(req.LogicalGroupID), + PublicModel: strings.TrimSpace(req.PublicModel), + RouteID: strings.TrimSpace(req.RouteID), + Action: strings.TrimSpace(req.Action), + ExpiresAt: strings.TrimSpace(req.ExpiresAt), + } + if err := writer.AppendStickyAudit(ctx, event); err != nil { + return RouteStickyAuditInfo{}, err + } + if !req.Sync { + return routeStickyAuditRowToInfo(sqlite.RouteStickyAudit{ + StickyKey: event.StickyKey, + StickyKeyType: event.StickyKeyType, + LogicalGroupID: event.LogicalGroupID, + PublicModel: event.PublicModel, + RouteID: event.RouteID, + Action: event.Action, + ExpiresAt: event.ExpiresAt, + }), nil + } + if err := writer.Flush(ctx); err != nil { + return RouteStickyAuditInfo{}, err + } + + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return RouteStickyAuditInfo{}, err + } + defer store.Close() + + items, err := store.RouteStickyAudit().ListRecent(ctx, sqlite.RouteStickyAuditFilter{ + StickyKey: event.StickyKey, + Action: event.Action, + Limit: 1, + }) + if err != nil { + return RouteStickyAuditInfo{}, err + } + if len(items) == 0 { + return RouteStickyAuditInfo{}, fmt.Errorf("route sticky audit %q not found after append", event.StickyKey) + } + return routeStickyAuditRowToInfo(items[0]), nil + } +} + +func buildListRouteStickyAuditAction(sqliteDSN string) func(context.Context, ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error) { + return func(ctx context.Context, req ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return nil, err + } + defer store.Close() + + rows, err := store.RouteStickyAudit().ListRecent(ctx, sqlite.RouteStickyAuditFilter{ + StickyKey: strings.TrimSpace(req.StickyKey), + StickyKeyType: strings.TrimSpace(req.StickyKeyType), + LogicalGroupID: strings.TrimSpace(req.LogicalGroupID), + PublicModel: strings.TrimSpace(req.PublicModel), + RouteID: strings.TrimSpace(req.RouteID), + Action: strings.TrimSpace(req.Action), + Limit: req.Limit, + }) + if err != nil { + return nil, err + } + return routeStickyAuditRowsToInfo(rows), nil + } +} + +func decodeRouteDecisionLogFilter(r *http.Request) (ListRouteDecisionLogsRequest, *httpError) { + limit, err := parseOptionalLimit(r.URL.Query().Get("limit")) + if err != nil { + return ListRouteDecisionLogsRequest{}, err + } + return ListRouteDecisionLogsRequest{ + RequestID: strings.TrimSpace(r.URL.Query().Get("request_id")), + LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")), + PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")), + SelectedRouteID: strings.TrimSpace(r.URL.Query().Get("selected_route_id")), + StickyKey: strings.TrimSpace(r.URL.Query().Get("sticky_key")), + Limit: limit, + }, nil +} + +func decodeRouteFailoverEventFilter(r *http.Request) (ListRouteFailoverEventsRequest, *httpError) { + limit, err := parseOptionalLimit(r.URL.Query().Get("limit")) + if err != nil { + return ListRouteFailoverEventsRequest{}, err + } + return ListRouteFailoverEventsRequest{ + RequestID: strings.TrimSpace(r.URL.Query().Get("request_id")), + LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")), + PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")), + FromRouteID: strings.TrimSpace(r.URL.Query().Get("from_route_id")), + ToRouteID: strings.TrimSpace(r.URL.Query().Get("to_route_id")), + Limit: limit, + }, nil +} + +func decodeRouteStickyAuditFilter(r *http.Request) (ListRouteStickyAuditRequest, *httpError) { + limit, err := parseOptionalLimit(r.URL.Query().Get("limit")) + if err != nil { + return ListRouteStickyAuditRequest{}, err + } + return ListRouteStickyAuditRequest{ + StickyKey: strings.TrimSpace(r.URL.Query().Get("sticky_key")), + StickyKeyType: strings.TrimSpace(r.URL.Query().Get("sticky_key_type")), + LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")), + PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")), + RouteID: strings.TrimSpace(r.URL.Query().Get("route_id")), + Action: strings.TrimSpace(r.URL.Query().Get("action")), + Limit: limit, + }, nil +} + +func parseOptionalLimit(raw string) (int, *httpError) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, nil + } + limit, err := strconv.Atoi(raw) + if err != nil || limit <= 0 { + return 0, &httpError{StatusCode: http.StatusBadRequest, Code: "bad_request", Message: "limit must be a positive integer"} + } + return limit, nil +} + +func routeDecisionLogRowToInfo(row sqlite.RouteDecisionLog) RouteDecisionLogInfo { + return RouteDecisionLogInfo{ + ID: row.ID, + RequestID: row.RequestID, + LogicalGroupID: row.LogicalGroupID, + PublicModel: row.PublicModel, + UserKey: row.UserKey, + ConversationKey: row.ConversationKey, + StickyKey: row.StickyKey, + StickyKeyType: row.StickyKeyType, + StickyHit: row.StickyHit, + SelectedRouteID: row.SelectedRouteID, + SelectedShadowGroupID: row.SelectedShadowGroupID, + FallbackUsed: row.FallbackUsed, + ErrorClass: row.ErrorClass, + UpstreamStatus: row.UpstreamStatus, + LatencyMS: row.LatencyMS, + CreatedAt: row.CreatedAt, + } +} + +func routeDecisionLogRowsToInfo(rows []sqlite.RouteDecisionLog) []RouteDecisionLogInfo { + result := make([]RouteDecisionLogInfo, 0, len(rows)) + for _, row := range rows { + result = append(result, routeDecisionLogRowToInfo(row)) + } + return result +} + +func routeFailoverEventRowToInfo(row sqlite.RouteFailoverEvent) RouteFailoverEventInfo { + return RouteFailoverEventInfo{ + ID: row.ID, + RequestID: row.RequestID, + LogicalGroupID: row.LogicalGroupID, + PublicModel: row.PublicModel, + FromRouteID: row.FromRouteID, + ToRouteID: row.ToRouteID, + Reason: row.Reason, + FailureCount: row.FailureCount, + CreatedAt: row.CreatedAt, + } +} + +func routeFailoverEventRowsToInfo(rows []sqlite.RouteFailoverEvent) []RouteFailoverEventInfo { + result := make([]RouteFailoverEventInfo, 0, len(rows)) + for _, row := range rows { + result = append(result, routeFailoverEventRowToInfo(row)) + } + return result +} + +func routeStickyAuditRowToInfo(row sqlite.RouteStickyAudit) RouteStickyAuditInfo { + return RouteStickyAuditInfo{ + ID: row.ID, + StickyKey: row.StickyKey, + StickyKeyType: row.StickyKeyType, + LogicalGroupID: row.LogicalGroupID, + PublicModel: row.PublicModel, + RouteID: row.RouteID, + Action: row.Action, + ExpiresAt: row.ExpiresAt, + CreatedAt: row.CreatedAt, + } +} + +func routeStickyAuditRowsToInfo(rows []sqlite.RouteStickyAudit) []RouteStickyAuditInfo { + result := make([]RouteStickyAuditInfo, 0, len(rows)) + for _, row := range rows { + result = append(result, routeStickyAuditRowToInfo(row)) + } + return result +} diff --git a/internal/app/route_logging_api_test.go b/internal/app/route_logging_api_test.go new file mode 100644 index 00000000..69a4f7e8 --- /dev/null +++ b/internal/app/route_logging_api_test.go @@ -0,0 +1,145 @@ +package app + +import ( + "context" + "net/http" + "path/filepath" + "testing" +) + +func TestAPIAppendRouteDecisionLogReturnsCreated(t *testing.T) { + handler := NewAPIHandler("secret-token", ActionSet{ + AppendRouteDecisionLog: func(_ context.Context, req AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) { + if req.RequestID != "req-1" { + t.Fatalf("RequestID = %q, want req-1", req.RequestID) + } + return RouteDecisionLogInfo{ + ID: 1, + RequestID: req.RequestID, + LogicalGroupID: req.LogicalGroupID, + PublicModel: req.PublicModel, + SelectedRouteID: req.SelectedRouteID, + SelectedShadowGroupID: req.SelectedShadowGroupID, + }, nil + }, + }) + + request := httptestRequest(t, http.MethodPost, "/api/routing/logs/decisions", map[string]any{ + "request_id": "req-1", + "logical_group_id": "gpt-shared", + "public_model": "gpt-5.4", + "selected_route_id": "asxs", + "selected_shadow_group_id": "gpt-shared__asxs", + "sync": true, + }, "secret-token") + response := httptestRecorder(handler, request) + assertStatusCode(t, response, http.StatusCreated) + assertJSONContains(t, response.Body().Bytes(), "decision_log.request_id", "req-1") +} + +func TestAPIListRouteDecisionLogsRejectsInvalidLimit(t *testing.T) { + handler := NewAPIHandler("secret-token", ActionSet{ + ListRouteDecisionLogs: func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) { + t.Fatal("ListRouteDecisionLogs should not be called") + return nil, nil + }, + }) + + request := httptestRequest(t, http.MethodGet, "/api/routing/logs/decisions?limit=zero", nil, "secret-token") + response := httptestRecorder(handler, request) + assertStatusCode(t, response, http.StatusBadRequest) +} + +func TestNewActionSetRouteLoggingFlow(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "route-logging.db") + dsn := "file:" + filepath.ToSlash(dbPath) + "?_busy_timeout=5000" + actions := NewActionSet(dsn) + ctx := context.Background() + + decision, err := actions.AppendRouteDecisionLog(ctx, AppendRouteDecisionLogRequest{ + RequestID: "req-1", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + StickyKey: "sticky-1", + StickyKeyType: "conversation", + StickyHit: true, + SelectedRouteID: "asxs", + SelectedShadowGroupID: "gpt-shared__asxs", + UpstreamStatus: 200, + LatencyMS: 90, + Sync: true, + }) + if err != nil { + t.Fatalf("AppendRouteDecisionLog() error = %v", err) + } + if decision.SelectedRouteID != "asxs" { + t.Fatalf("AppendRouteDecisionLog() = %+v, want selected route asxs", decision) + } + + failover, err := actions.AppendRouteFailoverEvent(ctx, AppendRouteFailoverEventRequest{ + RequestID: "req-2", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + FromRouteID: "asxs", + ToRouteID: "codex2api", + Reason: "timeout", + FailureCount: 2, + Sync: true, + }) + if err != nil { + t.Fatalf("AppendRouteFailoverEvent() error = %v", err) + } + if failover.ToRouteID != "codex2api" { + t.Fatalf("AppendRouteFailoverEvent() = %+v, want to_route_id codex2api", failover) + } + + sticky, err := actions.AppendRouteStickyAudit(ctx, AppendRouteStickyAuditRequest{ + StickyKey: "sticky-1", + StickyKeyType: "conversation", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + RouteID: "asxs", + Action: "bind", + ExpiresAt: "2026-05-28T18:00:00Z", + Sync: true, + }) + if err != nil { + t.Fatalf("AppendRouteStickyAudit() error = %v", err) + } + if sticky.Action != "bind" { + t.Fatalf("AppendRouteStickyAudit() = %+v, want action bind", sticky) + } + + decisions, err := actions.ListRouteDecisionLogs(ctx, ListRouteDecisionLogsRequest{ + RequestID: "req-1", + Limit: 10, + }) + if err != nil { + t.Fatalf("ListRouteDecisionLogs() error = %v", err) + } + if len(decisions) != 1 || decisions[0].StickyKey != "sticky-1" { + t.Fatalf("ListRouteDecisionLogs() = %+v, want sticky-1", decisions) + } + + failovers, err := actions.ListRouteFailoverEvents(ctx, ListRouteFailoverEventsRequest{ + RequestID: "req-2", + Limit: 10, + }) + if err != nil { + t.Fatalf("ListRouteFailoverEvents() error = %v", err) + } + if len(failovers) != 1 || failovers[0].FailureCount != 2 { + t.Fatalf("ListRouteFailoverEvents() = %+v, want failure_count 2", failovers) + } + + stickyAudits, err := actions.ListRouteStickyAudit(ctx, ListRouteStickyAuditRequest{ + StickyKey: "sticky-1", + Limit: 10, + }) + if err != nil { + t.Fatalf("ListRouteStickyAudit() error = %v", err) + } + if len(stickyAudits) != 1 || stickyAudits[0].RouteID != "asxs" { + t.Fatalf("ListRouteStickyAudit() = %+v, want route asxs", stickyAudits) + } +} diff --git a/internal/routing/logwriter.go b/internal/routing/logwriter.go new file mode 100644 index 00000000..a3dd3325 --- /dev/null +++ b/internal/routing/logwriter.go @@ -0,0 +1,319 @@ +package routing + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +const ( + defaultQueueSize = 128 + defaultFlushInterval = 2 * time.Second + defaultMaxBatchSize = 32 + defaultFallbackWriteTimeout = 3 * time.Second +) + +type RouteDecisionEvent struct { + RequestID string + LogicalGroupID string + PublicModel string + UserKey string + ConversationKey string + StickyKey string + StickyKeyType string + StickyHit bool + SelectedRouteID string + SelectedShadowGroupID string + FallbackUsed bool + ErrorClass string + UpstreamStatus int + LatencyMS int +} + +type RouteFailoverEvent struct { + RequestID string + LogicalGroupID string + PublicModel string + FromRouteID string + ToRouteID string + Reason string + FailureCount int +} + +type RouteStickyAuditEvent struct { + StickyKey string + StickyKeyType string + LogicalGroupID string + PublicModel string + RouteID string + Action string + ExpiresAt string +} + +type RouteDecisionLogger interface { + AppendDecision(context.Context, RouteDecisionEvent) error + AppendFailover(context.Context, RouteFailoverEvent) error + AppendStickyAudit(context.Context, RouteStickyAuditEvent) error + Flush(context.Context) error + Close() error +} + +type routeLogSink interface { + AppendDecision(context.Context, RouteDecisionEvent) error + AppendFailover(context.Context, RouteFailoverEvent) error + AppendStickyAudit(context.Context, RouteStickyAuditEvent) error + Close() error +} + +type AsyncLogWriterOptions struct { + QueueSize int + FlushInterval time.Duration + MaxBatchSize int + FallbackWriteTimeout time.Duration +} + +type queuedLogEvent struct { + decision *RouteDecisionEvent + failover *RouteFailoverEvent + sticky *RouteStickyAuditEvent +} + +type AsyncLogWriter struct { + sink routeLogSink + queue chan queuedLogEvent + flushRequests chan chan error + closeCh chan struct{} + closedCh chan struct{} + flushInterval time.Duration + maxBatchSize int + fallbackWriteTimeout time.Duration + once sync.Once +} + +func NewSQLiteLogWriter(ctx context.Context, sqliteDSN string, opts AsyncLogWriterOptions) (*AsyncLogWriter, error) { + store, err := sqlite.Open(ctx, sqliteDSN) + if err != nil { + return nil, err + } + return NewAsyncLogWriter(newSQLiteRouteLogSink(store), opts), nil +} + +func NewAsyncLogWriter(sink routeLogSink, opts AsyncLogWriterOptions) *AsyncLogWriter { + if opts.QueueSize <= 0 { + opts.QueueSize = defaultQueueSize + } + if opts.FlushInterval <= 0 { + opts.FlushInterval = defaultFlushInterval + } + if opts.MaxBatchSize <= 0 { + opts.MaxBatchSize = defaultMaxBatchSize + } + if opts.FallbackWriteTimeout <= 0 { + opts.FallbackWriteTimeout = defaultFallbackWriteTimeout + } + + writer := &AsyncLogWriter{ + sink: sink, + queue: make(chan queuedLogEvent, opts.QueueSize), + flushRequests: make(chan chan error), + closeCh: make(chan struct{}), + closedCh: make(chan struct{}), + flushInterval: opts.FlushInterval, + maxBatchSize: opts.MaxBatchSize, + fallbackWriteTimeout: opts.FallbackWriteTimeout, + } + go writer.loop() + return writer +} + +func (w *AsyncLogWriter) AppendDecision(ctx context.Context, event RouteDecisionEvent) error { + return w.enqueue(ctx, queuedLogEvent{decision: &event}) +} + +func (w *AsyncLogWriter) AppendFailover(ctx context.Context, event RouteFailoverEvent) error { + return w.enqueue(ctx, queuedLogEvent{failover: &event}) +} + +func (w *AsyncLogWriter) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error { + return w.enqueue(ctx, queuedLogEvent{sticky: &event}) +} + +func (w *AsyncLogWriter) Flush(ctx context.Context) error { + resp := make(chan error, 1) + select { + case <-ctx.Done(): + return ctx.Err() + case <-w.closedCh: + return nil + case w.flushRequests <- resp: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-resp: + return err + } +} + +func (w *AsyncLogWriter) Close() error { + var err error + w.once.Do(func() { + close(w.closeCh) + <-w.closedCh + err = w.sink.Close() + }) + return err +} + +func (w *AsyncLogWriter) loop() { + ticker := time.NewTicker(w.flushInterval) + defer ticker.Stop() + defer close(w.closedCh) + + batch := make([]queuedLogEvent, 0, w.maxBatchSize) + for { + select { + case item := <-w.queue: + batch = append(batch, item) + if len(batch) >= w.maxBatchSize { + w.flushBatch(batch) + batch = batch[:0] + } + case resp := <-w.flushRequests: + batch = w.drainQueue(batch) + err := w.flushBatch(batch) + batch = batch[:0] + resp <- err + case <-ticker.C: + batch = w.drainQueue(batch) + if len(batch) == 0 { + continue + } + w.flushBatch(batch) + batch = batch[:0] + case <-w.closeCh: + batch = w.drainQueue(batch) + w.flushBatch(batch) + return + } + } +} + +func (w *AsyncLogWriter) enqueue(ctx context.Context, item queuedLogEvent) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-w.closedCh: + return fmt.Errorf("route log writer is closed") + case w.queue <- item: + return nil + default: + fallbackCtx, cancel := context.WithTimeout(context.Background(), w.fallbackWriteTimeout) + defer cancel() + return w.writeOne(fallbackCtx, item) + } +} + +func (w *AsyncLogWriter) flushBatch(batch []queuedLogEvent) error { + if len(batch) == 0 { + return nil + } + + var firstErr error + for _, item := range batch { + if err := w.writeOne(context.Background(), item); err != nil { + if firstErr == nil { + firstErr = err + } + log.Printf("routing: flush route log event failed: %v", err) + } + } + return firstErr +} + +func (w *AsyncLogWriter) drainQueue(batch []queuedLogEvent) []queuedLogEvent { + for { + select { + case item := <-w.queue: + batch = append(batch, item) + default: + return batch + } + } +} + +func (w *AsyncLogWriter) writeOne(ctx context.Context, item queuedLogEvent) error { + switch { + case item.decision != nil: + return w.sink.AppendDecision(ctx, *item.decision) + case item.failover != nil: + return w.sink.AppendFailover(ctx, *item.failover) + case item.sticky != nil: + return w.sink.AppendStickyAudit(ctx, *item.sticky) + default: + return fmt.Errorf("route log event payload is empty") + } +} + +type sqliteRouteLogSink struct { + store *sqlite.DB +} + +func newSQLiteRouteLogSink(store *sqlite.DB) *sqliteRouteLogSink { + return &sqliteRouteLogSink{store: store} +} + +func (s *sqliteRouteLogSink) AppendDecision(ctx context.Context, event RouteDecisionEvent) error { + _, err := s.store.RouteDecisionLogs().Create(ctx, sqlite.RouteDecisionLog{ + RequestID: event.RequestID, + LogicalGroupID: event.LogicalGroupID, + PublicModel: event.PublicModel, + UserKey: event.UserKey, + ConversationKey: event.ConversationKey, + StickyKey: event.StickyKey, + StickyKeyType: event.StickyKeyType, + StickyHit: event.StickyHit, + SelectedRouteID: event.SelectedRouteID, + SelectedShadowGroupID: event.SelectedShadowGroupID, + FallbackUsed: event.FallbackUsed, + ErrorClass: event.ErrorClass, + UpstreamStatus: event.UpstreamStatus, + LatencyMS: event.LatencyMS, + }) + return err +} + +func (s *sqliteRouteLogSink) AppendFailover(ctx context.Context, event RouteFailoverEvent) error { + _, err := s.store.RouteFailoverEvents().Create(ctx, sqlite.RouteFailoverEvent{ + RequestID: event.RequestID, + LogicalGroupID: event.LogicalGroupID, + PublicModel: event.PublicModel, + FromRouteID: event.FromRouteID, + ToRouteID: event.ToRouteID, + Reason: event.Reason, + FailureCount: event.FailureCount, + }) + return err +} + +func (s *sqliteRouteLogSink) AppendStickyAudit(ctx context.Context, event RouteStickyAuditEvent) error { + _, err := s.store.RouteStickyAudit().Create(ctx, sqlite.RouteStickyAudit{ + StickyKey: event.StickyKey, + StickyKeyType: event.StickyKeyType, + LogicalGroupID: event.LogicalGroupID, + PublicModel: event.PublicModel, + RouteID: event.RouteID, + Action: event.Action, + ExpiresAt: event.ExpiresAt, + }) + return err +} + +func (s *sqliteRouteLogSink) Close() error { + return s.store.Close() +} diff --git a/internal/routing/logwriter_test.go b/internal/routing/logwriter_test.go new file mode 100644 index 00000000..ccd6334b --- /dev/null +++ b/internal/routing/logwriter_test.go @@ -0,0 +1,203 @@ +package routing + +import ( + "context" + "errors" + "path/filepath" + "testing" + "time" + + "sub2api-cn-relay-manager/internal/store/sqlite" +) + +func TestSQLiteLogWriterAppendAndFlush(t *testing.T) { + t.Parallel() + + dbPath := filepath.Join(t.TempDir(), "route-logs.db") + dsn := "file:" + filepath.ToSlash(dbPath) + "?_busy_timeout=5000" + writer, err := NewSQLiteLogWriter(context.Background(), dsn, AsyncLogWriterOptions{ + QueueSize: 4, + FlushInterval: time.Hour, + MaxBatchSize: 8, + }) + if err != nil { + t.Fatalf("NewSQLiteLogWriter() error = %v", err) + } + defer writer.Close() + + if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{ + RequestID: "req-1", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + StickyKey: "sticky-1", + StickyKeyType: "conversation", + StickyHit: true, + SelectedRouteID: "asxs", + SelectedShadowGroupID: "gpt-shared__asxs", + UpstreamStatus: 200, + LatencyMS: 120, + }); err != nil { + t.Fatalf("AppendDecision() error = %v", err) + } + if err := writer.AppendFailover(context.Background(), RouteFailoverEvent{ + RequestID: "req-2", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + FromRouteID: "asxs", + ToRouteID: "codex2api", + Reason: "timeout", + FailureCount: 2, + }); err != nil { + t.Fatalf("AppendFailover() error = %v", err) + } + if err := writer.AppendStickyAudit(context.Background(), RouteStickyAuditEvent{ + StickyKey: "sticky-1", + StickyKeyType: "conversation", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + RouteID: "asxs", + Action: "bind", + ExpiresAt: "2026-05-28T18:00:00Z", + }); err != nil { + t.Fatalf("AppendStickyAudit() error = %v", err) + } + if err := writer.Flush(context.Background()); err != nil { + t.Fatalf("Flush() error = %v", err) + } + + store, err := sqlite.Open(context.Background(), dsn) + if err != nil { + t.Fatalf("sqlite.Open() error = %v", err) + } + defer store.Close() + + decisions, err := store.RouteDecisionLogs().ListRecent(context.Background(), sqlite.RouteDecisionLogFilter{Limit: 10}) + if err != nil { + t.Fatalf("RouteDecisionLogs().ListRecent() error = %v", err) + } + if len(decisions) != 1 || decisions[0].SelectedRouteID != "asxs" || !decisions[0].StickyHit { + t.Fatalf("decision logs = %+v", decisions) + } + + failovers, err := store.RouteFailoverEvents().ListRecent(context.Background(), sqlite.RouteFailoverEventFilter{Limit: 10}) + if err != nil { + t.Fatalf("RouteFailoverEvents().ListRecent() error = %v", err) + } + if len(failovers) != 1 || failovers[0].ToRouteID != "codex2api" { + t.Fatalf("failover events = %+v", failovers) + } + + stickyAudit, err := store.RouteStickyAudit().ListRecent(context.Background(), sqlite.RouteStickyAuditFilter{Limit: 10}) + if err != nil { + t.Fatalf("RouteStickyAudit().ListRecent() error = %v", err) + } + if len(stickyAudit) != 1 || stickyAudit[0].Action != "bind" { + t.Fatalf("sticky audit = %+v", stickyAudit) + } +} + +func TestAsyncLogWriterFlushFailureDoesNotCrash(t *testing.T) { + t.Parallel() + + sink := &failingRouteLogSink{} + writer := NewAsyncLogWriter(sink, AsyncLogWriterOptions{ + QueueSize: 2, + FlushInterval: time.Hour, + MaxBatchSize: 2, + }) + defer writer.Close() + + if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{ + RequestID: "req-fail", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + SelectedRouteID: "asxs", + SelectedShadowGroupID: "gpt-shared__asxs", + }); err != nil { + t.Fatalf("AppendDecision() error = %v", err) + } + if err := writer.Flush(context.Background()); err == nil { + t.Fatal("Flush() error = nil, want failure") + } + if sink.appendCalls == 0 { + t.Fatal("appendCalls = 0, want at least one attempted write") + } +} + +func TestAsyncLogWriterFlushesQueuedEvents(t *testing.T) { + t.Parallel() + + sink := &recordingRouteLogSink{} + writer := NewAsyncLogWriter(sink, AsyncLogWriterOptions{ + QueueSize: 1, + FlushInterval: time.Hour, + MaxBatchSize: 100, + }) + defer writer.Close() + + if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{ + RequestID: "req-a", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + SelectedRouteID: "asxs", + SelectedShadowGroupID: "gpt-shared__asxs", + }); err != nil { + t.Fatalf("AppendDecision() first error = %v", err) + } + if err := writer.AppendDecision(context.Background(), RouteDecisionEvent{ + RequestID: "req-b", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + SelectedRouteID: "codex2api", + SelectedShadowGroupID: "gpt-shared__codex2api", + }); err != nil { + t.Fatalf("AppendDecision() second error = %v", err) + } + if err := writer.Flush(context.Background()); err != nil { + t.Fatalf("Flush() error = %v", err) + } + + if len(sink.decisions) != 2 { + t.Fatalf("recorded decisions len = %d, want 2", len(sink.decisions)) + } +} + +type failingRouteLogSink struct { + appendCalls int +} + +func (s *failingRouteLogSink) AppendDecision(context.Context, RouteDecisionEvent) error { + s.appendCalls++ + return errors.New("boom") +} + +func (s *failingRouteLogSink) AppendFailover(context.Context, RouteFailoverEvent) error { + s.appendCalls++ + return errors.New("boom") +} + +func (s *failingRouteLogSink) AppendStickyAudit(context.Context, RouteStickyAuditEvent) error { + s.appendCalls++ + return errors.New("boom") +} + +func (s *failingRouteLogSink) Close() error { return nil } + +type recordingRouteLogSink struct { + decisions []RouteDecisionEvent +} + +func (s *recordingRouteLogSink) AppendDecision(_ context.Context, event RouteDecisionEvent) error { + s.decisions = append(s.decisions, event) + return nil +} + +func (s *recordingRouteLogSink) AppendFailover(context.Context, RouteFailoverEvent) error { + return nil +} + +func (s *recordingRouteLogSink) AppendStickyAudit(context.Context, RouteStickyAuditEvent) error { + return nil +} + +func (s *recordingRouteLogSink) Close() error { return nil } diff --git a/internal/store/migrations/0011_route_logging.sql b/internal/store/migrations/0011_route_logging.sql new file mode 100644 index 00000000..5bb7d394 --- /dev/null +++ b/internal/store/migrations/0011_route_logging.sql @@ -0,0 +1,52 @@ +CREATE TABLE route_decision_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + request_id TEXT NOT NULL, + logical_group_id TEXT NOT NULL, + public_model TEXT NOT NULL, + user_key TEXT NOT NULL DEFAULT '', + conversation_key TEXT NOT NULL DEFAULT '', + sticky_key TEXT NOT NULL DEFAULT '', + sticky_key_type TEXT NOT NULL DEFAULT '', + sticky_hit INTEGER NOT NULL DEFAULT 0, + selected_route_id TEXT NOT NULL, + selected_shadow_group_id TEXT NOT NULL, + fallback_used INTEGER NOT NULL DEFAULT 0, + error_class TEXT NOT NULL DEFAULT '', + upstream_status INTEGER NOT NULL DEFAULT 0, + latency_ms INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_route_decision_logs_request_id ON route_decision_logs(request_id); +CREATE INDEX idx_route_decision_logs_group_model ON route_decision_logs(logical_group_id, public_model, id DESC); +CREATE INDEX idx_route_decision_logs_selected_route ON route_decision_logs(selected_route_id, id DESC); + +CREATE TABLE route_failover_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + request_id TEXT NOT NULL, + logical_group_id TEXT NOT NULL, + public_model TEXT NOT NULL, + from_route_id TEXT NOT NULL, + to_route_id TEXT NOT NULL, + reason TEXT NOT NULL, + failure_count INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_route_failover_events_request_id ON route_failover_events(request_id); +CREATE INDEX idx_route_failover_events_group_model ON route_failover_events(logical_group_id, public_model, id DESC); + +CREATE TABLE route_sticky_audit ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sticky_key TEXT NOT NULL, + sticky_key_type TEXT NOT NULL, + logical_group_id TEXT NOT NULL, + public_model TEXT NOT NULL, + route_id TEXT NOT NULL, + action TEXT NOT NULL, + expires_at TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_route_sticky_audit_sticky_key ON route_sticky_audit(sticky_key, id DESC); +CREATE INDEX idx_route_sticky_audit_group_model ON route_sticky_audit(logical_group_id, public_model, id DESC); diff --git a/internal/store/sqlite/db.go b/internal/store/sqlite/db.go index 619a461c..e92de045 100644 --- a/internal/store/sqlite/db.go +++ b/internal/store/sqlite/db.go @@ -27,6 +27,9 @@ type Queries struct { LogicalGroupModels *LogicalGroupModelsRepo LogicalGroupRoutes *LogicalGroupRoutesRepo LogicalGroupRouteModels *LogicalGroupRouteModelsRepo + RouteDecisionLogs *RouteDecisionLogsRepo + RouteFailoverEvents *RouteFailoverEventsRepo + RouteStickyAudit *RouteStickyAuditRepo ProviderDrafts *ProviderDraftsRepo ImportBatches *ImportBatchesRepo ImportBatchItems *ImportBatchItemsRepo @@ -112,6 +115,18 @@ func (db *DB) LogicalGroupRouteModels() *LogicalGroupRouteModelsRepo { return db.queries.LogicalGroupRouteModels } +func (db *DB) RouteDecisionLogs() *RouteDecisionLogsRepo { + return db.queries.RouteDecisionLogs +} + +func (db *DB) RouteFailoverEvents() *RouteFailoverEventsRepo { + return db.queries.RouteFailoverEvents +} + +func (db *DB) RouteStickyAudit() *RouteStickyAuditRepo { + return db.queries.RouteStickyAudit +} + func (db *DB) ProviderDrafts() *ProviderDraftsRepo { return db.queries.ProviderDrafts } @@ -188,6 +203,9 @@ func newQueries(db execQuerier) *Queries { LogicalGroupModels: newLogicalGroupModelsRepo(db), LogicalGroupRoutes: newLogicalGroupRoutesRepo(db), LogicalGroupRouteModels: newLogicalGroupRouteModelsRepo(db), + RouteDecisionLogs: newRouteDecisionLogsRepo(db), + RouteFailoverEvents: newRouteFailoverEventsRepo(db), + RouteStickyAudit: newRouteStickyAuditRepo(db), ProviderDrafts: newProviderDraftsRepo(db), ImportBatches: newImportBatchesRepo(db), ImportBatchItems: newImportBatchItemsRepo(db), diff --git a/internal/store/sqlite/db_test.go b/internal/store/sqlite/db_test.go index 77d4dd34..879ad416 100644 --- a/internal/store/sqlite/db_test.go +++ b/internal/store/sqlite/db_test.go @@ -111,6 +111,9 @@ func TestOpenAppliesLogicalRoutingTables(t *testing.T) { "logical_group_models", "logical_group_routes", "logical_group_route_models", + "route_decision_logs", + "route_failover_events", + "route_sticky_audit", } { found, err := tableExists(context.Background(), db, table) if err != nil { diff --git a/internal/store/sqlite/route_decision_logs_repo.go b/internal/store/sqlite/route_decision_logs_repo.go new file mode 100644 index 00000000..e1205b9c --- /dev/null +++ b/internal/store/sqlite/route_decision_logs_repo.go @@ -0,0 +1,208 @@ +package sqlite + +import ( + "context" + "fmt" + "strings" +) + +type RouteDecisionLog struct { + ID int64 + RequestID string + LogicalGroupID string + PublicModel string + UserKey string + ConversationKey string + StickyKey string + StickyKeyType string + StickyHit bool + SelectedRouteID string + SelectedShadowGroupID string + FallbackUsed bool + ErrorClass string + UpstreamStatus int + LatencyMS int + CreatedAt string +} + +type RouteDecisionLogFilter struct { + RequestID string + LogicalGroupID string + PublicModel string + SelectedRouteID string + StickyKey string + Limit int +} + +type RouteDecisionLogsRepo struct { + db execQuerier +} + +func newRouteDecisionLogsRepo(db execQuerier) *RouteDecisionLogsRepo { + return &RouteDecisionLogsRepo{db: db} +} + +func (r *RouteDecisionLogsRepo) Create(ctx context.Context, row RouteDecisionLog) (int64, error) { + row, err := normalizeRouteDecisionLog(row) + if err != nil { + return 0, err + } + + result, err := r.db.ExecContext( + ctx, + `INSERT INTO route_decision_logs ( + request_id, + logical_group_id, + public_model, + user_key, + conversation_key, + sticky_key, + sticky_key_type, + sticky_hit, + selected_route_id, + selected_shadow_group_id, + fallback_used, + error_class, + upstream_status, + latency_ms + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + row.RequestID, + row.LogicalGroupID, + row.PublicModel, + row.UserKey, + row.ConversationKey, + row.StickyKey, + row.StickyKeyType, + boolToSQLiteInt(row.StickyHit), + row.SelectedRouteID, + row.SelectedShadowGroupID, + boolToSQLiteInt(row.FallbackUsed), + row.ErrorClass, + row.UpstreamStatus, + row.LatencyMS, + ) + if err != nil { + return 0, fmt.Errorf("insert route decision log %q: %w", row.RequestID, err) + } + + id, err := result.LastInsertId() + if err != nil { + return 0, fmt.Errorf("read inserted route decision log id for %q: %w", row.RequestID, err) + } + return id, nil +} + +func (r *RouteDecisionLogsRepo) ListRecent(ctx context.Context, filter RouteDecisionLogFilter) ([]RouteDecisionLog, error) { + clauses := make([]string, 0, 5) + args := make([]any, 0, 6) + + if requestID := strings.TrimSpace(filter.RequestID); requestID != "" { + clauses = append(clauses, "request_id = ?") + args = append(args, requestID) + } + if logicalGroupID := strings.TrimSpace(filter.LogicalGroupID); logicalGroupID != "" { + clauses = append(clauses, "logical_group_id = ?") + args = append(args, logicalGroupID) + } + if publicModel := strings.TrimSpace(filter.PublicModel); publicModel != "" { + clauses = append(clauses, "public_model = ?") + args = append(args, publicModel) + } + if selectedRouteID := strings.TrimSpace(filter.SelectedRouteID); selectedRouteID != "" { + clauses = append(clauses, "selected_route_id = ?") + args = append(args, selectedRouteID) + } + if stickyKey := strings.TrimSpace(filter.StickyKey); stickyKey != "" { + clauses = append(clauses, "sticky_key = ?") + args = append(args, stickyKey) + } + + query := `SELECT id, request_id, logical_group_id, public_model, user_key, conversation_key, sticky_key, sticky_key_type, sticky_hit, selected_route_id, selected_shadow_group_id, fallback_used, error_class, upstream_status, latency_ms, created_at + FROM route_decision_logs` + if len(clauses) > 0 { + query += " WHERE " + strings.Join(clauses, " AND ") + } + query += " ORDER BY id DESC LIMIT ?" + args = append(args, normalizeRouteLogListLimit(filter.Limit)) + + rows, err := r.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("list route decision logs: %w", err) + } + defer rows.Close() + + items := make([]RouteDecisionLog, 0) + for rows.Next() { + var ( + item RouteDecisionLog + stickyHit int + fallbackUsed int + ) + if err := rows.Scan( + &item.ID, + &item.RequestID, + &item.LogicalGroupID, + &item.PublicModel, + &item.UserKey, + &item.ConversationKey, + &item.StickyKey, + &item.StickyKeyType, + &stickyHit, + &item.SelectedRouteID, + &item.SelectedShadowGroupID, + &fallbackUsed, + &item.ErrorClass, + &item.UpstreamStatus, + &item.LatencyMS, + &item.CreatedAt, + ); err != nil { + return nil, fmt.Errorf("scan route decision log: %w", err) + } + item.StickyHit = stickyHit != 0 + item.FallbackUsed = fallbackUsed != 0 + items = append(items, item) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate route decision logs: %w", err) + } + return items, nil +} + +func normalizeRouteDecisionLog(row RouteDecisionLog) (RouteDecisionLog, error) { + row.RequestID = strings.TrimSpace(row.RequestID) + row.LogicalGroupID = strings.TrimSpace(row.LogicalGroupID) + row.PublicModel = strings.TrimSpace(row.PublicModel) + row.UserKey = strings.TrimSpace(row.UserKey) + row.ConversationKey = strings.TrimSpace(row.ConversationKey) + row.StickyKey = strings.TrimSpace(row.StickyKey) + row.StickyKeyType = strings.TrimSpace(row.StickyKeyType) + row.SelectedRouteID = strings.TrimSpace(row.SelectedRouteID) + row.SelectedShadowGroupID = strings.TrimSpace(row.SelectedShadowGroupID) + row.ErrorClass = strings.TrimSpace(row.ErrorClass) + + switch { + case row.RequestID == "": + return RouteDecisionLog{}, fmt.Errorf("request_id is required") + case row.LogicalGroupID == "": + return RouteDecisionLog{}, fmt.Errorf("logical_group_id is required") + case row.PublicModel == "": + return RouteDecisionLog{}, fmt.Errorf("public_model is required") + case row.SelectedRouteID == "": + return RouteDecisionLog{}, fmt.Errorf("selected_route_id is required") + case row.SelectedShadowGroupID == "": + return RouteDecisionLog{}, fmt.Errorf("selected_shadow_group_id is required") + case row.UpstreamStatus < 0: + return RouteDecisionLog{}, fmt.Errorf("upstream_status must be >= 0") + case row.LatencyMS < 0: + return RouteDecisionLog{}, fmt.Errorf("latency_ms must be >= 0") + } + + return row, nil +} + +func boolToSQLiteInt(value bool) int { + if value { + return 1 + } + return 0 +} diff --git a/internal/store/sqlite/route_failover_events_repo.go b/internal/store/sqlite/route_failover_events_repo.go new file mode 100644 index 00000000..2eb49297 --- /dev/null +++ b/internal/store/sqlite/route_failover_events_repo.go @@ -0,0 +1,163 @@ +package sqlite + +import ( + "context" + "fmt" + "strings" +) + +type RouteFailoverEvent struct { + ID int64 + RequestID string + LogicalGroupID string + PublicModel string + FromRouteID string + ToRouteID string + Reason string + FailureCount int + CreatedAt string +} + +type RouteFailoverEventFilter struct { + RequestID string + LogicalGroupID string + PublicModel string + FromRouteID string + ToRouteID string + Limit int +} + +type RouteFailoverEventsRepo struct { + db execQuerier +} + +func newRouteFailoverEventsRepo(db execQuerier) *RouteFailoverEventsRepo { + return &RouteFailoverEventsRepo{db: db} +} + +func (r *RouteFailoverEventsRepo) Create(ctx context.Context, row RouteFailoverEvent) (int64, error) { + row, err := normalizeRouteFailoverEvent(row) + if err != nil { + return 0, err + } + + result, err := r.db.ExecContext( + ctx, + `INSERT INTO route_failover_events ( + request_id, + logical_group_id, + public_model, + from_route_id, + to_route_id, + reason, + failure_count + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, + row.RequestID, + row.LogicalGroupID, + row.PublicModel, + row.FromRouteID, + row.ToRouteID, + row.Reason, + row.FailureCount, + ) + if err != nil { + return 0, fmt.Errorf("insert route failover event %q: %w", row.RequestID, err) + } + + id, err := result.LastInsertId() + if err != nil { + return 0, fmt.Errorf("read inserted route failover event id for %q: %w", row.RequestID, err) + } + return id, nil +} + +func (r *RouteFailoverEventsRepo) ListRecent(ctx context.Context, filter RouteFailoverEventFilter) ([]RouteFailoverEvent, error) { + clauses := make([]string, 0, 5) + args := make([]any, 0, 6) + + if requestID := strings.TrimSpace(filter.RequestID); requestID != "" { + clauses = append(clauses, "request_id = ?") + args = append(args, requestID) + } + if logicalGroupID := strings.TrimSpace(filter.LogicalGroupID); logicalGroupID != "" { + clauses = append(clauses, "logical_group_id = ?") + args = append(args, logicalGroupID) + } + if publicModel := strings.TrimSpace(filter.PublicModel); publicModel != "" { + clauses = append(clauses, "public_model = ?") + args = append(args, publicModel) + } + if fromRouteID := strings.TrimSpace(filter.FromRouteID); fromRouteID != "" { + clauses = append(clauses, "from_route_id = ?") + args = append(args, fromRouteID) + } + if toRouteID := strings.TrimSpace(filter.ToRouteID); toRouteID != "" { + clauses = append(clauses, "to_route_id = ?") + args = append(args, toRouteID) + } + + query := `SELECT id, request_id, logical_group_id, public_model, from_route_id, to_route_id, reason, failure_count, created_at + FROM route_failover_events` + if len(clauses) > 0 { + query += " WHERE " + strings.Join(clauses, " AND ") + } + query += " ORDER BY id DESC LIMIT ?" + args = append(args, normalizeRouteLogListLimit(filter.Limit)) + + rows, err := r.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("list route failover events: %w", err) + } + defer rows.Close() + + items := make([]RouteFailoverEvent, 0) + for rows.Next() { + var item RouteFailoverEvent + if err := rows.Scan( + &item.ID, + &item.RequestID, + &item.LogicalGroupID, + &item.PublicModel, + &item.FromRouteID, + &item.ToRouteID, + &item.Reason, + &item.FailureCount, + &item.CreatedAt, + ); err != nil { + return nil, fmt.Errorf("scan route failover event: %w", err) + } + items = append(items, item) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate route failover events: %w", err) + } + return items, nil +} + +func normalizeRouteFailoverEvent(row RouteFailoverEvent) (RouteFailoverEvent, error) { + row.RequestID = strings.TrimSpace(row.RequestID) + row.LogicalGroupID = strings.TrimSpace(row.LogicalGroupID) + row.PublicModel = strings.TrimSpace(row.PublicModel) + row.FromRouteID = strings.TrimSpace(row.FromRouteID) + row.ToRouteID = strings.TrimSpace(row.ToRouteID) + row.Reason = strings.TrimSpace(row.Reason) + + switch { + case row.RequestID == "": + return RouteFailoverEvent{}, fmt.Errorf("request_id is required") + case row.LogicalGroupID == "": + return RouteFailoverEvent{}, fmt.Errorf("logical_group_id is required") + case row.PublicModel == "": + return RouteFailoverEvent{}, fmt.Errorf("public_model is required") + case row.FromRouteID == "": + return RouteFailoverEvent{}, fmt.Errorf("from_route_id is required") + case row.ToRouteID == "": + return RouteFailoverEvent{}, fmt.Errorf("to_route_id is required") + case row.Reason == "": + return RouteFailoverEvent{}, fmt.Errorf("reason is required") + case row.FailureCount < 0: + return RouteFailoverEvent{}, fmt.Errorf("failure_count must be >= 0") + } + + return row, nil +} diff --git a/internal/store/sqlite/route_logging_helpers.go b/internal/store/sqlite/route_logging_helpers.go new file mode 100644 index 00000000..71dd0bf8 --- /dev/null +++ b/internal/store/sqlite/route_logging_helpers.go @@ -0,0 +1,17 @@ +package sqlite + +const ( + defaultRouteLogListLimit = 50 + maxRouteLogListLimit = 200 +) + +func normalizeRouteLogListLimit(limit int) int { + switch { + case limit <= 0: + return defaultRouteLogListLimit + case limit > maxRouteLogListLimit: + return maxRouteLogListLimit + default: + return limit + } +} diff --git a/internal/store/sqlite/route_logging_repos_test.go b/internal/store/sqlite/route_logging_repos_test.go new file mode 100644 index 00000000..fb34f342 --- /dev/null +++ b/internal/store/sqlite/route_logging_repos_test.go @@ -0,0 +1,129 @@ +package sqlite + +import ( + "context" + "testing" +) + +func TestRouteDecisionLogsRepoCreateAndListRecent(t *testing.T) { + t.Parallel() + + store := openTestDB(t) + ctx := context.Background() + + if _, err := store.RouteDecisionLogs().Create(ctx, RouteDecisionLog{ + RequestID: "req-1", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + UserKey: "user-a", + ConversationKey: "conv-a", + StickyKey: "sticky-a", + StickyKeyType: "conversation", + StickyHit: true, + SelectedRouteID: "asxs", + SelectedShadowGroupID: "gpt-shared__asxs", + FallbackUsed: false, + ErrorClass: "", + UpstreamStatus: 200, + LatencyMS: 123, + }); err != nil { + t.Fatalf("RouteDecisionLogs().Create() error = %v", err) + } + + logs, err := store.RouteDecisionLogs().ListRecent(ctx, RouteDecisionLogFilter{ + LogicalGroupID: "gpt-shared", + Limit: 10, + }) + if err != nil { + t.Fatalf("RouteDecisionLogs().ListRecent() error = %v", err) + } + if len(logs) != 1 { + t.Fatalf("RouteDecisionLogs().ListRecent() len = %d, want 1", len(logs)) + } + if logs[0].SelectedRouteID != "asxs" || !logs[0].StickyHit || logs[0].UpstreamStatus != 200 { + t.Fatalf("RouteDecisionLogs().ListRecent()[0] = %+v", logs[0]) + } +} + +func TestRouteFailoverEventsRepoCreateAndListRecent(t *testing.T) { + t.Parallel() + + store := openTestDB(t) + ctx := context.Background() + + if _, err := store.RouteFailoverEvents().Create(ctx, RouteFailoverEvent{ + RequestID: "req-2", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4", + FromRouteID: "asxs", + ToRouteID: "codex2api", + Reason: "upstream_5xx", + FailureCount: 2, + }); err != nil { + t.Fatalf("RouteFailoverEvents().Create() error = %v", err) + } + + items, err := store.RouteFailoverEvents().ListRecent(ctx, RouteFailoverEventFilter{ + RequestID: "req-2", + Limit: 10, + }) + if err != nil { + t.Fatalf("RouteFailoverEvents().ListRecent() error = %v", err) + } + if len(items) != 1 { + t.Fatalf("RouteFailoverEvents().ListRecent() len = %d, want 1", len(items)) + } + if items[0].ToRouteID != "codex2api" || items[0].FailureCount != 2 { + t.Fatalf("RouteFailoverEvents().ListRecent()[0] = %+v", items[0]) + } +} + +func TestRouteStickyAuditRepoCreateAndListRecent(t *testing.T) { + t.Parallel() + + store := openTestDB(t) + ctx := context.Background() + + if _, err := store.RouteStickyAudit().Create(ctx, RouteStickyAudit{ + StickyKey: "sticky-3", + StickyKeyType: "user_model", + LogicalGroupID: "gpt-shared", + PublicModel: "gpt-5.4-mini", + RouteID: "asxs", + Action: "bind", + ExpiresAt: "2026-05-28T18:00:00Z", + }); err != nil { + t.Fatalf("RouteStickyAudit().Create() error = %v", err) + } + + items, err := store.RouteStickyAudit().ListRecent(ctx, RouteStickyAuditFilter{ + StickyKey: "sticky-3", + Limit: 10, + }) + if err != nil { + t.Fatalf("RouteStickyAudit().ListRecent() error = %v", err) + } + if len(items) != 1 { + t.Fatalf("RouteStickyAudit().ListRecent() len = %d, want 1", len(items)) + } + if items[0].Action != "bind" || items[0].RouteID != "asxs" { + t.Fatalf("RouteStickyAudit().ListRecent()[0] = %+v", items[0]) + } +} + +func TestRouteLoggingReposRejectInvalidRows(t *testing.T) { + t.Parallel() + + store := openTestDB(t) + ctx := context.Background() + + if _, err := store.RouteDecisionLogs().Create(ctx, RouteDecisionLog{}); err == nil { + t.Fatal("RouteDecisionLogs().Create() error = nil, want validation error") + } + if _, err := store.RouteFailoverEvents().Create(ctx, RouteFailoverEvent{}); err == nil { + t.Fatal("RouteFailoverEvents().Create() error = nil, want validation error") + } + if _, err := store.RouteStickyAudit().Create(ctx, RouteStickyAudit{}); err == nil { + t.Fatal("RouteStickyAudit().Create() error = nil, want validation error") + } +} diff --git a/internal/store/sqlite/route_sticky_audit_repo.go b/internal/store/sqlite/route_sticky_audit_repo.go new file mode 100644 index 00000000..a42bacb3 --- /dev/null +++ b/internal/store/sqlite/route_sticky_audit_repo.go @@ -0,0 +1,167 @@ +package sqlite + +import ( + "context" + "fmt" + "strings" +) + +type RouteStickyAudit struct { + ID int64 + StickyKey string + StickyKeyType string + LogicalGroupID string + PublicModel string + RouteID string + Action string + ExpiresAt string + CreatedAt string +} + +type RouteStickyAuditFilter struct { + StickyKey string + StickyKeyType string + LogicalGroupID string + PublicModel string + RouteID string + Action string + Limit int +} + +type RouteStickyAuditRepo struct { + db execQuerier +} + +func newRouteStickyAuditRepo(db execQuerier) *RouteStickyAuditRepo { + return &RouteStickyAuditRepo{db: db} +} + +func (r *RouteStickyAuditRepo) Create(ctx context.Context, row RouteStickyAudit) (int64, error) { + row, err := normalizeRouteStickyAudit(row) + if err != nil { + return 0, err + } + + result, err := r.db.ExecContext( + ctx, + `INSERT INTO route_sticky_audit ( + sticky_key, + sticky_key_type, + logical_group_id, + public_model, + route_id, + action, + expires_at + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, + row.StickyKey, + row.StickyKeyType, + row.LogicalGroupID, + row.PublicModel, + row.RouteID, + row.Action, + row.ExpiresAt, + ) + if err != nil { + return 0, fmt.Errorf("insert route sticky audit %q: %w", row.StickyKey, err) + } + + id, err := result.LastInsertId() + if err != nil { + return 0, fmt.Errorf("read inserted route sticky audit id for %q: %w", row.StickyKey, err) + } + return id, nil +} + +func (r *RouteStickyAuditRepo) ListRecent(ctx context.Context, filter RouteStickyAuditFilter) ([]RouteStickyAudit, error) { + clauses := make([]string, 0, 6) + args := make([]any, 0, 7) + + if stickyKey := strings.TrimSpace(filter.StickyKey); stickyKey != "" { + clauses = append(clauses, "sticky_key = ?") + args = append(args, stickyKey) + } + if stickyKeyType := strings.TrimSpace(filter.StickyKeyType); stickyKeyType != "" { + clauses = append(clauses, "sticky_key_type = ?") + args = append(args, stickyKeyType) + } + if logicalGroupID := strings.TrimSpace(filter.LogicalGroupID); logicalGroupID != "" { + clauses = append(clauses, "logical_group_id = ?") + args = append(args, logicalGroupID) + } + if publicModel := strings.TrimSpace(filter.PublicModel); publicModel != "" { + clauses = append(clauses, "public_model = ?") + args = append(args, publicModel) + } + if routeID := strings.TrimSpace(filter.RouteID); routeID != "" { + clauses = append(clauses, "route_id = ?") + args = append(args, routeID) + } + if action := strings.TrimSpace(filter.Action); action != "" { + clauses = append(clauses, "action = ?") + args = append(args, action) + } + + query := `SELECT id, sticky_key, sticky_key_type, logical_group_id, public_model, route_id, action, expires_at, created_at + FROM route_sticky_audit` + if len(clauses) > 0 { + query += " WHERE " + strings.Join(clauses, " AND ") + } + query += " ORDER BY id DESC LIMIT ?" + args = append(args, normalizeRouteLogListLimit(filter.Limit)) + + rows, err := r.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("list route sticky audit: %w", err) + } + defer rows.Close() + + items := make([]RouteStickyAudit, 0) + for rows.Next() { + var item RouteStickyAudit + if err := rows.Scan( + &item.ID, + &item.StickyKey, + &item.StickyKeyType, + &item.LogicalGroupID, + &item.PublicModel, + &item.RouteID, + &item.Action, + &item.ExpiresAt, + &item.CreatedAt, + ); err != nil { + return nil, fmt.Errorf("scan route sticky audit: %w", err) + } + items = append(items, item) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate route sticky audit: %w", err) + } + return items, nil +} + +func normalizeRouteStickyAudit(row RouteStickyAudit) (RouteStickyAudit, error) { + row.StickyKey = strings.TrimSpace(row.StickyKey) + row.StickyKeyType = strings.TrimSpace(row.StickyKeyType) + row.LogicalGroupID = strings.TrimSpace(row.LogicalGroupID) + row.PublicModel = strings.TrimSpace(row.PublicModel) + row.RouteID = strings.TrimSpace(row.RouteID) + row.Action = strings.TrimSpace(row.Action) + row.ExpiresAt = strings.TrimSpace(row.ExpiresAt) + + switch { + case row.StickyKey == "": + return RouteStickyAudit{}, fmt.Errorf("sticky_key is required") + case row.StickyKeyType == "": + return RouteStickyAudit{}, fmt.Errorf("sticky_key_type is required") + case row.LogicalGroupID == "": + return RouteStickyAudit{}, fmt.Errorf("logical_group_id is required") + case row.PublicModel == "": + return RouteStickyAudit{}, fmt.Errorf("public_model is required") + case row.RouteID == "": + return RouteStickyAudit{}, fmt.Errorf("route_id is required") + case row.Action == "": + return RouteStickyAudit{}, fmt.Errorf("action is required") + } + + return row, nil +} diff --git a/tests/integration/store_init_test.go b/tests/integration/store_init_test.go index ee74b2c0..d2c83852 100644 --- a/tests/integration/store_init_test.go +++ b/tests/integration/store_init_test.go @@ -141,6 +141,9 @@ func TestStoreAppliesLatestMigration(t *testing.T) { "logical_group_models", "logical_group_routes", "logical_group_route_models", + "route_decision_logs", + "route_failover_events", + "route_sticky_audit", } { if !tableExists(t, store.SQLDB(), table) { t.Fatalf("table %q does not exist after latest migration", table) @@ -224,6 +227,51 @@ func TestStoreAppliesLatestMigration(t *testing.T) { t.Fatalf("column %q missing from logical_group_route_models", column) } } + + for _, column := range []string{ + "request_id", + "logical_group_id", + "public_model", + "sticky_key", + "sticky_hit", + "selected_route_id", + "selected_shadow_group_id", + "fallback_used", + "upstream_status", + "latency_ms", + } { + if !tableColumnExists(t, store.SQLDB(), "route_decision_logs", column) { + t.Fatalf("column %q missing from route_decision_logs", column) + } + } + + for _, column := range []string{ + "request_id", + "logical_group_id", + "public_model", + "from_route_id", + "to_route_id", + "reason", + "failure_count", + } { + if !tableColumnExists(t, store.SQLDB(), "route_failover_events", column) { + t.Fatalf("column %q missing from route_failover_events", column) + } + } + + for _, column := range []string{ + "sticky_key", + "sticky_key_type", + "logical_group_id", + "public_model", + "route_id", + "action", + "expires_at", + } { + if !tableColumnExists(t, store.SQLDB(), "route_sticky_audit", column) { + t.Fatalf("column %q missing from route_sticky_audit", column) + } + } } func TestStoreInitEnforcesLogicalRoutingConstraints(t *testing.T) {