354 lines
14 KiB
Go
354 lines
14 KiB
Go
package httpapi_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"supply-intelligence/internal/app"
|
|
"supply-intelligence/internal/domain"
|
|
)
|
|
|
|
func requireDockerForPostgresE2E(t *testing.T) {
|
|
t.Helper()
|
|
if _, err := exec.LookPath("docker"); err != nil {
|
|
t.Skip("docker not installed")
|
|
}
|
|
if _, err := exec.LookPath("pg_isready"); err != nil {
|
|
t.Skip("pg_isready not installed")
|
|
}
|
|
}
|
|
|
|
func freeTCPPort(t *testing.T) int {
|
|
t.Helper()
|
|
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
t.Fatalf("allocate free tcp port: %v", err)
|
|
}
|
|
defer ln.Close()
|
|
addr, ok := ln.Addr().(*net.TCPAddr)
|
|
if !ok {
|
|
t.Fatalf("unexpected listener addr type: %T", ln.Addr())
|
|
}
|
|
return addr.Port
|
|
}
|
|
|
|
func waitForPostgresReady(t *testing.T, port int, user, dbName, containerName string) {
|
|
t.Helper()
|
|
deadline := time.Now().Add(45 * time.Second)
|
|
var lastOut string
|
|
for time.Now().Before(deadline) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
cmd := exec.CommandContext(ctx, "pg_isready", "-h", "127.0.0.1", "-p", strconv.Itoa(port), "-U", user, "-d", dbName)
|
|
out, err := cmd.CombinedOutput()
|
|
cancel()
|
|
lastOut = strings.TrimSpace(string(out))
|
|
if err == nil {
|
|
return
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
logs, _ := exec.Command("docker", "logs", containerName).CombinedOutput()
|
|
t.Fatalf("postgres container did not become ready on port %d within timeout; last pg_isready=%q logs=%s", port, lastOut, string(logs))
|
|
}
|
|
|
|
func newPostgresApplicationForE2E(t *testing.T) *app.Application {
|
|
t.Helper()
|
|
requireDockerForPostgresE2E(t)
|
|
_, currentFile, _, ok := runtime.Caller(0)
|
|
if !ok {
|
|
t.Fatal("resolve current test file")
|
|
}
|
|
projectRoot := filepath.Clean(filepath.Join(filepath.Dir(currentFile), "..", ".."))
|
|
migrationsDir := filepath.Join(projectRoot, "migrations")
|
|
|
|
hostPort := freeTCPPort(t)
|
|
containerName := fmt.Sprintf("supply-intelligence-e2e-%d", time.Now().UnixNano())
|
|
dbName := "supply_intelligence"
|
|
dbUser := "supply"
|
|
dbPassword := "supply123"
|
|
|
|
runArgs := []string{
|
|
"run", "-d",
|
|
"--name", containerName,
|
|
"-e", "POSTGRES_DB=" + dbName,
|
|
"-e", "POSTGRES_USER=" + dbUser,
|
|
"-e", "POSTGRES_PASSWORD=" + dbPassword,
|
|
"-p", fmt.Sprintf("127.0.0.1:%d:5432", hostPort),
|
|
"-v", migrationsDir + ":/docker-entrypoint-initdb.d:ro",
|
|
"postgres:16-alpine",
|
|
}
|
|
runCmd := exec.Command("docker", runArgs...)
|
|
runCmd.Dir = projectRoot
|
|
if out, err := runCmd.CombinedOutput(); err != nil {
|
|
t.Skipf("start isolated postgres container failed: %v output=%s", err, string(out))
|
|
}
|
|
t.Cleanup(func() {
|
|
rmCmd := exec.Command("docker", "rm", "-f", containerName)
|
|
rmCmd.Dir = projectRoot
|
|
_, _ = rmCmd.CombinedOutput()
|
|
})
|
|
|
|
waitForPostgresReady(t, hostPort, dbUser, dbName, containerName)
|
|
connString := fmt.Sprintf("postgres://%s:%s@127.0.0.1:%d/%s?sslmode=disable", dbUser, dbPassword, hostPort, dbName)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
t.Cleanup(cancel)
|
|
application, err := app.NewWithPostgres(ctx, connString)
|
|
if err != nil {
|
|
t.Fatalf("connect isolated postgres app: %v", err)
|
|
}
|
|
application.GatewayConsumerService.SetConsumer("gateway")
|
|
if application.GatewayConsumerService == nil {
|
|
t.Fatal("expected gateway consumer service")
|
|
}
|
|
t.Cleanup(application.Close)
|
|
return application
|
|
}
|
|
|
|
func TestPostgresE2EPublishConsumeAckAdmissionState(t *testing.T) {
|
|
application := newPostgresApplicationForE2E(t)
|
|
handler := application.Server.Routes()
|
|
|
|
model := fmt.Sprintf("gpt-4.1-e2e-%d", time.Now().UnixNano())
|
|
candidateID := fmt.Sprintf("cand-e2e-%d", time.Now().UnixNano())
|
|
eventID := fmt.Sprintf("evt-e2e-%d", time.Now().UnixNano())
|
|
|
|
application.Repo.UpsertSupplyAccount(context.Background(), domain.SupplyAccount{
|
|
AccountID: 8801,
|
|
Platform: "openai",
|
|
APIKey: "test-key",
|
|
ConsumerTag: "gateway",
|
|
Status: "active",
|
|
CreatedAt: time.Unix(90, 0).UTC(),
|
|
UpdatedAt: time.Unix(90, 0).UTC(),
|
|
})
|
|
application.Repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{
|
|
CandidateID: candidateID,
|
|
AccountID: 8801,
|
|
Platform: "openai",
|
|
Model: model,
|
|
Source: "admission",
|
|
Status: domain.DiscoveryCandidateStatusTestPassed,
|
|
DiscoveredAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(110, 0).UTC(),
|
|
Version: 2,
|
|
})
|
|
application.Repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{
|
|
Platform: "openai",
|
|
Model: model,
|
|
Status: "draft",
|
|
Source: "admission",
|
|
CreatedAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(110, 0).UTC(),
|
|
Version: 1,
|
|
})
|
|
|
|
publishReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/publish/package-event", bytes.NewBufferString(fmt.Sprintf(`{"event_id":"%s","platform":"openai","model":"%s","occurred_at":"2026-05-06T20:40:00Z"}`, eventID, model)))
|
|
publishRR := httptest.NewRecorder()
|
|
handler.ServeHTTP(publishRR, publishReq)
|
|
if publishRR.Code != http.StatusOK {
|
|
t.Fatalf("unexpected publish status: %d body=%s", publishRR.Code, publishRR.Body.String())
|
|
}
|
|
|
|
consumeReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/gateway/consume-once", bytes.NewBufferString(`{"consumer":"gateway"}`))
|
|
consumeRR := httptest.NewRecorder()
|
|
handler.ServeHTTP(consumeRR, consumeReq)
|
|
if consumeRR.Code != http.StatusOK {
|
|
t.Fatalf("unexpected consume status: %d body=%s", consumeRR.Code, consumeRR.Body.String())
|
|
}
|
|
var consumeBody struct {
|
|
Items []struct {
|
|
EventID string `json:"event_id"`
|
|
GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"`
|
|
Result domain.GatewayAckResult `json:"result"`
|
|
} `json:"items"`
|
|
}
|
|
if err := json.NewDecoder(consumeRR.Body).Decode(&consumeBody); err != nil {
|
|
t.Fatalf("decode consume response: %v", err)
|
|
}
|
|
if len(consumeBody.Items) != 1 {
|
|
t.Fatalf("expected one consumed item, got %+v", consumeBody.Items)
|
|
}
|
|
lastConsumed := consumeBody.Items[0]
|
|
if lastConsumed.EventID != eventID {
|
|
t.Fatalf("expected consumed event %s, got %+v", eventID, lastConsumed)
|
|
}
|
|
if lastConsumed.GatewaySyncStatus != domain.GatewaySyncStatusApplied || lastConsumed.Result != domain.GatewayAckResultApplied {
|
|
t.Fatalf("expected applied consume result, got %+v", lastConsumed)
|
|
}
|
|
|
|
stateReq := httptest.NewRequest(http.MethodGet, "/internal/supply-intelligence/models/openai/"+model+"/admission-state", nil)
|
|
stateRR := httptest.NewRecorder()
|
|
handler.ServeHTTP(stateRR, stateReq)
|
|
if stateRR.Code != http.StatusOK {
|
|
t.Fatalf("unexpected admission-state status after consume: %d body=%s", stateRR.Code, stateRR.Body.String())
|
|
}
|
|
var stateBody struct {
|
|
Candidate *domain.DiscoveryCandidate `json:"candidate"`
|
|
Package *domain.SupplyPackage `json:"package"`
|
|
GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"`
|
|
LastEvent *domain.PackageChangeEvent `json:"last_event"`
|
|
}
|
|
if err := json.NewDecoder(stateRR.Body).Decode(&stateBody); err != nil {
|
|
t.Fatalf("decode admission-state response: %v", err)
|
|
}
|
|
if stateBody.Candidate == nil || stateBody.Candidate.Status != domain.DiscoveryCandidateStatusPublished {
|
|
t.Fatalf("expected published candidate, got %+v", stateBody.Candidate)
|
|
}
|
|
if stateBody.Package == nil || stateBody.Package.Status != "active" {
|
|
t.Fatalf("expected active package, got %+v", stateBody.Package)
|
|
}
|
|
if stateBody.LastEvent == nil || stateBody.LastEvent.EventID != eventID {
|
|
t.Fatalf("expected latest event %s, got %+v", eventID, stateBody.LastEvent)
|
|
}
|
|
if stateBody.GatewaySyncStatus != domain.GatewaySyncStatusApplied {
|
|
t.Fatalf("expected applied sync status after consume, got %q", stateBody.GatewaySyncStatus)
|
|
}
|
|
|
|
ackReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/gateway/package-changes/"+eventID+"/ack", bytes.NewBufferString(`{"consumer":"gateway","result":"applied","detail":"manual confirm"}`))
|
|
ackRR := httptest.NewRecorder()
|
|
handler.ServeHTTP(ackRR, ackReq)
|
|
if ackRR.Code != http.StatusNoContent {
|
|
t.Fatalf("unexpected ack status: %d body=%s", ackRR.Code, ackRR.Body.String())
|
|
}
|
|
|
|
finalStateReq := httptest.NewRequest(http.MethodGet, "/internal/supply-intelligence/models/openai/"+model+"/admission-state", nil)
|
|
finalStateRR := httptest.NewRecorder()
|
|
handler.ServeHTTP(finalStateRR, finalStateReq)
|
|
if finalStateRR.Code != http.StatusOK {
|
|
t.Fatalf("unexpected final admission-state status: %d body=%s", finalStateRR.Code, finalStateRR.Body.String())
|
|
}
|
|
var finalStateBody struct {
|
|
Candidate *domain.DiscoveryCandidate `json:"candidate"`
|
|
Package *domain.SupplyPackage `json:"package"`
|
|
GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"`
|
|
LastEvent *domain.PackageChangeEvent `json:"last_event"`
|
|
}
|
|
if err := json.NewDecoder(finalStateRR.Body).Decode(&finalStateBody); err != nil {
|
|
t.Fatalf("decode final admission-state response: %v", err)
|
|
}
|
|
if finalStateBody.GatewaySyncStatus != domain.GatewaySyncStatusApplied {
|
|
t.Fatalf("expected applied sync status after explicit ack, got %q", finalStateBody.GatewaySyncStatus)
|
|
}
|
|
if finalStateBody.LastEvent == nil || finalStateBody.LastEvent.Consumer != "gateway" || finalStateBody.LastEvent.ConsumerDetail != "manual confirm" {
|
|
t.Fatalf("expected ack details persisted, got %+v", finalStateBody.LastEvent)
|
|
}
|
|
|
|
storedEvent, ok := application.Repo.GetLatestPackageEvent(context.Background(), "openai", model)
|
|
if !ok {
|
|
t.Fatal("expected stored package event")
|
|
}
|
|
if storedEvent.EventID != eventID || storedEvent.GatewaySyncStatus != domain.GatewaySyncStatusApplied {
|
|
t.Fatalf("unexpected stored event: %+v", storedEvent)
|
|
}
|
|
if storedEvent.AckedAt == nil {
|
|
t.Fatalf("expected stored ack timestamp, got %+v", storedEvent)
|
|
}
|
|
|
|
storedSnapshot, ok := application.Repo.GetGatewayAppliedSnapshot(context.Background(), "gateway")
|
|
if !ok {
|
|
t.Fatal("expected gateway applied snapshot")
|
|
}
|
|
if storedSnapshot.LastEventID != eventID || storedSnapshot.LastModel != model || storedSnapshot.LastResult != string(domain.GatewayAckResultApplied) {
|
|
t.Fatalf("unexpected gateway snapshot: %+v", storedSnapshot)
|
|
}
|
|
}
|
|
|
|
func TestPostgresE2EPublishConsumeAckAdmissionStateRequiresAuthorizedConsumer(t *testing.T) {
|
|
application := newPostgresApplicationForE2E(t)
|
|
handler := application.Server.Routes()
|
|
|
|
model := fmt.Sprintf("gpt-4.1-e2e-unauth-%d", time.Now().UnixNano())
|
|
candidateID := fmt.Sprintf("cand-e2e-unauth-%d", time.Now().UnixNano())
|
|
eventID := fmt.Sprintf("evt-e2e-unauth-%d", time.Now().UnixNano())
|
|
|
|
application.Repo.UpsertSupplyAccount(context.Background(), domain.SupplyAccount{
|
|
AccountID: 9901,
|
|
Platform: "openai",
|
|
APIKey: "test-key",
|
|
ConsumerTag: "other-consumer",
|
|
Status: "active",
|
|
CreatedAt: time.Unix(90, 0).UTC(),
|
|
UpdatedAt: time.Unix(90, 0).UTC(),
|
|
})
|
|
application.Repo.UpsertDiscoveryCandidateContext(context.Background(), domain.DiscoveryCandidate{
|
|
CandidateID: candidateID,
|
|
AccountID: 9901,
|
|
Platform: "openai",
|
|
Model: model,
|
|
Source: "admission",
|
|
Status: domain.DiscoveryCandidateStatusTestPassed,
|
|
DiscoveredAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(110, 0).UTC(),
|
|
Version: 2,
|
|
})
|
|
application.Repo.UpsertSupplyPackage(context.Background(), domain.SupplyPackage{
|
|
Platform: "openai",
|
|
Model: model,
|
|
Status: "draft",
|
|
Source: "admission",
|
|
CreatedAt: time.Unix(100, 0).UTC(),
|
|
UpdatedAt: time.Unix(110, 0).UTC(),
|
|
Version: 1,
|
|
})
|
|
|
|
publishReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/publish/package-event", bytes.NewBufferString(fmt.Sprintf(`{"event_id":"%s","platform":"openai","model":"%s","occurred_at":"2026-05-06T20:45:00Z"}`, eventID, model)))
|
|
publishRR := httptest.NewRecorder()
|
|
handler.ServeHTTP(publishRR, publishReq)
|
|
if publishRR.Code != http.StatusOK {
|
|
t.Fatalf("unexpected publish status: %d body=%s", publishRR.Code, publishRR.Body.String())
|
|
}
|
|
|
|
authorizedAccounts := application.Repo.ListSupplyAccountsByConsumer(context.Background(), "gateway")
|
|
if len(authorizedAccounts) != 0 {
|
|
t.Fatalf("expected no accounts authorized for gateway, got %+v", authorizedAccounts)
|
|
}
|
|
consumeReq := httptest.NewRequest(http.MethodPost, "/internal/supply-intelligence/gateway/consume-once", bytes.NewBufferString(`{"consumer":"gateway"}`))
|
|
consumeRR := httptest.NewRecorder()
|
|
handler.ServeHTTP(consumeRR, consumeReq)
|
|
if consumeRR.Code != http.StatusOK {
|
|
t.Fatalf("unexpected consume status: %d body=%s", consumeRR.Code, consumeRR.Body.String())
|
|
}
|
|
var consumeBody struct {
|
|
Items []any `json:"items"`
|
|
}
|
|
if err := json.NewDecoder(consumeRR.Body).Decode(&consumeBody); err != nil {
|
|
t.Fatalf("decode consume response: %v", err)
|
|
}
|
|
if len(consumeBody.Items) != 0 {
|
|
t.Fatalf("expected unauthorized event to be skipped, got %+v", consumeBody.Items)
|
|
}
|
|
|
|
stateReq := httptest.NewRequest(http.MethodGet, "/internal/supply-intelligence/models/openai/"+model+"/admission-state", nil)
|
|
stateRR := httptest.NewRecorder()
|
|
handler.ServeHTTP(stateRR, stateReq)
|
|
if stateRR.Code != http.StatusOK {
|
|
t.Fatalf("unexpected admission-state status: %d body=%s", stateRR.Code, stateRR.Body.String())
|
|
}
|
|
var stateBody struct {
|
|
GatewaySyncStatus domain.GatewaySyncStatus `json:"gateway_sync_status"`
|
|
LastEvent *domain.PackageChangeEvent `json:"last_event"`
|
|
}
|
|
if err := json.NewDecoder(stateRR.Body).Decode(&stateBody); err != nil {
|
|
t.Fatalf("decode admission-state response: %v", err)
|
|
}
|
|
if stateBody.GatewaySyncStatus != domain.GatewaySyncStatusPending {
|
|
t.Fatalf("expected pending sync status when unauthorized consumer skips event, got %q", stateBody.GatewaySyncStatus)
|
|
}
|
|
if stateBody.LastEvent == nil || !strings.EqualFold(stateBody.LastEvent.EventID, eventID) {
|
|
t.Fatalf("expected last event to remain pending, got %+v", stateBody.LastEvent)
|
|
}
|
|
}
|