Files
sub2api-cn-relay-manager/internal/app/route_logging_api.go
2026-05-28 21:24:05 +08:00

637 lines
22 KiB
Go

package app
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"sub2api-cn-relay-manager/internal/routing"
"sub2api-cn-relay-manager/internal/store/sqlite"
)
type AppendRouteDecisionLogRequest struct {
RequestID string `json:"request_id"`
LogicalGroupID string `json:"logical_group_id"`
PublicModel string `json:"public_model"`
UserKey string `json:"user_key,omitempty"`
ConversationKey string `json:"conversation_key,omitempty"`
StickyKey string `json:"sticky_key,omitempty"`
StickyKeyType string `json:"sticky_key_type,omitempty"`
StickyHit bool `json:"sticky_hit,omitempty"`
SelectedRouteID string `json:"selected_route_id"`
SelectedShadowGroupID string `json:"selected_shadow_group_id"`
FallbackUsed bool `json:"fallback_used,omitempty"`
ErrorClass string `json:"error_class,omitempty"`
UpstreamStatus int `json:"upstream_status,omitempty"`
LatencyMS int `json:"latency_ms,omitempty"`
Sync bool `json:"sync,omitempty"`
}
type ListRouteDecisionLogsRequest struct {
RequestID string
LogicalGroupID string
PublicModel string
SelectedRouteID string
StickyKey string
Limit int
}
type RouteDecisionLogInfo struct {
ID int64 `json:"id"`
RequestID string `json:"request_id"`
LogicalGroupID string `json:"logical_group_id"`
PublicModel string `json:"public_model"`
UserKey string `json:"user_key,omitempty"`
ConversationKey string `json:"conversation_key,omitempty"`
StickyKey string `json:"sticky_key,omitempty"`
StickyKeyType string `json:"sticky_key_type,omitempty"`
StickyHit bool `json:"sticky_hit"`
SelectedRouteID string `json:"selected_route_id"`
SelectedShadowGroupID string `json:"selected_shadow_group_id"`
FallbackUsed bool `json:"fallback_used"`
ErrorClass string `json:"error_class,omitempty"`
UpstreamStatus int `json:"upstream_status,omitempty"`
LatencyMS int `json:"latency_ms,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
}
type AppendRouteFailoverEventRequest struct {
RequestID string `json:"request_id"`
LogicalGroupID string `json:"logical_group_id"`
PublicModel string `json:"public_model"`
FromRouteID string `json:"from_route_id"`
ToRouteID string `json:"to_route_id"`
Reason string `json:"reason"`
FailureCount int `json:"failure_count,omitempty"`
Sync bool `json:"sync,omitempty"`
}
type ListRouteFailoverEventsRequest struct {
RequestID string
LogicalGroupID string
PublicModel string
FromRouteID string
ToRouteID string
Limit int
}
type RouteFailoverEventInfo struct {
ID int64 `json:"id"`
RequestID string `json:"request_id"`
LogicalGroupID string `json:"logical_group_id"`
PublicModel string `json:"public_model"`
FromRouteID string `json:"from_route_id"`
ToRouteID string `json:"to_route_id"`
Reason string `json:"reason"`
FailureCount int `json:"failure_count"`
CreatedAt string `json:"created_at,omitempty"`
}
type AppendRouteStickyAuditRequest struct {
StickyKey string `json:"sticky_key"`
StickyKeyType string `json:"sticky_key_type"`
LogicalGroupID string `json:"logical_group_id"`
PublicModel string `json:"public_model"`
RouteID string `json:"route_id"`
Action string `json:"action"`
ExpiresAt string `json:"expires_at,omitempty"`
Sync bool `json:"sync,omitempty"`
}
type ListRouteStickyAuditRequest struct {
StickyKey string
StickyKeyType string
LogicalGroupID string
PublicModel string
RouteID string
Action string
Limit int
}
type RouteStickyAuditInfo struct {
ID int64 `json:"id"`
StickyKey string `json:"sticky_key"`
StickyKeyType string `json:"sticky_key_type"`
LogicalGroupID string `json:"logical_group_id"`
PublicModel string `json:"public_model"`
RouteID string `json:"route_id"`
Action string `json:"action"`
ExpiresAt string `json:"expires_at,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
}
func handleAppendRouteDecisionLog(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-decision-log action is not configured"})
return
}
var req AppendRouteDecisionLogRequest
if err := decodeJSON(r, &req); err != nil {
writeHTTPError(w, err)
return
}
item, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusCreated, map[string]any{"decision_log": item})
}
func handleListRouteDecisionLogs(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-decision-logs action is not configured"})
return
}
req, err := decodeRouteDecisionLogFilter(r)
if err != nil {
writeHTTPError(w, err)
return
}
items, actionErr := fn(r.Context(), req)
if actionErr != nil {
writeHTTPError(w, classifyError(actionErr))
return
}
writeJSON(w, http.StatusOK, map[string]any{"decision_logs": items})
}
func handleAppendRouteFailoverEvent(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-failover-event action is not configured"})
return
}
var req AppendRouteFailoverEventRequest
if err := decodeJSON(r, &req); err != nil {
writeHTTPError(w, err)
return
}
item, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusCreated, map[string]any{"failover_event": item})
}
func handleListRouteFailoverEvents(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-failover-events action is not configured"})
return
}
req, err := decodeRouteFailoverEventFilter(r)
if err != nil {
writeHTTPError(w, err)
return
}
items, actionErr := fn(r.Context(), req)
if actionErr != nil {
writeHTTPError(w, classifyError(actionErr))
return
}
writeJSON(w, http.StatusOK, map[string]any{"failover_events": items})
}
func handleAppendRouteStickyAudit(w http.ResponseWriter, r *http.Request, fn func(context.Context, AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "append-route-sticky-audit action is not configured"})
return
}
var req AppendRouteStickyAuditRequest
if err := decodeJSON(r, &req); err != nil {
writeHTTPError(w, err)
return
}
item, err := fn(r.Context(), req)
if err != nil {
writeHTTPError(w, classifyError(err))
return
}
writeJSON(w, http.StatusCreated, map[string]any{"sticky_audit": item})
}
func handleListRouteStickyAudit(w http.ResponseWriter, r *http.Request, fn func(context.Context, ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error)) {
if fn == nil {
writeHTTPError(w, &httpError{StatusCode: http.StatusInternalServerError, Code: "server_misconfigured", Message: "list-route-sticky-audit action is not configured"})
return
}
req, err := decodeRouteStickyAuditFilter(r)
if err != nil {
writeHTTPError(w, err)
return
}
items, actionErr := fn(r.Context(), req)
if actionErr != nil {
writeHTTPError(w, classifyError(actionErr))
return
}
writeJSON(w, http.StatusOK, map[string]any{"sticky_audits": items})
}
type lazyRouteLogWriter struct {
sqliteDSN string
mu sync.Mutex
writer *routing.AsyncLogWriter
}
func newLazyRouteLogWriter(sqliteDSN string) *lazyRouteLogWriter {
return &lazyRouteLogWriter{sqliteDSN: sqliteDSN}
}
func (l *lazyRouteLogWriter) get(ctx context.Context) (*routing.AsyncLogWriter, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.writer != nil {
return l.writer, nil
}
writer, err := routing.NewSQLiteLogWriter(ctx, l.sqliteDSN, routing.AsyncLogWriterOptions{})
if err != nil {
return nil, err
}
l.writer = writer
return l.writer, nil
}
func buildAppendRouteDecisionLogAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) {
return func(ctx context.Context, req AppendRouteDecisionLogRequest) (RouteDecisionLogInfo, error) {
writer, err := writerSource.get(ctx)
if err != nil {
return RouteDecisionLogInfo{}, err
}
event := routing.RouteDecisionEvent{
RequestID: strings.TrimSpace(req.RequestID),
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
PublicModel: strings.TrimSpace(req.PublicModel),
UserKey: strings.TrimSpace(req.UserKey),
ConversationKey: strings.TrimSpace(req.ConversationKey),
StickyKey: strings.TrimSpace(req.StickyKey),
StickyKeyType: strings.TrimSpace(req.StickyKeyType),
StickyHit: req.StickyHit,
SelectedRouteID: strings.TrimSpace(req.SelectedRouteID),
SelectedShadowGroupID: strings.TrimSpace(req.SelectedShadowGroupID),
FallbackUsed: req.FallbackUsed,
ErrorClass: strings.TrimSpace(req.ErrorClass),
UpstreamStatus: req.UpstreamStatus,
LatencyMS: req.LatencyMS,
}
if err := writer.AppendDecision(ctx, event); err != nil {
return RouteDecisionLogInfo{}, err
}
if !req.Sync {
return routeDecisionLogRowToInfo(sqlite.RouteDecisionLog{
RequestID: event.RequestID,
LogicalGroupID: event.LogicalGroupID,
PublicModel: event.PublicModel,
UserKey: event.UserKey,
ConversationKey: event.ConversationKey,
StickyKey: event.StickyKey,
StickyKeyType: event.StickyKeyType,
StickyHit: event.StickyHit,
SelectedRouteID: event.SelectedRouteID,
SelectedShadowGroupID: event.SelectedShadowGroupID,
FallbackUsed: event.FallbackUsed,
ErrorClass: event.ErrorClass,
UpstreamStatus: event.UpstreamStatus,
LatencyMS: event.LatencyMS,
}), nil
}
if err := writer.Flush(ctx); err != nil {
return RouteDecisionLogInfo{}, err
}
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return RouteDecisionLogInfo{}, err
}
defer store.Close()
items, err := store.RouteDecisionLogs().ListRecent(ctx, sqlite.RouteDecisionLogFilter{
RequestID: event.RequestID,
Limit: 1,
})
if err != nil {
return RouteDecisionLogInfo{}, err
}
if len(items) == 0 {
return RouteDecisionLogInfo{}, fmt.Errorf("route decision log %q not found after append", event.RequestID)
}
return routeDecisionLogRowToInfo(items[0]), nil
}
}
func buildListRouteDecisionLogsAction(sqliteDSN string) func(context.Context, ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) {
return func(ctx context.Context, req ListRouteDecisionLogsRequest) ([]RouteDecisionLogInfo, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return nil, err
}
defer store.Close()
rows, err := store.RouteDecisionLogs().ListRecent(ctx, sqlite.RouteDecisionLogFilter{
RequestID: strings.TrimSpace(req.RequestID),
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
PublicModel: strings.TrimSpace(req.PublicModel),
SelectedRouteID: strings.TrimSpace(req.SelectedRouteID),
StickyKey: strings.TrimSpace(req.StickyKey),
Limit: req.Limit,
})
if err != nil {
return nil, err
}
return routeDecisionLogRowsToInfo(rows), nil
}
}
func buildAppendRouteFailoverEventAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error) {
return func(ctx context.Context, req AppendRouteFailoverEventRequest) (RouteFailoverEventInfo, error) {
writer, err := writerSource.get(ctx)
if err != nil {
return RouteFailoverEventInfo{}, err
}
event := routing.RouteFailoverEvent{
RequestID: strings.TrimSpace(req.RequestID),
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
PublicModel: strings.TrimSpace(req.PublicModel),
FromRouteID: strings.TrimSpace(req.FromRouteID),
ToRouteID: strings.TrimSpace(req.ToRouteID),
Reason: strings.TrimSpace(req.Reason),
FailureCount: req.FailureCount,
}
if err := writer.AppendFailover(ctx, event); err != nil {
return RouteFailoverEventInfo{}, err
}
if !req.Sync {
return routeFailoverEventRowToInfo(sqlite.RouteFailoverEvent{
RequestID: event.RequestID,
LogicalGroupID: event.LogicalGroupID,
PublicModel: event.PublicModel,
FromRouteID: event.FromRouteID,
ToRouteID: event.ToRouteID,
Reason: event.Reason,
FailureCount: event.FailureCount,
}), nil
}
if err := writer.Flush(ctx); err != nil {
return RouteFailoverEventInfo{}, err
}
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return RouteFailoverEventInfo{}, err
}
defer store.Close()
items, err := store.RouteFailoverEvents().ListRecent(ctx, sqlite.RouteFailoverEventFilter{
RequestID: event.RequestID,
Limit: 1,
})
if err != nil {
return RouteFailoverEventInfo{}, err
}
if len(items) == 0 {
return RouteFailoverEventInfo{}, fmt.Errorf("route failover event %q not found after append", event.RequestID)
}
return routeFailoverEventRowToInfo(items[0]), nil
}
}
func buildListRouteFailoverEventsAction(sqliteDSN string) func(context.Context, ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error) {
return func(ctx context.Context, req ListRouteFailoverEventsRequest) ([]RouteFailoverEventInfo, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return nil, err
}
defer store.Close()
rows, err := store.RouteFailoverEvents().ListRecent(ctx, sqlite.RouteFailoverEventFilter{
RequestID: strings.TrimSpace(req.RequestID),
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
PublicModel: strings.TrimSpace(req.PublicModel),
FromRouteID: strings.TrimSpace(req.FromRouteID),
ToRouteID: strings.TrimSpace(req.ToRouteID),
Limit: req.Limit,
})
if err != nil {
return nil, err
}
return routeFailoverEventRowsToInfo(rows), nil
}
}
func buildAppendRouteStickyAuditAction(writerSource *lazyRouteLogWriter, sqliteDSN string) func(context.Context, AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error) {
return func(ctx context.Context, req AppendRouteStickyAuditRequest) (RouteStickyAuditInfo, error) {
writer, err := writerSource.get(ctx)
if err != nil {
return RouteStickyAuditInfo{}, err
}
event := routing.RouteStickyAuditEvent{
StickyKey: strings.TrimSpace(req.StickyKey),
StickyKeyType: strings.TrimSpace(req.StickyKeyType),
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
PublicModel: strings.TrimSpace(req.PublicModel),
RouteID: strings.TrimSpace(req.RouteID),
Action: strings.TrimSpace(req.Action),
ExpiresAt: strings.TrimSpace(req.ExpiresAt),
}
if err := writer.AppendStickyAudit(ctx, event); err != nil {
return RouteStickyAuditInfo{}, err
}
if !req.Sync {
return routeStickyAuditRowToInfo(sqlite.RouteStickyAudit{
StickyKey: event.StickyKey,
StickyKeyType: event.StickyKeyType,
LogicalGroupID: event.LogicalGroupID,
PublicModel: event.PublicModel,
RouteID: event.RouteID,
Action: event.Action,
ExpiresAt: event.ExpiresAt,
}), nil
}
if err := writer.Flush(ctx); err != nil {
return RouteStickyAuditInfo{}, err
}
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return RouteStickyAuditInfo{}, err
}
defer store.Close()
items, err := store.RouteStickyAudit().ListRecent(ctx, sqlite.RouteStickyAuditFilter{
StickyKey: event.StickyKey,
Action: event.Action,
Limit: 1,
})
if err != nil {
return RouteStickyAuditInfo{}, err
}
if len(items) == 0 {
return RouteStickyAuditInfo{}, fmt.Errorf("route sticky audit %q not found after append", event.StickyKey)
}
return routeStickyAuditRowToInfo(items[0]), nil
}
}
func buildListRouteStickyAuditAction(sqliteDSN string) func(context.Context, ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error) {
return func(ctx context.Context, req ListRouteStickyAuditRequest) ([]RouteStickyAuditInfo, error) {
store, err := sqlite.Open(ctx, sqliteDSN)
if err != nil {
return nil, err
}
defer store.Close()
rows, err := store.RouteStickyAudit().ListRecent(ctx, sqlite.RouteStickyAuditFilter{
StickyKey: strings.TrimSpace(req.StickyKey),
StickyKeyType: strings.TrimSpace(req.StickyKeyType),
LogicalGroupID: strings.TrimSpace(req.LogicalGroupID),
PublicModel: strings.TrimSpace(req.PublicModel),
RouteID: strings.TrimSpace(req.RouteID),
Action: strings.TrimSpace(req.Action),
Limit: req.Limit,
})
if err != nil {
return nil, err
}
return routeStickyAuditRowsToInfo(rows), nil
}
}
func decodeRouteDecisionLogFilter(r *http.Request) (ListRouteDecisionLogsRequest, *httpError) {
limit, err := parseOptionalLimit(r.URL.Query().Get("limit"))
if err != nil {
return ListRouteDecisionLogsRequest{}, err
}
return ListRouteDecisionLogsRequest{
RequestID: strings.TrimSpace(r.URL.Query().Get("request_id")),
LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")),
PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")),
SelectedRouteID: strings.TrimSpace(r.URL.Query().Get("selected_route_id")),
StickyKey: strings.TrimSpace(r.URL.Query().Get("sticky_key")),
Limit: limit,
}, nil
}
func decodeRouteFailoverEventFilter(r *http.Request) (ListRouteFailoverEventsRequest, *httpError) {
limit, err := parseOptionalLimit(r.URL.Query().Get("limit"))
if err != nil {
return ListRouteFailoverEventsRequest{}, err
}
return ListRouteFailoverEventsRequest{
RequestID: strings.TrimSpace(r.URL.Query().Get("request_id")),
LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")),
PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")),
FromRouteID: strings.TrimSpace(r.URL.Query().Get("from_route_id")),
ToRouteID: strings.TrimSpace(r.URL.Query().Get("to_route_id")),
Limit: limit,
}, nil
}
func decodeRouteStickyAuditFilter(r *http.Request) (ListRouteStickyAuditRequest, *httpError) {
limit, err := parseOptionalLimit(r.URL.Query().Get("limit"))
if err != nil {
return ListRouteStickyAuditRequest{}, err
}
return ListRouteStickyAuditRequest{
StickyKey: strings.TrimSpace(r.URL.Query().Get("sticky_key")),
StickyKeyType: strings.TrimSpace(r.URL.Query().Get("sticky_key_type")),
LogicalGroupID: strings.TrimSpace(r.URL.Query().Get("logical_group_id")),
PublicModel: strings.TrimSpace(r.URL.Query().Get("public_model")),
RouteID: strings.TrimSpace(r.URL.Query().Get("route_id")),
Action: strings.TrimSpace(r.URL.Query().Get("action")),
Limit: limit,
}, nil
}
func parseOptionalLimit(raw string) (int, *httpError) {
raw = strings.TrimSpace(raw)
if raw == "" {
return 0, nil
}
limit, err := strconv.Atoi(raw)
if err != nil || limit <= 0 {
return 0, &httpError{StatusCode: http.StatusBadRequest, Code: "bad_request", Message: "limit must be a positive integer"}
}
return limit, nil
}
func routeDecisionLogRowToInfo(row sqlite.RouteDecisionLog) RouteDecisionLogInfo {
return RouteDecisionLogInfo{
ID: row.ID,
RequestID: row.RequestID,
LogicalGroupID: row.LogicalGroupID,
PublicModel: row.PublicModel,
UserKey: row.UserKey,
ConversationKey: row.ConversationKey,
StickyKey: row.StickyKey,
StickyKeyType: row.StickyKeyType,
StickyHit: row.StickyHit,
SelectedRouteID: row.SelectedRouteID,
SelectedShadowGroupID: row.SelectedShadowGroupID,
FallbackUsed: row.FallbackUsed,
ErrorClass: row.ErrorClass,
UpstreamStatus: row.UpstreamStatus,
LatencyMS: row.LatencyMS,
CreatedAt: row.CreatedAt,
}
}
func routeDecisionLogRowsToInfo(rows []sqlite.RouteDecisionLog) []RouteDecisionLogInfo {
result := make([]RouteDecisionLogInfo, 0, len(rows))
for _, row := range rows {
result = append(result, routeDecisionLogRowToInfo(row))
}
return result
}
func routeFailoverEventRowToInfo(row sqlite.RouteFailoverEvent) RouteFailoverEventInfo {
return RouteFailoverEventInfo{
ID: row.ID,
RequestID: row.RequestID,
LogicalGroupID: row.LogicalGroupID,
PublicModel: row.PublicModel,
FromRouteID: row.FromRouteID,
ToRouteID: row.ToRouteID,
Reason: row.Reason,
FailureCount: row.FailureCount,
CreatedAt: row.CreatedAt,
}
}
func routeFailoverEventRowsToInfo(rows []sqlite.RouteFailoverEvent) []RouteFailoverEventInfo {
result := make([]RouteFailoverEventInfo, 0, len(rows))
for _, row := range rows {
result = append(result, routeFailoverEventRowToInfo(row))
}
return result
}
func routeStickyAuditRowToInfo(row sqlite.RouteStickyAudit) RouteStickyAuditInfo {
return RouteStickyAuditInfo{
ID: row.ID,
StickyKey: row.StickyKey,
StickyKeyType: row.StickyKeyType,
LogicalGroupID: row.LogicalGroupID,
PublicModel: row.PublicModel,
RouteID: row.RouteID,
Action: row.Action,
ExpiresAt: row.ExpiresAt,
CreatedAt: row.CreatedAt,
}
}
func routeStickyAuditRowsToInfo(rows []sqlite.RouteStickyAudit) []RouteStickyAuditInfo {
result := make([]RouteStickyAuditInfo, 0, len(rows))
for _, row := range rows {
result = append(result, routeStickyAuditRowToInfo(row))
}
return result
}