20260326 完成数据库model迁移;20260327 增加一键replay脚本

This commit is contained in:
yuyr 2026-03-27 11:24:34 +08:00
parent fe8b89d829
commit cd0ba15286
28 changed files with 1956 additions and 930 deletions

View File

@ -0,0 +1,45 @@
# Replay Verify Scripts
## `run_multi_rir_ccr_replay_verify.sh`
用途:
- 通用 multi-RIR CCR replay verify 入口
- 通过 `--rir` 指定一个或多个 RIR按顺序执行
- 通过 `--mode` 指定 `snapshot``delta``both`
- 默认每个 RIR 的 RocksDB 目录在 compare/verify 结束后自动删除;传 `--keep-db` 才保留
- 同一次执行的所有产物都会先落到 `rpki/target/replay/<timestamp>/`
- 该时间戳目录下再按 RIR 分目录:
- `<rir>_ccr_replay_<timestamp>`
默认输入:
- bundle root: `/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3`
- 每个 RIR 的 TAL / TA / validation time / record CSV 由 `scripts/payload_replay/multi_rir_case_info.py` 解析
用法:
- 单个 RIR
- `./scripts/replay_verify/run_multi_rir_ccr_replay_verify.sh --rir apnic --mode both`
- `./scripts/replay_verify/run_multi_rir_ccr_replay_verify.sh --rir apnic --mode snapshot`
- `./scripts/replay_verify/run_multi_rir_ccr_replay_verify.sh --rir apnic --mode delta`
- `./scripts/replay_verify/run_multi_rir_ccr_replay_verify.sh --rir apnic --mode both`
- `./scripts/replay_verify/run_multi_rir_ccr_replay_verify.sh --rir apnic,ripe --mode snapshot`
- `./scripts/replay_verify/run_multi_rir_ccr_replay_verify.sh --rir afrinic,apnic,arin,lacnic,ripe --mode both`
- `./scripts/replay_verify/run_multi_rir_ccr_replay_verify.sh --rir apnic --mode delta --keep-db`
可覆盖环境变量:
- `BUNDLE_ROOT`
- `OUT_ROOT`(默认:`rpki/target/replay`
- `RUN_TAG`
主要产物:
- 单次执行根目录:
- `rpki/target/replay/<timestamp>/`
- 每个 RIR 子目录下:
- `<rir>_snapshot.ccr`
- `<rir>_delta.ccr`
- `<rir>_*_report.json`
- `<rir>_*_ccr_vrps.csv`
- `<rir>_*_ccr_compare_summary.md`
- `<rir>_*_ccr_verify.json`
- 同次执行总汇总:
- `multi_rir_ccr_replay_verify_<timestamp>_summary.md`
- `multi_rir_ccr_replay_verify_<timestamp>_summary.json`

View File

@ -0,0 +1,284 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
cd "$ROOT_DIR"
usage() {
cat <<'USAGE'
Usage:
run_multi_rir_ccr_replay_verify.sh --rir <rir[,rir...]> [--mode snapshot|delta|both] [--keep-db]
Options:
--rir <list> Comma-separated RIR list, e.g. apnic or apnic,ripe
--mode <mode> snapshot | delta | both (default: both)
--keep-db Keep per-run RocksDB directories (default: remove after verify)
--bundle-root <p> Override bundle root
--out-root <p> Override output root (default: rpki/target/replay)
--run-tag <tag> Override timestamp suffix for all RIR runs
USAGE
}
MODE="both"
KEEP_DB=0
RIR_LIST=""
BUNDLE_ROOT="${BUNDLE_ROOT:-/home/yuyr/dev/rust_playground/routinator/bench/multi_rir_demo/runs/20260316-112341-multi-final3}"
OUT_ROOT="${OUT_ROOT:-$ROOT_DIR/target/replay}"
RUN_TAG="${RUN_TAG:-$(date -u +%Y%m%dT%H%M%SZ)}"
while [[ $# -gt 0 ]]; do
case "$1" in
--rir)
shift
RIR_LIST="${1:-}"
;;
--mode)
shift
MODE="${1:-}"
;;
--keep-db)
KEEP_DB=1
;;
--bundle-root)
shift
BUNDLE_ROOT="${1:-}"
;;
--out-root)
shift
OUT_ROOT="${1:-}"
;;
--run-tag)
shift
RUN_TAG="${1:-}"
;;
-h|--help)
usage
exit 0
;;
*)
echo "unknown argument: $1" >&2
usage >&2
exit 2
;;
esac
shift || true
done
if [[ -z "$RIR_LIST" ]]; then
echo "--rir is required" >&2
usage >&2
exit 2
fi
case "$MODE" in
snapshot|delta|both) ;;
*)
echo "invalid --mode: $MODE" >&2
usage >&2
exit 2
;;
esac
CASE_INFO_SCRIPT="$ROOT_DIR/scripts/payload_replay/multi_rir_case_info.py"
mkdir -p "$OUT_ROOT"
RUN_ROOT="$OUT_ROOT/$RUN_TAG"
mkdir -p "$RUN_ROOT"
cargo build --release --bin rpki --bin ccr_to_routinator_csv --bin ccr_verify >/dev/null
summary_md="$RUN_ROOT/multi_rir_ccr_replay_verify_${RUN_TAG}_summary.md"
summary_json="$RUN_ROOT/multi_rir_ccr_replay_verify_${RUN_TAG}_summary.json"
python3 - <<'PY' >/dev/null
PY
summary_json_tmp="$(mktemp)"
printf '[]' > "$summary_json_tmp"
run_one_mode() {
local rir="$1"
local mode="$2"
local run_dir="$3"
local trust_anchor="$4"
local tal_path="$5"
local ta_path="$6"
local base_archive="$7"
local base_locks="$8"
local base_csv="$9"
local delta_archive="${10}"
local delta_locks="${11}"
local delta_csv="${12}"
local snapshot_validation_time="${13}"
local delta_validation_time="${14}"
local db_dir="$run_dir/${rir}_${mode}_db"
local report_json="$run_dir/${rir}_${mode}_report.json"
local run_log="$run_dir/${rir}_${mode}_run.log"
local ccr_path="$run_dir/${rir}_${mode}.ccr"
local csv_path="$run_dir/${rir}_${mode}_ccr_vrps.csv"
local compare_md="$run_dir/${rir}_${mode}_ccr_compare_summary.md"
local only_ours="$run_dir/${rir}_${mode}_ccr_only_in_ours.csv"
local only_record="$run_dir/${rir}_${mode}_ccr_only_in_record.csv"
local verify_json="$run_dir/${rir}_${mode}_ccr_verify.json"
local meta_json="$run_dir/${rir}_${mode}_meta.json"
rm -rf "$db_dir"
local -a cmd=(target/release/rpki --db "$db_dir" --tal-path "$tal_path" --ta-path "$ta_path")
if [[ "$mode" == "snapshot" ]]; then
cmd+=(--payload-replay-archive "$base_archive" --payload-replay-locks "$base_locks" --validation-time "$snapshot_validation_time")
else
cmd+=(
--payload-base-archive "$base_archive"
--payload-base-locks "$base_locks"
--payload-base-validation-time "$snapshot_validation_time"
--payload-delta-archive "$delta_archive"
--payload-delta-locks "$delta_locks"
--validation-time "$delta_validation_time"
)
fi
cmd+=(--report-json "$report_json" --ccr-out "$ccr_path")
local start_s end_s duration_s
start_s="$(date +%s)"
(
echo "# ${rir} ${mode} command:"
printf '%q ' "${cmd[@]}"
echo
echo
"${cmd[@]}"
) 2>&1 | tee "$run_log" >/dev/null
end_s="$(date +%s)"
duration_s="$((end_s - start_s))"
target/release/ccr_to_routinator_csv \
--ccr "$ccr_path" \
--out "$csv_path" \
--trust-anchor "$trust_anchor" >/dev/null
local record_csv
if [[ "$mode" == "snapshot" ]]; then
record_csv="$base_csv"
else
record_csv="$delta_csv"
fi
./scripts/payload_replay/compare_with_routinator_record.sh \
"$csv_path" \
"$record_csv" \
"$compare_md" \
"$only_ours" \
"$only_record" >/dev/null
target/release/ccr_verify \
--ccr "$ccr_path" \
--db "$db_dir" > "$verify_json"
python3 - "$report_json" "$meta_json" "$mode" "$duration_s" <<'PY'
import json, sys
from pathlib import Path
report = json.loads(Path(sys.argv[1]).read_text(encoding='utf-8'))
meta = {
'mode': sys.argv[3],
'duration_seconds': int(sys.argv[4]),
'validation_time': report.get('validation_time_rfc3339_utc'),
'publication_points_processed': report['tree']['instances_processed'],
'publication_points_failed': report['tree']['instances_failed'],
'vrps': len(report['vrps']),
'aspas': len(report['aspas']),
}
Path(sys.argv[2]).write_text(json.dumps(meta, ensure_ascii=False, indent=2)+'\n', encoding='utf-8')
PY
if [[ "$KEEP_DB" -eq 0 ]]; then
rm -rf "$db_dir"
fi
}
IFS=',' read -r -a RIRS <<< "$RIR_LIST"
for rir in "${RIRS[@]}"; do
rir="$(echo "$rir" | xargs)"
[[ -n "$rir" ]] || continue
eval "$(python3 "$CASE_INFO_SCRIPT" --bundle-root "$BUNDLE_ROOT" --rir "$rir" --format env)"
run_dir="$RUN_ROOT/${rir}_ccr_replay_${RUN_TAG}"
mkdir -p "$run_dir"
if [[ "$MODE" == "snapshot" || "$MODE" == "both" ]]; then
run_one_mode \
"$rir" snapshot "$run_dir" "$TRUST_ANCHOR" "$TAL_PATH" "$TA_PATH" \
"$PAYLOAD_REPLAY_ARCHIVE" "$PAYLOAD_REPLAY_LOCKS" "$ROUTINATOR_BASE_RECORD_CSV" \
"$PAYLOAD_DELTA_ARCHIVE" "$PAYLOAD_DELTA_LOCKS" "$ROUTINATOR_DELTA_RECORD_CSV" \
"$SNAPSHOT_VALIDATION_TIME" "$DELTA_VALIDATION_TIME"
fi
if [[ "$MODE" == "delta" || "$MODE" == "both" ]]; then
run_one_mode \
"$rir" delta "$run_dir" "$TRUST_ANCHOR" "$TAL_PATH" "$TA_PATH" \
"$PAYLOAD_REPLAY_ARCHIVE" "$PAYLOAD_REPLAY_LOCKS" "$ROUTINATOR_BASE_RECORD_CSV" \
"$PAYLOAD_DELTA_ARCHIVE" "$PAYLOAD_DELTA_LOCKS" "$ROUTINATOR_DELTA_RECORD_CSV" \
"$SNAPSHOT_VALIDATION_TIME" "$DELTA_VALIDATION_TIME"
fi
python3 - "$summary_json_tmp" "$run_dir" "$rir" "$MODE" <<'PY'
import json, sys
from pathlib import Path
summary_path = Path(sys.argv[1])
run_dir = Path(sys.argv[2])
rir = sys.argv[3]
mode = sys.argv[4]
rows = json.loads(summary_path.read_text(encoding='utf-8'))
for submode in ['snapshot','delta']:
if mode not in ('both', submode):
continue
compare = run_dir / f'{rir}_{submode}_ccr_compare_summary.md'
meta = run_dir / f'{rir}_{submode}_meta.json'
verify = run_dir / f'{rir}_{submode}_ccr_verify.json'
if not compare.exists() or not meta.exists() or not verify.exists():
continue
compare_text = compare.read_text(encoding='utf-8')
meta_obj = json.loads(meta.read_text(encoding='utf-8'))
verify_obj = json.loads(verify.read_text(encoding='utf-8'))
def metric(name):
prefix = f'| {name} | '
for line in compare_text.splitlines():
if line.startswith(prefix):
return int(line.split('|')[2].strip())
raise SystemExit(f'missing metric {name} in {compare}')
rows.append({
'rir': rir,
'mode': submode,
'run_dir': str(run_dir),
'duration_seconds': meta_obj['duration_seconds'],
'vrps': meta_obj['vrps'],
'aspas': meta_obj['aspas'],
'only_in_ours': metric('only_in_ours'),
'only_in_record': metric('only_in_record'),
'intersection': metric('intersection'),
'state_hashes_ok': verify_obj.get('state_hashes_ok'),
})
summary_path.write_text(json.dumps(rows, ensure_ascii=False, indent=2)+'\n', encoding='utf-8')
PY
done
python3 - "$summary_json_tmp" "$summary_json" "$summary_md" "$RUN_TAG" <<'PY'
import json, sys
from pathlib import Path
rows = json.loads(Path(sys.argv[1]).read_text(encoding='utf-8'))
out_json = Path(sys.argv[2])
out_md = Path(sys.argv[3])
run_tag = sys.argv[4]
out_json.write_text(json.dumps(rows, ensure_ascii=False, indent=2)+'\n', encoding='utf-8')
parts = []
parts.append('# Multi-RIR CCR Replay Verify Summary\n\n')
parts.append(f'- run_tag: `{run_tag}`\n\n')
parts.append('| rir | mode | duration_s | vrps | aspas | only_in_ours | only_in_record | state_hashes_ok |\n')
parts.append('|---|---|---:|---:|---:|---:|---:|---|\n')
for row in rows:
parts.append(f"| {row['rir']} | {row['mode']} | {row['duration_seconds']} | {row['vrps']} | {row['aspas']} | {row['only_in_ours']} | {row['only_in_record']} | {row['state_hashes_ok']} |\n")
out_md.write_text(''.join(parts), encoding='utf-8')
PY
rm -f "$summary_json_tmp"
echo "== multi-rir replay verify complete ==" >&2
echo "- summary: $summary_md" >&2
echo "- summary json: $summary_json" >&2

View File

@ -3,6 +3,36 @@
"version": 2,
"source": "https://marketplace.visualstudio.com/items?itemName=pomdtr.excalidraw-editor",
"elements": [
{
"id": "qRHdw-Ut-vBgaTaqBgwLz",
"type": "rectangle",
"x": 902.3138754863095,
"y": 2678.0908137043048,
"width": 236.65964942812968,
"height": 375.87119006484846,
"angle": 0,
"strokeColor": "transparent",
"backgroundColor": "#b2f2bb",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "Zz",
"roundness": {
"type": 3
},
"seed": 1898623381,
"version": 94,
"versionNonce": 843496469,
"isDeleted": false,
"boundElements": null,
"updated": 1774497955573,
"link": null,
"locked": false
},
{
"id": "6TX3XEpY6zQ8YD2gCks-J",
"type": "ellipse",
@ -2494,7 +2524,7 @@
"version": 72,
"versionNonce": 1545966256,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773114633322,
"link": null,
"locked": false,
@ -3116,7 +3146,7 @@
"version": 7,
"versionNonce": 1888566864,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773114395646,
"link": null,
"locked": false,
@ -3190,7 +3220,7 @@
"version": 91,
"versionNonce": 336574032,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115366206,
"link": null,
"locked": false
@ -3218,7 +3248,7 @@
"version": 52,
"versionNonce": 1619956816,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115399843,
"link": null,
"locked": false,
@ -3296,7 +3326,7 @@
"version": 5,
"versionNonce": 1422040240,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115711518,
"link": null,
"locked": false,
@ -3370,7 +3400,7 @@
"version": 7,
"versionNonce": 1849437264,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115714936,
"link": null,
"locked": false,
@ -3452,7 +3482,7 @@
"version": 5,
"versionNonce": 1494146128,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115718725,
"link": null,
"locked": false,
@ -3634,7 +3664,7 @@
"version": 5,
"versionNonce": 1110543440,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115722655,
"link": null,
"locked": false,
@ -3708,7 +3738,7 @@
"version": 5,
"versionNonce": 32818864,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115726419,
"link": null,
"locked": false,
@ -3853,7 +3883,7 @@
"version": 80,
"versionNonce": 2063705264,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115581907,
"link": null,
"locked": false
@ -3881,7 +3911,7 @@
"version": 8,
"versionNonce": 468362320,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115592009,
"link": null,
"locked": false,
@ -3918,7 +3948,7 @@
"version": 47,
"versionNonce": 355656368,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115597983,
"link": null,
"locked": false
@ -3946,7 +3976,7 @@
"version": 57,
"versionNonce": 1310461616,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115603065,
"link": null,
"locked": false
@ -4048,7 +4078,7 @@
"version": 59,
"versionNonce": 603605072,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115639925,
"link": null,
"locked": false,
@ -4091,7 +4121,7 @@
"version": 83,
"versionNonce": 1951717968,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773115646441,
"link": null,
"locked": false,
@ -5023,7 +5053,7 @@
"version": 41,
"versionNonce": 218425424,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773116118218,
"link": null,
"locked": false,
@ -5123,7 +5153,7 @@
"version": 18,
"versionNonce": 1830824624,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773116135062,
"link": null,
"locked": false,
@ -5223,7 +5253,7 @@
"version": 6,
"versionNonce": 258842800,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773116144968,
"link": null,
"locked": false,
@ -5323,7 +5353,7 @@
"version": 6,
"versionNonce": 1402610256,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773116155635,
"link": null,
"locked": false,
@ -5497,7 +5527,7 @@
"version": 16,
"versionNonce": 1175871664,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773116197849,
"link": null,
"locked": false,
@ -5597,7 +5627,7 @@
"version": 20,
"versionNonce": 153771696,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773116217438,
"link": null,
"locked": false,
@ -5701,7 +5731,7 @@
"version": 6,
"versionNonce": 602646096,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773116228554,
"link": null,
"locked": false,
@ -5775,7 +5805,7 @@
"version": 39,
"versionNonce": 313531472,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773129151744,
"link": null,
"locked": false,
@ -7509,7 +7539,7 @@
"version": 20,
"versionNonce": 771730096,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773128132466,
"link": null,
"locked": false,
@ -7546,7 +7576,7 @@
"version": 185,
"versionNonce": 2079384144,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773129151744,
"link": null,
"locked": false,
@ -8022,10 +8052,7 @@
],
"pressures": [],
"simulatePressure": true,
"lastCommittedPoint": [
170.39252493058768,
-5.1117753579201235
]
"lastCommittedPoint": null
},
{
"id": "ES3H6HxkXXAdU_TAK1q-Y",
@ -8109,7 +8136,7 @@
"version": 5,
"versionNonce": 1913455792,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773128156983,
"link": null,
"locked": false,
@ -8205,7 +8232,7 @@
"version": 5,
"versionNonce": 909726896,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773128159933,
"link": null,
"locked": false,
@ -10058,7 +10085,7 @@
"version": 9,
"versionNonce": 775627440,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773128137154,
"link": null,
"locked": false,
@ -10450,7 +10477,7 @@
"version": 5,
"versionNonce": 1293264048,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773128162365,
"link": null,
"locked": false,
@ -10546,7 +10573,7 @@
"version": 5,
"versionNonce": 111277232,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773128164616,
"link": null,
"locked": false,
@ -15921,7 +15948,7 @@
"version": 9,
"versionNonce": 134388400,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773128141149,
"link": null,
"locked": false,
@ -16013,7 +16040,7 @@
"version": 9,
"versionNonce": 1230377648,
"isDeleted": false,
"boundElements": null,
"boundElements": [],
"updated": 1773128144654,
"link": null,
"locked": false,
@ -16049,8 +16076,8 @@
"type": 2
},
"seed": 1572300368,
"version": 98,
"versionNonce": 1871666352,
"version": 99,
"versionNonce": 1923189141,
"isDeleted": false,
"boundElements": [
{
@ -16058,7 +16085,7 @@
"id": "buUBs6ZWoW0_XJOiVjU2X"
}
],
"updated": 1773129151974,
"updated": 1774497963434,
"link": null,
"locked": false,
"points": [
@ -16085,8 +16112,8 @@
{
"id": "buUBs6ZWoW0_XJOiVjU2X",
"type": "text",
"x": 952.6392707639827,
"y": 2621.571169580318,
"x": 961.1589223603503,
"y": 2771.516554079474,
"width": 40,
"height": 25,
"angle": 0,
@ -16102,11 +16129,11 @@
"index": "b6bV",
"roundness": null,
"seed": 1251791024,
"version": 9,
"versionNonce": 3508912,
"version": 10,
"versionNonce": 618429141,
"isDeleted": false,
"boundElements": null,
"updated": 1773128148385,
"boundElements": [],
"updated": 1774497962077,
"link": null,
"locked": false,
"text": "包含",
@ -16188,6 +16215,43 @@
"originalText": "PP3",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "G2dlYSYSyBcTaCSWE_9Fg",
"type": "text",
"x": 993.7959422946816,
"y": 2645.276672426363,
"width": 50.03996276855469,
"height": 25,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "#b2f2bb",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "b6e",
"roundness": null,
"seed": 1833275125,
"version": 27,
"versionNonce": 1513809621,
"isDeleted": false,
"boundElements": null,
"updated": 1774497979403,
"link": null,
"locked": false,
"text": "VCIR",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "VCIR",
"autoResize": true,
"lineHeight": 1.25
}
],
"appState": {

629
specs/sync.excalidraw Normal file
View File

@ -0,0 +1,629 @@
{
"type": "excalidraw",
"version": 2,
"source": "https://marketplace.visualstudio.com/items?itemName=pomdtr.excalidraw-editor",
"elements": [
{
"id": "782wmN2vbn0vYfClUbwVT",
"type": "rectangle",
"x": 458.5143563406808,
"y": 224.57136099679133,
"width": 335.08570861816406,
"height": 143.99998474121094,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a0",
"roundness": {
"type": 3
},
"seed": 264486616,
"version": 304,
"versionNonce": 505039016,
"isDeleted": false,
"boundElements": [
{
"type": "text",
"id": "491W0AyWpioiNNXTRuXMb"
}
],
"updated": 1774499907328,
"link": null,
"locked": false
},
{
"id": "491W0AyWpioiNNXTRuXMb",
"type": "text",
"x": 514.987287248884,
"y": 246.5713533673968,
"width": 222.1398468017578,
"height": 100,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a1",
"roundness": null,
"seed": 1658952360,
"version": 360,
"versionNonce": 404419496,
"isDeleted": false,
"boundElements": null,
"updated": 1774499907328,
"link": null,
"locked": false,
"text": "RAW BY HASH\nsha256 -> file content\n(.mft/.roa/.cer)\n通过hash找原始文件",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "center",
"verticalAlign": "middle",
"containerId": "782wmN2vbn0vYfClUbwVT",
"originalText": "RAW BY HASH\nsha256 -> file content (.mft/.roa/.cer)\n通过hash找原始文件",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "4v-5vJwc-YSKwDDA6wnNy",
"type": "rectangle",
"x": 86.74286869594027,
"y": 224.1142785208566,
"width": 332.79998561314164,
"height": 141.99999128069192,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a2",
"roundness": {
"type": 3
},
"seed": 1176722904,
"version": 312,
"versionNonce": 1631513048,
"isDeleted": false,
"boundElements": [
{
"type": "text",
"id": "tug_6QGIm4LrnsrYiV18D"
}
],
"updated": 1774499913411,
"link": null,
"locked": false
},
{
"id": "tug_6QGIm4LrnsrYiV18D",
"type": "text",
"x": 109.85297502790172,
"y": 245.11427416120256,
"width": 286.57977294921875,
"height": 100,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a3",
"roundness": null,
"seed": 1923736280,
"version": 409,
"versionNonce": 980676312,
"isDeleted": false,
"boundElements": [],
"updated": 1774499913411,
"link": null,
"locked": false,
"text": "REPOSITORY VIEW\nuri -> sha256(current version\nfile)\n对象uri 查找最新hash",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "center",
"verticalAlign": "middle",
"containerId": "4v-5vJwc-YSKwDDA6wnNy",
"originalText": "REPOSITORY VIEW\nuri -> sha256(current version file)\n对象uri 查找最新hash",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "V2AcY1746pbG544Yh7A7q",
"type": "rectangle",
"x": 90.85720498221252,
"y": -2.5142985752651725,
"width": 218.51431492396773,
"height": 205.71430751255576,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a4",
"roundness": {
"type": 3
},
"seed": 436378328,
"version": 498,
"versionNonce": 113477080,
"isDeleted": false,
"boundElements": [
{
"type": "text",
"id": "WVFatIOjK3SB8FDetV4ts"
}
],
"updated": 1774499431839,
"link": null,
"locked": false
},
{
"id": "WVFatIOjK3SB8FDetV4ts",
"type": "text",
"x": 110.4543816702706,
"y": 37.84285518101271,
"width": 179.31996154785156,
"height": 125,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a5",
"roundness": null,
"seed": 1637449688,
"version": 675,
"versionNonce": 1502150312,
"isDeleted": false,
"boundElements": [],
"updated": 1774499518593,
"link": null,
"locked": false,
"text": "RRDP SOURCE\nSTATE\nnotify -> state\n(session, serial)\n不同rrdp源同步状态",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "center",
"verticalAlign": "middle",
"containerId": "V2AcY1746pbG544Yh7A7q",
"originalText": "RRDP SOURCE STATE\nnotify -> state (session, serial)\n不同rrdp源同步状态",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "KaZIF4nN5lJcP8jlzm2ze",
"type": "rectangle",
"x": 333.25717054094576,
"y": -5.54285212925501,
"width": 217.3714316231864,
"height": 206.7142813546317,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a6",
"roundness": {
"type": 3
},
"seed": 1602529240,
"version": 528,
"versionNonce": 138363048,
"isDeleted": false,
"boundElements": [
{
"type": "text",
"id": "zs86EXmSVt0DGU7Yilr06"
}
],
"updated": 1774499548936,
"link": null,
"locked": false
},
{
"id": "zs86EXmSVt0DGU7Yilr06",
"type": "text",
"x": 341.94288635253895,
"y": 22.814288548060844,
"width": 200,
"height": 150,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a7",
"roundness": null,
"seed": 626126040,
"version": 865,
"versionNonce": 1330001880,
"isDeleted": false,
"boundElements": [],
"updated": 1774499555053,
"link": null,
"locked": false,
"text": "RRDP SOURCE\nMEMBER\nsource+ uri ->\npresent/withdraw\n前缀遍历获取rrdp源下\n全部对象不同源混放",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "center",
"verticalAlign": "middle",
"containerId": "KaZIF4nN5lJcP8jlzm2ze",
"originalText": "RRDP SOURCE MEMBER\nsource+ uri -> present/withdraw\n前缀遍历获取rrdp源下全部对象不同源混放",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "w4ratSPiaf_sJhmxzs_zB",
"type": "rectangle",
"x": 575.7714941842216,
"y": -5.842858450753411,
"width": 217.3714316231864,
"height": 206.7142813546317,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a8",
"roundness": {
"type": 3
},
"seed": 1385028264,
"version": 564,
"versionNonce": 1503452584,
"isDeleted": false,
"boundElements": [
{
"type": "text",
"id": "t7Y2vDpAmPdo00qNs6Lxp"
}
],
"updated": 1774499442854,
"link": null,
"locked": false
},
{
"id": "t7Y2vDpAmPdo00qNs6Lxp",
"type": "text",
"x": 584.7972292218891,
"y": 47.51428222656244,
"width": 199.31996154785156,
"height": 100,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "a9",
"roundness": null,
"seed": 1548502440,
"version": 945,
"versionNonce": 1851848664,
"isDeleted": false,
"boundElements": [],
"updated": 1774499514167,
"link": null,
"locked": false,
"text": "RRDP URI OWNER\nuri -> source\n反查对象所属rrdp源\n防止跨源误删",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "center",
"verticalAlign": "middle",
"containerId": "w4ratSPiaf_sJhmxzs_zB",
"originalText": "RRDP URI OWNER\nuri -> source\n反查对象所属rrdp源防止跨源误删",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "iHWL3p3MaRLZ-l7Es74es",
"type": "rectangle",
"x": 89.02865600585938,
"y": 398.08574567522317,
"width": 339.42862374441967,
"height": 161.14283970424117,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "aA",
"roundness": {
"type": 3
},
"seed": 600580568,
"version": 174,
"versionNonce": 863743704,
"isDeleted": false,
"boundElements": [
{
"type": "text",
"id": "gi3C5lbily2-D96ZNdUB_"
}
],
"updated": 1774499918860,
"link": null,
"locked": false
},
{
"id": "gi3C5lbily2-D96ZNdUB_",
"type": "text",
"x": 105.99298313685827,
"y": 441.15716552734375,
"width": 305.4999694824219,
"height": 75,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "aAV",
"roundness": null,
"seed": 1096467112,
"version": 228,
"versionNonce": 315626456,
"isDeleted": false,
"boundElements": null,
"updated": 1774499918860,
"link": null,
"locked": false,
"text": "VCIR\n按照CA为单元记录已验证缓存的\nRPKI对象产物树状结构",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "center",
"verticalAlign": "middle",
"containerId": "iHWL3p3MaRLZ-l7Es74es",
"originalText": "VCIR\n按照CA为单元记录已验证缓存的RPKI对象产物树状结构",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "J6aHbqCN1b8plYxKAKfYT",
"type": "rectangle",
"x": 454.17152186802457,
"y": 399.80005972725996,
"width": 339.42862374441967,
"height": 161.14283970424117,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "aC",
"roundness": {
"type": 3
},
"seed": 1274007512,
"version": 244,
"versionNonce": 877648088,
"isDeleted": false,
"boundElements": [
{
"type": "text",
"id": "ernK0EMAzhxJvpYUaWPiS"
}
],
"updated": 1774499918860,
"link": null,
"locked": false
},
{
"id": "ernK0EMAzhxJvpYUaWPiS",
"type": "text",
"x": 461.35585021972656,
"y": 442.87147957938055,
"width": 325.0599670410156,
"height": 75,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "aD",
"roundness": null,
"seed": 706221272,
"version": 431,
"versionNonce": 2072968664,
"isDeleted": false,
"boundElements": [],
"updated": 1774499918860,
"link": null,
"locked": false,
"text": "AUDIT RULE INDEX\n溯源审计用户通过产物规则hash反\n向查找对应VCIR节点",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "center",
"verticalAlign": "middle",
"containerId": "J6aHbqCN1b8plYxKAKfYT",
"originalText": "AUDIT RULE INDEX\n溯源审计用户通过产物规则hash反向查找对应VCIR节点",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "kkkXT2D6yQsceW2UyfJPF",
"type": "text",
"x": -121.25701032366032,
"y": 73.5142887660437,
"width": 186.9999542236328,
"height": 25,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "aE",
"roundness": null,
"seed": 2098948056,
"version": 64,
"versionNonce": 1921034200,
"isDeleted": false,
"boundElements": null,
"updated": 1774499954136,
"link": null,
"locked": false,
"text": "RRDP 同步状态数据",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "RRDP 同步状态数据",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "BKIRb0Geq874XYWd0OLtS",
"type": "text",
"x": -92.68558175223177,
"y": 276.9428296770369,
"width": 120,
"height": 25,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "aF",
"roundness": null,
"seed": 791319000,
"version": 31,
"versionNonce": 703934936,
"isDeleted": false,
"boundElements": null,
"updated": 1774499970564,
"link": null,
"locked": false,
"text": "原始文件数据",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "原始文件数据",
"autoResize": true,
"lineHeight": 1.25
},
{
"id": "wNzjeS0S_Ji1bKTrKDoyd",
"type": "text",
"x": -101.82843017578091,
"y": 473.51426696777366,
"width": 140,
"height": 25,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 2,
"strokeStyle": "solid",
"roughness": 1,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "aG",
"roundness": null,
"seed": 1183095768,
"version": 26,
"versionNonce": 1711685800,
"isDeleted": false,
"boundElements": null,
"updated": 1774499982445,
"link": null,
"locked": false,
"text": "已验证产物数据",
"fontSize": 20,
"fontFamily": 5,
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "已验证产物数据",
"autoResize": true,
"lineHeight": 1.25
}
],
"appState": {
"gridSize": 20,
"gridStep": 5,
"gridModeEnabled": false,
"viewBackgroundColor": "#ffffff"
},
"files": {}
}

View File

@ -384,7 +384,7 @@ mod tests {
]
}
fn put_raw(store: &RocksStore, bytes: &[u8], uri: &str, object_type: &str) {
fn put_raw_evidence(store: &RocksStore, bytes: &[u8], uri: &str, object_type: &str) {
let mut entry = RawByHashEntry::from_bytes(sha256_hex(bytes), bytes.to_vec());
entry.origin_uris.push(uri.to_string());
entry.object_type = Some(object_type.to_string());
@ -432,17 +432,17 @@ mod tests {
.put_audit_rule_index_entry(&rule_entry)
.expect("put rule index");
put_raw(&store, leaf_manifest.as_bytes(), leaf_manifest, "mft");
put_raw(
put_raw_evidence(&store, leaf_manifest.as_bytes(), leaf_manifest, "mft");
put_raw_evidence(
&store,
format!("{}-crl", leaf_manifest).as_bytes(),
&leaf_manifest.replace(".mft", ".crl"),
"crl",
);
put_raw(&store, b"roa-raw", &local.source_object_uri, "roa");
put_raw(&store, b"roa-ee", "rsync://example.test/leaf/a.ee", "cer");
put_raw(&store, root_manifest.as_bytes(), root_manifest, "mft");
put_raw(
put_raw_evidence(&store, b"roa-raw", &local.source_object_uri, "roa");
put_raw_evidence(&store, b"roa-ee", "rsync://example.test/leaf/a.ee", "cer");
put_raw_evidence(&store, root_manifest.as_bytes(), root_manifest, "mft");
put_raw_evidence(
&store,
format!("{}-crl", root_manifest).as_bytes(),
&root_manifest.replace(".mft", ".crl"),

View File

@ -3,9 +3,9 @@ use std::path::PathBuf;
use rocksdb::{DB, IteratorMode, Options};
use rpki::storage::{
ALL_COLUMN_FAMILY_NAMES, CF_AUDIT_RULE_INDEX, CF_RAW_BY_HASH, CF_RAW_OBJECTS,
CF_REPOSITORY_VIEW, CF_RRDP_OBJECT_INDEX, CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_STATE,
CF_RRDP_URI_OWNER, CF_VCIR, column_family_descriptors,
ALL_COLUMN_FAMILY_NAMES, CF_AUDIT_RULE_INDEX, CF_RAW_BY_HASH, CF_REPOSITORY_VIEW,
CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, CF_VCIR,
column_family_descriptors,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -48,8 +48,7 @@ Options:
Output groups:
- current_repository_view: repository_view + raw_by_hash
- current_validation_state: vcir + audit_rule_index
- current_rrdp_state: rrdp_state + rrdp_source + rrdp_source_member + rrdp_uri_owner
- legacy_compatibility: raw_objects + rrdp_object_index
- current_rrdp_state: rrdp_source + rrdp_source_member + rrdp_uri_owner
"
)
}
@ -78,10 +77,9 @@ fn cf_group(cf_name: &str) -> CfGroup {
match cf_name {
CF_REPOSITORY_VIEW | CF_RAW_BY_HASH => CfGroup::CurrentRepositoryView,
CF_VCIR | CF_AUDIT_RULE_INDEX => CfGroup::CurrentValidationState,
CF_RRDP_STATE | CF_RRDP_SOURCE | CF_RRDP_SOURCE_MEMBER | CF_RRDP_URI_OWNER => {
CF_RRDP_SOURCE | CF_RRDP_SOURCE_MEMBER | CF_RRDP_URI_OWNER => {
CfGroup::CurrentRrdpState
}
CF_RAW_OBJECTS | CF_RRDP_OBJECT_INDEX => CfGroup::LegacyCompatibility,
_ => CfGroup::LegacyCompatibility,
}
}
@ -175,7 +173,7 @@ mod tests {
);
assert_eq!(cf_group(CF_RRDP_SOURCE), CfGroup::CurrentRrdpState);
assert_eq!(cf_group(CF_RRDP_URI_OWNER), CfGroup::CurrentRrdpState);
assert_eq!(cf_group(CF_RAW_OBJECTS), CfGroup::LegacyCompatibility);
assert_eq!(cf_group("unknown_legacy"), CfGroup::LegacyCompatibility);
}
#[test]
@ -185,15 +183,13 @@ mod tests {
(CF_RAW_BY_HASH, 7),
(CF_VCIR, 11),
(CF_AUDIT_RULE_INDEX, 13),
(CF_RRDP_STATE, 17),
(CF_RRDP_SOURCE_MEMBER, 19),
(CF_RRDP_OBJECT_INDEX, 29),
]);
assert_eq!(grouped.get(&CfGroup::CurrentRepositoryView), Some(&12));
assert_eq!(grouped.get(&CfGroup::CurrentValidationState), Some(&24));
assert_eq!(grouped.get(&CfGroup::CurrentRrdpState), Some(&36));
assert_eq!(grouped.get(&CfGroup::LegacyCompatibility), Some(&29));
assert_eq!(grouped.get(&CfGroup::CurrentRrdpState), Some(&19));
assert_eq!(grouped.get(&CfGroup::LegacyCompatibility), None);
}
#[test]
@ -201,6 +197,6 @@ mod tests {
let text = usage();
assert!(text.contains("--exact"), "{text}");
assert!(text.contains("current_validation_state"), "{text}");
assert!(text.contains("legacy_compatibility"), "{text}");
assert!(text.contains("current_rrdp_state"), "{text}");
}
}

View File

@ -2,14 +2,12 @@ use std::path::PathBuf;
use rocksdb::{DB, IteratorMode, Options};
use rpki::storage::{
CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_STATE, CF_RRDP_URI_OWNER,
RrdpSourceMemberRecord, RrdpSourceRecord, RrdpUriOwnerRecord, column_family_descriptors,
CF_RRDP_SOURCE, CF_RRDP_SOURCE_MEMBER, CF_RRDP_URI_OWNER, RrdpSourceMemberRecord,
RrdpSourceRecord, RrdpUriOwnerRecord, column_family_descriptors,
};
use rpki::sync::rrdp::RrdpState;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum DumpView {
LegacyState,
Source,
Members,
Owners,
@ -19,13 +17,12 @@ enum DumpView {
impl DumpView {
fn parse(value: &str) -> Result<Self, String> {
match value {
"legacy-state" => Ok(Self::LegacyState),
"source" => Ok(Self::Source),
"members" => Ok(Self::Members),
"owners" => Ok(Self::Owners),
"all" => Ok(Self::All),
other => Err(format!(
"invalid --view: {other} (expected one of: legacy-state, source, members, owners, all)"
"invalid --view: {other} (expected one of: source, members, owners, all)"
)),
}
}
@ -42,7 +39,7 @@ fn usage() -> String {
format!(
"\
Usage:
{bin} --db <path> [--view <legacy-state|source|members|owners|all>]
{bin} --db <path> [--view <source|members|owners|all>]
Options:
--db <path> RocksDB directory
@ -94,22 +91,6 @@ fn open_db(path: &std::path::Path) -> Result<DB, Box<dyn std::error::Error>> {
)?)
}
fn collect_legacy_state(db: &DB) -> Result<Vec<(String, RrdpState)>, Box<dyn std::error::Error>> {
let cf = db
.cf_handle(CF_RRDP_STATE)
.ok_or("missing column family: rrdp_state")?;
let mut out = Vec::new();
for res in db.iterator_cf(cf, IteratorMode::Start) {
let (k, v) = res?;
let notify_uri = String::from_utf8_lossy(&k).to_string();
let state = RrdpState::decode(&v)
.map_err(|e| format!("decode rrdp_state failed for {notify_uri}: {e}"))?;
out.push((notify_uri, state));
}
out.sort_by(|a, b| a.0.cmp(&b.0));
Ok(out)
}
fn collect_source_records(db: &DB) -> Result<Vec<RrdpSourceRecord>, Box<dyn std::error::Error>> {
let cf = db
.cf_handle(CF_RRDP_SOURCE)
@ -163,14 +144,6 @@ fn collect_uri_owner_records(
Ok(out)
}
fn print_legacy_state(entries: &[(String, RrdpState)]) {
println!("[legacy-state]");
println!("notify_uri\tserial\tsession_id");
for (notify_uri, state) in entries {
println!("{notify_uri}\t{}\t{}", state.serial, state.session_id);
}
}
fn print_source_records(entries: &[RrdpSourceRecord]) {
println!("[source]");
println!("notify_uri\tlast_serial\tlast_session_id\tsync_state\tlast_snapshot_uri\tlast_error");
@ -228,12 +201,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let db = open_db(&args.db_path)?;
match args.view {
DumpView::LegacyState => print_legacy_state(&collect_legacy_state(&db)?),
DumpView::Source => print_source_records(&collect_source_records(&db)?),
DumpView::Members => print_source_member_records(&collect_source_member_records(&db)?),
DumpView::Owners => print_uri_owner_records(&collect_uri_owner_records(&db)?),
DumpView::All => {
print_legacy_state(&collect_legacy_state(&db)?);
print_source_records(&collect_source_records(&db)?);
print_source_member_records(&collect_source_member_records(&db)?);
print_uri_owner_records(&collect_uri_owner_records(&db)?);
@ -275,19 +246,9 @@ mod tests {
}
#[test]
fn collect_rrdp_views_reads_legacy_and_current_records() {
fn collect_rrdp_views_reads_current_records() {
let dir = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(dir.path()).expect("open store");
let legacy_state = RrdpState {
session_id: "session-1".to_string(),
serial: 42,
};
store
.put_rrdp_state(
"https://example.test/notify.xml",
&legacy_state.encode().expect("encode legacy state"),
)
.expect("put legacy state");
store
.put_rrdp_source_record(&RrdpSourceRecord {
notify_uri: "https://example.test/notify.xml".to_string(),
@ -328,14 +289,10 @@ mod tests {
drop(store);
let db = open_db(dir.path()).expect("open db");
let legacy = collect_legacy_state(&db).expect("legacy dump");
let sources = collect_source_records(&db).expect("source dump");
let members = collect_source_member_records(&db).expect("members dump");
let owners = collect_uri_owner_records(&db).expect("owners dump");
assert_eq!(legacy.len(), 1);
assert_eq!(legacy[0].0, "https://example.test/notify.xml");
assert_eq!(legacy[0].1.serial, 42);
assert_eq!(sources.len(), 1);
assert_eq!(sources[0].sync_state, RrdpSourceSyncState::DeltaReady);
assert_eq!(members.len(), 1);

View File

@ -10,9 +10,6 @@ use sha2::Digest;
use crate::data_model::rc::{AsResourceSet, IpResourceSet};
pub const CF_RAW_OBJECTS: &str = "raw_objects";
pub const CF_RRDP_STATE: &str = "rrdp_state";
pub const CF_RRDP_OBJECT_INDEX: &str = "rrdp_object_index";
pub const CF_REPOSITORY_VIEW: &str = "repository_view";
pub const CF_RAW_BY_HASH: &str = "raw_by_hash";
pub const CF_VCIR: &str = "vcir";
@ -22,9 +19,6 @@ pub const CF_RRDP_SOURCE_MEMBER: &str = "rrdp_source_member";
pub const CF_RRDP_URI_OWNER: &str = "rrdp_uri_owner";
pub const ALL_COLUMN_FAMILY_NAMES: &[&str] = &[
CF_RAW_OBJECTS,
CF_RRDP_STATE,
CF_RRDP_OBJECT_INDEX,
CF_REPOSITORY_VIEW,
CF_RAW_BY_HASH,
CF_VCIR,
@ -34,7 +28,6 @@ pub const ALL_COLUMN_FAMILY_NAMES: &[&str] = &[
CF_RRDP_URI_OWNER,
];
const RRDP_OBJECT_INDEX_PREFIX: &[u8] = b"rrdp_obj:";
const REPOSITORY_VIEW_KEY_PREFIX: &str = "repo_view:";
const RAW_BY_HASH_KEY_PREFIX: &str = "rawbyhash:";
const VCIR_KEY_PREFIX: &str = "vcir:";
@ -785,224 +778,6 @@ impl RocksStore {
.ok_or(StorageError::MissingColumnFamily(name))
}
pub fn put_raw(&self, rsync_uri: &str, bytes: &[u8]) -> StorageResult<()> {
let cf = self.cf(CF_RAW_OBJECTS)?;
self.db
.put_cf(cf, rsync_uri.as_bytes(), bytes)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn put_raw_batch(&self, objects: Vec<(String, Vec<u8>)>) -> StorageResult<usize> {
if objects.is_empty() {
return Ok(0);
}
let cf = self.cf(CF_RAW_OBJECTS)?;
let mut batch = WriteBatch::default();
for (rsync_uri, bytes) in &objects {
batch.put_cf(cf, rsync_uri.as_bytes(), bytes.as_slice());
}
self.write_batch(batch)?;
Ok(objects.len())
}
pub fn get_raw(&self, rsync_uri: &str) -> StorageResult<Option<Vec<u8>>> {
let cf = self.cf(CF_RAW_OBJECTS)?;
let v = self
.db
.get_cf(cf, rsync_uri.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(v)
}
pub fn delete_raw(&self, rsync_uri: &str) -> StorageResult<()> {
let cf = self.cf(CF_RAW_OBJECTS)?;
self.db
.delete_cf(cf, rsync_uri.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn put_rrdp_state(&self, notification_uri: &str, bytes: &[u8]) -> StorageResult<()> {
let cf = self.cf(CF_RRDP_STATE)?;
self.db
.put_cf(cf, notification_uri.as_bytes(), bytes)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn get_rrdp_state(&self, notification_uri: &str) -> StorageResult<Option<Vec<u8>>> {
let cf = self.cf(CF_RRDP_STATE)?;
let v = self
.db
.get_cf(cf, notification_uri.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(v)
}
#[allow(dead_code)]
pub fn delete_rrdp_state(&self, notification_uri: &str) -> StorageResult<()> {
let cf = self.cf(CF_RRDP_STATE)?;
self.db
.delete_cf(cf, notification_uri.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
fn rrdp_object_index_key(notification_uri: &str, rsync_uri: &str) -> Vec<u8> {
let mut out = Vec::with_capacity(
RRDP_OBJECT_INDEX_PREFIX.len() + notification_uri.len() + 1 + rsync_uri.len(),
);
out.extend_from_slice(RRDP_OBJECT_INDEX_PREFIX);
out.extend_from_slice(notification_uri.as_bytes());
out.push(0);
out.extend_from_slice(rsync_uri.as_bytes());
out
}
fn rrdp_object_index_prefix(notification_uri: &str) -> Vec<u8> {
let mut out =
Vec::with_capacity(RRDP_OBJECT_INDEX_PREFIX.len() + notification_uri.len() + 1);
out.extend_from_slice(RRDP_OBJECT_INDEX_PREFIX);
out.extend_from_slice(notification_uri.as_bytes());
out.push(0);
out
}
#[allow(dead_code)]
pub fn rrdp_object_index_contains(
&self,
notification_uri: &str,
rsync_uri: &str,
) -> StorageResult<bool> {
let cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let k = Self::rrdp_object_index_key(notification_uri, rsync_uri);
Ok(self
.db
.get_cf(cf, k)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.is_some())
}
#[allow(dead_code)]
pub fn rrdp_object_index_iter(
&self,
notification_uri: &str,
) -> StorageResult<impl Iterator<Item = String> + '_> {
let cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let prefix = Self::rrdp_object_index_prefix(notification_uri);
let prefix_len = prefix.len();
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
Ok(self
.db
.iterator_cf(cf, mode)
.take_while(move |res| match res {
Ok((k, _v)) => k.starts_with(prefix.as_slice()),
Err(_) => false,
})
.filter_map(move |res| {
let (k, _v) = res.ok()?;
let rsync_part = k.get(prefix_len..)?;
let s = std::str::from_utf8(rsync_part).ok()?;
Some(s.to_string())
}))
}
#[allow(dead_code)]
pub fn rrdp_object_index_clear(&self, notification_uri: &str) -> StorageResult<usize> {
let cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let prefix = Self::rrdp_object_index_prefix(notification_uri);
let mode = IteratorMode::From(prefix.as_slice(), Direction::Forward);
let keys: Vec<Box<[u8]>> = self
.db
.iterator_cf(cf, mode)
.take_while(|res| match res {
Ok((k, _v)) => k.starts_with(prefix.as_slice()),
Err(_) => false,
})
.filter_map(|res| res.ok().map(|(k, _v)| k))
.collect();
if keys.is_empty() {
return Ok(0);
}
let mut batch = WriteBatch::default();
for k in &keys {
batch.delete_cf(cf, k);
}
self.write_batch(batch)?;
Ok(keys.len())
}
pub fn apply_rrdp_snapshot(
&self,
notification_uri: &str,
published: &[(String, Vec<u8>)],
) -> StorageResult<usize> {
let raw_cf = self.cf(CF_RAW_OBJECTS)?;
let idx_cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let mut new_set: HashSet<&str> = HashSet::with_capacity(published.len());
for (u, _b) in published {
new_set.insert(u.as_str());
}
let old_uris: Vec<String> = self.rrdp_object_index_iter(notification_uri)?.collect();
let mut batch = WriteBatch::default();
for old in &old_uris {
if !new_set.contains(old.as_str()) {
batch.delete_cf(raw_cf, old.as_bytes());
let k = Self::rrdp_object_index_key(notification_uri, old.as_str());
batch.delete_cf(idx_cf, k);
}
}
for (uri, bytes) in published {
batch.put_cf(raw_cf, uri.as_bytes(), bytes.as_slice());
let k = Self::rrdp_object_index_key(notification_uri, uri.as_str());
batch.put_cf(idx_cf, k, b"");
}
self.write_batch(batch)?;
Ok(published.len())
}
pub fn apply_rrdp_delta(
&self,
notification_uri: &str,
ops: &[RrdpDeltaOp],
) -> StorageResult<usize> {
if ops.is_empty() {
return Ok(0);
}
let raw_cf = self.cf(CF_RAW_OBJECTS)?;
let idx_cf = self.cf(CF_RRDP_OBJECT_INDEX)?;
let mut batch = WriteBatch::default();
for op in ops {
match op {
RrdpDeltaOp::Upsert { rsync_uri, bytes } => {
batch.put_cf(raw_cf, rsync_uri.as_bytes(), bytes.as_slice());
let k = Self::rrdp_object_index_key(notification_uri, rsync_uri.as_str());
batch.put_cf(idx_cf, k, b"");
}
RrdpDeltaOp::Delete { rsync_uri } => {
batch.delete_cf(raw_cf, rsync_uri.as_bytes());
let k = Self::rrdp_object_index_key(notification_uri, rsync_uri.as_str());
batch.delete_cf(idx_cf, k);
}
}
}
self.write_batch(batch)?;
Ok(ops.len())
}
pub fn put_repository_view_entry(&self, entry: &RepositoryViewEntry) -> StorageResult<()> {
entry.validate_internal()?;
let cf = self.cf(CF_REPOSITORY_VIEW)?;
@ -1145,6 +920,16 @@ impl RocksStore {
self.write_batch(batch)
}
pub fn delete_raw_by_hash_entry(&self, sha256_hex: &str) -> StorageResult<()> {
validate_sha256_hex("raw_by_hash.sha256_hex", sha256_hex)?;
let cf = self.cf(CF_RAW_BY_HASH)?;
let key = raw_by_hash_key(sha256_hex);
self.db
.delete_cf(cf, key.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn get_raw_by_hash_entry(&self, sha256_hex: &str) -> StorageResult<Option<RawByHashEntry>> {
let cf = self.cf(CF_RAW_BY_HASH)?;
let key = raw_by_hash_key(sha256_hex);
@ -1411,6 +1196,55 @@ impl RocksStore {
.collect()
}
pub fn list_current_rrdp_source_members(
&self,
notify_uri: &str,
) -> StorageResult<Vec<RrdpSourceMemberRecord>> {
let mut records = self.list_rrdp_source_member_records(notify_uri)?;
records.retain(|record| record.present);
records.sort_by(|a, b| a.rsync_uri.cmp(&b.rsync_uri));
Ok(records)
}
pub fn is_current_rrdp_source_member(
&self,
notify_uri: &str,
rsync_uri: &str,
) -> StorageResult<bool> {
Ok(matches!(
self.get_rrdp_source_member_record(notify_uri, rsync_uri)?,
Some(record) if record.present
))
}
pub fn load_current_object_bytes_by_uri(&self, rsync_uri: &str) -> StorageResult<Option<Vec<u8>>> {
let Some(view) = self.get_repository_view_entry(rsync_uri)? else {
return Ok(None);
};
match view.state {
RepositoryViewState::Withdrawn => Ok(None),
RepositoryViewState::Present | RepositoryViewState::Replaced => {
let hash = view
.current_hash
.as_deref()
.ok_or(StorageError::InvalidData {
entity: "repository_view",
detail: format!(
"current_hash missing for current object URI: {rsync_uri}"
),
})?;
let raw = self.get_raw_by_hash_entry(hash)?.ok_or(StorageError::InvalidData {
entity: "repository_view",
detail: format!(
"raw_by_hash entry missing for current object URI: {rsync_uri} (hash={hash})"
),
})?;
Ok(Some(raw.bytes))
}
}
}
pub fn put_rrdp_uri_owner_record(&self, record: &RrdpUriOwnerRecord) -> StorageResult<()> {
record.validate_internal()?;
let cf = self.cf(CF_RRDP_URI_OWNER)?;
@ -1449,31 +1283,6 @@ impl RocksStore {
Ok(())
}
#[allow(dead_code)]
pub fn raw_iter_prefix<'a>(
&'a self,
prefix: &'a [u8],
) -> StorageResult<impl Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
let cf = self.cf(CF_RAW_OBJECTS)?;
let mode = IteratorMode::From(prefix, Direction::Forward);
Ok(self
.db
.iterator_cf(cf, mode)
.take_while(move |res| match res {
Ok((k, _v)) => k.starts_with(prefix),
Err(_) => false,
})
.filter_map(|res| res.ok()))
}
#[allow(dead_code)]
pub fn raw_iter_all<'a>(
&'a self,
) -> StorageResult<impl Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
let cf = self.cf(CF_RAW_OBJECTS)?;
let mode = IteratorMode::Start;
Ok(self.db.iterator_cf(cf, mode).filter_map(|res| res.ok()))
}
#[allow(dead_code)]
@ -1865,121 +1674,6 @@ mod tests {
}
}
#[test]
fn rrdp_object_index_and_snapshot_delta_helpers_work_end_to_end() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let notification_uri = "https://rrdp.example.test/notification.xml";
let u1 = "rsync://rpki.example.test/repo/obj1.cer".to_string();
let u2 = "rsync://rpki.example.test/repo/obj2.mft".to_string();
assert_eq!(
store
.rrdp_object_index_clear(notification_uri)
.expect("clear empty"),
0
);
let published_v1 = vec![(u1.clone(), vec![1u8, 2, 3]), (u2.clone(), vec![9u8, 8, 7])];
let n = store
.apply_rrdp_snapshot(notification_uri, &published_v1)
.expect("apply snapshot v1");
assert_eq!(n, 2);
assert_eq!(
store.get_raw(&u1).expect("get_raw u1"),
Some(vec![1u8, 2, 3])
);
assert_eq!(
store.get_raw(&u2).expect("get_raw u2"),
Some(vec![9u8, 8, 7])
);
assert!(
store
.rrdp_object_index_contains(notification_uri, &u1)
.expect("contains u1")
);
assert!(
store
.rrdp_object_index_contains(notification_uri, &u2)
.expect("contains u2")
);
let mut listed: Vec<String> = store
.rrdp_object_index_iter(notification_uri)
.expect("iter")
.collect();
listed.sort();
assert_eq!(listed, vec![u1.clone(), u2.clone()]);
let published_v2 = vec![(u2.clone(), vec![0u8, 1, 2, 3])];
store
.apply_rrdp_snapshot(notification_uri, &published_v2)
.expect("apply snapshot v2");
assert_eq!(store.get_raw(&u1).expect("get_raw removed"), None);
assert_eq!(
store.get_raw(&u2).expect("get_raw updated"),
Some(vec![0u8, 1, 2, 3])
);
let u3 = "rsync://rpki.example.test/repo/obj3.crl".to_string();
let ops = vec![
RrdpDeltaOp::Upsert {
rsync_uri: u3.clone(),
bytes: vec![4u8, 5, 6],
},
RrdpDeltaOp::Delete {
rsync_uri: u2.clone(),
},
];
let applied = store
.apply_rrdp_delta(notification_uri, &ops)
.expect("apply delta");
assert_eq!(applied, 2);
assert_eq!(store.get_raw(&u2).expect("get_raw deleted"), None);
assert_eq!(
store.get_raw(&u3).expect("get_raw u3"),
Some(vec![4u8, 5, 6])
);
let prefix = b"rsync://rpki.example.test/repo/";
let mut got: Vec<String> = store
.raw_iter_prefix(prefix)
.expect("raw_iter_prefix")
.map(|(k, _v)| String::from_utf8(k.to_vec()).expect("utf8 key"))
.collect();
got.sort();
assert_eq!(got, vec![u3.clone()]);
let all: Vec<String> = store
.raw_iter_all()
.expect("raw_iter_all")
.map(|(k, _v)| String::from_utf8(k.to_vec()).expect("utf8 key"))
.collect();
assert!(all.contains(&u3));
let cleared = store
.rrdp_object_index_clear(notification_uri)
.expect("clear");
assert!(cleared >= 1);
assert!(
!store
.rrdp_object_index_contains(notification_uri, &u3)
.expect("contains after clear")
);
}
#[test]
fn apply_rrdp_delta_empty_ops_is_noop() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let n = store
.apply_rrdp_delta("https://rrdp.example.test/notification.xml", &[])
.expect("apply empty delta");
assert_eq!(n, 0);
}
#[test]
fn repository_view_and_raw_by_hash_roundtrip() {
let td = tempfile::tempdir().expect("tempdir");
@ -2271,47 +1965,6 @@ mod tests {
.validate_internal()
.expect_err("duplicate origin URI must fail");
assert!(err.to_string().contains("duplicate origin URI"));
store
.put_raw_batch(vec![(
"rsync://example.test/repo/raw.cer".to_string(),
vec![1, 2, 3],
)])
.expect("put_raw_batch stores entries");
assert_eq!(
store
.get_raw("rsync://example.test/repo/raw.cer")
.expect("get raw"),
Some(vec![1, 2, 3])
);
store
.delete_raw("rsync://example.test/repo/raw.cer")
.expect("delete raw entry");
assert!(
store
.get_raw("rsync://example.test/repo/raw.cer")
.expect("get deleted raw")
.is_none()
);
store
.put_rrdp_state("https://rrdp.example.test/notification.xml", b"state")
.expect("put rrdp state");
assert_eq!(
store
.get_rrdp_state("https://rrdp.example.test/notification.xml")
.expect("get rrdp state"),
Some(b"state".to_vec())
);
store
.delete_rrdp_state("https://rrdp.example.test/notification.xml")
.expect("delete rrdp state");
assert!(
store
.get_rrdp_state("https://rrdp.example.test/notification.xml")
.expect("get deleted rrdp state")
.is_none()
);
}
#[test]
@ -2494,4 +2147,167 @@ mod tests {
);
}
#[test]
fn current_rrdp_source_member_helpers_filter_present_records() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let notify_uri = "https://rrdp.example.test/notification.xml";
let mut present_a =
sample_rrdp_source_member_record(notify_uri, "rsync://example.test/repo/a.cer", 1);
let mut withdrawn_b =
sample_rrdp_source_member_record(notify_uri, "rsync://example.test/repo/b.roa", 2);
withdrawn_b.present = false;
let present_c =
sample_rrdp_source_member_record(notify_uri, "rsync://example.test/repo/c.crl", 3);
let other_source = sample_rrdp_source_member_record(
"https://other.example.test/notification.xml",
"rsync://other.example.test/repo/x.cer",
4,
);
present_a.last_confirmed_serial = 10;
store
.put_rrdp_source_member_record(&present_a)
.expect("put present a");
store
.put_rrdp_source_member_record(&withdrawn_b)
.expect("put withdrawn b");
store
.put_rrdp_source_member_record(&present_c)
.expect("put present c");
store
.put_rrdp_source_member_record(&other_source)
.expect("put other source");
let members = store
.list_current_rrdp_source_members(notify_uri)
.expect("list current members");
assert_eq!(
members
.iter()
.map(|record| record.rsync_uri.as_str())
.collect::<Vec<_>>(),
vec![
"rsync://example.test/repo/a.cer",
"rsync://example.test/repo/c.crl",
]
);
assert!(
store
.is_current_rrdp_source_member(notify_uri, &present_a.rsync_uri)
.expect("current a")
);
assert!(
!store
.is_current_rrdp_source_member(notify_uri, &withdrawn_b.rsync_uri)
.expect("withdrawn b")
);
assert!(
!store
.is_current_rrdp_source_member(notify_uri, &other_source.rsync_uri)
.expect("other source")
);
}
#[test]
fn load_current_object_bytes_by_uri_uses_repository_view_and_raw_by_hash() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let present_bytes = b"present-object".to_vec();
let present_hash = sha256_hex(&present_bytes);
let mut present_raw = RawByHashEntry::from_bytes(present_hash.clone(), present_bytes.clone());
present_raw.origin_uris.push("rsync://example.test/repo/present.roa".to_string());
present_raw.object_type = Some("roa".to_string());
store
.put_raw_by_hash_entry(&present_raw)
.expect("put present raw");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: "rsync://example.test/repo/present.roa".to_string(),
current_hash: Some(present_hash),
repository_source: Some("https://rrdp.example.test/notification.xml".to_string()),
object_type: Some("roa".to_string()),
state: RepositoryViewState::Present,
})
.expect("put present view");
let replaced_bytes = b"replaced-object".to_vec();
let replaced_hash = sha256_hex(&replaced_bytes);
let mut replaced_raw =
RawByHashEntry::from_bytes(replaced_hash.clone(), replaced_bytes.clone());
replaced_raw.origin_uris.push("rsync://example.test/repo/replaced.cer".to_string());
replaced_raw.object_type = Some("cer".to_string());
store
.put_raw_by_hash_entry(&replaced_raw)
.expect("put replaced raw");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: "rsync://example.test/repo/replaced.cer".to_string(),
current_hash: Some(replaced_hash),
repository_source: Some("https://rrdp.example.test/notification.xml".to_string()),
object_type: Some("cer".to_string()),
state: RepositoryViewState::Replaced,
})
.expect("put replaced view");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: "rsync://example.test/repo/withdrawn.crl".to_string(),
current_hash: Some(sha256_hex(b"withdrawn")),
repository_source: Some("https://rrdp.example.test/notification.xml".to_string()),
object_type: Some("crl".to_string()),
state: RepositoryViewState::Withdrawn,
})
.expect("put withdrawn view");
assert_eq!(
store
.load_current_object_bytes_by_uri("rsync://example.test/repo/present.roa")
.expect("load present"),
Some(present_bytes)
);
assert_eq!(
store
.load_current_object_bytes_by_uri("rsync://example.test/repo/replaced.cer")
.expect("load replaced"),
Some(replaced_bytes)
);
assert_eq!(
store
.load_current_object_bytes_by_uri("rsync://example.test/repo/withdrawn.crl")
.expect("load withdrawn"),
None
);
assert_eq!(
store
.load_current_object_bytes_by_uri("rsync://example.test/repo/missing.roa")
.expect("load missing"),
None
);
}
#[test]
fn load_current_object_bytes_by_uri_errors_when_raw_by_hash_is_missing() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let rsync_uri = "rsync://example.test/repo/missing.cer";
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(hex::encode([0x11; 32])),
repository_source: Some("https://rrdp.example.test/notification.xml".to_string()),
object_type: Some("cer".to_string()),
state: RepositoryViewState::Present,
})
.expect("put view");
let err = store
.load_current_object_bytes_by_uri(rsync_uri)
.expect_err("missing raw_by_hash should error");
assert!(matches!(err, StorageError::InvalidData { .. }));
}
}

View File

@ -8,7 +8,7 @@ use crate::replay::delta_archive::{ReplayDeltaArchiveIndex, ReplayDeltaRrdpKind}
use crate::report::{RfcRef, Warning};
use crate::storage::RocksStore;
use crate::sync::rrdp::sync_from_notification_with_timing_and_download_log;
use crate::sync::rrdp::{Fetcher as HttpFetcher, RrdpState, RrdpSyncError};
use crate::sync::rrdp::{Fetcher as HttpFetcher, RrdpSyncError, load_rrdp_local_state};
use crate::sync::store_projection::{
build_repository_view_present_entry,
build_repository_view_withdrawn_entry,
@ -18,6 +18,11 @@ use std::collections::HashSet;
use std::thread;
use std::time::Duration;
#[cfg(test)]
use crate::storage::RrdpSourceSyncState;
#[cfg(test)]
use crate::sync::rrdp::persist_rrdp_local_state;
const RRDP_RETRY_BACKOFFS_PROD: [Duration; 3] = [
Duration::from_millis(200),
Duration::from_millis(500),
@ -54,7 +59,7 @@ pub enum RepoSyncError {
Storage(String),
}
/// Sync a publication point into `raw_objects`.
/// Sync a publication point into the current repository view.
///
/// v1 behavior:
/// - If `rrdp_notification_uri` is present and `policy.sync_preference` is `rrdp_then_rsync`,
@ -100,7 +105,7 @@ pub fn sync_publication_point(
.with_rfc_refs(&[RfcRef("RFC 8182 §3.4.5")])
.with_context(notification_uri),
];
let written = rsync_sync_into_raw_objects(
let written = rsync_sync_into_current_store(
store,
rsync_base_uri,
rsync_fetcher,
@ -120,7 +125,7 @@ pub fn sync_publication_point(
}
}
_ => {
let written = rsync_sync_into_raw_objects(
let written = rsync_sync_into_current_store(
store,
rsync_base_uri,
rsync_fetcher,
@ -170,7 +175,7 @@ pub fn sync_publication_point_replay(
})
}
ReplayResolvedTransport::Rsync => {
let written = rsync_sync_into_raw_objects(
let written = rsync_sync_into_current_store(
store,
rsync_base_uri,
rsync_fetcher,
@ -221,7 +226,7 @@ pub fn sync_publication_point_replay_delta(
})
}
ReplayDeltaResolvedTransport::Rsync => {
let written = rsync_sync_into_raw_objects(
let written = rsync_sync_into_current_store(
store,
rsync_base_uri,
rsync_fetcher,
@ -325,22 +330,17 @@ fn validate_delta_replay_base_state_for_repo(
) -> Result<(), RepoSyncError> {
match base.transport {
ReplayTransport::Rrdp => {
let bytes = store
.get_rrdp_state(notification_uri)
.map_err(|e| RepoSyncError::Storage(e.to_string()))?
let state = load_rrdp_local_state(store, notification_uri)
.map_err(RepoSyncError::Storage)?
.ok_or_else(|| {
RepoSyncError::Replay(format!(
"delta replay base state missing for {notification_uri}: expected RRDP session={} serial={}"
,
"delta replay base state missing for {notification_uri}: expected RRDP session={} serial={}",
base.session.as_deref().unwrap_or("<none>"),
base.serial.map(|v| v.to_string()).unwrap_or_else(|| "<none>".to_string())
base.serial
.map(|v| v.to_string())
.unwrap_or_else(|| "<none>".to_string())
))
})?;
let state = RrdpState::decode(&bytes).map_err(|e| {
RepoSyncError::Replay(format!(
"decode base RRDP state failed for {notification_uri}: {e}"
))
})?;
let expected_session = base.session.as_deref().unwrap_or("");
let expected_serial = base.serial.unwrap_or_default();
if state.session_id != expected_session || state.serial != expected_serial {
@ -496,7 +496,7 @@ fn try_rrdp_sync_with_retry(
}
}
fn rsync_sync_into_raw_objects(
fn rsync_sync_into_current_store(
store: &RocksStore,
rsync_base_uri: &str,
rsync_fetcher: &dyn RsyncFetcher,
@ -538,17 +538,9 @@ fn rsync_sync_into_raw_objects(
.map_err(|e| RepoSyncError::Storage(e.to_string()))?;
let new_set: HashSet<&str> = objects.iter().map(|(uri, _)| uri.as_str()).collect();
let _w = timing
.as_ref()
.map(|t| t.span_phase("rsync_write_raw_objects_total"));
store
.put_raw_batch(objects.clone())
.map_err(|e| RepoSyncError::Storage(e.to_string()))?;
drop(_w);
let _proj = timing
.as_ref()
.map(|t| t.span_phase("rsync_write_repository_view_total"));
.map(|t| t.span_phase("rsync_write_current_store_total"));
let prepared_raw = prepare_raw_by_hash_evidence_batch(store, &objects)
.map_err(RepoSyncError::Storage)?;
let mut repository_view_entries = Vec::new();
@ -635,6 +627,15 @@ mod tests {
}
}
fn assert_current_object(store: &RocksStore, uri: &str, expected: &[u8]) {
assert_eq!(
store
.load_current_object_bytes_by_uri(uri)
.expect("load current object"),
Some(expected.to_vec())
);
}
fn notification_xml(
session_id: &str,
serial: u64,
@ -952,7 +953,7 @@ mod tests {
}
#[test]
fn rsync_sync_writes_raw_objects_with_batch_and_records_counts() {
fn rsync_sync_writes_current_store_and_records_counts() {
let temp = tempfile::tempdir().expect("tempdir");
let repo_dir = temp.path().join("repo");
@ -1003,22 +1004,9 @@ mod tests {
assert_eq!(objects.objects_count, 3);
assert_eq!(objects.objects_bytes_total, 9);
assert_eq!(
store.get_raw("rsync://example.test/repo/a.mft").unwrap(),
Some(b"mft".to_vec())
);
assert_eq!(
store
.get_raw("rsync://example.test/repo/sub/b.roa")
.unwrap(),
Some(b"roa".to_vec())
);
assert_eq!(
store
.get_raw("rsync://example.test/repo/sub/c.cer")
.unwrap(),
Some(b"cer".to_vec())
);
assert_current_object(&store, "rsync://example.test/repo/a.mft", b"mft");
assert_current_object(&store, "rsync://example.test/repo/sub/b.roa", b"roa");
assert_current_object(&store, "rsync://example.test/repo/sub/c.cer", b"cer");
let view = store
.get_repository_view_entry("rsync://example.test/repo/a.mft")
@ -1208,10 +1196,7 @@ mod tests {
.expect("sync ok");
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(
store.get_raw(published_uri).unwrap(),
Some(published_bytes.to_vec())
);
assert_current_object(&store, published_uri, published_bytes);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 4, "expected 3x notification + 1x snapshot");
@ -1400,10 +1385,15 @@ mod tests {
session_id: sid.to_string(),
serial: 1,
};
let state_bytes = state.encode().expect("encode state");
store
.put_rrdp_state(notification_uri, &state_bytes)
.expect("seed state");
persist_rrdp_local_state(
&store,
notification_uri,
&state,
RrdpSourceSyncState::DeltaReady,
Some(snapshot_uri),
None,
)
.expect("seed state");
let delta_2 = format!(
r#"<delta xmlns="http://www.ripe.net/rpki/rrdp" version="1" session_id="{sid}" serial="2"></delta>"#
@ -1450,10 +1440,7 @@ mod tests {
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 1);
assert_eq!(
store.get_raw(published_uri).unwrap(),
Some(published_bytes.to_vec())
);
assert_current_object(&store, published_uri, published_bytes);
let events = download_log.snapshot_events();
assert_eq!(events.len(), 4);
@ -1517,10 +1504,7 @@ mod tests {
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 1);
assert_eq!(
store.get_raw(&published_uri).unwrap(),
Some(b"mft".to_vec())
);
assert_current_object(&store, &published_uri, b"mft");
}
#[test]
@ -1560,12 +1544,7 @@ mod tests {
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 1);
assert_eq!(out.warnings.len(), 0);
assert_eq!(
store
.get_raw("rsync://rsync.example.test/repo/sub/fallback.cer")
.unwrap(),
Some(b"cer".to_vec())
);
assert_current_object(&store, "rsync://rsync.example.test/repo/sub/fallback.cer", b"cer");
}
#[test]
@ -1633,9 +1612,15 @@ mod tests {
session_id: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(),
serial: 10,
};
store
.put_rrdp_state(&notify_uri, &state.encode().unwrap())
.expect("seed base state");
persist_rrdp_local_state(
&store,
&notify_uri,
&state,
RrdpSourceSyncState::DeltaReady,
None,
None,
)
.expect("seed base state");
let out = sync_publication_point_replay_delta(
&store,
@ -1651,21 +1636,11 @@ mod tests {
assert_eq!(out.source, RepoSyncSource::Rrdp);
assert_eq!(out.objects_written, 2);
assert_eq!(
store.get_raw("rsync://example.test/repo/a.mft").unwrap(),
Some(b"delta-a".to_vec())
);
assert_eq!(
store
.get_raw("rsync://example.test/repo/sub/b.roa")
.unwrap(),
Some(b"delta-b".to_vec())
);
let state_bytes = store
.get_rrdp_state(&notify_uri)
.unwrap()
assert_current_object(&store, "rsync://example.test/repo/a.mft", b"delta-a");
assert_current_object(&store, "rsync://example.test/repo/sub/b.roa", b"delta-b");
let new_state = load_rrdp_local_state(&store, &notify_uri)
.expect("load current state")
.expect("rrdp state present");
let new_state = RrdpState::decode(&state_bytes).expect("decode state");
assert_eq!(new_state.serial, 12);
}
@ -1727,9 +1702,15 @@ mod tests {
session_id: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(),
serial: 10,
};
store
.put_rrdp_state(&notify_uri, &state.encode().unwrap())
.expect("seed base state");
persist_rrdp_local_state(
&store,
&notify_uri,
&state,
RrdpSourceSyncState::DeltaReady,
None,
None,
)
.expect("seed base state");
let base_locks_body = std::fs::read_to_string(&base_locks).expect("read base locks");
let base_locks_sha = sha256_hex(base_locks_body.as_bytes());
@ -1808,18 +1789,8 @@ mod tests {
.expect("fallback-rsync delta sync ok");
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 2);
assert_eq!(
store
.get_raw("rsync://rsync.example.test/repo/a.mft")
.unwrap(),
Some(b"base".to_vec())
);
assert_eq!(
store
.get_raw("rsync://rsync.example.test/repo/sub/x.cer")
.unwrap(),
Some(b"overlay-cer".to_vec())
);
assert_current_object(&store, "rsync://rsync.example.test/repo/a.mft", b"base");
assert_current_object(&store, "rsync://rsync.example.test/repo/sub/x.cer", b"overlay-cer");
}
#[test]
@ -1845,9 +1816,15 @@ mod tests {
session_id: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa".to_string(),
serial: 10,
};
store
.put_rrdp_state(&notify_uri, &state.encode().unwrap())
.expect("seed base state");
persist_rrdp_local_state(
&store,
&notify_uri,
&state,
RrdpSourceSyncState::DeltaReady,
None,
None,
)
.expect("seed base state");
let locks_body = std::fs::read_to_string(&delta_locks).expect("read delta locks");
let rewritten =

View File

@ -210,6 +210,41 @@ impl RrdpState {
}
}
pub(crate) fn load_rrdp_local_state(
store: &RocksStore,
notification_uri: &str,
) -> Result<Option<RrdpState>, String> {
if let Some(record) = store
.get_rrdp_source_record(notification_uri)
.map_err(|e| e.to_string())?
{
if let (Some(session_id), Some(serial)) = (record.last_session_id, record.last_serial) {
return Ok(Some(RrdpState { session_id, serial }));
}
}
Ok(None)
}
pub(crate) fn persist_rrdp_local_state(
store: &RocksStore,
notification_uri: &str,
state: &RrdpState,
sync_state: RrdpSourceSyncState,
last_snapshot_uri: Option<&str>,
last_snapshot_hash_hex: Option<&str>,
) -> Result<(), String> {
update_rrdp_source_record_on_success(
store,
notification_uri,
state.session_id.as_str(),
state.serial,
sync_state,
last_snapshot_uri,
last_snapshot_hash_hex,
)
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NotificationSnapshot {
pub session_id: Uuid,
@ -582,15 +617,10 @@ fn sync_from_notification_snapshot_inner(
session_id: notif.session_id.to_string(),
serial: notif.serial,
};
let bytes = state.encode().map_err(RrdpSyncError::Storage)?;
store
.put_rrdp_state(notification_uri, &bytes)
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
update_rrdp_source_record_on_success(
persist_rrdp_local_state(
store,
notification_uri,
notif.session_id.to_string().as_str(),
notif.serial,
&state,
RrdpSourceSyncState::SnapshotOnly,
Some(&notif.snapshot_uri),
Some(&hex::encode(notif.snapshot_hash_sha256)),
@ -683,10 +713,7 @@ fn sync_from_notification_inner(
let _read_state_total = timing
.as_ref()
.map(|t| t.span_phase("rrdp_read_state_total"));
let state = store
.get_rrdp_state(notification_uri)
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?
.and_then(|bytes| RrdpState::decode(&bytes).ok());
let state = load_rrdp_local_state(store, notification_uri).map_err(RrdpSyncError::Storage)?;
drop(_read_state_step);
drop(_read_state_total);
@ -811,15 +838,10 @@ fn sync_from_notification_inner(
session_id: notif.session_id.to_string(),
serial: notif.serial,
};
let bytes = new_state.encode().map_err(RrdpSyncError::Storage)?;
store
.put_rrdp_state(notification_uri, &bytes)
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
update_rrdp_source_record_on_success(
persist_rrdp_local_state(
store,
notification_uri,
notif.session_id.to_string().as_str(),
notif.serial,
&new_state,
RrdpSourceSyncState::DeltaReady,
Some(&notif.snapshot_uri),
Some(&hex::encode(notif.snapshot_hash_sha256)),
@ -913,15 +935,10 @@ fn sync_from_notification_inner(
session_id: notif.session_id.to_string(),
serial: notif.serial,
};
let bytes = new_state.encode().map_err(RrdpSyncError::Storage)?;
store
.put_rrdp_state(notification_uri, &bytes)
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
update_rrdp_source_record_on_success(
persist_rrdp_local_state(
store,
notification_uri,
notif.session_id.to_string().as_str(),
notif.serial,
&new_state,
RrdpSourceSyncState::SnapshotOnly,
Some(&notif.snapshot_uri),
Some(&hex::encode(notif.snapshot_hash_sha256)),
@ -984,7 +1001,7 @@ fn apply_delta(
bytes,
} => {
let is_member = store
.rrdp_object_index_contains(notification_uri, uri.as_str())
.is_current_rrdp_source_member(notification_uri, uri.as_str())
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
if !is_member {
return Err(RrdpError::DeltaTargetNotFromRepository { rsync_uri: uri }.into());
@ -992,7 +1009,7 @@ fn apply_delta(
ensure_rrdp_uri_can_be_owned_by(store, notification_uri, uri.as_str())
.map_err(RrdpSyncError::Storage)?;
let old_bytes = store
.get_raw(uri.as_str())
.load_current_object_bytes_by_uri(uri.as_str())
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?
.ok_or_else(|| RrdpError::DeltaTargetMissing {
rsync_uri: uri.clone(),
@ -1017,7 +1034,7 @@ fn apply_delta(
bytes,
} => {
let is_member = store
.rrdp_object_index_contains(notification_uri, uri.as_str())
.is_current_rrdp_source_member(notification_uri, uri.as_str())
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
if is_member {
return Err(
@ -1037,7 +1054,7 @@ fn apply_delta(
}
DeltaElement::Withdraw { uri, hash_sha256 } => {
let is_member = store
.rrdp_object_index_contains(notification_uri, uri.as_str())
.is_current_rrdp_source_member(notification_uri, uri.as_str())
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
if !is_member {
return Err(RrdpError::DeltaTargetNotFromRepository { rsync_uri: uri }.into());
@ -1045,7 +1062,7 @@ fn apply_delta(
ensure_rrdp_uri_can_be_owned_by(store, notification_uri, uri.as_str())
.map_err(RrdpSyncError::Storage)?;
let old_bytes = store
.get_raw(uri.as_str())
.load_current_object_bytes_by_uri(uri.as_str())
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?
.ok_or_else(|| RrdpError::DeltaTargetMissing {
rsync_uri: uri.clone(),
@ -1066,10 +1083,6 @@ fn apply_delta(
}
}
store
.apply_rrdp_delta(notification_uri, ops.as_slice())
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
for effect in projection {
match effect {
DeltaProjectionEffect::Upsert { rsync_uri, bytes } => {
@ -1188,8 +1201,10 @@ fn apply_snapshot(
}
let previous_members: Vec<String> = store
.rrdp_object_index_iter(notification_uri)
.list_current_rrdp_source_members(notification_uri)
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?
.into_iter()
.map(|record| record.rsync_uri)
.collect();
let new_set: std::collections::HashSet<&str> =
published.iter().map(|(uri, _)| uri.as_str()).collect();
@ -1204,7 +1219,7 @@ fn apply_snapshot(
.and_then(|entry| entry.current_hash)
.or_else(|| {
store
.get_raw(old_uri)
.load_current_object_bytes_by_uri(old_uri)
.ok()
.flatten()
.map(|bytes| compute_sha256_hex(&bytes))
@ -1212,10 +1227,6 @@ fn apply_snapshot(
withdrawn.push((old_uri.clone(), previous_hash));
}
store
.apply_rrdp_snapshot(notification_uri, published.as_slice())
.map_err(|e| RrdpSyncError::Storage(e.to_string()))?;
let session_id = expected_session_id.to_string();
let prepared_raw = prepare_raw_by_hash_evidence_batch(store, &published)
.map_err(RrdpSyncError::Storage)?;
@ -1404,6 +1415,15 @@ mod tests {
}
}
fn assert_current_object(store: &RocksStore, uri: &str, expected: &[u8]) {
assert_eq!(
store
.load_current_object_bytes_by_uri(uri)
.expect("load current object"),
Some(expected.to_vec())
);
}
fn notification_xml(
session_id: &str,
serial: u64,
@ -1682,8 +1702,8 @@ mod tests {
.expect("sync snapshot");
let old_b = store
.get_raw("rsync://example.net/repo/b.roa")
.expect("get_raw")
.load_current_object_bytes_by_uri("rsync://example.net/repo/b.roa")
.expect("load current b")
.expect("b present");
let old_b_hash = hex::encode(sha2::Sha256::digest(old_b.as_slice()));
@ -1721,34 +1741,34 @@ mod tests {
.expect("apply delta");
assert_eq!(applied, 3);
assert!(
assert_eq!(
store
.get_raw("rsync://example.net/repo/a.mft")
.expect("get_raw")
.is_none(),
.load_current_object_bytes_by_uri("rsync://example.net/repo/a.mft")
.expect("load current a"),
None,
"a withdrawn"
);
let b = store
.get_raw("rsync://example.net/repo/b.roa")
.expect("get_raw")
.load_current_object_bytes_by_uri("rsync://example.net/repo/b.roa")
.expect("load current b")
.expect("b present");
assert_eq!(b, b"b2");
let c = store
.get_raw("rsync://example.net/repo/c.crl")
.expect("get_raw")
.load_current_object_bytes_by_uri("rsync://example.net/repo/c.crl")
.expect("load current c")
.expect("c present");
assert_eq!(c, b"c2");
assert!(
!store
.rrdp_object_index_contains(notif_uri, "rsync://example.net/repo/a.mft")
.expect("contains"),
.is_current_rrdp_source_member(notif_uri, "rsync://example.net/repo/a.mft")
.expect("is member"),
"a removed from rrdp repo index"
);
assert!(
store
.rrdp_object_index_contains(notif_uri, "rsync://example.net/repo/c.crl")
.expect("contains"),
.is_current_rrdp_source_member(notif_uri, "rsync://example.net/repo/c.crl")
.expect("is member"),
"c added to rrdp repo index"
);
@ -1779,6 +1799,19 @@ mod tests {
.expect("get a member")
.expect("a member exists");
assert!(!a_member.present);
let current_members = store
.list_current_rrdp_source_members(notif_uri)
.expect("list current members");
assert_eq!(
current_members
.iter()
.map(|record| record.rsync_uri.as_str())
.collect::<Vec<_>>(),
vec![
"rsync://example.net/repo/b.roa",
"rsync://example.net/repo/c.crl",
]
);
}
#[test]
@ -1893,7 +1926,7 @@ mod tests {
sync_from_notification_snapshot(&store, notif_uri, &notif, &fetcher).expect("seed");
let old_bytes = store
.get_raw("rsync://example.net/repo/a.mft")
.load_current_object_bytes_by_uri("rsync://example.net/repo/a.mft")
.expect("get")
.expect("present");
let old_hash = hex::encode(sha2::Sha256::digest(old_bytes.as_slice()));
@ -1926,8 +1959,8 @@ mod tests {
// Target missing in local cache (index still says it's a member).
store
.delete_raw("rsync://example.net/repo/a.mft")
.expect("delete");
.delete_repository_view_entry("rsync://example.net/repo/a.mft")
.expect("delete current repository view entry");
let delta = delta_xml(
sid,
2,
@ -2061,23 +2094,12 @@ mod tests {
sync_from_notification_snapshot(&store, notif_uri, &notif, &fetcher).expect("sync");
assert_eq!(published, 2);
let a = store
.get_raw("rsync://example.net/repo/a.mft")
.expect("get_raw")
.expect("a present");
assert_eq!(a, b"mft-bytes");
assert_current_object(&store, "rsync://example.net/repo/a.mft", b"mft-bytes");
assert_current_object(&store, "rsync://example.net/repo/b.roa", b"roa-bytes");
let b = store
.get_raw("rsync://example.net/repo/b.roa")
.expect("get_raw")
.expect("b present");
assert_eq!(b, b"roa-bytes");
let state_bytes = store
.get_rrdp_state(notif_uri)
.expect("get_rrdp_state")
let state = load_rrdp_local_state(&store, notif_uri)
.expect("get rrdp state")
.expect("state present");
let state = RrdpState::decode(&state_bytes).expect("decode state");
assert_eq!(state.session_id, sid);
assert_eq!(state.serial, serial);
@ -2166,25 +2188,16 @@ mod tests {
};
sync_from_notification_snapshot(&store, notif_uri, &notif_2, &fetcher_2).expect("sync 2");
let a = store
.get_raw("rsync://example.net/repo/a.mft")
.expect("get_raw");
assert!(
a.is_none(),
store
.load_current_object_bytes_by_uri("rsync://example.net/repo/a.mft")
.expect("get current a")
.is_none(),
"a should be deleted by full-state snapshot apply"
);
let b = store
.get_raw("rsync://example.net/repo/b.roa")
.expect("get_raw")
.expect("b present");
assert_eq!(b, b"b2");
let c = store
.get_raw("rsync://example.net/repo/c.crl")
.expect("get_raw")
.expect("c present");
assert_eq!(c, b"c2");
assert_current_object(&store, "rsync://example.net/repo/b.roa", b"b2");
assert_current_object(&store, "rsync://example.net/repo/c.crl", b"c2");
}
#[test]
@ -2271,20 +2284,51 @@ mod tests {
// Delta 2 publishes c then delta 3 withdraws it => final state should not contain c.
assert!(
store
.get_raw("rsync://example.net/repo/c.crl")
.expect("get_raw")
.load_current_object_bytes_by_uri("rsync://example.net/repo/c.crl")
.expect("get current")
.is_none()
);
let state_bytes = store
.get_rrdp_state(notif_uri)
.expect("get_rrdp_state")
let state = load_rrdp_local_state(&store, notif_uri)
.expect("get rrdp state")
.expect("state present");
let state = RrdpState::decode(&state_bytes).expect("decode");
assert_eq!(state.session_id, Uuid::parse_str(sid).unwrap().to_string());
assert_eq!(state.serial, 3);
}
#[test]
fn load_rrdp_local_state_uses_source_record_only() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(tmp.path()).expect("open rocksdb");
let notif_uri = "https://example.net/notification.xml";
assert_eq!(
load_rrdp_local_state(&store, notif_uri).expect("load empty"),
None
);
update_rrdp_source_record_on_success(
&store,
notif_uri,
"source-session",
9,
crate::storage::RrdpSourceSyncState::DeltaReady,
Some("https://example.net/snapshot.xml"),
Some(&hex::encode([0x11; 32])),
)
.expect("write source record");
let got = load_rrdp_local_state(&store, notif_uri)
.expect("load source preferred")
.expect("source present");
assert_eq!(
got,
RrdpState {
session_id: "source-session".to_string(),
serial: 9,
}
);
}
#[test]
fn sync_from_notification_falls_back_to_snapshot_if_missing_required_deltas() {
let tmp = tempfile::tempdir().expect("tempdir");
@ -2331,8 +2375,8 @@ mod tests {
assert_eq!(published, 1);
assert!(
store
.get_raw("rsync://example.net/repo/z.roa")
.expect("get_raw")
.load_current_object_bytes_by_uri("rsync://example.net/repo/z.roa")
.expect("get current")
.is_some()
);
}

View File

@ -136,7 +136,7 @@ pub enum ManifestFreshError {
RepoSyncFailed { detail: String },
#[error(
"manifest not found in raw_objects: {manifest_rsync_uri} (RFC 9286 §6.2; RFC 9286 §6.6)"
"manifest not found in current repository view: {manifest_rsync_uri} (RFC 9286 §6.2; RFC 9286 §6.6)"
)]
MissingManifest { manifest_rsync_uri: String },
@ -204,7 +204,7 @@ pub enum ManifestFreshError {
},
#[error(
"manifest referenced file missing in raw_objects: {rsync_uri} (RFC 9286 §6.4; RFC 9286 §6.6)"
"manifest referenced file missing in current repository view: {rsync_uri} (RFC 9286 §6.4; RFC 9286 §6.6)"
)]
MissingFile { rsync_uri: String },
@ -548,7 +548,7 @@ pub(crate) fn try_build_fresh_publication_point(
}
let manifest_bytes = store
.get_raw(manifest_rsync_uri)
.load_current_object_bytes_by_uri(manifest_rsync_uri)
.map_err(|e| ManifestFreshError::MissingManifest {
manifest_rsync_uri: format!("{manifest_rsync_uri} ({e})"),
})?
@ -642,7 +642,7 @@ pub(crate) fn try_build_fresh_publication_point(
let rsync_uri =
join_rsync_dir_and_file(publication_point_rsync_uri, entry.file_name.as_str());
let bytes = store
.get_raw(&rsync_uri)
.load_current_object_bytes_by_uri(&rsync_uri)
.map_err(|_e| ManifestFreshError::MissingFile {
rsync_uri: rsync_uri.clone(),
})?
@ -836,6 +836,27 @@ mod tests {
entry
}
fn put_current_object(
store: &RocksStore,
rsync_uri: &str,
bytes: Vec<u8>,
object_type: &str,
) {
let hash = hex::encode(sha2::Sha256::digest(&bytes));
store
.put_raw_by_hash_entry(&raw_by_hash_entry(rsync_uri, bytes, object_type))
.expect("put raw_by_hash entry");
store
.put_repository_view_entry(&crate::storage::RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(hash),
repository_source: Some("https://example.test/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: crate::storage::RepositoryViewState::Present,
})
.expect("put repository view entry");
}
fn sample_current_instance_vcir(
manifest_rsync_uri: &str,
publication_point_rsync_uri: &str,
@ -985,9 +1006,7 @@ mod tests {
publication_point_rsync_uri,
validation_time,
) = load_manifest_fixture();
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes, "mft");
let first_non_crl = manifest
.manifest
.parse_files()
@ -1001,9 +1020,8 @@ mod tests {
.join(first_non_crl.file_name.as_str());
let bytes = std::fs::read(&file_path).expect("read fixture file");
let rsync_uri = format!("{publication_point_rsync_uri}{}", first_non_crl.file_name);
store
.put_raw(&rsync_uri, &bytes)
.expect("store single file");
let object_type = rsync_uri.rsplit('.').next().unwrap_or("bin");
put_current_object(&store, &rsync_uri, bytes, object_type);
let err = try_build_fresh_publication_point(
&store,

View File

@ -64,7 +64,7 @@ pub struct Rpkiv1PublicationPointRunner<'a> {
/// In-run RRDP dedup: when RRDP is enabled, only sync each `rrdp_notification_uri` once per run.
///
/// - If RRDP succeeded for a repo, later publication points referencing that same RRDP repo
/// skip network fetches and reuse the already-populated `raw_objects`.
/// skip network fetches and reuse the already-populated current repository view.
/// - If RRDP failed for a repo, later publication points skip RRDP attempts and go straight
/// to rsync for their own `rsync_base_uri` (still per-publication-point).
pub rrdp_dedup: bool,

View File

@ -7,7 +7,9 @@ use rpki::data_model::rc::{
use rpki::data_model::roa::RoaObject;
use rpki::storage::PackFile;
use rpki::storage::RawByHashEntry;
use rpki::storage::RocksStore;
use sha2::Digest;
use std::path::{Path, PathBuf};
use std::time::Instant;
@ -304,7 +306,12 @@ fn landing_packfile_cbor_put(store: &RocksStore, obj_type: ObjType, sample: &str
let pf = PackFile::from_bytes_compute_sha256(rsync_uri, bytes.to_vec());
let encoded = serde_cbor::to_vec(std::hint::black_box(&pf)).expect("cbor encode packfile");
let key = format!("bench:packfile:{}:{}", obj_type.as_str(), sample);
store.put_raw(&key, &encoded).expect("store put_raw");
let sha256_hex = hex::encode(sha2::Sha256::digest(&encoded));
let mut entry = RawByHashEntry::from_bytes(sha256_hex, encoded);
entry.origin_uris.push(key);
entry.object_type = Some("cbor".to_string());
entry.encoding = Some("cbor".to_string());
store.put_raw_by_hash_entry(&entry).expect("store raw_by_hash");
}
#[derive(Clone, Debug, serde::Serialize)]
@ -560,7 +567,7 @@ fn stage2_decode_validate_and_landing_benchmark_selected_der_v2() {
}
if mode.do_landing() {
// Landing benchmark: PackFile(from_bytes_compute_sha256) + CBOR + RocksDB put_raw.
// Landing benchmark: PackFile(from_bytes_compute_sha256) + CBOR + RocksDB current-state landing.
let mut per_round_ns_per_op = Vec::with_capacity(rounds as usize);
for _ in 0..warmup_iters {
landing_packfile_cbor_put(&store, s.obj_type, &s.name, bytes.as_slice());
@ -617,7 +624,7 @@ fn stage2_decode_validate_and_landing_benchmark_selected_der_v2() {
println!();
}
if mode.do_landing() {
println!("## landing (PackFile::from_bytes_compute_sha256 + CBOR + RocksDB put_raw)");
println!("## landing (PackFile::from_bytes_compute_sha256 + CBOR + RocksDB current-state landing)");
println!();
println!("| type | sample | size_bytes | complexity | avg ns/op | ops/s |");
println!("|---|---|---:|---:|---:|---:|");
@ -693,7 +700,7 @@ fn stage2_decode_validate_and_landing_benchmark_selected_der_v2() {
if let Some(path) = out_md_landing {
let md = render_markdown(
"Stage2 landing benchmark (PackFile CBOR + RocksDB put_raw)",
"Stage2 landing benchmark (PackFile CBOR + RocksDB current-state landing)",
&landing_rows,
);
write_text_file(&path, &md);

View File

@ -168,18 +168,17 @@ fn apnic_live_delta_only_from_persistent_db() {
.as_deref()
.expect("APNIC root must have rrdpNotification");
let state_bytes = store
.get_rrdp_state(rrdp_notification_uri)
.expect("get rrdp_state")
let source = store
.get_rrdp_source_record(rrdp_notification_uri)
.expect("get rrdp source")
.unwrap_or_else(|| {
panic!(
"missing rrdp_state for APNIC notification URI; run bootstrap test first. db_dir={}",
"missing RRDP source state for APNIC notification URI; run bootstrap test first. db_dir={}",
db_dir.display()
)
});
let state = rpki::sync::rrdp::RrdpState::decode(&state_bytes).expect("decode rrdp_state");
let old_serial = state.serial;
let old_session = state.session_id;
let old_serial = source.last_serial.expect("last serial");
let old_session = source.last_session_id.expect("last session");
let max_wait_secs: u64 = std::env::var("RPKI_LIVE_MAX_WAIT_SECS")
.ok()

View File

@ -212,11 +212,12 @@ fn apnic_tree_full_stats_serial() {
let mut raw_total = 0usize;
let mut raw_by_ext: BTreeMap<String, usize> = BTreeMap::new();
for (k, _v) in store.raw_iter_all().expect("raw_iter_all") {
for entry in store
.list_repository_view_entries_with_prefix("rsync://")
.expect("list repository view")
{
raw_total += 1;
if let Ok(uri) = std::str::from_utf8(&k) {
*raw_by_ext.entry(ext_of_uri(uri)).or_insert(0) += 1;
}
*raw_by_ext.entry(ext_of_uri(&entry.rsync_uri)).or_insert(0) += 1;
}
println!("APNIC Stage2 full-tree serial stats");
@ -270,7 +271,7 @@ fn apnic_tree_full_stats_serial() {
);
println!();
println!(
"rocksdb_raw_objects_total={} raw_by_ext={:?}",
"current_repository_view_total={} by_ext={:?}",
raw_total, raw_by_ext
);

View File

@ -5,7 +5,8 @@ use sha2::Digest;
use rpki::data_model::manifest::ManifestObject;
use rpki::policy::{CaFailedFetchPolicy, Policy};
use rpki::storage::{
PackTime, RawByHashEntry, RocksStore, ValidatedCaInstanceResult, ValidatedManifestMeta,
PackTime, RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore,
ValidatedCaInstanceResult, ValidatedManifestMeta,
VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary,
VcirInstanceGate, VcirRelatedArtifact, VcirSummary,
};
@ -115,6 +116,26 @@ fn store_validated_manifest_baseline(
.expect("store validated manifest baseline");
}
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: Vec<u8>, object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(&bytes));
let mut raw_entry = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes);
raw_entry.origin_uris.push(rsync_uri.to_string());
raw_entry.object_type = Some(object_type.to_string());
raw_entry.encoding = Some("der".to_string());
store
.put_raw_by_hash_entry(&raw_entry)
.expect("store raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.test/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("store repository view");
}
#[test]
fn manifest_success_returns_validated_publication_point_data() {
let manifest_path = Path::new(
@ -131,9 +152,7 @@ fn manifest_success_returns_validated_publication_point_data() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let entries = manifest
.manifest
.parse_files()
@ -146,7 +165,7 @@ fn manifest_success_returns_validated_publication_point_data() {
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(&store, &rsync_uri, bytes, rsync_uri.rsplit('.').next().unwrap_or("bin"));
}
let policy = Policy::default();
@ -186,9 +205,7 @@ fn manifest_hash_mismatch_reuses_current_instance_vcir_when_enabled() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let entries = manifest
.manifest
.parse_files()
@ -201,7 +218,7 @@ fn manifest_hash_mismatch_reuses_current_instance_vcir_when_enabled() {
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(&store, &rsync_uri, bytes, rsync_uri.rsplit('.').next().unwrap_or("bin"));
}
let policy = Policy::default();
@ -234,11 +251,11 @@ fn manifest_hash_mismatch_reuses_current_instance_vcir_when_enabled() {
let victim = entries.first().expect("non-empty file list");
let victim_uri = format!("{publication_point_rsync_uri}{}", victim.file_name);
let mut tampered = store
.get_raw(&victim_uri)
.expect("get victim raw")
.load_current_object_bytes_by_uri(&victim_uri)
.expect("load victim raw")
.expect("victim raw exists");
tampered[0] ^= 0xFF;
store.put_raw(&victim_uri, &tampered).expect("tamper raw");
put_current_object(&store, &victim_uri, tampered, victim_uri.rsplit('.').next().unwrap_or("bin"));
let second = process_manifest_publication_point(
&store,
@ -274,9 +291,7 @@ fn manifest_failed_fetch_stop_all_output() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let entries = manifest
.manifest
.parse_files()
@ -289,7 +304,7 @@ fn manifest_failed_fetch_stop_all_output() {
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(&store, &rsync_uri, bytes, rsync_uri.rsplit('.').next().unwrap_or("bin"));
}
let mut policy = Policy::default();
@ -313,11 +328,11 @@ fn manifest_failed_fetch_stop_all_output() {
let victim = entries.first().expect("non-empty file list");
let victim_uri = format!("{publication_point_rsync_uri}{}", victim.file_name);
let mut tampered = store
.get_raw(&victim_uri)
.expect("get victim raw")
.load_current_object_bytes_by_uri(&victim_uri)
.expect("load victim raw")
.expect("victim raw exists");
tampered[0] ^= 0xFF;
store.put_raw(&victim_uri, &tampered).expect("tamper raw");
put_current_object(&store, &victim_uri, tampered, victim_uri.rsplit('.').next().unwrap_or("bin"));
policy.ca_failed_fetch_policy = CaFailedFetchPolicy::StopAllOutput;
let err = process_manifest_publication_point(
@ -350,9 +365,7 @@ fn manifest_failed_fetch_rejects_stale_current_instance_vcir() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let entries = manifest
.manifest
.parse_files()
@ -365,7 +378,7 @@ fn manifest_failed_fetch_rejects_stale_current_instance_vcir() {
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(&store, &rsync_uri, bytes, rsync_uri.rsplit('.').next().unwrap_or("bin"));
}
let policy = Policy::default();
@ -391,8 +404,16 @@ fn manifest_failed_fetch_rejects_stale_current_instance_vcir() {
);
store
.delete_raw(&manifest_rsync_uri)
.expect("delete manifest raw to force fallback");
.delete_raw_by_hash_entry(
store
.get_repository_view_entry(&manifest_rsync_uri)
.expect("get repository view")
.expect("manifest repository view exists")
.current_hash
.as_deref()
.expect("manifest current hash"),
)
.expect("delete manifest raw_by_hash to force missing current object");
let err = process_manifest_publication_point(
&store,
@ -425,9 +446,7 @@ fn manifest_revalidation_with_unchanged_manifest_is_fresh() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let entries = manifest
.manifest
.parse_files()
@ -440,7 +459,7 @@ fn manifest_revalidation_with_unchanged_manifest_is_fresh() {
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(&store, &rsync_uri, bytes, rsync_uri.rsplit('.').next().unwrap_or("bin"));
}
let policy = Policy::default();
@ -507,9 +526,7 @@ fn manifest_rollback_is_treated_as_failed_fetch_and_reuses_current_instance_vcir
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let entries = manifest
.manifest
.parse_files()
@ -522,7 +539,7 @@ fn manifest_rollback_is_treated_as_failed_fetch_and_reuses_current_instance_vcir
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(&store, &rsync_uri, bytes, rsync_uri.rsplit('.').next().unwrap_or("bin"));
}
let policy = Policy::default();

View File

@ -1,8 +1,10 @@
use std::path::Path;
use sha2::Digest;
use rpki::data_model::manifest::ManifestObject;
use rpki::policy::Policy;
use rpki::storage::RocksStore;
use rpki::storage::{RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore};
use rpki::validation::manifest::{
ManifestProcessError, PublicationPointSource, process_manifest_publication_point,
};
@ -40,6 +42,26 @@ fn fixture_dir_to_rsync_uri(dir: &Path) -> String {
s
}
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: Vec<u8>, object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(&bytes));
let mut raw_entry = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes);
raw_entry.origin_uris.push(rsync_uri.to_string());
raw_entry.object_type = Some(object_type.to_string());
raw_entry.encoding = Some("der".to_string());
store
.put_raw_by_hash_entry(&raw_entry)
.expect("store raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.test/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("store repository view");
}
#[test]
fn manifest_outside_publication_point_yields_no_usable_cache() {
let manifest_path = Path::new(
@ -56,9 +78,7 @@ fn manifest_outside_publication_point_yields_no_usable_cache() {
let store = RocksStore::open(temp.path()).expect("open rocksdb");
// Store manifest and its locked files so Fresh would otherwise succeed.
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
for entry in manifest
.manifest
.parse_files()
@ -71,7 +91,12 @@ fn manifest_outside_publication_point_yields_no_usable_cache() {
.join(entry.file_name.as_str());
let bytes = std::fs::read(&file_path).expect("read referenced file");
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(
&store,
&rsync_uri,
bytes,
rsync_uri.rsplit('.').next().unwrap_or("bin"),
);
}
let policy = Policy::default();
@ -112,9 +137,7 @@ fn manifest_outside_publication_point_detects_current_instance_snapshot_pp_misma
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
for entry in manifest
.manifest
.parse_files()
@ -127,7 +150,12 @@ fn manifest_outside_publication_point_detects_current_instance_snapshot_pp_misma
.join(entry.file_name.as_str());
let bytes = std::fs::read(&file_path).expect("read referenced file");
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(
&store,
&rsync_uri,
bytes,
rsync_uri.rsplit('.').next().unwrap_or("bin"),
);
}
let policy = Policy::default();

View File

@ -5,7 +5,8 @@ use sha2::Digest;
use rpki::data_model::manifest::ManifestObject;
use rpki::policy::{CaFailedFetchPolicy, Policy};
use rpki::storage::{
PackTime, RawByHashEntry, RocksStore, ValidatedCaInstanceResult, ValidatedManifestMeta,
PackTime, RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore,
ValidatedCaInstanceResult, ValidatedManifestMeta,
VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary,
VcirInstanceGate, VcirRelatedArtifact, VcirSummary,
};
@ -118,6 +119,26 @@ fn store_validated_manifest_baseline(
.expect("store validated manifest baseline");
}
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: Vec<u8>, object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(&bytes));
let mut raw_entry = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes);
raw_entry.origin_uris.push(rsync_uri.to_string());
raw_entry.object_type = Some(object_type.to_string());
raw_entry.encoding = Some("der".to_string());
store
.put_raw_by_hash_entry(&raw_entry)
.expect("store raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.test/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("store repository view");
}
fn store_manifest_and_locked_files(
store: &RocksStore,
manifest_path: &Path,
@ -126,9 +147,7 @@ fn store_manifest_and_locked_files(
manifest: &ManifestObject,
manifest_bytes: &[u8],
) {
store
.put_raw(manifest_rsync_uri, manifest_bytes)
.expect("store manifest");
put_current_object(store, manifest_rsync_uri, manifest_bytes.to_vec(), "mft");
for entry in manifest
.manifest
.parse_files()
@ -142,7 +161,7 @@ fn store_manifest_and_locked_files(
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
put_current_object(&store, &rsync_uri, bytes, rsync_uri.rsplit('.').next().unwrap_or("bin"));
}
}
@ -258,9 +277,7 @@ fn manifest_missing_locked_file_is_treated_as_failed_fetch() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest only (no locked files)");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let issuer_ca_der = issuer_ca_fixture_der();
let policy = Policy::default();
@ -279,7 +296,8 @@ fn manifest_missing_locked_file_is_treated_as_failed_fetch() {
"{err}"
);
assert!(
err.to_string().contains("file missing in raw_objects"),
err.to_string()
.contains("file missing in current repository view"),
"unexpected error: {err}"
);
}

View File

@ -1,8 +1,10 @@
use std::path::Path;
use sha2::Digest;
use rpki::data_model::manifest::ManifestObject;
use rpki::policy::{CaFailedFetchPolicy, Policy};
use rpki::storage::RocksStore;
use rpki::storage::{RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore};
use rpki::validation::manifest::process_manifest_publication_point;
fn issuer_ca_fixture() -> Vec<u8> {
@ -16,6 +18,26 @@ fn issuer_ca_rsync_uri() -> &'static str {
"rsync://rpki.apnic.net/repository/B527EF581D6611E2BB468F7C72FD1FF2/BfycW4hQb3wNP4YsiJW-1n6fjro.cer"
}
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: Vec<u8>, object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(&bytes));
let mut raw_entry = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes);
raw_entry.origin_uris.push(rsync_uri.to_string());
raw_entry.object_type = Some(object_type.to_string());
raw_entry.encoding = Some("der".to_string());
store
.put_raw_by_hash_entry(&raw_entry)
.expect("store raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.test/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("store repository view");
}
#[test]
fn manifest_outside_publication_point_is_failed_fetch_rfc9286_section6_1() {
let fixture_manifest_path = Path::new(
@ -35,9 +57,7 @@ fn manifest_outside_publication_point_is_failed_fetch_rfc9286_section6_1() {
let store = RocksStore::open(temp.path()).expect("open rocksdb");
// Store the manifest at its rsync URI.
store
.put_raw(manifest_rsync_uri, &manifest_bytes)
.expect("store manifest raw");
put_current_object(&store, manifest_rsync_uri, manifest_bytes.clone(), "mft");
// Store all referenced files under the (different) publication point so that §6.4/§6.5
// would otherwise succeed if §6.1 was not enforced.
@ -50,7 +70,8 @@ fn manifest_outside_publication_point_is_failed_fetch_rfc9286_section6_1() {
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file raw");
let object_type = rsync_uri.rsplit('.').next().unwrap_or("bin");
put_current_object(&store, &rsync_uri, bytes, object_type);
}
let mut policy = Policy::default();

View File

@ -1,10 +1,14 @@
use std::path::Path;
use sha2::Digest;
use rpki::data_model::crl::RpkixCrl;
use rpki::data_model::manifest::ManifestObject;
use rpki::data_model::rc::ResourceCertificate;
use rpki::policy::{Policy, SignedObjectFailurePolicy};
use rpki::storage::{PackFile, RocksStore};
use rpki::storage::{
PackFile, RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore,
};
use rpki::validation::manifest::process_manifest_publication_point;
use rpki::validation::objects::process_publication_point_snapshot_for_issuer;
@ -30,6 +34,26 @@ fn fixture_dir_to_rsync_uri(dir: &Path) -> String {
s
}
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: Vec<u8>, object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(&bytes));
let mut raw_entry = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes);
raw_entry.origin_uris.push(rsync_uri.to_string());
raw_entry.object_type = Some(object_type.to_string());
raw_entry.encoding = Some("der".to_string());
store
.put_raw_by_hash_entry(&raw_entry)
.expect("store raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.test/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("store repository view");
}
fn build_cernet_pack_and_validation_time() -> (
rpki::validation::publication_point::PublicationPointSnapshot,
time::OffsetDateTime,
@ -48,9 +72,7 @@ fn build_cernet_pack_and_validation_time() -> (
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let entries = manifest
.manifest
.parse_files()
@ -63,7 +85,8 @@ fn build_cernet_pack_and_validation_time() -> (
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
let object_type = rsync_uri.rsplit('.').next().unwrap_or("bin");
put_current_object(&store, &rsync_uri, bytes, object_type);
}
let issuer_ca_der = std::fs::read(

View File

@ -1,10 +1,14 @@
use std::path::Path;
use sha2::Digest;
use rpki::data_model::crl::RpkixCrl;
use rpki::data_model::manifest::ManifestObject;
use rpki::data_model::rc::ResourceCertificate;
use rpki::policy::{Policy, SignedObjectFailurePolicy};
use rpki::storage::{PackFile, RocksStore};
use rpki::storage::{
PackFile, RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore,
};
use rpki::validation::manifest::process_manifest_publication_point;
use rpki::validation::objects::process_publication_point_snapshot_for_issuer;
@ -30,6 +34,26 @@ fn fixture_dir_to_rsync_uri(dir: &Path) -> String {
s
}
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: Vec<u8>, object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(&bytes));
let mut raw_entry = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes);
raw_entry.origin_uris.push(rsync_uri.to_string());
raw_entry.object_type = Some(object_type.to_string());
raw_entry.encoding = Some("der".to_string());
store
.put_raw_by_hash_entry(&raw_entry)
.expect("store raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.test/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("store repository view");
}
fn build_cernet_pack_and_validation_time() -> (
rpki::validation::publication_point::PublicationPointSnapshot,
time::OffsetDateTime,
@ -48,9 +72,7 @@ fn build_cernet_pack_and_validation_time() -> (
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw(&manifest_rsync_uri, &manifest_bytes)
.expect("store manifest");
put_current_object(&store, &manifest_rsync_uri, manifest_bytes.clone(), "mft");
let entries = manifest
.manifest
.parse_files()
@ -63,7 +85,8 @@ fn build_cernet_pack_and_validation_time() -> (
let bytes = std::fs::read(&file_path)
.unwrap_or_else(|_| panic!("read fixture file referenced by manifest: {file_path:?}"));
let rsync_uri = format!("{publication_point_rsync_uri}{}", entry.file_name);
store.put_raw(&rsync_uri, &bytes).expect("store file");
let object_type = rsync_uri.rsplit('.').next().unwrap_or("bin");
put_current_object(&store, &rsync_uri, bytes, object_type);
}
let issuer_ca_der = std::fs::read(

View File

@ -95,7 +95,7 @@ fn build_publication_point_snapshot_from_local_rsync_fixture(
None,
None,
)
.expect("sync into raw_objects");
.expect("sync into current repository view");
let pp = process_manifest_publication_point(
&store,

View File

@ -20,6 +20,15 @@ impl Fetcher for MapFetcher {
}
}
fn assert_current_object(store: &RocksStore, uri: &str, expected: &[u8]) {
assert_eq!(
store
.load_current_object_bytes_by_uri(uri)
.expect("load current object"),
Some(expected.to_vec())
);
}
struct CountingRsyncFetcher<F> {
inner: F,
calls: Arc<Mutex<usize>>,
@ -92,13 +101,7 @@ fn repo_sync_uses_rrdp_when_available() {
assert_eq!(out.objects_written, 2);
assert_eq!(*calls.lock().unwrap(), 0);
assert_eq!(
store
.get_raw("rsync://example.net/repo/obj1.cer")
.unwrap()
.unwrap(),
b"abc"
);
assert_current_object(&store, "rsync://example.net/repo/obj1.cer", b"abc");
}
#[test]
@ -166,13 +169,7 @@ fn repo_sync_skips_snapshot_when_state_unchanged() {
"expected no rsync fallback calls"
);
assert_eq!(
store
.get_raw("rsync://example.net/repo/obj1.cer")
.unwrap()
.unwrap(),
b"abc"
);
assert_current_object(&store, "rsync://example.net/repo/obj1.cer", b"abc");
}
#[test]
@ -221,17 +218,11 @@ fn repo_sync_falls_back_to_rsync_on_rrdp_failure() {
.any(|r| r.0 == "RFC 8182 §3.4.5")
);
assert_eq!(
store
.get_raw("rsync://example.net/repo/sub/obj.cer")
.unwrap()
.unwrap(),
b"hello"
);
assert_current_object(&store, "rsync://example.net/repo/sub/obj.cer", b"hello");
}
#[test]
fn repo_sync_rsync_populates_raw_objects() {
fn repo_sync_rsync_populates_current_repository_view() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
@ -263,18 +254,6 @@ fn repo_sync_rsync_populates_raw_objects() {
assert_eq!(out.source, RepoSyncSource::Rsync);
assert_eq!(out.objects_written, 2);
assert_eq!(
store
.get_raw("rsync://example.net/repo/a/one.cer")
.unwrap()
.unwrap(),
b"1"
);
assert_eq!(
store
.get_raw("rsync://example.net/repo/a/b/two.crl")
.unwrap()
.unwrap(),
b"2"
);
assert_current_object(&store, "rsync://example.net/repo/a/one.cer", b"1");
assert_current_object(&store, "rsync://example.net/repo/a/b/two.crl", b"2");
}

View File

@ -17,6 +17,15 @@ impl Fetcher for MapFetcher {
}
}
fn assert_current_object(store: &RocksStore, uri: &str, expected: &[u8]) {
assert_eq!(
store
.load_current_object_bytes_by_uri(uri)
.expect("load current object"),
Some(expected.to_vec())
);
}
#[test]
fn notification_parses_and_snapshot_is_applied_and_state_written() {
let notification_xml =
@ -47,28 +56,18 @@ fn notification_parses_and_snapshot_is_applied_and_state_written() {
.expect("sync");
assert_eq!(published, 2);
let obj1 = store
.get_raw("rsync://example.net/repo/obj1.cer")
.expect("get obj1")
.expect("obj1 exists");
assert_eq!(obj1, b"abc");
assert_current_object(&store, "rsync://example.net/repo/obj1.cer", b"abc");
assert_current_object(&store, "rsync://example.net/repo/obj2.crl", b"def");
let obj2 = store
.get_raw("rsync://example.net/repo/obj2.crl")
.expect("get obj2")
.expect("obj2 exists");
assert_eq!(obj2, b"def");
let state_bytes = store
.get_rrdp_state("https://example.net/rrdp/notification.xml")
.expect("get state")
.expect("state exists");
let state = rpki::sync::rrdp::RrdpState::decode(&state_bytes).expect("decode state");
let source = store
.get_rrdp_source_record("https://example.net/rrdp/notification.xml")
.expect("get source")
.expect("source exists");
assert_eq!(
state.session_id,
source.last_session_id.expect("session id"),
"9df4b597-af9e-4dca-bdda-719cce2c4e28".to_string()
);
assert_eq!(state.serial, 1);
assert_eq!(source.last_serial, Some(1));
}
#[test]

View File

@ -2,7 +2,8 @@ use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
use rpki::storage::RocksStore;
use rpki::storage::{RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore};
use sha2::Digest;
const RSYNC_BASE_URI: &str = "rsync://rpki.luys.cloud/repo/LY-RPKI/1/";
@ -90,7 +91,31 @@ fn rsync_fallback_breakdown_luys_cloud() {
let t3 = Instant::now();
for (uri, bytes) in &objects {
store.put_raw(uri, bytes).expect("put_raw");
let sha256_hex = hex::encode(sha2::Sha256::digest(bytes));
let mut raw = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes.clone());
raw.origin_uris.push(uri.clone());
raw.object_type = Some(
uri.rsplit('.')
.next()
.unwrap_or("bin")
.to_ascii_lowercase(),
);
raw.encoding = Some("der".to_string());
store.put_raw_by_hash_entry(&raw).expect("put raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: uri.clone(),
current_hash: Some(sha256_hex),
repository_source: Some(rsync_base_uri.clone()),
object_type: Some(
uri.rsplit('.')
.next()
.unwrap_or("bin")
.to_ascii_lowercase(),
),
state: RepositoryViewState::Present,
})
.expect("put repository view");
}
let write_wall = t3.elapsed();
@ -114,7 +139,7 @@ fn rsync_fallback_breakdown_luys_cloud() {
pct(read_wall)
);
println!(
"| rocksdb_put_raw | {:>11.3} | {:>5.1}% | write raw_objects ({} keys) |",
"| rocksdb_put_current | {:>11.3} | {:>5.1}% | write repository_view + raw_by_hash ({} keys) |",
write_wall.as_secs_f64() * 1000.0,
pct(write_wall),
objects.len()

View File

@ -1,21 +1,37 @@
use rpki::storage::RocksStore;
use rpki::storage::{RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore};
use sha2::Digest;
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: &[u8], object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(bytes));
let mut raw = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes.to_vec());
raw.origin_uris.push(rsync_uri.to_string());
raw.object_type = Some(object_type.to_string());
raw.encoding = Some("der".to_string());
store.put_raw_by_hash_entry(&raw).expect("put raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.test/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("put repository view");
}
#[test]
fn storage_iter_all_lists_raw_entries() {
fn storage_iter_all_lists_repository_view_entries() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw("rsync://example.test/repo/a.cer", b"a")
.expect("put_raw a");
store
.put_raw("rsync://example.test/repo/b.roa", b"b")
.expect("put_raw b");
put_current_object(&store, "rsync://example.test/repo/a.cer", b"a", "cer");
put_current_object(&store, "rsync://example.test/repo/b.roa", b"b", "roa");
let raw_keys = store
.raw_iter_all()
.expect("raw_iter_all")
.map(|(k, _v)| String::from_utf8(k.to_vec()).expect("utf8 key"))
.list_repository_view_entries_with_prefix("rsync://")
.expect("list repository_view")
.into_iter()
.map(|entry| entry.rsync_uri)
.collect::<Vec<_>>();
assert_eq!(raw_keys.len(), 2);
assert!(raw_keys.contains(&"rsync://example.test/repo/a.cer".to_string()));

View File

@ -1,32 +1,24 @@
use rocksdb::WriteBatch;
use rpki::storage::RocksStore;
#[test]
fn storage_delete_rrdp_state_works() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
use rpki::storage::{RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore};
use sha2::Digest;
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: &[u8], object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(bytes));
let mut raw = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes.to_vec());
raw.origin_uris.push(rsync_uri.to_string());
raw.object_type = Some(object_type.to_string());
raw.encoding = Some("der".to_string());
store.put_raw_by_hash_entry(&raw).expect("put raw_by_hash");
store
.put_rrdp_state("https://example.net/rrdp/notification.xml", b"state")
.expect("put state");
assert_eq!(
store
.get_rrdp_state("https://example.net/rrdp/notification.xml")
.unwrap()
.unwrap(),
b"state"
);
store
.delete_rrdp_state("https://example.net/rrdp/notification.xml")
.expect("delete state");
assert!(
store
.get_rrdp_state("https://example.net/rrdp/notification.xml")
.unwrap()
.is_none()
);
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.net/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("put repository view");
}
#[test]
@ -34,25 +26,20 @@ fn storage_raw_iter_prefix_filters_by_prefix() {
let temp = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(temp.path()).expect("open rocksdb");
store
.put_raw("rsync://example.net/repo/a/1.cer", b"1")
.unwrap();
store
.put_raw("rsync://example.net/repo/a/2.cer", b"2")
.unwrap();
store
.put_raw("rsync://example.net/repo/b/1.cer", b"3")
.unwrap();
put_current_object(&store, "rsync://example.net/repo/a/1.cer", b"1", "cer");
put_current_object(&store, "rsync://example.net/repo/a/2.cer", b"2", "cer");
put_current_object(&store, "rsync://example.net/repo/b/1.cer", b"3", "cer");
let prefix = b"rsync://example.net/repo/a/";
let prefix = "rsync://example.net/repo/a/";
let items = store
.raw_iter_prefix(prefix)
.list_repository_view_entries_with_prefix(prefix)
.expect("iter")
.map(|(k, v)| (String::from_utf8_lossy(&k).to_string(), v.to_vec()))
.into_iter()
.map(|entry| entry.rsync_uri)
.collect::<Vec<_>>();
assert_eq!(items.len(), 2);
for (k, _v) in &items {
for k in &items {
assert!(k.starts_with("rsync://example.net/repo/a/"));
}
}

View File

@ -1,6 +1,28 @@
use std::path::Path;
use rpki::storage::RocksStore;
use rpki::storage::{
PackTime, RawByHashEntry, RepositoryViewEntry, RepositoryViewState, RocksStore,
RrdpSourceRecord, RrdpSourceSyncState,
};
use sha2::Digest;
fn put_current_object(store: &RocksStore, rsync_uri: &str, bytes: &[u8], object_type: &str) {
let sha256_hex = hex::encode(sha2::Sha256::digest(bytes));
let mut raw = RawByHashEntry::from_bytes(sha256_hex.clone(), bytes.to_vec());
raw.origin_uris.push(rsync_uri.to_string());
raw.object_type = Some(object_type.to_string());
raw.encoding = Some("der".to_string());
store.put_raw_by_hash_entry(&raw).expect("put raw_by_hash");
store
.put_repository_view_entry(&RepositoryViewEntry {
rsync_uri: rsync_uri.to_string(),
current_hash: Some(sha256_hex),
repository_source: Some("https://example.invalid/notification.xml".to_string()),
object_type: Some(object_type.to_string()),
state: RepositoryViewState::Present,
})
.expect("put repository view");
}
#[test]
fn storage_opens_and_creates_column_families() {
@ -9,32 +31,65 @@ fn storage_opens_and_creates_column_families() {
}
#[test]
fn raw_objects_roundtrip_by_rsync_uri() {
fn current_object_roundtrip_by_rsync_uri() {
let dir = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(dir.path()).expect("open rocksdb");
let key = "rsync://example.invalid/repo/a.cer";
let value = b"hello";
store.put_raw(key, value).expect("put raw");
let got = store.get_raw(key).expect("get raw");
put_current_object(&store, key, value, "cer");
let got = store
.load_current_object_bytes_by_uri(key)
.expect("get current object");
assert_eq!(got.as_deref(), Some(value.as_slice()));
store.delete_raw(key).expect("delete raw");
let got = store.get_raw(key).expect("get raw after delete");
let current_hash = store
.get_repository_view_entry(key)
.expect("get repository view")
.expect("view exists")
.current_hash
.expect("current hash");
store
.delete_repository_view_entry(key)
.expect("delete repository view");
store
.delete_raw_by_hash_entry(&current_hash)
.expect("delete raw_by_hash");
let got = store
.load_current_object_bytes_by_uri(key)
.expect("get current object after delete");
assert!(got.is_none());
}
#[test]
fn rrdp_state_roundtrip_by_notification_uri() {
fn rrdp_source_roundtrip_by_notification_uri() {
let dir = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(dir.path()).expect("open rocksdb");
let notif = "https://example.invalid/rrdp/notification.xml";
let state = b"{\"session_id\":\"00000000-0000-0000-0000-000000000000\",\"last_serial\":1}";
store.put_rrdp_state(notif, state).expect("put rrdp_state");
let record = RrdpSourceRecord {
notify_uri: notif.to_string(),
last_session_id: Some("00000000-0000-0000-0000-000000000000".to_string()),
last_serial: Some(1),
first_seen_at: PackTime::from_utc_offset_datetime(time::OffsetDateTime::now_utc()),
last_seen_at: PackTime::from_utc_offset_datetime(time::OffsetDateTime::now_utc()),
last_sync_at: None,
sync_state: RrdpSourceSyncState::SnapshotOnly,
last_snapshot_uri: None,
last_snapshot_hash: None,
last_error: None,
};
store.put_rrdp_source_record(&record).expect("put rrdp_source");
let got = store.get_rrdp_state(notif).expect("get rrdp_state");
assert_eq!(got.as_deref(), Some(state.as_slice()));
let got = store
.get_rrdp_source_record(notif)
.expect("get rrdp_source")
.expect("rrdp_source present");
assert_eq!(got.last_serial, Some(1));
assert_eq!(
got.last_session_id.as_deref(),
Some("00000000-0000-0000-0000-000000000000")
);
}
#[test]
@ -43,14 +98,12 @@ fn store_is_reopenable() {
let path: &Path = dir.path();
let store = RocksStore::open(path).expect("open rocksdb");
store
.put_raw("rsync://example.invalid/repo/x", b"x")
.expect("put");
put_current_object(&store, "rsync://example.invalid/repo/x", b"x", "bin");
drop(store);
let store = RocksStore::open(path).expect("reopen rocksdb");
let got = store
.get_raw("rsync://example.invalid/repo/x")
.load_current_object_bytes_by_uri("rsync://example.invalid/repo/x")
.expect("get after reopen");
assert_eq!(got.as_deref(), Some(b"x".as_slice()));
}