rpki/scripts/manual_sync/delta_sync.sh
2026-03-04 11:12:53 +08:00

514 lines
19 KiB
Bash
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env bash
set -euo pipefail
# Delta sync + validation starting from a baseline snapshot DB.
#
# This script:
# 1) Copies BASE_DB_DIR -> DELTA_DB_DIR (so baseline is not modified)
# 2) Runs rpki validation again (RRDP will prefer delta if available)
# 3) Writes artifacts + a markdown delta analysis report
#
# Usage:
# ./scripts/manual_sync/delta_sync.sh <base_db_dir> <base_report_json>
#
# Outputs under OUT_DIR (default: target/live/manual_sync):
# - *_delta_db_* copied RocksDB directory
# - *_delta_report_*.json audit report
# - *_delta_run_*.log stdout/stderr log (includes summary)
# - *_delta_db_stats_*.txt db_stats --exact output
# - *_delta_rrdp_state_*.tsv rrdp_state_dump output
# - *_delta_analysis_*.md base vs delta comparison report
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
BASE_DB_DIR="${1:-}"
BASE_REPORT_JSON="${2:-}"
if [[ -z "${BASE_DB_DIR}" || -z "${BASE_REPORT_JSON}" ]]; then
echo "Usage: $0 <base_db_dir> <base_report_json>" >&2
exit 2
fi
if [[ ! -d "${BASE_DB_DIR}" ]]; then
echo "ERROR: base_db_dir is not a directory: ${BASE_DB_DIR}" >&2
exit 2
fi
if [[ ! -f "${BASE_REPORT_JSON}" ]]; then
echo "ERROR: base_report_json does not exist: ${BASE_REPORT_JSON}" >&2
exit 2
fi
TAL_URL="${TAL_URL:-https://tal.apnic.net/tal-archive/apnic-rfc7730-https.tal}"
HTTP_TIMEOUT_SECS="${HTTP_TIMEOUT_SECS:-1800}"
RSYNC_TIMEOUT_SECS="${RSYNC_TIMEOUT_SECS:-1800}"
RSYNC_MIRROR_ROOT="${RSYNC_MIRROR_ROOT:-}"
VALIDATION_TIME="${VALIDATION_TIME:-}"
OUT_DIR="${OUT_DIR:-$ROOT_DIR/target/live/manual_sync}"
mkdir -p "$OUT_DIR"
TS="$(date -u +%Y%m%dT%H%M%SZ)"
RUN_NAME="${RUN_NAME:-apnic_delta_${TS}}"
DELTA_DB_DIR="${DELTA_DB_DIR:-$OUT_DIR/${RUN_NAME}_db}"
DELTA_REPORT_JSON="${DELTA_REPORT_JSON:-$OUT_DIR/${RUN_NAME}_report.json}"
DELTA_RUN_LOG="${DELTA_RUN_LOG:-$OUT_DIR/${RUN_NAME}_run.log}"
BASE_DB_STATS_TXT="${BASE_DB_STATS_TXT:-$OUT_DIR/${RUN_NAME}_base_db_stats.txt}"
DELTA_DB_STATS_TXT="${DELTA_DB_STATS_TXT:-$OUT_DIR/${RUN_NAME}_delta_db_stats.txt}"
BASE_RRDP_STATE_TSV="${BASE_RRDP_STATE_TSV:-$OUT_DIR/${RUN_NAME}_base_rrdp_state.tsv}"
DELTA_RRDP_STATE_TSV="${DELTA_RRDP_STATE_TSV:-$OUT_DIR/${RUN_NAME}_delta_rrdp_state.tsv}"
DELTA_ANALYSIS_MD="${DELTA_ANALYSIS_MD:-$OUT_DIR/${RUN_NAME}_delta_analysis.md}"
DELTA_META_JSON="${DELTA_META_JSON:-$OUT_DIR/${RUN_NAME}_meta.json}"
# Best-effort base meta discovery (produced by `full_sync.sh`).
BASE_META_JSON="${BASE_META_JSON:-}"
if [[ -z "${BASE_META_JSON}" ]]; then
guess="${BASE_REPORT_JSON%_report.json}_meta.json"
if [[ -f "${guess}" ]]; then
BASE_META_JSON="${guess}"
fi
fi
echo "== rpki manual delta sync ==" >&2
echo "tal_url=$TAL_URL" >&2
echo "base_db=$BASE_DB_DIR" >&2
echo "base_report=$BASE_REPORT_JSON" >&2
echo "delta_db=$DELTA_DB_DIR" >&2
echo "delta_report=$DELTA_REPORT_JSON" >&2
echo "== copying base DB (baseline is not modified) ==" >&2
cp -a "$BASE_DB_DIR" "$DELTA_DB_DIR"
script_start_s="$(date +%s)"
run_start_s="$(date +%s)"
cmd=(cargo run --release --bin rpki -- \
--db "$DELTA_DB_DIR" \
--tal-url "$TAL_URL" \
--http-timeout-secs "$HTTP_TIMEOUT_SECS" \
--rsync-timeout-secs "$RSYNC_TIMEOUT_SECS" \
--report-json "$DELTA_REPORT_JSON")
if [[ -n "${RSYNC_MIRROR_ROOT}" ]]; then
cmd+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT")
fi
if [[ -n "${VALIDATION_TIME}" ]]; then
cmd+=(--validation-time "$VALIDATION_TIME")
fi
(
echo "# command:"
printf '%q ' "${cmd[@]}"
echo
echo
"${cmd[@]}"
) 2>&1 | tee "$DELTA_RUN_LOG" >/dev/null
run_end_s="$(date +%s)"
run_duration_s="$((run_end_s - run_start_s))"
echo "== db_stats (exact) ==" >&2
db_stats_start_s="$(date +%s)"
cargo run --release --bin db_stats -- --db "$BASE_DB_DIR" --exact 2>&1 | tee "$BASE_DB_STATS_TXT" >/dev/null
cargo run --release --bin db_stats -- --db "$DELTA_DB_DIR" --exact 2>&1 | tee "$DELTA_DB_STATS_TXT" >/dev/null
db_stats_end_s="$(date +%s)"
db_stats_duration_s="$((db_stats_end_s - db_stats_start_s))"
echo "== rrdp_state_dump ==" >&2
state_start_s="$(date +%s)"
cargo run --release --bin rrdp_state_dump -- --db "$BASE_DB_DIR" >"$BASE_RRDP_STATE_TSV"
cargo run --release --bin rrdp_state_dump -- --db "$DELTA_DB_DIR" >"$DELTA_RRDP_STATE_TSV"
state_end_s="$(date +%s)"
state_duration_s="$((state_end_s - state_start_s))"
script_end_s="$(date +%s)"
total_duration_s="$((script_end_s - script_start_s))"
echo "== delta analysis report ==" >&2
TAL_URL="$TAL_URL" \
BASE_DB_DIR="$BASE_DB_DIR" \
DELTA_DB_DIR="$DELTA_DB_DIR" \
DELTA_RUN_LOG="$DELTA_RUN_LOG" \
VALIDATION_TIME_ARG="$VALIDATION_TIME" \
HTTP_TIMEOUT_SECS="$HTTP_TIMEOUT_SECS" \
RSYNC_TIMEOUT_SECS="$RSYNC_TIMEOUT_SECS" \
RUN_DURATION_S="$run_duration_s" \
DB_STATS_DURATION_S="$db_stats_duration_s" \
STATE_DURATION_S="$state_duration_s" \
TOTAL_DURATION_S="$total_duration_s" \
python3 - "$BASE_REPORT_JSON" "$DELTA_REPORT_JSON" "$BASE_RRDP_STATE_TSV" "$DELTA_RRDP_STATE_TSV" \
"$BASE_DB_STATS_TXT" "$DELTA_DB_STATS_TXT" "$BASE_META_JSON" "$DELTA_META_JSON" "$DELTA_ANALYSIS_MD" <<'PY'
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
base_report_path = Path(sys.argv[1])
delta_report_path = Path(sys.argv[2])
base_state_path = Path(sys.argv[3])
delta_state_path = Path(sys.argv[4])
base_db_stats_path = Path(sys.argv[5])
delta_db_stats_path = Path(sys.argv[6])
base_meta_path_s = sys.argv[7]
delta_meta_path = Path(sys.argv[8])
out_md_path = Path(sys.argv[9])
def load_json(p: Path):
s = p.read_text(encoding="utf-8")
try:
return json.loads(s)
except json.JSONDecodeError:
# Backwards-compat / robustness: tolerate accidental literal trailing "\\n".
s2 = s.strip()
if s2.endswith("\\n"):
s2 = s2[:-2].rstrip()
return json.loads(s2)
def load_optional_json(path_s: str):
if not path_s:
return None
p = Path(path_s)
if not p.exists():
return None
return load_json(p)
def parse_rrdp_state_tsv(p: Path):
# format: "<notify_uri>\t<serial>\t<session_id>"
out = {}
for line in p.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
parts = line.split("\t")
if len(parts) != 3:
raise SystemExit(f"invalid rrdp_state_dump line in {p}: {line!r}")
uri, serial, session = parts
out[uri] = (int(serial), session)
return out
def parse_db_stats(p: Path):
# lines: key=value
out = {}
for line in p.read_text(encoding="utf-8").splitlines():
if "=" not in line:
continue
k, v = line.split("=", 1)
k = k.strip()
v = v.strip()
if v.isdigit():
out[k] = int(v)
else:
out[k] = v
return out
def warnings_total(rep: dict) -> int:
return len(rep["tree"]["warnings"]) + sum(len(pp["warnings"]) for pp in rep["publication_points"])
def report_summary(rep: dict) -> dict:
return {
"validation_time": rep["meta"]["validation_time_rfc3339_utc"],
"publication_points_processed": rep["tree"]["instances_processed"],
"publication_points_failed": rep["tree"]["instances_failed"],
"rrdp_repos_unique": len({pp.get("rrdp_notification_uri") for pp in rep["publication_points"] if pp.get("rrdp_notification_uri")}),
"vrps": len(rep["vrps"]),
"aspas": len(rep["aspas"]),
"audit_publication_points": len(rep["publication_points"]),
"warnings_total": warnings_total(rep),
}
def count_repo_sync_failed(rep: dict) -> int:
# Best-effort heuristic (we don't currently expose a structured counter in the audit report).
# Keep the match conservative to avoid false positives.
def is_repo_sync_failed(msg: str) -> bool:
m = msg.lower()
return "repo sync failed" in m or "rrdp fetch failed" in m or "rsync fetch failed" in m
n = 0
for w in rep["tree"]["warnings"]:
if is_repo_sync_failed(w.get("message", "")):
n += 1
for pp in rep["publication_points"]:
for w in pp.get("warnings", []):
if is_repo_sync_failed(w.get("message", "")):
n += 1
return n
def pp_manifest_sha(pp: dict) -> str:
# In our audit format, the first object is the manifest (synthetic entry) with sha256 of manifest_bytes.
for o in pp["objects"]:
if o["kind"] == "manifest":
return o["sha256_hex"]
return ""
def pp_objects_by_uri(rep: dict):
m = {}
for pp in rep["publication_points"]:
for o in pp["objects"]:
m[o["rsync_uri"]] = (o["sha256_hex"], o["kind"])
return m
def vrp_set(rep: dict):
return {(v["asn"], v["prefix"], v["max_length"]) for v in rep["vrps"]}
def rfc_refs_str(w: dict) -> str:
refs = w.get("rfc_refs") or []
return ", ".join(refs) if refs else ""
base = load_json(base_report_path)
delta = load_json(delta_report_path)
base_sum = report_summary(base)
delta_sum = report_summary(delta)
base_db = parse_db_stats(base_db_stats_path)
delta_db = parse_db_stats(delta_db_stats_path)
base_state = parse_rrdp_state_tsv(base_state_path)
delta_state = parse_rrdp_state_tsv(delta_state_path)
base_meta = load_optional_json(base_meta_path_s)
delta_meta = {
"recorded_at_utc": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"tal_url": os.environ["TAL_URL"],
"base_db_dir": os.environ["BASE_DB_DIR"],
"delta_db_dir": os.environ["DELTA_DB_DIR"],
"base_report_json": str(base_report_path),
"delta_report_json": str(delta_report_path),
"delta_run_log": os.environ["DELTA_RUN_LOG"],
"validation_time_arg": os.environ.get("VALIDATION_TIME_ARG",""),
"http_timeout_secs": int(os.environ["HTTP_TIMEOUT_SECS"]),
"rsync_timeout_secs": int(os.environ["RSYNC_TIMEOUT_SECS"]),
"durations_secs": {
"rpki_run": int(os.environ["RUN_DURATION_S"]),
"db_stats_exact": int(os.environ["DB_STATS_DURATION_S"]),
"rrdp_state_dump": int(os.environ["STATE_DURATION_S"]),
"total_script": int(os.environ["TOTAL_DURATION_S"]),
},
}
delta_meta_path.write_text(json.dumps(delta_meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# RRDP state changes
serial_changed = 0
session_changed = 0
serial_deltas = []
for uri, (old_serial, old_sess) in base_state.items():
if uri not in delta_state:
continue
new_serial, new_sess = delta_state[uri]
if new_serial != old_serial:
serial_changed += 1
serial_deltas.append((uri, old_serial, new_serial, new_serial - old_serial))
if new_sess != old_sess:
session_changed += 1
serial_deltas.sort(key=lambda x: x[3], reverse=True)
# Publication point diffs
base_pp = {pp["manifest_rsync_uri"]: pp for pp in base["publication_points"]}
delta_pp = {pp["manifest_rsync_uri"]: pp for pp in delta["publication_points"]}
base_keys = set(base_pp.keys())
delta_keys = set(delta_pp.keys())
new_pp = sorted(delta_keys - base_keys)
missing_pp = sorted(base_keys - delta_keys)
updated_pp = 0
unchanged_pp = 0
for k in sorted(base_keys & delta_keys):
if pp_manifest_sha(base_pp[k]) != pp_manifest_sha(delta_pp[k]):
updated_pp += 1
else:
unchanged_pp += 1
# Cache usage + repo sync failure hints
def source_counts(rep: dict) -> Counter:
c = Counter()
for pp in rep["publication_points"]:
c[pp.get("source","")] += 1
return c
base_sources = source_counts(base)
delta_sources = source_counts(delta)
base_repo_sync_failed = count_repo_sync_failed(base)
delta_repo_sync_failed = count_repo_sync_failed(delta)
def cache_reason_counts(rep: dict) -> Counter:
c = Counter()
for pp in rep.get("publication_points", []):
if pp.get("source") != "fetch_cache_pp":
continue
# Use warning messages as "reason". If missing, emit a fallback bucket.
ws = pp.get("warnings", [])
if not ws:
c["(no warnings recorded)"] += 1
continue
for w in ws:
msg = w.get("message", "").strip() or "(empty warning message)"
c[msg] += 1
return c
base_cache_reasons = cache_reason_counts(base)
delta_cache_reasons = cache_reason_counts(delta)
# Object change stats (by rsync URI, sha256)
base_obj = pp_objects_by_uri(base)
delta_obj = pp_objects_by_uri(delta)
kind_stats = {k: {"added": 0, "changed": 0, "removed": 0} for k in ["manifest","crl","certificate","roa","aspa","other"]}
all_uris = set(base_obj.keys()) | set(delta_obj.keys())
for uri in all_uris:
b = base_obj.get(uri)
d = delta_obj.get(uri)
if b is None and d is not None:
kind_stats[d[1]]["added"] += 1
elif b is not None and d is None:
kind_stats[b[1]]["removed"] += 1
else:
if b[0] != d[0]:
kind_stats[d[1]]["changed"] += 1
# VRP diff
base_v = vrp_set(base)
delta_v = vrp_set(delta)
added_v = delta_v - base_v
removed_v = base_v - delta_v
def fmt_db_stats(db: dict) -> str:
keys = ["raw_objects","rrdp_object_index","fetch_cache_pp","rrdp_state","total"]
out = []
for k in keys:
if k in db:
out.append(f"- `{k}={db[k]}`")
return "\n".join(out) if out else "_(missing db_stats keys)_"
lines = []
lines.append("# APNIC RRDP 增量同步验收manual_sync\n\n")
lines.append(f"时间戳:`{now}`UTC\n\n")
lines.append("## 复现信息\n\n")
lines.append(f"- base_report`{base_report_path}`\n")
lines.append(f"- delta_report`{delta_report_path}`\n")
lines.append(f"- base_db_stats`{base_db_stats_path}`\n")
lines.append(f"- delta_db_stats`{delta_db_stats_path}`\n")
lines.append(f"- base_rrdp_state`{base_state_path}`\n")
lines.append(f"- delta_rrdp_state`{delta_state_path}`\n\n")
lines.append("## 运行结果概览\n\n")
lines.append("| metric | base | delta |\n")
lines.append("|---|---:|---:|\n")
for k in [
"validation_time",
"publication_points_processed",
"publication_points_failed",
"rrdp_repos_unique",
"vrps",
"aspas",
"audit_publication_points",
"warnings_total",
]:
lines.append(f"| {k} | {base_sum[k]} | {delta_sum[k]} |\n")
lines.append("\n")
def dur(meta: dict | None, key: str):
if not meta:
return None
return (meta.get("durations_secs") or {}).get(key)
base_rpki_run = dur(base_meta, "rpki_run")
delta_rpki_run = delta_meta["durations_secs"]["rpki_run"]
base_total = dur(base_meta, "total_script")
delta_total = delta_meta["durations_secs"]["total_script"]
lines.append("## 持续时间seconds\n\n")
lines.append("| step | base | delta |\n")
lines.append("|---|---:|---:|\n")
lines.append(f"| rpki_run | {base_rpki_run if base_rpki_run is not None else 'unknown'} | {delta_rpki_run} |\n")
lines.append(f"| total_script | {base_total if base_total is not None else 'unknown'} | {delta_total} |\n")
lines.append("\n")
if base_meta is None and base_meta_path_s:
lines.append(f"> 注:未能读取 base meta`{base_meta_path_s}`(文件不存在或不可读)。建议用 `full_sync.sh` 生成 baseline 以获得 base 时长对比。\n\n")
lines.append("RocksDB KV`db_stats --exact`\n\n")
lines.append("### 基线base\n\n")
lines.append(fmt_db_stats(base_db) + "\n\n")
lines.append("### 增量delta\n\n")
lines.append(fmt_db_stats(delta_db) + "\n\n")
lines.append("## RRDP 增量是否发生(基于 `rrdp_state` 变化)\n\n")
lines.append(f"- repo_total(base)={len(base_state)}\n")
lines.append(f"- repo_total(delta)={len(delta_state)}\n")
lines.append(f"- serial_changed={serial_changed}\n")
lines.append(f"- session_changed={session_changed}\n\n")
if serial_deltas:
lines.append("serial 增长最大的 10 个 RRDP repoold -> new\n\n")
for uri, old, new, diff in serial_deltas[:10]:
lines.append(f"- `{uri}``{old} -> {new}`+{diff}\n")
lines.append("\n")
lines.append("## 发布点Publication Point变化统计\n\n")
lines.append("以 `manifest_rsync_uri` 作为发布点 key对比 base vs delta\n\n")
lines.append(f"- base PP`{len(base_keys)}`\n")
lines.append(f"- delta PP`{len(delta_keys)}`\n")
lines.append(f"- `new_pp={len(new_pp)}`\n")
lines.append(f"- `missing_pp={len(missing_pp)}`\n")
lines.append(f"- `updated_pp={updated_pp}`\n")
lines.append(f"- `unchanged_pp={unchanged_pp}`\n\n")
lines.append("> 注:`new_pp/missing_pp/updated_pp` 会混入“遍历范围变化”的影响(例如 validation_time 不同、或 base 中存在更多失败 PP。\n\n")
lines.append("## fail fetch / cache 使用情况\n\n")
lines.append(f"- repo sync failed启发式warning contains 'repo sync failed'/'rrdp fetch failed'/'rsync fetch failed'\n")
lines.append(f" - base`{base_repo_sync_failed}`\n")
lines.append(f" - delta`{delta_repo_sync_failed}`\n\n")
lines.append("- source 计数(按 `PublicationPointAudit.source`\n\n")
lines.append(f" - base`{dict(base_sources)}`\n")
lines.append(f" - delta`{dict(delta_sources)}`\n\n")
def render_cache_reasons(title: str, c: Counter) -> str:
if not c:
return f"{title}`0`(未使用 fetch_cache_pp\n\n"
lines = []
total = sum(c.values())
lines.append(f"{title}`{total}`\n\n")
lines.append("Top reasons按 warning message 聚合,可能一条 PP 有多条 warning\n\n")
for msg, n in c.most_common(10):
lines.append(f"- `{n}` × {msg}\n")
lines.append("\n")
return "".join(lines)
lines.append(render_cache_reasons("- base `source=fetch_cache_pp`", base_cache_reasons))
lines.append(render_cache_reasons("- delta `source=fetch_cache_pp`", delta_cache_reasons))
lines.append("## 文件变更统计(按对象类型)\n\n")
lines.append("按 `ObjectAuditEntry.sha256_hex` 对比(同一 rsync URI 前后 hash 变化记为 `~changed`\n\n")
lines.append("| kind | added | changed | removed |\n")
lines.append("|---|---:|---:|---:|\n")
for kind in ["manifest","crl","certificate","roa","aspa","other"]:
st = kind_stats[kind]
lines.append(f"| {kind} | {st['added']} | {st['changed']} | {st['removed']} |\n")
lines.append("\n")
lines.append("## VRP 影响(去重后集合 diff\n\n")
lines.append("以 `(asn, prefix, max_length)` 为 key\n\n")
lines.append(f"- base unique VRP`{len(base_v)}`\n")
lines.append(f"- delta unique VRP`{len(delta_v)}`\n")
lines.append(f"- `added={len(added_v)}`\n")
lines.append(f"- `removed={len(removed_v)}`\n")
lines.append(f"- `net={len(added_v) - len(removed_v)}`\n\n")
out_md_path.write_text("".join(lines), encoding="utf-8")
print(out_md_path)
PY
echo "== done ==" >&2
echo "artifacts:" >&2
echo "- delta db: $DELTA_DB_DIR" >&2
echo "- delta report: $DELTA_REPORT_JSON" >&2
echo "- delta run log: $DELTA_RUN_LOG" >&2
echo "- delta meta json: $DELTA_META_JSON" >&2
echo "- analysis md: $DELTA_ANALYSIS_MD" >&2
echo "- base state tsv: $BASE_RRDP_STATE_TSV" >&2
echo "- delta state tsv: $DELTA_RRDP_STATE_TSV" >&2