fix: add HTTP client connection pool limits, worker graceful shutdown, and PostgreSQL skip fallback for tests

This commit is contained in:
Your Name
2026-05-11 12:43:51 +08:00
parent 7e17e59ad1
commit a6ff7c88db
3 changed files with 27 additions and 5 deletions

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/bridge/ai-customer-service/internal/config" "github.com/bridge/ai-customer-service/internal/config"
@@ -59,6 +60,7 @@ func New(cfg *config.Config, logger *slog.Logger) (*App, error) {
checkers []health.Checker checkers []health.Checker
closers []func() error closers []func() error
workerClosers []func() error workerClosers []func() error
workerWg sync.WaitGroup
ticketListerStore ticketLister ticketListerStore ticketLister
sessionStore dialog.SessionRepository sessionStore dialog.SessionRepository
ticketStore dialog.TicketRepository ticketStore dialog.TicketRepository
@@ -164,13 +166,29 @@ func New(cfg *config.Config, logger *slog.Logger) (*App, error) {
workerCtx, cancel := context.WithCancel(context.Background()) workerCtx, cancel := context.WithCancel(context.Background())
workerClosers = append(workerClosers, func() error { workerClosers = append(workerClosers, func() error {
cancel() cancel()
return nil done := make(chan struct{})
go func() {
workerWg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-time.After(5 * time.Second):
return fmt.Errorf("worker %s shutdown timeout", platform)
}
}) })
worker := platformdelivery.NewWorker( worker := platformdelivery.NewWorker(
platform, platform,
profile.CallbackBaseURL, profile.CallbackBaseURL,
platformEvents, platformEvents,
&http.Client{Timeout: time.Duration(profile.CallbackTimeoutMS) * time.Millisecond}, &http.Client{
Timeout: time.Duration(profile.CallbackTimeoutMS) * time.Millisecond,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
},
},
platformdelivery.Signer{ platformdelivery.Signer{
Secret: profile.CallbackSecret, Secret: profile.CallbackSecret,
TimestampHeader: cfg.Webhook.TimestampHeader, TimestampHeader: cfg.Webhook.TimestampHeader,
@@ -182,7 +200,11 @@ func New(cfg *config.Config, logger *slog.Logger) (*App, error) {
worker.PollInterval = time.Duration(profile.CallbackPollIntervalMS) * time.Millisecond worker.PollInterval = time.Duration(profile.CallbackPollIntervalMS) * time.Millisecond
worker.BatchSize = profile.CallbackBatchSize worker.BatchSize = profile.CallbackBatchSize
worker.RetrySchedule = toRetrySchedule(profile.CallbackRetrySchedule) worker.RetrySchedule = toRetrySchedule(profile.CallbackRetrySchedule)
go worker.Start(workerCtx) workerWg.Add(1)
go func() {
defer workerWg.Done()
worker.Start(workerCtx)
}()
} }
startWorker("sub2api", cfg.PlatformAdapters.Sub2API) startWorker("sub2api", cfg.PlatformAdapters.Sub2API)
// startWorker("newapi", cfg.PlatformAdapters.NewAPI) // disabled until ingress implementation is complete // startWorker("newapi", cfg.PlatformAdapters.NewAPI) // disabled until ingress implementation is complete

View File

@@ -33,7 +33,7 @@ func openE2EPlatformDB(t *testing.T) *sql.DB {
ConnMaxLifetime: 30 * time.Second, ConnMaxLifetime: 30 * time.Second,
}) })
if err != nil { if err != nil {
t.Fatalf("open postgres failed: %v", err) t.Skipf("PostgreSQL not available, skipping E2E test: %v", err)
} }
if err := pgstore.RunMigrations(db, "../../db/migration"); err != nil { if err := pgstore.RunMigrations(db, "../../db/migration"); err != nil {
_ = db.Close() _ = db.Close()

View File

@@ -30,7 +30,7 @@ func openPlatformTestDB(t *testing.T) *sql.DB {
ConnMaxLifetime: 30 * time.Second, ConnMaxLifetime: 30 * time.Second,
}) })
if err != nil { if err != nil {
t.Fatalf("open postgres failed: %v", err) t.Skipf("PostgreSQL not available, skipping integration test: %v", err)
} }
if err := pgstore.RunMigrations(db, "../../db/migration"); err != nil { if err := pgstore.RunMigrations(db, "../../db/migration"); err != nil {
_ = db.Close() _ = db.Close()