#!/usr/bin/env python3 """Normalize historical real-host artifacts into repo-safe form.""" import json import pathlib import shutil import sys from typing import Iterable sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent)) from artifact_redaction import sanitize_group_state, sanitize_headers, sanitize_runtime_context, sanitize_nested, redact_key # noqa: E402 SENSITIVE_FILE_NAMES = { "00-managed-key.txt", "00-raw-user-key.txt", "05-subscription-access-prep.sql", } SENSITIVE_TEXT_PATTERNS = ( "managed-key", "raw-user-key", "probe-key", "key-preview", "key-corrected", ) ROOT_SENSITIVE_JSON_NAMES = { "deepseek.json", "minimax.json", "summary.json", "99-summary.json", "99-semantic-summary.json", } def write_json(path: pathlib.Path, payload) -> None: path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def migrate_key_source(path: pathlib.Path) -> None: payload = json.loads(path.read_text(encoding="utf-8")) if "redacted" in payload: return source = payload.get("source") provider_id = payload.get("provider_id") raw = "" prefix = str(payload.get("upstream_key_prefix") or "") suffix = str(payload.get("upstream_key_suffix") or "") if prefix or suffix: raw = prefix + suffix write_json(path, { "source": source, "provider_id": provider_id, "redacted": redact_key(raw), }) def migrate_runtime_context(path: pathlib.Path) -> None: payload = json.loads(path.read_text(encoding="utf-8")) write_json(path, sanitize_runtime_context(payload)) def migrate_redis_invalidation(path: pathlib.Path) -> None: raw = path.read_text(encoding="utf-8") write_json(path.with_suffix('.json'), { "auth_cache_invalidated": "auth_cache_key=" in raw, "balance_cache_invalidated": "balance_cache_key=" in raw, "subscription_cache_invalidated": "subscription_cache_key=" in raw, "redis_del_exit_code": 0 if raw.strip().endswith("3") or raw.strip().endswith("0") else None, }) path.unlink() def migrate_group_state(path: pathlib.Path) -> None: payload = json.loads(path.read_text(encoding="utf-8")) write_json(path, sanitize_group_state(payload)) def migrate_sql_summary(path: pathlib.Path) -> None: raw = path.read_text(encoding="utf-8") group_id = None min_balance = None subscription_days = None key_value = "" for line in raw.splitlines(): if "group_id = " in line and group_id is None: try: group_id = int(line.split("group_id = ", 1)[1].split()[0].strip().strip(",;")) except Exception: group_id = None if "balance < " in line and min_balance is None: try: min_balance = int(line.split("balance < ", 1)[1].split()[0].strip().strip(",;")) except Exception: min_balance = None if "interval '" in line and subscription_days is None: try: subscription_days = int(line.split("interval '", 1)[1].split(" days'", 1)[0]) except Exception: subscription_days = None if "WHERE key = '" in line and not key_value: key_value = line.split("WHERE key = '", 1)[1].split("'", 1)[0] summary = { "subscription_group_id": group_id, "min_balance": min_balance, "subscription_days": subscription_days, "api_key": redact_key(key_value), } write_json(path.with_name("05-subscription-access-prep.summary.json"), summary) def maybe_update_guide(path: pathlib.Path) -> None: raw = path.read_text(encoding="utf-8") if "artifact security mode:" in raw: return updated = raw.replace( "真实宿主验收产物 -> 速查清单对应\n\n", "真实宿主验收产物 -> 速查清单对应\n\nartifact security mode: migrated-safe\ncontains raw secrets: no\nrepository-safe: yes\n\n", 1, ) path.write_text(updated, encoding="utf-8") def sanitize_header_file(path: pathlib.Path) -> None: path.write_text(sanitize_headers(path.read_text(encoding="utf-8")), encoding="utf-8") def sanitize_json_file(path: pathlib.Path) -> None: payload = json.loads(path.read_text(encoding="utf-8")) write_json(path, sanitize_nested(payload)) def mirror_sensitive(root: pathlib.Path, sensitive_root: pathlib.Path, path: pathlib.Path) -> None: rel = path.relative_to(root) dst = sensitive_root / rel dst.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(path), str(dst)) def walk_artifact_dirs(root: pathlib.Path) -> Iterable[pathlib.Path]: for child in sorted(root.iterdir()): if child.is_dir(): yield child def should_sanitize_json(path: pathlib.Path) -> bool: if path.suffix != ".json": return False if path.name in {"00-local-key-source.json", "01-runtime-context.json", "00-context.json", "08-subscription-group-state.json"}: return False if path.name in ROOT_SENSITIVE_JSON_NAMES: return True if path.name in {"05a-batch-detail-pre-access.json", "07-access-status.json", "10-batch-detail.json"}: return True return False def should_mirror_sensitive_text(path: pathlib.Path) -> bool: if path.suffix != ".txt": return False lower = path.name.lower() return any(token in lower for token in SENSITIVE_TEXT_PATTERNS) def main() -> None: if len(sys.argv) != 2: raise SystemExit("usage: migrate_historical_artifacts.py ") root = pathlib.Path(sys.argv[1]).resolve() sensitive_root = root.parent / "real-host-acceptance-sensitive" for artifact_dir in walk_artifact_dirs(root): for path in sorted(artifact_dir.rglob("*")): if not path.is_file(): continue if path.name in SENSITIVE_FILE_NAMES: if path.name == "05-subscription-access-prep.sql": migrate_sql_summary(path) mirror_sensitive(root, sensitive_root, path) continue if should_mirror_sensitive_text(path): mirror_sensitive(root, sensitive_root, path) continue if path.name == "00-local-key-source.json": migrate_key_source(path) continue if path.name in {"01-runtime-context.json", "00-context.json"}: migrate_runtime_context(path) continue if path.name == "07-redis-targeted-invalidation.txt": migrate_redis_invalidation(path) continue if path.name == "08-subscription-group-state.json": migrate_group_state(path) continue if path.suffix == ".txt" and "headers" in path.name: sanitize_header_file(path) continue if path.name == "00-artifact-guide.txt": maybe_update_guide(path) continue if should_sanitize_json(path): sanitize_json_file(path) continue print(json.dumps({ "root": str(root), "sensitive_root": str(sensitive_root), "status": "ok", }, ensure_ascii=False)) if __name__ == "__main__": main()