Files
2026-05-12 18:49:52 +08:00

238 lines
8.8 KiB
Go

package app
import (
"context"
"fmt"
"time"
"supply-intelligence/internal/admission"
"supply-intelligence/internal/discovery"
"supply-intelligence/internal/domain"
"supply-intelligence/internal/gatewayconsumer"
"supply-intelligence/internal/httpapi"
"supply-intelligence/internal/integration"
"supply-intelligence/internal/poller"
"supply-intelligence/internal/probe"
"supply-intelligence/internal/publish"
"supply-intelligence/internal/repository"
)
type Application struct {
Repo repository.Repository
ProbeService *probe.Service
PublishService *publish.Service
DiscoveryService *discovery.Service
GatewayConsumerService *gatewayconsumer.Service
GatewayPoller *poller.GatewayPackagePoller
GatewayRuntime *poller.Runtime
DiscoveryRuntime *poller.DiscoveryRuntime
AdmissionService *admission.Service
AdmissionRuntime *poller.AdmissionRuntime
DiscoveryScheduler *discovery.DiscoveryScheduler
Server *httpapi.Server
cleanup func()
}
// New creates an Application backed by an in-memory repository.
// For production with PostgreSQL, use NewWithPostgres.
func New() *Application {
repo := repository.NewMemoryRepository()
return buildApp(repo, func() {})
}
// NewWithPostgres creates an Application backed by PostgreSQL.
// All services are wired to use the shared postgres repository.
func NewWithPostgres(ctx context.Context, connString string) (*Application, error) {
if connString == "" {
return nil, fmt.Errorf("empty connection string")
}
postgresRepo, err := repository.NewPostgresRepository(ctx, connString)
if err != nil {
return nil, fmt.Errorf("connect postgres: %w", err)
}
app := buildApp(postgresRepo, func() { postgresRepo.Close() })
return app, nil
}
// buildApp constructs all services wired to the given repository.
func buildApp(repo repository.Repository, cleanup func()) *Application {
// ── Probe ──────────────────────────────────────────────────────────────────
probeService := probe.NewService(repo)
// ── Publish ─────────────────────────────────────────────────────────────────
publishService := publish.NewService(repo)
// ── Discovery ──────────────────────────────────────────────────────────────
discoveryService := discovery.NewService(repo)
// ── Gateway Consumer ────────────────────────────────────────────────────────
gatewayConsumerService := gatewayconsumer.NewService(repo)
gatewayPoller := poller.NewGatewayPackagePoller(gatewayConsumerService)
gatewayRuntime := poller.NewRuntime(gatewayPoller, time.Second)
// ── Admission ───────────────────────────────────────────────────────────────
candidateRepo := &admissionCandidateAdapter{repo: repo}
packageRepo := &admissionPackageAdapter{repo: repo}
runner := admission.NewHTTPTestRunner()
testLogger := admission.NewTestLoggerAdapter(repo)
suites := []admission.TestSuite{
admission.BuildTestSuiteForPlatform("openai", "https://api.openai.com", ""),
admission.BuildTestSuiteForPlatform("anthropic", "https://api.anthropic.com", ""),
}
admissionService := admission.NewService(candidateRepo, packageRepo, suites, runner, testLogger)
admissionRuntime := poller.NewAdmissionRuntime(admissionService, 5*time.Minute)
// ── Discovery Scheduler & Runtime ───────────────────────────────────────────
adapterRegistry := discovery.NewSupplierAdapterRegistry()
httpClient := integration.NewDefaultHTTPClient()
adapterRegistry.Register(integration.NewOpenAIAdapter(httpClient))
adapterRegistry.Register(integration.NewAnthropicAdapter(httpClient))
discoveryScheduler := discovery.NewDiscoveryScheduler(discoveryService, adapterRegistry, repo)
discoveryRuntime := poller.NewDiscoveryRuntime(discoveryScheduler, 10*time.Minute)
// ── HTTP Server ──────────────────────────────────────────────────────────────
server := httpapi.NewServer(
repo, probeService, publishService,
gatewayConsumerService, gatewayRuntime, discoveryService,
admissionService, discoveryScheduler,
httpapi.NewDashboardHandler(repo),
)
return &Application{
Repo: repo,
ProbeService: probeService,
PublishService: publishService,
DiscoveryService: discoveryService,
GatewayConsumerService: gatewayConsumerService,
GatewayPoller: gatewayPoller,
GatewayRuntime: gatewayRuntime,
DiscoveryRuntime: discoveryRuntime,
AdmissionService: admissionService,
AdmissionRuntime: admissionRuntime,
DiscoveryScheduler: discoveryScheduler,
Server: server,
cleanup: cleanup,
}
}
func (a *Application) StartBackground(ctx context.Context) {
if a == nil || a.GatewayRuntime == nil {
return
}
a.GatewayRuntime.Start(ctx)
a.DiscoveryRuntime.Start(ctx)
a.AdmissionRuntime.Start(ctx)
}
func (a *Application) StopBackground() {
if a == nil {
return
}
if a.GatewayRuntime != nil {
a.GatewayRuntime.Stop()
}
if a.DiscoveryRuntime != nil {
a.DiscoveryRuntime.Stop()
}
if a.AdmissionRuntime != nil {
a.AdmissionRuntime.Stop()
}
}
// IsInMemoryGatewayState returns true when the application is backed by an in-memory repository.
func (a *Application) IsInMemoryGatewayState() bool {
if a == nil || a.Repo == nil {
return false
}
_, ok := a.Repo.(*repository.MemoryRepository)
return ok
}
func (a *Application) Close() {
if a == nil || a.cleanup == nil {
return
}
a.StopBackground()
a.cleanup()
}
// ─── Adapters: repository.Repository → admission package interfaces ───────────
type admissionCandidateAdapter struct {
repo repository.Repository
}
func (a *admissionCandidateAdapter) GetCandidateByIDContext(ctx context.Context, candidateID string) (admission.Candidate, bool) {
c, ok := a.repo.GetDiscoveryCandidateByIDContext(ctx, candidateID)
if !ok {
return admission.Candidate{}, false
}
return toAdmissionCandidate(c), true
}
func (a *admissionCandidateAdapter) UpdateCandidateStatus(ctx context.Context, candidateID string, status admission.CandidateStatus, failureCode, failureSummary string) error {
return a.repo.UpdateCandidateStatus(ctx, candidateID, domain.DiscoveryCandidateStatus(status), failureCode, failureSummary)
}
func (a *admissionCandidateAdapter) ListCandidatesByStatus(ctx context.Context, status admission.CandidateStatus) []admission.Candidate {
candidates := a.repo.ListDiscoveryCandidatesContext(ctx, domain.DiscoveryCandidateStatus(status))
result := make([]admission.Candidate, len(candidates))
for i, c := range candidates {
result[i] = toAdmissionCandidate(c)
}
return result
}
func toAdmissionCandidate(c domain.DiscoveryCandidate) admission.Candidate {
return admission.Candidate{
CandidateID: c.CandidateID,
AccountID: c.AccountID,
Platform: c.Platform,
Model: c.Model,
Status: admission.CandidateStatus(c.Status),
Source: c.Source,
ReasonCode: c.ReasonCode,
DiscoveredAt: c.DiscoveredAt,
UpdatedAt: c.UpdatedAt,
Version: c.Version,
}
}
type admissionPackageAdapter struct {
repo repository.Repository
}
func (a *admissionPackageAdapter) UpsertDraftPackage(ctx context.Context, platform, model, source string) (int64, error) {
if existing, ok := a.repo.GetSupplyPackage(ctx, platform, model); ok {
return existing.PackageID, nil
}
pkg := domain.SupplyPackage{
Platform: platform,
Model: model,
Status: "draft",
Source: source,
}
if err := a.repo.UpsertSupplyPackage(ctx, pkg); err != nil {
return 0, err
}
if newPkg, ok := a.repo.GetSupplyPackage(ctx, platform, model); ok {
return newPkg.PackageID, nil
}
return 0, nil
}
func (a *admissionPackageAdapter) GetDraftPackage(ctx context.Context, platform, model string) (admission.DraftPackage, bool) {
pkg, ok := a.repo.GetSupplyPackage(ctx, platform, model)
if !ok {
return admission.DraftPackage{}, false
}
return admission.DraftPackage{
PackageID: pkg.PackageID,
Platform: pkg.Platform,
Model: pkg.Model,
Status: pkg.Status,
Source: pkg.Source,
}, true
}