Files
llm-intelligence/scripts/materialize_daily_signals.go
phamnazage-jpg 475401bcbe
Some checks failed
CI / go-test (push) Has been cancelled
CI / scripts-regression (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / docker-build (push) Has been cancelled
feat(intraday): add discovery and verification watch pipeline
2026-05-27 18:54:32 +08:00

1225 lines
35 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//go:build llm_script
package main
import (
"database/sql"
"encoding/json"
"flag"
"fmt"
"log/slog"
"os"
"sort"
"strings"
"time"
_ "github.com/lib/pq"
)
type signalModelInfo struct {
Name string
ProviderName string
ProviderCountry string
ContextLength int
InputPrice float64
OutputPrice float64
Currency string
IsFree bool
OperatorName string
OperatorType string
}
type signalDailySignals struct {
NewModels int `json:"new_models"`
PriceChanges int `json:"price_changes"`
OfficialFree int `json:"official_free"`
AggregatorFree int `json:"aggregator_free"`
UnknownFree int `json:"unknown_free"`
}
type signalModelEvent struct {
EventType string `json:"event_type"`
ModelName string `json:"model_name"`
ProviderName string `json:"provider_name"`
OperatorName string `json:"operator_name"`
Audience string `json:"audience"`
TrustLabel string `json:"trust_label"`
SourceKindLabel string `json:"source_kind_label"`
PrimarySource string `json:"primary_source"`
SourceURL string `json:"source_url"`
UpdatedAt string `json:"updated_at"`
EvidenceDetail string `json:"evidence_detail"`
Baseline string `json:"baseline"`
Summary string `json:"summary"`
Currency string `json:"currency"`
OldInputPrice float64 `json:"old_input_price"`
NewInputPrice float64 `json:"new_input_price"`
OldOutputPrice float64 `json:"old_output_price"`
NewOutputPrice float64 `json:"new_output_price"`
PriceChangePct float64 `json:"price_change_pct"`
Priority int `json:"priority"`
}
type signalPromoCampaignDefinition struct {
Date string `json:"date"`
ModelName string `json:"model_name"`
ProviderName string `json:"provider_name"`
OperatorName string `json:"operator_name"`
Summary string `json:"summary"`
Audience string `json:"audience"`
Baseline string `json:"baseline"`
TrustLabel string `json:"trust_label"`
SourceKindLabel string `json:"source_kind_label"`
PrimarySource string `json:"primary_source"`
EvidenceDetail string `json:"evidence_detail"`
Priority int `json:"priority"`
}
type dailySignalSnapshot struct {
SignalDate string
Status string
Signals signalDailySignals
EventCount int
PageMode string
EventTypeCounts map[string]int
TopEvents []signalModelEvent
SourceAudit string
}
type materializeDailySignalsConfig struct {
Date string
SourceAudit string
DryRun bool
}
var signalLogger *slog.Logger
const signalUSDToCNY = 7.25
func init() {
signalLogger = slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
}
func main() {
loadSignalEnv()
var cfg materializeDailySignalsConfig
flag.StringVar(&cfg.Date, "date", signalDateValue(), "信号日期,格式 YYYY-MM-DD")
flag.StringVar(&cfg.SourceAudit, "source-audit", os.Getenv("SIGNAL_SOURCE_AUDIT"), "运行审计摘要")
flag.BoolVar(&cfg.DryRun, "dry-run", false, "仅计算并打印摘要,不写入数据库")
flag.Parse()
db, err := sql.Open("postgres", defaultSignalDSN())
if err != nil {
fmt.Fprintf(os.Stderr, "open db: %v\n", err)
os.Exit(1)
}
defer db.Close()
if err := runMaterializeDailySignals(db, cfg); err != nil {
fmt.Fprintf(os.Stderr, "materialize_daily_signals: %v\n", err)
os.Exit(1)
}
}
func loadSignalEnv() {
for _, path := range []string{".env.local", ".env"} {
data, err := os.ReadFile(path)
if err != nil {
continue
}
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
key, value, ok := strings.Cut(line, "=")
if !ok {
continue
}
key = strings.TrimSpace(key)
value = strings.Trim(strings.TrimSpace(value), `"'`)
if key == "" {
continue
}
if _, exists := os.LookupEnv(key); exists {
continue
}
_ = os.Setenv(key, value)
}
}
}
func defaultSignalDSN() string {
if dsn := os.Getenv("DATABASE_URL"); dsn != "" {
return dsn
}
return "postgres://long@/llm_intelligence?host=/var/run/postgresql"
}
func signalDateValue() string {
if value := strings.TrimSpace(os.Getenv("REPORT_DATE")); value != "" {
return value
}
return time.Now().Format("2006-01-02")
}
func runMaterializeDailySignals(db *sql.DB, cfg materializeDailySignalsConfig) error {
signals, err := loadSignalDailySignals(db, cfg.Date)
if err != nil {
return err
}
freeSignals, err := loadSignalFreeBreakdown(db)
if err != nil {
return err
}
signals.OfficialFree = freeSignals.OfficialFree
signals.AggregatorFree = freeSignals.AggregatorFree
signals.UnknownFree = freeSignals.UnknownFree
events, err := loadSignalModelEvents(db, cfg.Date)
if err != nil {
return err
}
snapshot := dailySignalSnapshot{
SignalDate: cfg.Date,
Status: "generated",
Signals: signals,
EventCount: len(events),
PageMode: buildSignalPageMode(signals, events),
EventTypeCounts: summarizeSignalEventTypes(events),
TopEvents: events,
SourceAudit: strings.TrimSpace(cfg.SourceAudit),
}
if cfg.DryRun {
fmt.Printf("source=daily-signal-materializer date=%s new_models=%d price_changes=%d event_count=%d page_mode=%s dry_run=true\n",
snapshot.SignalDate, snapshot.Signals.NewModels, snapshot.Signals.PriceChanges, snapshot.EventCount, snapshot.PageMode)
return nil
}
if err := upsertDailySignalSnapshot(db, snapshot); err != nil {
return err
}
fmt.Printf("source=daily-signal-materializer date=%s new_models=%d price_changes=%d event_count=%d page_mode=%s dry_run=false\n",
snapshot.SignalDate, snapshot.Signals.NewModels, snapshot.Signals.PriceChanges, snapshot.EventCount, snapshot.PageMode)
return nil
}
func upsertDailySignalSnapshot(db *sql.DB, snapshot dailySignalSnapshot) error {
eventTypeCounts, err := json.Marshal(snapshot.EventTypeCounts)
if err != nil {
return fmt.Errorf("marshal event_type_counts: %w", err)
}
topEvents, err := json.Marshal(snapshot.TopEvents)
if err != nil {
return fmt.Errorf("marshal top_events: %w", err)
}
_, err = db.Exec(
`INSERT INTO daily_signal_snapshot (
signal_date, status, new_models, price_changes,
official_free, aggregator_free, unknown_free,
event_count, page_mode, event_type_counts, top_events, source_audit
) VALUES (
$1::date, $2, $3, $4,
$5, $6, $7,
$8, $9, $10::jsonb, $11::jsonb, $12
)
ON CONFLICT (signal_date)
DO UPDATE SET
status = EXCLUDED.status,
new_models = EXCLUDED.new_models,
price_changes = EXCLUDED.price_changes,
official_free = EXCLUDED.official_free,
aggregator_free = EXCLUDED.aggregator_free,
unknown_free = EXCLUDED.unknown_free,
event_count = EXCLUDED.event_count,
page_mode = EXCLUDED.page_mode,
event_type_counts = EXCLUDED.event_type_counts,
top_events = EXCLUDED.top_events,
source_audit = EXCLUDED.source_audit,
generated_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP`,
snapshot.SignalDate, snapshot.Status, snapshot.Signals.NewModels, snapshot.Signals.PriceChanges,
snapshot.Signals.OfficialFree, snapshot.Signals.AggregatorFree, snapshot.Signals.UnknownFree,
snapshot.EventCount, snapshot.PageMode, string(eventTypeCounts), string(topEvents), signalNullIfBlank(snapshot.SourceAudit),
)
if err != nil {
return fmt.Errorf("upsert daily_signal_snapshot: %w", err)
}
return nil
}
func loadSignalDailySignals(db *sql.DB, date string) (signalDailySignals, error) {
signals := signalDailySignals{}
if err := db.QueryRow(`
SELECT COUNT(*)
FROM models
WHERE deleted_at IS NULL
AND DATE(created_at) = $1::date
`, date).Scan(&signals.NewModels); err != nil {
return signals, err
}
if err := db.QueryRow(`
SELECT COUNT(*)
FROM pricing_history
WHERE DATE(changed_at) = $1::date
`, date).Scan(&signals.PriceChanges); err != nil {
return signals, err
}
return signals, nil
}
func loadSignalFreeBreakdown(db *sql.DB) (signalDailySignals, error) {
rows, err := db.Query(`
WITH latest_prices AS (
SELECT
rp.model_id,
COALESCE(o.name, 'Unknown') AS operator_name,
COALESCE(o.type, 'reseller') AS operator_type,
rp.currency,
rp.input_price_per_mtok,
rp.output_price_per_mtok,
rp.is_free,
ROW_NUMBER() OVER (
PARTITION BY rp.model_id
ORDER BY rp.effective_date DESC NULLS LAST, rp.id DESC
) AS rn
FROM region_pricing rp
LEFT JOIN operator o ON rp.operator_id = o.id
)
SELECT
COALESCE(NULLIF(m.name, ''), m.external_id) AS model_name,
COALESCE(mp.name, split_part(m.external_id, '/', 1)) AS provider_name,
COALESCE(mp.country, 'unknown') AS provider_country,
COALESCE(m.context_length, 0) AS context_length,
COALESCE(lp.input_price_per_mtok, 0) AS input_price,
COALESCE(lp.output_price_per_mtok, 0) AS output_price,
COALESCE(lp.currency, 'USD') AS currency,
COALESCE(lp.operator_name, 'OpenRouter') AS operator_name,
COALESCE(lp.operator_type, 'reseller') AS operator_type
FROM models m
LEFT JOIN model_provider mp ON m.provider_id = mp.id
LEFT JOIN latest_prices lp ON lp.model_id = m.id AND lp.rn = 1
WHERE m.deleted_at IS NULL
AND COALESCE(lp.is_free, false) = true
`)
if err != nil {
return signalDailySignals{}, err
}
defer rows.Close()
signals := signalDailySignals{}
for rows.Next() {
var model signalModelInfo
if err := rows.Scan(
&model.Name,
&model.ProviderName,
&model.ProviderCountry,
&model.ContextLength,
&model.InputPrice,
&model.OutputPrice,
&model.Currency,
&model.OperatorName,
&model.OperatorType,
); err != nil {
return signalDailySignals{}, err
}
switch classifySignalFreeSource(model) {
case "官方免费":
signals.OfficialFree++
case "聚合免费":
signals.AggregatorFree++
default:
signals.UnknownFree++
}
}
return signals, rows.Err()
}
func loadSignalModelEvents(db *sql.DB, date string) ([]signalModelEvent, error) {
var events []signalModelEvent
newModelEvents, err := loadSignalNewModelEvents(db, date)
if err != nil {
return nil, err
}
events = append(events, newModelEvents...)
releaseEvents, err := loadSignalOfficialReleaseEvents(db, date)
if err != nil {
return nil, err
}
events = append(events, releaseEvents...)
promoEvents, err := loadSignalPromoCampaignEvents(date)
if err != nil {
return nil, err
}
events = append(events, promoEvents...)
priceEvents, err := loadSignalPriceChangeEvents(db, date)
if err != nil {
return nil, err
}
events = append(events, priceEvents...)
discoveryEvents, err := loadVerifiedDiscoverySignalEvents(db, date)
if err != nil {
return nil, err
}
events = mergeVerifiedDiscoveryEvents(events, discoveryEvents)
sort.Slice(events, func(i, j int) bool {
if events[i].Priority != events[j].Priority {
return events[i].Priority > events[j].Priority
}
return events[i].ModelName < events[j].ModelName
})
return dedupeSignalEvents(events), nil
}
func loadSignalPromoCampaignEvents(date string) ([]signalModelEvent, error) {
path, err := resolveSignalPromoCampaignDataPath()
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
body, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var definitions []signalPromoCampaignDefinition
if err := json.Unmarshal(body, &definitions); err != nil {
return nil, err
}
events := make([]signalModelEvent, 0)
for _, definition := range definitions {
if definition.Date != date {
continue
}
events = append(events, signalModelEvent{
EventType: "promo_campaign",
ModelName: definition.ModelName,
ProviderName: definition.ProviderName,
OperatorName: definition.OperatorName,
Audience: signalFirstNonEmpty(definition.Audience, "适合计划利用活动窗口压低成本的团队"),
TrustLabel: signalFirstNonEmpty(definition.TrustLabel, "官方来源 / 一级证据"),
SourceKindLabel: signalFirstNonEmpty(definition.SourceKindLabel, "官方活动页"),
PrimarySource: definition.PrimarySource,
SourceURL: definition.PrimarySource,
UpdatedAt: signalFormatEventUpdatedAt("", definition.Date),
EvidenceDetail: definition.EvidenceDetail,
Baseline: signalFirstNonEmpty(definition.Baseline, "活动窗口开启"),
Summary: definition.Summary,
Priority: signalMaxInt(definition.Priority, 115),
})
}
return events, nil
}
func resolveSignalPromoCampaignDataPath() (string, error) {
candidates := []string{
filepathJoin("scripts", "testdata", "report_promo_campaigns.json"),
filepathJoin("testdata", "report_promo_campaigns.json"),
}
for _, candidate := range candidates {
if _, err := os.Stat(candidate); err == nil {
return candidate, nil
}
}
return "", os.ErrNotExist
}
func loadSignalOfficialReleaseEvents(db *sql.DB, date string) ([]signalModelEvent, error) {
rows, err := db.Query(`
WITH latest_prices AS (
SELECT
rp.model_id,
COALESCE(o.name, 'Unknown') AS operator_name,
COALESCE(o.type, 'reseller') AS operator_type,
rp.currency,
ROW_NUMBER() OVER (
PARTITION BY rp.model_id
ORDER BY rp.effective_date DESC NULLS LAST, rp.id DESC
) AS rn
FROM region_pricing rp
LEFT JOIN operator o ON rp.operator_id = o.id
)
SELECT
COALESCE(NULLIF(m.name, ''), m.external_id) AS model_name,
COALESCE(mp.name, split_part(m.external_id, '/', 1)) AS provider_name,
COALESCE(lp.operator_name, 'Unknown') AS operator_name,
COALESCE(lp.operator_type, 'reseller') AS operator_type,
COALESCE(m.source_url, '') AS source_url,
COALESCE(m.date_confidence, 'unknown') AS date_confidence,
COALESCE(m.date_source_kind, 'unknown') AS date_source_kind,
COALESCE(mp.country, 'unknown') AS provider_country,
COALESCE(m.release_date, m.created_at::date) AS release_date,
COALESCE(lp.currency, 'USD') AS currency
FROM models m
LEFT JOIN model_provider mp ON m.provider_id = mp.id
LEFT JOIN latest_prices lp ON lp.model_id = m.id AND lp.rn = 1
WHERE m.deleted_at IS NULL
AND m.release_date = $1::date
AND COALESCE(m.source_url, '') <> ''
AND COALESCE(lp.operator_type, 'reseller') IN ('official', 'cloud')
ORDER BY m.release_date DESC, m.id DESC
LIMIT 8
`, date)
if err != nil {
return nil, err
}
defer rows.Close()
var events []signalModelEvent
for rows.Next() {
var (
modelName string
providerName string
operatorName string
operatorType string
sourceURL string
dateConfidence string
dateSourceKind string
providerCountry string
releaseDate time.Time
currency string
)
if err := rows.Scan(
&modelName,
&providerName,
&operatorName,
&operatorType,
&sourceURL,
&dateConfidence,
&dateSourceKind,
&providerCountry,
&releaseDate,
&currency,
); err != nil {
return nil, err
}
model := signalModelInfo{
Name: modelName,
ProviderName: providerName,
ProviderCountry: providerCountry,
Currency: currency,
OperatorName: operatorName,
OperatorType: operatorType,
}
events = append(events, signalModelEvent{
EventType: "official_release",
ModelName: modelName,
ProviderName: providerName,
OperatorName: operatorName,
Audience: "适合需要复查默认选型与路线图判断的团队",
TrustLabel: buildSignalReleaseTrustLabel(model, dateConfidence),
SourceKindLabel: buildSignalReleaseSourceKindLabel(dateSourceKind, dateConfidence),
PrimarySource: sourceURL,
SourceURL: sourceURL,
UpdatedAt: releaseDate.Format("2006-01-02 15:04"),
EvidenceDetail: buildSignalReleaseEvidenceDetail(dateSourceKind, dateConfidence),
Baseline: "官方首次发布",
Summary: fmt.Sprintf("%s 官方发布新模型,值得优先复查默认选型。", providerName),
Currency: currency,
Priority: 120,
})
}
return events, rows.Err()
}
func loadSignalNewModelEvents(db *sql.DB, date string) ([]signalModelEvent, error) {
rows, err := db.Query(`
WITH latest_prices AS (
SELECT
rp.model_id,
COALESCE(o.name, 'Unknown') AS operator_name,
COALESCE(o.type, 'reseller') AS operator_type,
rp.currency,
rp.input_price_per_mtok,
rp.output_price_per_mtok,
rp.is_free,
ROW_NUMBER() OVER (
PARTITION BY rp.model_id
ORDER BY rp.effective_date DESC NULLS LAST, rp.id DESC
) AS rn
FROM region_pricing rp
LEFT JOIN operator o ON rp.operator_id = o.id
)
SELECT
COALESCE(NULLIF(m.name, ''), m.external_id) AS model_name,
COALESCE(mp.name, split_part(m.external_id, '/', 1)) AS provider_name,
COALESCE(lp.operator_name, 'OpenRouter') AS operator_name,
COALESCE(lp.operator_type, 'reseller') AS operator_type,
COALESCE(lp.currency, 'USD') AS currency,
COALESCE(lp.input_price_per_mtok, 0) AS input_price,
COALESCE(lp.output_price_per_mtok, 0) AS output_price,
COALESCE(lp.is_free, false) AS is_free,
COALESCE(m.context_length, 0) AS context_length,
COALESCE(mp.country, 'unknown') AS provider_country,
m.created_at
FROM models m
LEFT JOIN model_provider mp ON m.provider_id = mp.id
LEFT JOIN latest_prices lp ON lp.model_id = m.id AND lp.rn = 1
WHERE m.deleted_at IS NULL
AND DATE(m.created_at) = $1::date
ORDER BY m.created_at DESC, m.id DESC
LIMIT 8
`, date)
if err != nil {
return nil, err
}
defer rows.Close()
var events []signalModelEvent
for rows.Next() {
var model signalModelInfo
var createdAt time.Time
if err := rows.Scan(
&model.Name,
&model.ProviderName,
&model.OperatorName,
&model.OperatorType,
&model.Currency,
&model.InputPrice,
&model.OutputPrice,
&model.IsFree,
&model.ContextLength,
&model.ProviderCountry,
&createdAt,
); err != nil {
return nil, err
}
summary := "新模型进入情报池,值得重新评估当前默认选择。"
if model.IsFree {
summary = fmt.Sprintf("新模型首日可免费试用,需注意其免费来源属于%s。", classifySignalFreeSource(model))
} else if model.ContextLength >= 1024*256 {
summary = fmt.Sprintf("新模型带来 %s 长上下文,值得复查 Agent 和代码场景。", signalFormatContextWindowCompact(model.ContextLength))
}
events = append(events, signalModelEvent{
EventType: "new_model",
ModelName: model.Name,
ProviderName: model.ProviderName,
OperatorName: model.OperatorName,
Audience: "适合想尽快验证新模型价值的选型读者",
TrustLabel: buildSignalTrustLabel(model),
SourceKindLabel: "模型快照",
PrimarySource: buildSignalPrimarySource("region_pricing", model.OperatorName),
SourceURL: buildSignalPrimarySource("region_pricing", model.OperatorName),
UpdatedAt: createdAt.Format("2006-01-02 15:04"),
EvidenceDetail: "models.created_at = 今日,且已存在最新价格快照",
Baseline: "首次出现",
Summary: summary,
Currency: model.Currency,
NewInputPrice: model.InputPrice,
NewOutputPrice: model.OutputPrice,
Priority: 85 + signalMinInt(model.ContextLength/(1024*128), 10),
})
}
return events, rows.Err()
}
func loadSignalPriceChangeEvents(db *sql.DB, date string) ([]signalModelEvent, error) {
rows, err := db.Query(`
WITH latest_prices AS (
SELECT
rp.model_id,
COALESCE(o.name, 'Unknown') AS operator_name,
COALESCE(o.type, 'reseller') AS operator_type,
ROW_NUMBER() OVER (
PARTITION BY rp.model_id
ORDER BY rp.effective_date DESC NULLS LAST, rp.id DESC
) AS rn
FROM region_pricing rp
LEFT JOIN operator o ON rp.operator_id = o.id
)
SELECT
COALESCE(NULLIF(m.name, ''), m.external_id) AS model_name,
COALESCE(mp.name, split_part(m.external_id, '/', 1)) AS provider_name,
COALESCE(lp.operator_name, 'OpenRouter') AS operator_name,
COALESCE(lp.operator_type, 'reseller') AS operator_type,
ph.currency,
COALESCE(ph.old_input_price, 0),
COALESCE(ph.new_input_price, 0),
COALESCE(ph.old_output_price, 0),
COALESCE(ph.new_output_price, 0),
COALESCE(mp.country, 'unknown') AS provider_country,
ph.changed_at
FROM pricing_history ph
JOIN models m ON ph.model_id = m.id
LEFT JOIN model_provider mp ON m.provider_id = mp.id
LEFT JOIN latest_prices lp ON lp.model_id = m.id AND lp.rn = 1
WHERE DATE(ph.changed_at) = $1::date
ORDER BY ph.changed_at DESC, ph.id DESC
LIMIT 16
`, date)
if err != nil {
return nil, err
}
defer rows.Close()
var events []signalModelEvent
for rows.Next() {
var (
model signalModelInfo
oldInputPrice float64
newInputPrice float64
oldOutputPrice float64
newOutputPrice float64
changedAt time.Time
)
if err := rows.Scan(
&model.Name,
&model.ProviderName,
&model.OperatorName,
&model.OperatorType,
&model.Currency,
&oldInputPrice,
&newInputPrice,
&oldOutputPrice,
&newOutputPrice,
&model.ProviderCountry,
&changedAt,
); err != nil {
return nil, err
}
changePct := signalSignedPriceChangePct(oldInputPrice, newInputPrice, oldOutputPrice, newOutputPrice)
if changePct == 0 {
continue
}
eventType := "price_increase"
summary := "价格上调已足以影响默认成本,需要确认备用模型。"
if changePct < 0 {
eventType = "price_cut"
summary = "价格下降已足以影响默认选型,值得重新评估同类模型。"
}
events = append(events, signalModelEvent{
EventType: eventType,
ModelName: model.Name,
ProviderName: model.ProviderName,
OperatorName: model.OperatorName,
Audience: buildSignalPriceEventAudience(changePct),
TrustLabel: buildSignalTrustLabel(model),
SourceKindLabel: "价格快照",
PrimarySource: "pricing_history",
SourceURL: buildSignalPrimarySource("region_pricing", model.OperatorName),
UpdatedAt: changedAt.Format("2006-01-02 15:04"),
EvidenceDetail: buildSignalPriceEvidenceDetail(changePct, oldInputPrice, newInputPrice, model.Currency),
Baseline: fmt.Sprintf("较昨日 %+.0f%%", changePct),
Summary: summary,
Currency: model.Currency,
OldInputPrice: oldInputPrice,
NewInputPrice: newInputPrice,
OldOutputPrice: oldOutputPrice,
NewOutputPrice: newOutputPrice,
PriceChangePct: changePct,
Priority: 70 + signalMinInt(int(signalAbs(changePct)), 25),
})
}
return events, rows.Err()
}
func summarizeSignalEventTypes(events []signalModelEvent) map[string]int {
counts := make(map[string]int)
for _, event := range events {
counts[event.EventType]++
}
return counts
}
func dedupeSignalEvents(events []signalModelEvent) []signalModelEvent {
seen := make(map[string]struct{})
result := make([]signalModelEvent, 0, len(events))
for _, event := range events {
key := event.EventType + "|" + event.ModelName
if _, exists := seen[key]; exists {
continue
}
seen[key] = struct{}{}
result = append(result, event)
}
return result
}
func loadVerifiedDiscoverySignalEvents(db *sql.DB, date string) ([]signalModelEvent, error) {
rows, err := db.Query(`
SELECT
event_type,
provider_name,
COALESCE(model_name, ''),
COALESCE(provider_country, ''),
title,
COALESCE(summary, ''),
COALESCE(candidate_urls::text, '[]'),
COALESCE(verification_notes, ''),
updated_at
FROM intraday_news_candidate
WHERE candidate_date = $1::date
AND status = 'verified'
AND verification_confidence = 'official_confirmed'
ORDER BY updated_at DESC, id DESC
`, date)
if err != nil {
if strings.Contains(err.Error(), `relation "intraday_news_candidate" does not exist`) {
return nil, nil
}
return nil, err
}
defer rows.Close()
var events []signalModelEvent
for rows.Next() {
var (
eventType string
providerName string
modelName string
providerCountry string
title string
summary string
rawURLs string
notes string
updatedAt time.Time
)
if err := rows.Scan(&eventType, &providerName, &modelName, &providerCountry, &title, &summary, &rawURLs, &notes, &updatedAt); err != nil {
return nil, err
}
var urls []string
if err := json.Unmarshal([]byte(rawURLs), &urls); err != nil {
return nil, fmt.Errorf("unmarshal discovery candidate urls: %w", err)
}
primaryURL := firstString(urls)
if strings.TrimSpace(primaryURL) == "" {
continue
}
normalizedType := signalNormalizeIntradayEventType(eventType)
events = append(events, signalModelEvent{
EventType: normalizedType,
ModelName: signalFirstNonEmpty(modelName, title),
ProviderName: providerName,
OperatorName: providerName,
Audience: buildDiscoveryAudience(normalizedType),
TrustLabel: "官方来源 / discovery 验证",
SourceKindLabel: buildDiscoverySourceKind(normalizedType),
PrimarySource: primaryURL,
SourceURL: primaryURL,
UpdatedAt: updatedAt.Format("2006-01-02 15:04"),
EvidenceDetail: signalFirstNonEmpty(notes, summary),
Baseline: buildDiscoveryBaseline(normalizedType),
Summary: signalFirstNonEmpty(summary, title),
Priority: buildDiscoveryPriority(normalizedType),
})
}
if err := rows.Err(); err != nil {
return nil, err
}
return filterVerifiedDiscoverySignalEvents(events), nil
}
func filterVerifiedDiscoverySignalEvents(events []signalModelEvent) []signalModelEvent {
filtered := make([]signalModelEvent, 0, len(events))
for _, event := range events {
switch event.EventType {
case "official_release", "promo_campaign", "price_cut", "price_increase":
filtered = append(filtered, event)
}
}
return filtered
}
func mergeVerifiedDiscoveryEvents(nativeEvents, discoveryEvents []signalModelEvent) []signalModelEvent {
merged := append([]signalModelEvent{}, nativeEvents...)
index := make(map[string]int, len(merged))
for i, event := range merged {
index[signalEventMergeKey(event)] = i
}
for _, event := range filterVerifiedDiscoverySignalEvents(discoveryEvents) {
key := signalEventMergeKey(event)
if idx, exists := index[key]; exists {
merged[idx] = mergeSignalEventEvidence(merged[idx], event)
continue
}
index[key] = len(merged)
merged = append(merged, event)
}
return merged
}
func mergeSignalEventEvidence(native, discovery signalModelEvent) signalModelEvent {
merged := native
if strings.TrimSpace(merged.SourceKindLabel) == "" {
merged.SourceKindLabel = discovery.SourceKindLabel
}
if strings.TrimSpace(merged.SourceURL) == "" {
merged.SourceURL = discovery.SourceURL
}
if strings.TrimSpace(merged.PrimarySource) == "" {
merged.PrimarySource = discovery.PrimarySource
}
if strings.TrimSpace(merged.EvidenceDetail) == "" {
merged.EvidenceDetail = discovery.EvidenceDetail
}
if strings.TrimSpace(merged.TrustLabel) == "" {
merged.TrustLabel = discovery.TrustLabel
}
return merged
}
func signalEventMergeKey(event signalModelEvent) string {
return strings.Join([]string{
signalNormalizeIntradayEventType(event.EventType),
signalNormalizeWord(event.ProviderName),
signalNormalizeWord(event.ModelName),
}, "|")
}
func buildDiscoveryAudience(eventType string) string {
switch eventType {
case "official_release":
return "适合需要尽快复查默认选型与路线图影响的团队"
case "promo_campaign":
return "适合想利用活动窗口压低成本的团队"
case "price_cut":
return "适合准备趁降价重排默认模型的团队"
case "price_increase":
return "适合提前准备替代模型和预算回退方案的团队"
default:
return "适合关注日内情报变化的读者"
}
}
func buildDiscoverySourceKind(eventType string) string {
switch eventType {
case "official_release":
return "discovery 验证 / 官方发布页"
case "promo_campaign":
return "discovery 验证 / 官方活动页"
case "price_cut", "price_increase":
return "discovery 验证 / 官方价格页"
default:
return "discovery 验证"
}
}
func buildDiscoveryBaseline(eventType string) string {
switch eventType {
case "official_release":
return "discovery 验证通过"
case "promo_campaign":
return "活动窗口已验证"
case "price_cut", "price_increase":
return "official_confirmed"
default:
return "discovery verified"
}
}
func buildDiscoveryPriority(eventType string) int {
switch eventType {
case "official_release":
return 118
case "promo_campaign":
return 112
case "price_cut":
return 96
case "price_increase":
return 94
default:
return 80
}
}
func firstString(values []string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return value
}
}
return ""
}
func signalNormalizeIntradayEventType(value string) string {
switch strings.TrimSpace(strings.ToLower(value)) {
case "price_cut":
return "price_cut"
case "price_increase":
return "price_increase"
case "official_release":
return "official_release"
case "promo_campaign":
return "promo_campaign"
default:
return "unknown"
}
}
func signalNormalizeWord(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
value = strings.ReplaceAll(value, "_", "-")
var b strings.Builder
lastDash := false
for _, r := range value {
isAlphaNum := (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9')
if isAlphaNum {
b.WriteRune(r)
lastDash = false
continue
}
if !lastDash {
b.WriteByte('-')
lastDash = true
}
}
result := strings.Trim(b.String(), "-")
if result == "" {
return "unknown"
}
return result
}
func classifySignalFreeSource(model signalModelInfo) string {
switch model.OperatorType {
case "official", "cloud":
return "官方免费"
case "reseller":
if isSignalVerifiedAggregator(model.OperatorName) {
return "聚合免费"
}
}
return "待确认"
}
func isSignalVerifiedAggregator(name string) bool {
switch strings.ToLower(strings.TrimSpace(name)) {
case "openrouter", "siliconflow", "fireworks", "groq":
return true
default:
return false
}
}
func buildSignalPageMode(signals signalDailySignals, events []signalModelEvent) string {
if hasSignalEventType(events, "official_release") || hasSignalEventType(events, "promo_campaign") {
return "hot"
}
if signals.NewModels == 0 && signals.PriceChanges == 0 {
return "calm"
}
if signals.NewModels+signals.PriceChanges >= 3 {
return "hot"
}
return "standard"
}
func hasSignalEventType(events []signalModelEvent, eventType string) bool {
for _, event := range events {
if event.EventType == eventType {
return true
}
}
return false
}
func buildSignalTrustLabel(model signalModelInfo) string {
switch model.OperatorType {
case "official", "cloud":
return "官方来源"
case "reseller":
if isSignalVerifiedAggregator(model.OperatorName) {
return "聚合来源"
}
}
return "待验证来源"
}
func buildSignalPrimarySource(sourceKind, operatorName string) string {
switch sourceKind {
case "region_pricing":
if operatorName == "" {
return "region_pricing"
}
return operatorName + " / region_pricing"
default:
return sourceKind
}
}
func buildSignalPriceEvidenceDetail(changePct, oldPrice, newPrice float64, currency string) string {
direction := "上涨"
if changePct < 0 {
direction = "下降"
}
return fmt.Sprintf(
"pricing_history 记录到输入价格由 %s 调整为 %s较昨日%s %.0f%%",
signalFormatPrice(oldPrice, currency),
signalFormatPrice(newPrice, currency),
direction,
signalAbs(changePct),
)
}
func buildSignalReleaseSourceKindLabel(dateSourceKind, dateConfidence string) string {
switch {
case dateSourceKind == "secondary_authoritative_report" || dateConfidence == "secondary_authoritative":
return "二级权威佐证发布"
case dateSourceKind == "official_announcement" && dateConfidence == "official_primary":
return "一级官方发布"
case dateSourceKind == "official_product_page":
return "官方产品页"
case dateSourceKind == "catalog_backfill":
return "目录回填"
default:
return "一级官方发布"
}
}
func buildSignalReleaseEvidenceDetail(dateSourceKind, dateConfidence string) string {
switch {
case dateSourceKind == "secondary_authoritative_report" || dateConfidence == "secondary_authoritative":
return "models.release_date = 今日,发布日期采用次级权威报道佐证,模型来源页保留官方文档"
case dateSourceKind == "official_announcement" && dateConfidence == "official_primary":
return "models.release_date = 今日,且 source_url 指向官方发布页"
case dateSourceKind == "official_product_page":
return "models.release_date = 今日,来源页为官方产品页,发布日期置信度待确认"
case dateSourceKind == "catalog_backfill":
return "models.release_date = 今日,发布日期来自目录级元数据回填"
default:
return "models.release_date = 今日,且已记录发布日期证据元数据"
}
}
func buildSignalReleaseTrustLabel(model signalModelInfo, dateConfidence string) string {
base := buildSignalTrustLabel(model)
switch dateConfidence {
case "official_primary":
return base + " / 一级证据"
case "secondary_authoritative":
return base + " / 二级佐证"
default:
return base
}
}
func buildSignalPriceEventAudience(changePct float64) string {
if changePct < 0 {
return "适合以成本为先、准备趁降价重排默认选型的团队"
}
return "适合需要提前准备替代模型和预算回退方案的团队"
}
func signalFormatEventUpdatedAt(value, fallbackDate string) string {
if strings.TrimSpace(value) != "" {
return value
}
if fallbackDate != "" {
return fallbackDate + " 00:00"
}
return "-"
}
func signalFormatPrice(price float64, currency string) string {
if price <= 0 {
return "免费"
}
if currency == "CNY" {
if price < 1 {
return fmt.Sprintf("¥%.2f", price)
}
return fmt.Sprintf("¥%.1f", price)
}
cny := price * signalUSDToCNY
if cny < 1 {
return fmt.Sprintf("¥%.2f", cny)
}
return fmt.Sprintf("¥%.1f", cny)
}
func signalFormatContextWindowCompact(value int) string {
if value <= 0 {
return "-"
}
if value%(1024*1024) == 0 {
return fmt.Sprintf("%dM", value/(1024*1024))
}
if value%1024 == 0 {
return fmt.Sprintf("%dK", value/1024)
}
return fmt.Sprintf("%d", value)
}
func signalSignedPriceChangePct(oldInput, newInput, oldOutput, newOutput float64) float64 {
inputPct := signalSignedChange(oldInput, newInput)
outputPct := signalSignedChange(oldOutput, newOutput)
if signalAbs(inputPct) >= signalAbs(outputPct) {
return inputPct
}
return outputPct
}
func signalSignedChange(oldValue, newValue float64) float64 {
if oldValue == 0 {
if newValue == 0 {
return 0
}
return 100
}
return ((newValue - oldValue) / oldValue) * 100
}
func signalFirstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return value
}
}
return ""
}
func signalAbs(v float64) float64 {
if v < 0 {
return -v
}
return v
}
func signalMinInt(a, b int) int {
if a < b {
return a
}
return b
}
func signalMaxInt(a, b int) int {
if a > b {
return a
}
return b
}
func filepathJoin(parts ...string) string {
return strings.Join(parts, string(os.PathSeparator))
}
func signalNullIfBlank(value string) any {
if strings.TrimSpace(value) == "" {
return nil
}
return value
}