Files
llm-intelligence/scripts/run_real_pipeline.sh
phamnazage-jpg a8999abcb0 feat(runtime): harden daily pipeline audit and verification
Tighten real-ingestion success rules, separate scheduled reports from historical rebuilds, and persist source-level runtime audit across daily pipeline runs.

Also add the Phase 5 CI workflow contract plus verification updates and supporting docs so the full uncommitted change set can be validated together.
2026-05-14 16:17:39 +08:00

183 lines
5.6 KiB
Bash
Executable File

#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
. "$ROOT_DIR/scripts/report_utils.sh"
cd "$ROOT_DIR"
if [[ -f ".env.local" ]]; then
# shellcheck disable=SC1091
source ".env.local"
fi
if [[ -f ".env" ]]; then
# shellcheck disable=SC1091
source ".env"
fi
if [[ -z "${DATABASE_URL:-}" ]]; then
echo "DATABASE_URL 未设置" >&2
exit 1
fi
if [[ -z "${OPENROUTER_API_KEY:-}" ]]; then
echo "OPENROUTER_API_KEY 未设置,无法执行真实采集" >&2
exit 1
fi
REPORT_DATE="$(report_date_value)"
FETCH_OUT="$ROOT_DIR/models.json"
FETCH_TOTAL="0"
PIPELINE_STAGE_SET="openrouter,multi_source,official_imports,daily_report"
PIPELINE_SOURCE_SET="openrouter,moonshot,deepseek,openai,zhipu,baidu,bytedance"
PIPELINE_FAILED_SOURCE_SET="none"
MULTI_SOURCE_AUDIT="multi_source_audit=unavailable"
PIPELINE_AUDIT_SUMMARY=""
normalize_summary_file() {
local path="$1"
if [[ ! -f "$path" ]]; then
return
fi
tr '\n' ' ' < "$path" | sed 's/[[:space:]]\+/ /g; s/^ //; s/ $//'
}
extract_failed_source_keys() {
local summary="$1"
printf '%s\n' "$summary" | sed -n 's/.*failed_source_keys=\([^ ]*\).*/\1/p'
}
merge_failed_source_keys() {
local keys="$1"
if [[ -z "$keys" || "$keys" == "none" ]]; then
return
fi
if [[ "$PIPELINE_FAILED_SOURCE_SET" == "none" ]]; then
PIPELINE_FAILED_SOURCE_SET="$keys"
return
fi
PIPELINE_FAILED_SOURCE_SET="${PIPELINE_FAILED_SOURCE_SET},${keys}"
}
refresh_pipeline_audit() {
PIPELINE_AUDIT_SUMMARY="runtime_audit stage_set=${PIPELINE_STAGE_SET} selected_source_keys=${PIPELINE_SOURCE_SET} failed_source_keys=${PIPELINE_FAILED_SOURCE_SET} openrouter_total=${FETCH_TOTAL:-0} ${MULTI_SOURCE_AUDIT}"
}
record_failure() {
local error_message output_path
error_message="$1"
output_path=""
refresh_pipeline_audit
if [[ -f "$(report_markdown_path "$REPORT_DATE")" ]]; then
output_path="$(report_markdown_path "$REPORT_DATE")"
fi
track_report_state "$DATABASE_URL" "$REPORT_DATE" "failed" "" "$PIPELINE_AUDIT_SUMMARY" "$output_path" "$error_message" "manual" "pipeline" "false" >/dev/null 2>&1 || true
}
refresh_pipeline_audit
"$ROOT_DIR/scripts/apply_migration.sh"
if ! go run "./scripts/fetch_openrouter.go" \
-api-key "$OPENROUTER_API_KEY" \
-db "$DATABASE_URL" \
-out "$FETCH_OUT" \
-strict-real; then
merge_failed_source_keys "openrouter"
record_failure "真实采集失败"
exit 1
fi
FETCH_TOTAL=$(python3 - <<'PY' "$FETCH_OUT"
import json, sys
path = sys.argv[1]
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(int(data.get("total", 0)))
PY
)
if [[ "${FETCH_TOTAL:-0}" -lt 10 ]]; then
merge_failed_source_keys "openrouter"
record_failure "本次采集结果异常: total=${FETCH_TOTAL:-0} < 10"
exit 1
fi
refresh_pipeline_audit
MULTI_SOURCE_OUTPUT="$(mktemp)"
if ! go run "./scripts/fetch_multi_source.go" --sources moonshot,deepseek,openai > "$MULTI_SOURCE_OUTPUT"; then
MULTI_SOURCE_SUMMARY="$(normalize_summary_file "$MULTI_SOURCE_OUTPUT")"
if [[ -n "$MULTI_SOURCE_SUMMARY" ]]; then
MULTI_SOURCE_AUDIT="multi_source_audit=${MULTI_SOURCE_SUMMARY}"
merge_failed_source_keys "$(extract_failed_source_keys "$MULTI_SOURCE_SUMMARY")"
else
MULTI_SOURCE_AUDIT="multi_source_audit=stage_failed"
merge_failed_source_keys "moonshot,deepseek,openai"
fi
cat "$MULTI_SOURCE_OUTPUT"
rm -f "$MULTI_SOURCE_OUTPUT"
record_failure "多源补充同步失败"
exit 1
fi
MULTI_SOURCE_SUMMARY="$(normalize_summary_file "$MULTI_SOURCE_OUTPUT")"
MULTI_SOURCE_AUDIT="multi_source_audit=${MULTI_SOURCE_SUMMARY:-none}"
merge_failed_source_keys "$(extract_failed_source_keys "$MULTI_SOURCE_SUMMARY")"
refresh_pipeline_audit
cat "$MULTI_SOURCE_OUTPUT"
rm -f "$MULTI_SOURCE_OUTPUT"
if ! go run -tags llm_script "./scripts/import_zhipu_data.go"; then
merge_failed_source_keys "zhipu"
record_failure "智谱官方导入失败"
exit 1
fi
if ! go run -tags llm_script "./scripts/export_official_seed_json.go"; then
merge_failed_source_keys "official_seed_export"
record_failure "官方种子导出失败"
exit 1
fi
if ! go run -tags llm_script "./scripts/import_phase2_data.go"; then
merge_failed_source_keys "baidu"
record_failure "百度官方导入失败"
exit 1
fi
if ! go run -tags llm_script "./scripts/import_bytedance_data.go"; then
merge_failed_source_keys "bytedance"
record_failure "字节官方导入失败"
exit 1
fi
refresh_pipeline_audit
if ! REPORT_RUN_KIND="manual" REPORT_TRIGGER_SOURCE="pipeline" REPORT_IS_OFFICIAL_DAILY="false" REPORT_RUNTIME_AUDIT="$PIPELINE_AUDIT_SUMMARY" go run "./scripts/generate_daily_report.go"; then
record_failure "日报生成失败"
exit 1
fi
if [[ ! -f "$(report_archive_markdown_path "$REPORT_DATE")" || ! -f "$(report_archive_html_path "$REPORT_DATE")" ]]; then
record_failure "日报归档缺失"
exit 1
fi
if ! psql "$DATABASE_URL" -Atqc "select count(*) from daily_report where report_date = current_date and status = 'generated';" | awk '{ exit !($1 >= 1) }'; then
record_failure "daily_report 未写入 generated 记录"
exit 1
fi
if ! psql "$DATABASE_URL" -Atqc "select count(*) from report_runs where report_date = current_date and status = 'generated';" | awk '{ exit !($1 >= 1) }'; then
record_failure "report_runs 未写入 generated 记录"
exit 1
fi
psql "$DATABASE_URL" -Atqc \
"select 'daily_report', count(*) from daily_report where report_date = current_date
union all
select 'models', count(*) from models
union all
select 'region_pricing', count(*) from region_pricing
union all
select 'report_runs', count(*) from report_runs where report_date = current_date
order by 1;"