Files
llm-intelligence/cmd/server/main.go
phamnazage-jpg a8999abcb0 feat(runtime): harden daily pipeline audit and verification
Tighten real-ingestion success rules, separate scheduled reports from historical rebuilds, and persist source-level runtime audit across daily pipeline runs.

Also add the Phase 5 CI workflow contract plus verification updates and supporting docs so the full uncommitted change set can be validated together.
2026-05-14 16:17:39 +08:00

407 lines
12 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
_ "github.com/lib/pq"
)
type modelResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Provider string `json:"provider"`
ProviderCN string `json:"providerCN"`
Modality string `json:"modality"`
ContextLength int `json:"contextLength"`
InputPrice float64 `json:"inputPrice"`
OutputPrice float64 `json:"outputPrice"`
Currency string `json:"currency"`
IsFree bool `json:"isFree"`
Stale bool `json:"stale"`
DataConfidence string `json:"dataConfidence"`
}
type subscriptionPlanResponse struct {
PlanFamily string `json:"planFamily"`
PlanCode string `json:"planCode"`
PlanName string `json:"planName"`
Tier string `json:"tier"`
Provider string `json:"provider"`
ProviderCN string `json:"providerCN"`
Operator string `json:"operator"`
OperatorCN string `json:"operatorCN"`
Currency string `json:"currency"`
ListPrice float64 `json:"listPrice"`
PriceUnit string `json:"priceUnit"`
QuotaValue int64 `json:"quotaValue"`
QuotaUnit string `json:"quotaUnit"`
ContextWindow int `json:"contextWindow"`
ModelScope []string `json:"modelScope"`
SourceURL string `json:"sourceUrl"`
PublishedAt string `json:"publishedAt"`
EffectiveDate string `json:"effectiveDate"`
}
type apiEnvelope struct {
Data any `json:"data"`
}
type modelFetcher func(context.Context, *sql.DB) ([]modelResponse, error)
type subscriptionPlanFetcher func(context.Context, *sql.DB) ([]subscriptionPlanResponse, error)
type latestReportFetcher func(context.Context, *sql.DB) (*latestReportResponse, error)
type latestReportResponse struct {
ReportDate string `json:"reportDate"`
Status string `json:"status"`
ModelCount int `json:"modelCount"`
SummaryMD string `json:"summaryMD"`
MarkdownPath string `json:"markdownPath"`
HTMLPath string `json:"htmlPath"`
ArchiveMarkdownPath string `json:"archiveMarkdownPath"`
ArchiveHTMLPath string `json:"archiveHtmlPath"`
MarkdownURL string `json:"markdownUrl"`
HTMLURL string `json:"htmlUrl"`
UpdatedAt string `json:"updatedAt"`
}
func main() {
addr := os.Getenv("PORT")
if addr == "" {
addr = "8080"
}
databaseURL := os.Getenv("DATABASE_URL")
var db *sql.DB
if databaseURL != "" {
conn, err := sql.Open("postgres", databaseURL)
if err != nil {
log.Printf("database open failed: %v", err)
} else {
conn.SetConnMaxLifetime(5 * time.Minute)
conn.SetMaxOpenConns(5)
conn.SetMaxIdleConns(5)
db = conn
}
}
mux := newMux(db, fetchModels, fetchSubscriptionPlans, fetchLatestReport)
log.Printf("server listening on :%s", addr)
if err := http.ListenAndServe(":"+addr, mux); err != nil {
log.Fatal(err)
}
}
func newMux(db *sql.DB, fetchModelsFn modelFetcher, fetchPlansFn subscriptionPlanFetcher, fetchLatestReportFn latestReportFetcher) *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
if db == nil {
http.Error(w, "database not configured", http.StatusServiceUnavailable)
return
}
if err := db.PingContext(r.Context()); err != nil {
http.Error(w, "database unavailable", http.StatusServiceUnavailable)
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
})
mux.HandleFunc("/api/v1/models", func(w http.ResponseWriter, r *http.Request) {
if db == nil {
http.Error(w, "database not configured", http.StatusServiceUnavailable)
return
}
models, err := fetchModelsFn(r.Context(), db)
if err != nil {
http.Error(w, "query failed", http.StatusInternalServerError)
log.Printf("fetch models failed: %v", err)
return
}
writeJSON(w, http.StatusOK, apiEnvelope{Data: models})
})
mux.HandleFunc("/api/v1/subscription-plans", func(w http.ResponseWriter, r *http.Request) {
if db == nil {
http.Error(w, "database not configured", http.StatusServiceUnavailable)
return
}
plans, err := fetchPlansFn(r.Context(), db)
if err != nil {
http.Error(w, "query failed", http.StatusInternalServerError)
log.Printf("fetch subscription plans failed: %v", err)
return
}
writeJSON(w, http.StatusOK, apiEnvelope{Data: plans})
})
mux.HandleFunc("/api/v1/reports/latest/html", func(w http.ResponseWriter, r *http.Request) {
serveLatestReportArtifact(w, r, db, fetchLatestReportFn, "html")
})
mux.HandleFunc("/api/v1/reports/latest/markdown", func(w http.ResponseWriter, r *http.Request) {
serveLatestReportArtifact(w, r, db, fetchLatestReportFn, "markdown")
})
mux.HandleFunc("/api/v1/reports/latest", func(w http.ResponseWriter, r *http.Request) {
if db == nil {
http.Error(w, "database not configured", http.StatusServiceUnavailable)
return
}
report, err := fetchLatestReportFn(r.Context(), db)
if err != nil {
if err == sql.ErrNoRows {
http.Error(w, "latest report not found", http.StatusNotFound)
return
}
http.Error(w, "query failed", http.StatusInternalServerError)
log.Printf("fetch latest report failed: %v", err)
return
}
writeJSON(w, http.StatusOK, apiEnvelope{Data: report})
})
return mux
}
func fetchModels(ctx context.Context, db *sql.DB) ([]modelResponse, error) {
rows, err := db.QueryContext(ctx, `
WITH latest_prices AS (
SELECT
rp.model_id,
rp.input_price_per_mtok,
rp.output_price_per_mtok,
rp.currency,
rp.is_free,
ROW_NUMBER() OVER (
PARTITION BY rp.model_id
ORDER BY rp.effective_date DESC NULLS LAST, rp.id DESC
) AS rn
FROM region_pricing rp
)
SELECT
m.external_id,
COALESCE(NULLIF(m.name, ''), m.external_id),
COALESCE(mp.name_cn, mp.name, split_part(m.external_id, '/', 1)),
COALESCE(mp.name, split_part(m.external_id, '/', 1)),
COALESCE(m.modality, 'text'),
COALESCE(m.context_length, 0),
lp.input_price_per_mtok,
lp.output_price_per_mtok,
COALESCE(lp.currency, 'USD'),
COALESCE(lp.is_free, m.is_free, false),
COALESCE(m.data_confidence, 'official')
FROM models m
LEFT JOIN model_provider mp ON mp.id = m.provider_id
LEFT JOIN latest_prices lp ON lp.model_id = m.id AND lp.rn = 1
WHERE m.deleted_at IS NULL
ORDER BY m.id DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var models []modelResponse
for rows.Next() {
var model modelResponse
var inputPrice sql.NullFloat64
var outputPrice sql.NullFloat64
if err := rows.Scan(
&model.ID,
&model.Name,
&model.ProviderCN,
&model.Provider,
&model.Modality,
&model.ContextLength,
&inputPrice,
&outputPrice,
&model.Currency,
&model.IsFree,
&model.DataConfidence,
); err != nil {
return nil, err
}
model.InputPrice = 0
model.OutputPrice = 0
if inputPrice.Valid {
model.InputPrice = inputPrice.Float64
}
if outputPrice.Valid {
model.OutputPrice = outputPrice.Float64
}
model.Stale = model.DataConfidence == "stale"
models = append(models, model)
}
return models, rows.Err()
}
func fetchLatestReport(ctx context.Context, db *sql.DB) (*latestReportResponse, error) {
var report latestReportResponse
var markdownPath string
err := db.QueryRowContext(ctx, `
SELECT
TO_CHAR(report_date, 'YYYY-MM-DD'),
status,
COALESCE(model_count, 0),
COALESCE(summary_md, ''),
COALESCE(output_path, ''),
COALESCE(TO_CHAR(updated_at, 'YYYY-MM-DD"T"HH24:MI:SS'), '')
FROM daily_report
WHERE output_path IS NOT NULL
AND output_path <> ''
AND status = 'generated'
AND COALESCE(is_official_daily, true) = true
ORDER BY report_date DESC, updated_at DESC
LIMIT 1
`).Scan(
&report.ReportDate,
&report.Status,
&report.ModelCount,
&report.SummaryMD,
&markdownPath,
&report.UpdatedAt,
)
if err != nil {
return nil, err
}
report.MarkdownPath = filepath.ToSlash(markdownPath)
report.HTMLPath = deriveReportHTMLPath(markdownPath, report.ReportDate)
report.ArchiveMarkdownPath = deriveReportArchivePath(markdownPath, report.ReportDate)
report.ArchiveHTMLPath = deriveReportArchivePath(report.HTMLPath, report.ReportDate)
report.MarkdownURL = "/api/v1/reports/latest/markdown"
report.HTMLURL = "/api/v1/reports/latest/html"
return &report, nil
}
func serveLatestReportArtifact(w http.ResponseWriter, r *http.Request, db *sql.DB, fetchLatestReportFn latestReportFetcher, artifactType string) {
if db == nil {
http.Error(w, "database not configured", http.StatusServiceUnavailable)
return
}
report, err := fetchLatestReportFn(r.Context(), db)
if err != nil {
if err == sql.ErrNoRows {
http.Error(w, "latest report not found", http.StatusNotFound)
return
}
http.Error(w, "query failed", http.StatusInternalServerError)
log.Printf("fetch latest report failed: %v", err)
return
}
targetPath := report.MarkdownPath
if artifactType == "html" {
targetPath = report.HTMLPath
w.Header().Set("Content-Type", "text/html; charset=utf-8")
} else {
w.Header().Set("Content-Type", "text/markdown; charset=utf-8")
}
if _, err := os.Stat(targetPath); err != nil {
http.Error(w, "report artifact not found", http.StatusNotFound)
return
}
http.ServeFile(w, r, targetPath)
}
func deriveReportHTMLPath(markdownPath, reportDate string) string {
reportFile := filepath.Base(markdownPath)
if reportFile == "." || reportFile == "" {
reportFile = fmt.Sprintf("daily_report_%s.md", reportDate)
}
htmlFile := strings.TrimSuffix(reportFile, filepath.Ext(reportFile)) + ".html"
reportDir := filepath.Dir(markdownPath)
if reportDir == "." || reportDir == "" {
reportDir = "reports/daily"
}
return filepath.ToSlash(filepath.Join(reportDir, "html", htmlFile))
}
func deriveReportArchivePath(reportPath, reportDate string) string {
reportFile := filepath.Base(reportPath)
if reportFile == "." || reportFile == "" {
reportFile = fmt.Sprintf("daily_report_%s.md", reportDate)
}
return filepath.ToSlash(filepath.Join("reports/daily", reportDate[:4], reportDate[5:7], reportFile))
}
func fetchSubscriptionPlans(ctx context.Context, db *sql.DB) ([]subscriptionPlanResponse, error) {
rows, err := db.QueryContext(ctx, `
SELECT
sp.plan_family,
sp.plan_code,
sp.plan_name,
sp.tier,
COALESCE(mp.name, 'unknown') AS provider_name,
COALESCE(mp.name_cn, mp.name, 'unknown') AS provider_name_cn,
COALESCE(o.name, 'unknown') AS operator_name,
COALESCE(o.name_cn, o.name, 'unknown') AS operator_name_cn,
sp.currency,
sp.list_price,
sp.price_unit,
COALESCE(sp.quota_value, 0),
COALESCE(sp.quota_unit, ''),
COALESCE(sp.context_window, 0),
COALESCE(sp.model_scope, '[]'),
COALESCE(sp.source_url, ''),
COALESCE(to_char(sp.published_at, 'YYYY-MM-DD"T"HH24:MI:SS'), ''),
COALESCE(to_char(sp.effective_date, 'YYYY-MM-DD'), '')
FROM subscription_plan sp
JOIN model_provider mp ON mp.id = sp.provider_id
LEFT JOIN operator o ON o.id = sp.operator_id
ORDER BY sp.list_price ASC, sp.plan_name ASC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var plans []subscriptionPlanResponse
for rows.Next() {
var plan subscriptionPlanResponse
var modelScopeRaw string
if err := rows.Scan(
&plan.PlanFamily,
&plan.PlanCode,
&plan.PlanName,
&plan.Tier,
&plan.Provider,
&plan.ProviderCN,
&plan.Operator,
&plan.OperatorCN,
&plan.Currency,
&plan.ListPrice,
&plan.PriceUnit,
&plan.QuotaValue,
&plan.QuotaUnit,
&plan.ContextWindow,
&modelScopeRaw,
&plan.SourceURL,
&plan.PublishedAt,
&plan.EffectiveDate,
); err != nil {
return nil, err
}
if err := json.Unmarshal([]byte(modelScopeRaw), &plan.ModelScope); err != nil {
plan.ModelScope = nil
}
plans = append(plans, plan)
}
return plans, rows.Err()
}
func writeJSON(w http.ResponseWriter, status int, value any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(value); err != nil {
http.Error(w, "encode failed", http.StatusInternalServerError)
}
}