#!/usr/bin/env bash set -euo pipefail # Delta sync + validation starting from a baseline snapshot DB. # # This script: # 1) Copies BASE_DB_DIR -> DELTA_DB_DIR (so baseline is not modified) # 2) Runs rpki validation again (RRDP will prefer delta if available) # 3) Writes artifacts + a markdown delta analysis report # # Usage: # ./scripts/manual_sync/delta_sync.sh # # Outputs under OUT_DIR (default: target/live/manual_sync): # - *_delta_db_* copied RocksDB directory # - *_delta_report_*.json audit report # - *_delta_run_*.log stdout/stderr log (includes summary) # - *_delta_db_stats_*.txt db_stats --exact output # - *_delta_rrdp_state_*.tsv rrdp_state_dump --view legacy-state output # - *_delta_analysis_*.md base vs delta comparison report ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" cd "$ROOT_DIR" BASE_DB_DIR="${1:-}" BASE_REPORT_JSON="${2:-}" if [[ -z "${BASE_DB_DIR}" || -z "${BASE_REPORT_JSON}" ]]; then echo "Usage: $0 " >&2 exit 2 fi if [[ ! -d "${BASE_DB_DIR}" ]]; then echo "ERROR: base_db_dir is not a directory: ${BASE_DB_DIR}" >&2 exit 2 fi if [[ ! -f "${BASE_REPORT_JSON}" ]]; then echo "ERROR: base_report_json does not exist: ${BASE_REPORT_JSON}" >&2 exit 2 fi TAL_URL="${TAL_URL:-https://tal.apnic.net/tal-archive/apnic-rfc7730-https.tal}" HTTP_TIMEOUT_SECS="${HTTP_TIMEOUT_SECS:-1800}" RSYNC_TIMEOUT_SECS="${RSYNC_TIMEOUT_SECS:-1800}" RSYNC_MIRROR_ROOT="${RSYNC_MIRROR_ROOT:-}" VALIDATION_TIME="${VALIDATION_TIME:-}" OUT_DIR="${OUT_DIR:-$ROOT_DIR/target/live/manual_sync}" mkdir -p "$OUT_DIR" TS="$(date -u +%Y%m%dT%H%M%SZ)" RUN_NAME="${RUN_NAME:-apnic_delta_${TS}}" DELTA_DB_DIR="${DELTA_DB_DIR:-$OUT_DIR/${RUN_NAME}_db}" DELTA_REPORT_JSON="${DELTA_REPORT_JSON:-$OUT_DIR/${RUN_NAME}_report.json}" DELTA_RUN_LOG="${DELTA_RUN_LOG:-$OUT_DIR/${RUN_NAME}_run.log}" BASE_DB_STATS_TXT="${BASE_DB_STATS_TXT:-$OUT_DIR/${RUN_NAME}_base_db_stats.txt}" DELTA_DB_STATS_TXT="${DELTA_DB_STATS_TXT:-$OUT_DIR/${RUN_NAME}_delta_db_stats.txt}" BASE_RRDP_STATE_TSV="${BASE_RRDP_STATE_TSV:-$OUT_DIR/${RUN_NAME}_base_rrdp_state.tsv}" DELTA_RRDP_STATE_TSV="${DELTA_RRDP_STATE_TSV:-$OUT_DIR/${RUN_NAME}_delta_rrdp_state.tsv}" DELTA_ANALYSIS_MD="${DELTA_ANALYSIS_MD:-$OUT_DIR/${RUN_NAME}_delta_analysis.md}" DELTA_META_JSON="${DELTA_META_JSON:-$OUT_DIR/${RUN_NAME}_meta.json}" # Best-effort base meta discovery (produced by `full_sync.sh`). BASE_META_JSON="${BASE_META_JSON:-}" if [[ -z "${BASE_META_JSON}" ]]; then guess="${BASE_REPORT_JSON%_report.json}_meta.json" if [[ -f "${guess}" ]]; then BASE_META_JSON="${guess}" fi fi echo "== rpki manual delta sync ==" >&2 echo "tal_url=$TAL_URL" >&2 echo "base_db=$BASE_DB_DIR" >&2 echo "base_report=$BASE_REPORT_JSON" >&2 echo "delta_db=$DELTA_DB_DIR" >&2 echo "delta_report=$DELTA_REPORT_JSON" >&2 echo "== copying base DB (baseline is not modified) ==" >&2 cp -a "$BASE_DB_DIR" "$DELTA_DB_DIR" script_start_s="$(date +%s)" run_start_s="$(date +%s)" cmd=(cargo run --release --bin rpki -- \ --db "$DELTA_DB_DIR" \ --tal-url "$TAL_URL" \ --http-timeout-secs "$HTTP_TIMEOUT_SECS" \ --rsync-timeout-secs "$RSYNC_TIMEOUT_SECS" \ --report-json "$DELTA_REPORT_JSON") if [[ -n "${RSYNC_MIRROR_ROOT}" ]]; then cmd+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT") fi if [[ -n "${VALIDATION_TIME}" ]]; then cmd+=(--validation-time "$VALIDATION_TIME") fi ( echo "# command:" printf '%q ' "${cmd[@]}" echo echo "${cmd[@]}" ) 2>&1 | tee "$DELTA_RUN_LOG" >/dev/null run_end_s="$(date +%s)" run_duration_s="$((run_end_s - run_start_s))" echo "== db_stats (exact) ==" >&2 db_stats_start_s="$(date +%s)" cargo run --release --bin db_stats -- --db "$BASE_DB_DIR" --exact 2>&1 | tee "$BASE_DB_STATS_TXT" >/dev/null cargo run --release --bin db_stats -- --db "$DELTA_DB_DIR" --exact 2>&1 | tee "$DELTA_DB_STATS_TXT" >/dev/null db_stats_end_s="$(date +%s)" db_stats_duration_s="$((db_stats_end_s - db_stats_start_s))" echo "== rrdp_state_dump (legacy-state) ==" >&2 state_start_s="$(date +%s)" cargo run --release --bin rrdp_state_dump -- --db "$BASE_DB_DIR" --view legacy-state >"$BASE_RRDP_STATE_TSV" cargo run --release --bin rrdp_state_dump -- --db "$DELTA_DB_DIR" --view legacy-state >"$DELTA_RRDP_STATE_TSV" state_end_s="$(date +%s)" state_duration_s="$((state_end_s - state_start_s))" script_end_s="$(date +%s)" total_duration_s="$((script_end_s - script_start_s))" echo "== delta analysis report ==" >&2 TAL_URL="$TAL_URL" \ BASE_DB_DIR="$BASE_DB_DIR" \ DELTA_DB_DIR="$DELTA_DB_DIR" \ DELTA_RUN_LOG="$DELTA_RUN_LOG" \ VALIDATION_TIME_ARG="$VALIDATION_TIME" \ HTTP_TIMEOUT_SECS="$HTTP_TIMEOUT_SECS" \ RSYNC_TIMEOUT_SECS="$RSYNC_TIMEOUT_SECS" \ RUN_DURATION_S="$run_duration_s" \ DB_STATS_DURATION_S="$db_stats_duration_s" \ STATE_DURATION_S="$state_duration_s" \ TOTAL_DURATION_S="$total_duration_s" \ python3 - "$BASE_REPORT_JSON" "$DELTA_REPORT_JSON" "$BASE_RRDP_STATE_TSV" "$DELTA_RRDP_STATE_TSV" \ "$BASE_DB_STATS_TXT" "$DELTA_DB_STATS_TXT" "$BASE_META_JSON" "$DELTA_META_JSON" "$DELTA_ANALYSIS_MD" <<'PY' import json import os import sys from collections import Counter, defaultdict from datetime import datetime, timezone from pathlib import Path base_report_path = Path(sys.argv[1]) delta_report_path = Path(sys.argv[2]) base_state_path = Path(sys.argv[3]) delta_state_path = Path(sys.argv[4]) base_db_stats_path = Path(sys.argv[5]) delta_db_stats_path = Path(sys.argv[6]) base_meta_path_s = sys.argv[7] delta_meta_path = Path(sys.argv[8]) out_md_path = Path(sys.argv[9]) def load_json(p: Path): s = p.read_text(encoding="utf-8") try: return json.loads(s) except json.JSONDecodeError: # Backwards-compat / robustness: tolerate accidental literal trailing "\\n". s2 = s.strip() if s2.endswith("\\n"): s2 = s2[:-2].rstrip() return json.loads(s2) def load_optional_json(path_s: str): if not path_s: return None p = Path(path_s) if not p.exists(): return None return load_json(p) def parse_rrdp_state_tsv(p: Path): # format from `rrdp_state_dump --view legacy-state`: # [legacy-state] # notify_uri serial session_id # out = {} for line in p.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("["): continue if line == "notify_uri serial session_id": continue parts = line.split(" ") if len(parts) != 3: raise SystemExit(f"invalid rrdp_state_dump line in {p}: {line!r}") uri, serial, session = parts out[uri] = (int(serial), session) return out def parse_db_stats(p: Path): # lines: key=value out = {} for line in p.read_text(encoding="utf-8").splitlines(): if "=" not in line: continue k, v = line.split("=", 1) k = k.strip() v = v.strip() if v.isdigit(): out[k] = int(v) else: out[k] = v return out def warnings_total(rep: dict) -> int: return len(rep["tree"]["warnings"]) + sum(len(pp["warnings"]) for pp in rep["publication_points"]) def report_summary(rep: dict) -> dict: return { "validation_time": rep["meta"]["validation_time_rfc3339_utc"], "publication_points_processed": rep["tree"]["instances_processed"], "publication_points_failed": rep["tree"]["instances_failed"], "rrdp_repos_unique": len({pp.get("rrdp_notification_uri") for pp in rep["publication_points"] if pp.get("rrdp_notification_uri")}), "vrps": len(rep["vrps"]), "aspas": len(rep["aspas"]), "audit_publication_points": len(rep["publication_points"]), "warnings_total": warnings_total(rep), } def count_repo_sync_failed(rep: dict) -> int: # Best-effort heuristic (we don't currently expose a structured counter in the audit report). # Keep the match conservative to avoid false positives. def is_repo_sync_failed(msg: str) -> bool: m = msg.lower() return "repo sync failed" in m or "rrdp fetch failed" in m or "rsync fetch failed" in m n = 0 for w in rep["tree"]["warnings"]: if is_repo_sync_failed(w.get("message", "")): n += 1 for pp in rep["publication_points"]: for w in pp.get("warnings", []): if is_repo_sync_failed(w.get("message", "")): n += 1 return n def pp_manifest_sha(pp: dict) -> str: # In our audit format, the first object is the manifest (synthetic entry) with sha256 of manifest_bytes. for o in pp["objects"]: if o["kind"] == "manifest": return o["sha256_hex"] return "" def pp_objects_by_uri(rep: dict): m = {} for pp in rep["publication_points"]: for o in pp["objects"]: m[o["rsync_uri"]] = (o["sha256_hex"], o["kind"]) return m def vrp_set(rep: dict): return {(v["asn"], v["prefix"], v["max_length"]) for v in rep["vrps"]} def rfc_refs_str(w: dict) -> str: refs = w.get("rfc_refs") or [] return ", ".join(refs) if refs else "" base = load_json(base_report_path) delta = load_json(delta_report_path) base_sum = report_summary(base) delta_sum = report_summary(delta) base_db = parse_db_stats(base_db_stats_path) delta_db = parse_db_stats(delta_db_stats_path) base_state = parse_rrdp_state_tsv(base_state_path) delta_state = parse_rrdp_state_tsv(delta_state_path) base_meta = load_optional_json(base_meta_path_s) download_stats = delta.get("download_stats") or {} delta_meta = { "recorded_at_utc": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "tal_url": os.environ["TAL_URL"], "base_db_dir": os.environ["BASE_DB_DIR"], "delta_db_dir": os.environ["DELTA_DB_DIR"], "base_report_json": str(base_report_path), "delta_report_json": str(delta_report_path), "delta_run_log": os.environ["DELTA_RUN_LOG"], "validation_time_arg": os.environ.get("VALIDATION_TIME_ARG",""), "http_timeout_secs": int(os.environ["HTTP_TIMEOUT_SECS"]), "rsync_timeout_secs": int(os.environ["RSYNC_TIMEOUT_SECS"]), "durations_secs": { "rpki_run": int(os.environ["RUN_DURATION_S"]), "db_stats_exact": int(os.environ["DB_STATS_DURATION_S"]), "rrdp_state_dump": int(os.environ["STATE_DURATION_S"]), "total_script": int(os.environ["TOTAL_DURATION_S"]), }, "download_stats": download_stats, } delta_meta_path.write_text(json.dumps(delta_meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") # RRDP state changes serial_changed = 0 session_changed = 0 serial_deltas = [] for uri, (old_serial, old_sess) in base_state.items(): if uri not in delta_state: continue new_serial, new_sess = delta_state[uri] if new_serial != old_serial: serial_changed += 1 serial_deltas.append((uri, old_serial, new_serial, new_serial - old_serial)) if new_sess != old_sess: session_changed += 1 serial_deltas.sort(key=lambda x: x[3], reverse=True) # Publication point diffs base_pp = {pp["manifest_rsync_uri"]: pp for pp in base["publication_points"]} delta_pp = {pp["manifest_rsync_uri"]: pp for pp in delta["publication_points"]} base_keys = set(base_pp.keys()) delta_keys = set(delta_pp.keys()) new_pp = sorted(delta_keys - base_keys) missing_pp = sorted(base_keys - delta_keys) updated_pp = 0 unchanged_pp = 0 for k in sorted(base_keys & delta_keys): if pp_manifest_sha(base_pp[k]) != pp_manifest_sha(delta_pp[k]): updated_pp += 1 else: unchanged_pp += 1 # Cache usage + repo sync failure hints def source_counts(rep: dict) -> Counter: c = Counter() for pp in rep["publication_points"]: c[pp.get("source","")] += 1 return c base_sources = source_counts(base) delta_sources = source_counts(delta) base_repo_sync_failed = count_repo_sync_failed(base) delta_repo_sync_failed = count_repo_sync_failed(delta) def cache_reason_counts(rep: dict) -> Counter: c = Counter() for pp in rep.get("publication_points", []): if pp.get("source") != "vcir_current_instance": continue # Use warning messages as "reason". If missing, emit a fallback bucket. ws = pp.get("warnings", []) if not ws: c["(no warnings recorded)"] += 1 continue for w in ws: msg = w.get("message", "").strip() or "(empty warning message)" c[msg] += 1 return c base_cache_reasons = cache_reason_counts(base) delta_cache_reasons = cache_reason_counts(delta) # Object change stats (by rsync URI, sha256) base_obj = pp_objects_by_uri(base) delta_obj = pp_objects_by_uri(delta) kind_stats = {k: {"added": 0, "changed": 0, "removed": 0} for k in ["manifest","crl","certificate","roa","aspa","other"]} all_uris = set(base_obj.keys()) | set(delta_obj.keys()) for uri in all_uris: b = base_obj.get(uri) d = delta_obj.get(uri) if b is None and d is not None: kind_stats[d[1]]["added"] += 1 elif b is not None and d is None: kind_stats[b[1]]["removed"] += 1 else: if b[0] != d[0]: kind_stats[d[1]]["changed"] += 1 # VRP diff base_v = vrp_set(base) delta_v = vrp_set(delta) added_v = delta_v - base_v removed_v = base_v - delta_v def fmt_db_stats(db: dict) -> str: ordered = [ "mode", "repository_view", "raw_by_hash", "vcir", "audit_rule_index", "rrdp_source", "rrdp_source_member", "rrdp_uri_owner", "rrdp_state", "raw_objects", "rrdp_object_index", "group_current_repository_view", "group_current_validation_state", "group_current_rrdp_state", "group_legacy_compatibility", "total", "sst_files", ] out = [] seen = set() for k in ordered: if k in db: out.append(f"- `{k}={db[k]}`") seen.add(k) for k in sorted(set(db) - seen): out.append(f"- `{k}={db[k]}`") return "\n".join(out) if out else "_(missing db_stats keys)_" lines = [] lines.append("# APNIC RRDP 增量同步验收(manual_sync)\n\n") lines.append(f"时间戳:`{now}`(UTC)\n\n") lines.append("## 复现信息\n\n") lines.append(f"- base_report:`{base_report_path}`\n") lines.append(f"- delta_report:`{delta_report_path}`\n") lines.append(f"- base_db_stats:`{base_db_stats_path}`\n") lines.append(f"- delta_db_stats:`{delta_db_stats_path}`\n") lines.append(f"- base_rrdp_state:`{base_state_path}`\n") lines.append(f"- delta_rrdp_state:`{delta_state_path}`\n\n") lines.append("## 运行结果概览\n\n") lines.append("| metric | base | delta |\n") lines.append("|---|---:|---:|\n") for k in [ "validation_time", "publication_points_processed", "publication_points_failed", "rrdp_repos_unique", "vrps", "aspas", "audit_publication_points", "warnings_total", ]: lines.append(f"| {k} | {base_sum[k]} | {delta_sum[k]} |\n") lines.append("\n") def dur(meta: dict | None, key: str): if not meta: return None return (meta.get("durations_secs") or {}).get(key) base_rpki_run = dur(base_meta, "rpki_run") delta_rpki_run = delta_meta["durations_secs"]["rpki_run"] base_total = dur(base_meta, "total_script") delta_total = delta_meta["durations_secs"]["total_script"] lines.append("## 持续时间(seconds)\n\n") lines.append("| step | base | delta |\n") lines.append("|---|---:|---:|\n") lines.append(f"| rpki_run | {base_rpki_run if base_rpki_run is not None else 'unknown'} | {delta_rpki_run} |\n") lines.append(f"| total_script | {base_total if base_total is not None else 'unknown'} | {delta_total} |\n") lines.append("\n") if base_meta is None and base_meta_path_s: lines.append(f"> 注:未能读取 base meta:`{base_meta_path_s}`(文件不存在或不可读)。建议用 `full_sync.sh` 生成 baseline 以获得 base 时长对比。\n\n") lines.append("RocksDB KV(`db_stats --exact`):\n\n") lines.append("### 基线(base)\n\n") lines.append(fmt_db_stats(base_db) + "\n\n") lines.append("### 增量(delta)\n\n") lines.append(fmt_db_stats(delta_db) + "\n\n") lines.append("## RRDP 增量是否发生(基于 `rrdp_state` 变化)\n\n") lines.append(f"- repo_total(base)={len(base_state)}\n") lines.append(f"- repo_total(delta)={len(delta_state)}\n") lines.append(f"- serial_changed={serial_changed}\n") lines.append(f"- session_changed={session_changed}\n\n") if serial_deltas: lines.append("serial 增长最大的 10 个 RRDP repo(old -> new):\n\n") for uri, old, new, diff in serial_deltas[:10]: lines.append(f"- `{uri}`:`{old} -> {new}`(+{diff})\n") lines.append("\n") lines.append("## 发布点(Publication Point)变化统计\n\n") lines.append("以 `manifest_rsync_uri` 作为发布点 key,对比 base vs delta:\n\n") lines.append(f"- base PP:`{len(base_keys)}`\n") lines.append(f"- delta PP:`{len(delta_keys)}`\n") lines.append(f"- `new_pp={len(new_pp)}`\n") lines.append(f"- `missing_pp={len(missing_pp)}`\n") lines.append(f"- `updated_pp={updated_pp}`\n") lines.append(f"- `unchanged_pp={unchanged_pp}`\n\n") lines.append("> 注:`new_pp/missing_pp/updated_pp` 会混入“遍历范围变化”的影响(例如 validation_time 不同、或 base 中存在更多失败 PP)。\n\n") lines.append("## fail fetch / VCIR 当前实例缓存复用情况\n\n") lines.append(f"- repo sync failed(启发式:warning contains 'repo sync failed'/'rrdp fetch failed'/'rsync fetch failed')\n") lines.append(f" - base:`{base_repo_sync_failed}`\n") lines.append(f" - delta:`{delta_repo_sync_failed}`\n\n") lines.append("- source 计数(按 `PublicationPointAudit.source`):\n\n") lines.append(f" - base:`{dict(base_sources)}`\n") lines.append(f" - delta:`{dict(delta_sources)}`\n\n") def render_cache_reasons(title: str, c: Counter) -> str: if not c: return f"{title}:`0`(未使用 VCIR 当前实例缓存复用)\n\n" lines = [] total = sum(c.values()) lines.append(f"{title}:`{total}`\n\n") lines.append("Top reasons(按 warning message 聚合,可能一条 PP 有多条 warning):\n\n") for msg, n in c.most_common(10): lines.append(f"- `{n}` × {msg}\n") lines.append("\n") return "".join(lines) lines.append(render_cache_reasons("- base `source=vcir_current_instance`", base_cache_reasons)) lines.append(render_cache_reasons("- delta `source=vcir_current_instance`", delta_cache_reasons)) lines.append("## 文件变更统计(按对象类型)\n\n") lines.append("按 `ObjectAuditEntry.sha256_hex` 对比(同一 rsync URI 前后 hash 变化记为 `~changed`):\n\n") lines.append("| kind | added | changed | removed |\n") lines.append("|---|---:|---:|---:|\n") for kind in ["manifest","crl","certificate","roa","aspa","other"]: st = kind_stats[kind] lines.append(f"| {kind} | {st['added']} | {st['changed']} | {st['removed']} |\n") lines.append("\n") lines.append("## VRP 影响(去重后集合 diff)\n\n") lines.append("以 `(asn, prefix, max_length)` 为 key:\n\n") lines.append(f"- base unique VRP:`{len(base_v)}`\n") lines.append(f"- delta unique VRP:`{len(delta_v)}`\n") lines.append(f"- `added={len(added_v)}`\n") lines.append(f"- `removed={len(removed_v)}`\n") lines.append(f"- `net={len(added_v) - len(removed_v)}`\n\n") out_md_path.write_text("".join(lines), encoding="utf-8") print(out_md_path) PY echo "== done ==" >&2 echo "artifacts:" >&2 echo "- delta db: $DELTA_DB_DIR" >&2 echo "- delta report: $DELTA_REPORT_JSON" >&2 echo "- delta run log: $DELTA_RUN_LOG" >&2 echo "- delta meta json: $DELTA_META_JSON" >&2 echo "- analysis md: $DELTA_ANALYSIS_MD" >&2 echo "- base state tsv: $BASE_RRDP_STATE_TSV" >&2 echo "- delta state tsv: $DELTA_RRDP_STATE_TSV" >&2