20260620 优化delta长尾控制面和PP cache快路径

This commit is contained in:
yuyr 2026-06-20 15:31:20 +08:00
parent bd266ef2a5
commit 184d3cb95b
33 changed files with 1951 additions and 109 deletions

View File

@ -0,0 +1,80 @@
{
"created_at_utc": "2026-06-19T08:08:03Z",
"items": [
{
"rir": "afrinic",
"ta_bytes": 1216,
"ta_download": "200 1216 1.351147",
"ta_elapsed_s": 1.227,
"ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/afrinic-ta.cer",
"ta_sha256": "43a26fd28bafb9398e5b2ab19e036b450bd04f4973a7f5ad151cebdee0edac36",
"ta_uri": "https://rpki.afrinic.net/repository/AfriNIC.cer",
"tal_bytes": 496,
"tal_download": "200 496 1.361326",
"tal_elapsed_s": 1.238,
"tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/afrinic.tal",
"tal_sha256": "2838ef30ea27ce5705abf5f5adb131d8c35b1f50858338a2f3c84bb207c2fa35",
"tal_url": "https://rpki.afrinic.net/tal/afrinic.tal"
},
{
"rir": "apnic",
"ta_bytes": 1222,
"ta_download": "200 1222 1.012321",
"ta_elapsed_s": 0.921,
"ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/apnic-ta.cer",
"ta_sha256": "2014230ad49b2777ac2bde0948ddfa4b8f207114c549e26d755de88c3593e3af",
"ta_uri": "https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer",
"tal_bytes": 532,
"tal_download": "200 466 1.007155",
"tal_elapsed_s": 0.917,
"tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/apnic.tal",
"tal_sha256": "472e551f7c551c2e999e582b7c9437d3bee4900fe53afff62aeb28d4940ade94",
"tal_url": "https://tal.apnic.net/apnic.tal"
},
{
"rir": "arin",
"ta_bytes": 1143,
"ta_download": "200 1143 0.816714",
"ta_elapsed_s": 0.743,
"ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/arin-ta.cer",
"ta_sha256": "5b3c2f6f04abd19261084487a43c156f778b0b8926d63801d48c7a93ed349492",
"ta_uri": "https://rrdp.arin.net/arin-rpki-ta.cer",
"tal_bytes": 1258,
"tal_download": "200 1258 0.848581",
"tal_elapsed_s": 0.774,
"tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/arin.tal",
"tal_sha256": "1f8bdb03bcc30a3b8e11fd9a87102fba250c22137a3c8baa9c81b139cb412639",
"tal_url": "https://www.arin.net/resources/manage/rpki/arin.tal"
},
{
"rir": "lacnic",
"ta_bytes": 1166,
"ta_download": "200 1166 1.025273",
"ta_elapsed_s": 0.932,
"ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/lacnic-ta.cer",
"ta_sha256": "f44bc51008fd6998de7597b72a79b07bf3ebcb0f14daa7c7c022c0e9d66e0ad0",
"ta_uri": "https://rrdp.lacnic.net/ta/rta-lacnic-rpki.cer",
"tal_bytes": 502,
"tal_download": "200 502 1.729122",
"tal_elapsed_s": 1.565,
"tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/lacnic.tal",
"tal_sha256": "d44bb9394ab009c8b53e5efebf2a1c9450bab61a27efe00de5a3e4587a3a2f6a",
"tal_url": "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal"
},
{
"rir": "ripe",
"ta_bytes": 1036,
"ta_download": "200 1036 1.060992",
"ta_elapsed_s": 4.536,
"ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/ripe-ncc-ta.cer",
"ta_sha256": "3e3f7e4efc8d0cea03d9cc1fde6e168b45c26d7b3272e14abd8da4871886e539",
"ta_uri": "https://rpki.ripe.net/ta/ripe-ncc-ta.cer",
"tal_bytes": 482,
"tal_download": "200 482 1.092082",
"tal_elapsed_s": 0.992,
"tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/ripe-ncc.tal",
"tal_sha256": "59ca27ef93f23682749fcefe7c6d70fbc723343549ff9e4d3996acaff79817fb",
"tal_url": "https://tal.rpki.ripe.net/ripe-ncc.tal"
}
]
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,10 @@
rsync://rpki.afrinic.net/repository/AfriNIC.cer
https://rpki.afrinic.net/repository/AfriNIC.cer
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxsAqAhWIO+ON2Ef9oRDM
pKxv+AfmSLIdLWJtjrvUyDxJPBjgR+kVrOHUeTaujygFUp49tuN5H2C1rUuQavTH
vve6xNF5fU3OkTcqEzMOZy+ctkbde2SRMVdvbO22+TH9gNhKDc9l7Vu01qU4LeJH
k3X0f5uu5346YrGAOSv6AaYBXVgXxa0s9ZvgqFpim50pReQe/WI3QwFKNgpPzfQL
6Y7fDPYdYaVOXPXSKtx7P4s4KLA/ZWmRL/bobw/i2fFviAGhDrjqqqum+/9w1hEl
L/vqihVnV18saKTnLvkItA/Bf5i11Yhw2K7qv573YWxyuqCknO/iYLTR1DToBZcZ
UQIDAQAB

View File

@ -0,0 +1,10 @@
https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer
rsync://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx9RWSL61YAAYumEiU8z8
qH2ETVIL01ilxZlzIL9JYSORMN5Cmtf8V2JblIealSqgOTGjvSjEsiV73s67zYQI
7C/iSOb96uf3/s86NqbxDiFQGN8qG7RNcdgVuUlAidl8WxvLNI8VhqbAB5uSg/Mr
LeSOvXRja041VptAxIhcGzDMvlAJRwkrYK/Mo8P4E2rSQgwqCgae0ebY1CsJ3Cjf
i67C1nw7oXqJJovvXJ4apGmEv8az23OLC6Ki54Ul/E6xk227BFttqFV3YMtKx42H
cCcDVZZy01n7JjzvO8ccaXmHIgR7utnqhBRNNq5Xc5ZhbkrUsNtiJmrZzVlgU6Ou
0wIDAQAB

View File

@ -0,0 +1,10 @@
https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer
rsync://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx9RWSL61YAAYumEiU8z8
qH2ETVIL01ilxZlzIL9JYSORMN5Cmtf8V2JblIealSqgOTGjvSjEsiV73s67zYQI
7C/iSOb96uf3/s86NqbxDiFQGN8qG7RNcdgVuUlAidl8WxvLNI8VhqbAB5uSg/Mr
LeSOvXRja041VptAxIhcGzDMvlAJRwkrYK/Mo8P4E2rSQgwqCgae0ebY1CsJ3Cjf
i67C1nw7oXqJJovvXJ4apGmEv8az23OLC6Ki54Ul/E6xk227BFttqFV3YMtKx42H
cCcDVZZy01n7JjzvO8ccaXmHIgR7utnqhBRNNq5Xc5ZhbkrUsNtiJmrZzVlgU6Ou
0wIDAQAB

View File

@ -0,0 +1,19 @@
# THIS TRUST ANCHOR LOCATOR IS PROVIDED BY THE AMERICAN REGISTRY FOR
# INTERNET NUMBERS (ARIN) "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL ARIN BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS PUBLIC KEY, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
rsync://rpki.arin.net/repository/arin-rpki-ta.cer
https://rrdp.arin.net/arin-rpki-ta.cer
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3lZPjbHvMRV5sDDqfLc/685th5FnreHMJjg8
pEZUbG8Y8TQxSBsDebbsDpl3Ov3Cj1WtdrJ3CIfQODCPrrJdOBSrMATeUbPC+JlNf2SRP3UB+VJFgtTj
0RN8cEYIuhBW5t6AxQbHhdNQH+A1F/OJdw0q9da2U29Lx85nfFxvnC1EpK9CbLJS4m37+RlpNbT1cba+
b+loXpx0Qcb1C4UpJCGDy7uNf5w6/+l7RpATAHqqsX4qCtwwDYlbHzp2xk9owF3mkCxzl0HwncO+sEHH
eaL3OjtwdIGrRGeHi2Mpt+mvWHhtQqVG+51MHTyg+nIjWFKKGx1Q9+KDx4wJStwveQIDAQAB

View File

@ -0,0 +1,4 @@
https://rrdp.lacnic.net/ta/rta-lacnic-rpki.cer
rsync://repository.lacnic.net/rpki/lacnic/rta-lacnic-rpki.cer
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqZEzhYK0+PtDOPfub/KRc3MeWx3neXx4/wbnJWGbNAtbYqXg3uU5J4HFzPgk/VIppgSKAhlO0H60DRP48by9gr5/yDHu2KXhOmnMg46sYsUIpfgtBS9+VtrqWziJfb+pkGtuOWeTnj6zBmBNZKK+5AlMCW1WPhrylIcB+XSZx8tk9GS/3SMQ+YfMVwwAyYjsex14Uzto4GjONALE5oh1M3+glRQduD6vzSwOD+WahMbc9vCOTED+2McLHRKgNaQf0YJ9a1jG9oJIvDkKXEqdfqDRktwyoD74cV57bW3tBAexB7GglITbInyQAsmdngtfg2LUMrcROHHP86QPZINjDQIDAQAB

View File

@ -0,0 +1,10 @@
https://rpki.ripe.net/ta/ripe-ncc-ta.cer
rsync://rpki.ripe.net/ta/ripe-ncc-ta.cer
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA0URYSGqUz2myBsOzeW1j
Q6NsxNvlLMyhWknvnl8NiBCs/T/S2XuNKQNZ+wBZxIgPPV2pFBFeQAvoH/WK83Hw
A26V2siwm/MY2nKZ+Olw+wlpzlZ1p3Ipj2eNcKrmit8BwBC8xImzuCGaV0jkRB0G
Z0hoH6Ml03umLprRsn6v0xOP0+l6Qc1ZHMFVFb385IQ7FQQTcVIxrdeMsoyJq9eM
kE6DoclHhF/NlSllXubASQ9KUWqJ0+Ot3QCXr4LXECMfkpkVR2TZT+v5v658bHVs
6ZxRD1b6Uk1uQKAyHUbn/tXvP8lrjAibGzVsXDT2L0x4Edx+QdixPgOji3gBMyL2
VwIDAQAB

View File

@ -14,6 +14,7 @@ MAX_RUNS="${MAX_RUNS:-3}"
INTERVAL_SECS="${INTERVAL_SECS:-0}" INTERVAL_SECS="${INTERVAL_SECS:-0}"
STOP_AFTER_SECS="${STOP_AFTER_SECS:-0}" STOP_AFTER_SECS="${STOP_AFTER_SECS:-0}"
RIRS="${RIRS:-afrinic,apnic,arin,lacnic,ripe}" RIRS="${RIRS:-afrinic,apnic,arin,lacnic,ripe}"
TAL_INPUT_MODE="${TAL_INPUT_MODE:-file-with-ta}"
RUN_ROOT="${RUN_ROOT:-$PACKAGE_ROOT}" RUN_ROOT="${RUN_ROOT:-$PACKAGE_ROOT}"
RETAIN_RUNS="${RETAIN_RUNS:-10}" RETAIN_RUNS="${RETAIN_RUNS:-10}"
CLEAN_TMP_AFTER_RUN="${CLEAN_TMP_AFTER_RUN:-0}" CLEAN_TMP_AFTER_RUN="${CLEAN_TMP_AFTER_RUN:-0}"
@ -100,6 +101,16 @@ validate_rsync_scope() {
esac esac
} }
validate_tal_input_mode() {
case "$TAL_INPUT_MODE" in
file-with-ta|file-live-ta|url)
;;
*)
die "TAL_INPUT_MODE must be file-with-ta, file-live-ta or url: $TAL_INPUT_MODE"
;;
esac
}
normalize_token() { normalize_token() {
local token="$1" local token="$1"
token="${token#"${token%%[![:space:]]*}"}" token="${token#"${token%%[![:space:]]*}"}"
@ -149,10 +160,10 @@ ta_file_for_rir() {
esac esac
} }
cir_tal_uri_for_rir() { tal_url_for_rir() {
case "$1" in case "$1" in
afrinic) printf '%s' "https://rpki.afrinic.net/tal/afrinic.tal" ;; afrinic) printf '%s' "https://rpki.afrinic.net/tal/afrinic.tal" ;;
apnic) printf '%s' "https://rpki.apnic.net/tal/apnic-rfc7730-https.tal" ;; apnic) printf '%s' "https://tal.apnic.net/apnic.tal" ;;
arin) printf '%s' "https://www.arin.net/resources/manage/rpki/arin.tal" ;; arin) printf '%s' "https://www.arin.net/resources/manage/rpki/arin.tal" ;;
lacnic) printf '%s' "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal" ;; lacnic) printf '%s' "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal" ;;
ripe) printf '%s' "https://tal.rpki.ripe.net/ripe-ncc.tal" ;; ripe) printf '%s' "https://tal.rpki.ripe.net/ripe-ncc.tal" ;;
@ -160,6 +171,47 @@ cir_tal_uri_for_rir() {
esac esac
} }
cir_tal_uri_for_rir() {
tal_url_for_rir "$1"
}
tal_https_uri_from_fixture() {
local tal_path="$1"
awk '
/^[[:space:]]*#/ { next }
/^[[:space:]]*$/ { next }
{
gsub(/^[[:space:]]+|[[:space:]]+$/, "", $0)
}
/^https?:\/\// {
print
exit 0
}
' "$tal_path"
}
live_ta_file_for_rir() {
printf '%s' "$STATE_ROOT/live-ta/$(basename "$(tal_file_for_rir "$1")" .tal).cer"
}
refresh_live_ta_for_rir() {
local rir_name="$1"
local tal_path
local ta_uri
local ta_file
local tmp_file
tal_path="$(tal_file_for_rir "$rir_name")"
ta_uri="$(tal_https_uri_from_fixture "$tal_path")"
[[ -n "$ta_uri" ]] || die "missing http(s) TA URI in TAL fixture for $rir_name: $tal_path"
ta_file="$(live_ta_file_for_rir "$rir_name")"
mkdir -p "$(dirname "$ta_file")"
tmp_file="${ta_file}.tmp.$$"
curl -fsSL --connect-timeout 15 --max-time 120 "$ta_uri" -o "$tmp_file" \
|| { rm -f "$tmp_file"; die "failed to refresh TA for $rir_name from $ta_uri"; }
[[ -s "$tmp_file" ]] || { rm -f "$tmp_file"; die "empty TA download for $rir_name from $ta_uri"; }
mv "$tmp_file" "$ta_file"
}
compare_view_trust_anchor() { compare_view_trust_anchor() {
if [[ "${#RIR_LIST[@]}" -eq 1 ]]; then if [[ "${#RIR_LIST[@]}" -eq 1 ]]; then
printf '%s' "${RIR_LIST[0]}" printf '%s' "${RIR_LIST[0]}"
@ -377,8 +429,16 @@ build_child_args() {
local rir_name local rir_name
for rir_name in "${RIR_LIST[@]}"; do for rir_name in "${RIR_LIST[@]}"; do
CHILD_ARGS+=(--tal-path "$(tal_file_for_rir "$rir_name")") if [[ "$TAL_INPUT_MODE" == "url" ]]; then
CHILD_ARGS+=(--ta-path "$(ta_file_for_rir "$rir_name")") CHILD_ARGS+=(--tal-url "$(tal_url_for_rir "$rir_name")")
elif [[ "$TAL_INPUT_MODE" == "file-live-ta" ]]; then
refresh_live_ta_for_rir "$rir_name"
CHILD_ARGS+=(--tal-path "$(tal_file_for_rir "$rir_name")")
CHILD_ARGS+=(--ta-path "$(live_ta_file_for_rir "$rir_name")")
else
CHILD_ARGS+=(--tal-path "$(tal_file_for_rir "$rir_name")")
CHILD_ARGS+=(--ta-path "$(ta_file_for_rir "$rir_name")")
fi
done done
CHILD_ARGS+=( CHILD_ARGS+=(
@ -555,7 +615,7 @@ run_one_round() {
build_child_args build_child_args
if is_true "$RPKI_ANALYZE"; then if is_true "$RPKI_ANALYZE"; then
CHILD_ARGS+=(--analysis-out "$run_dir/analyze") CHILD_ARGS+=(--analyze --analysis-out "$run_dir/analyze")
fi fi
local daemon_args=( local daemon_args=(
--state-root "$daemon_state_root" --state-root "$daemon_state_root"
@ -612,11 +672,15 @@ main() {
require_command python3 require_command python3
require_command date require_command date
require_command find require_command find
if [[ "$TAL_INPUT_MODE" == "file-live-ta" ]]; then
require_command curl
fi
validate_max_runs validate_max_runs
validate_non_negative_int "INTERVAL_SECS" "$INTERVAL_SECS" validate_non_negative_int "INTERVAL_SECS" "$INTERVAL_SECS"
validate_non_negative_int "STOP_AFTER_SECS" "$STOP_AFTER_SECS" validate_non_negative_int "STOP_AFTER_SECS" "$STOP_AFTER_SECS"
validate_positive_int "RETAIN_RUNS" "$RETAIN_RUNS" validate_positive_int "RETAIN_RUNS" "$RETAIN_RUNS"
validate_rsync_scope validate_rsync_scope
validate_tal_input_mode
if [[ -n "${DB_STATS_EXACT_EVERY:-}" && "$DB_STATS_EXACT_EVERY" != "0" ]]; then if [[ -n "${DB_STATS_EXACT_EVERY:-}" && "$DB_STATS_EXACT_EVERY" != "0" ]]; then
validate_positive_int "DB_STATS_EXACT_EVERY" "$DB_STATS_EXACT_EVERY" validate_positive_int "DB_STATS_EXACT_EVERY" "$DB_STATS_EXACT_EVERY"
fi fi
@ -626,8 +690,15 @@ main() {
local rir_name local rir_name
for rir_name in "${RIR_LIST[@]}"; do for rir_name in "${RIR_LIST[@]}"; do
[[ -f "$(tal_file_for_rir "$rir_name")" ]] || die "missing TAL fixture for $rir_name" if [[ "$TAL_INPUT_MODE" == "url" ]]; then
[[ -f "$(ta_file_for_rir "$rir_name")" ]] || die "missing TA fixture for $rir_name" [[ -n "$(tal_url_for_rir "$rir_name")" ]] || die "missing TAL URL for $rir_name"
elif [[ "$TAL_INPUT_MODE" == "file-live-ta" ]]; then
[[ -f "$(tal_file_for_rir "$rir_name")" ]] || die "missing TAL fixture for $rir_name"
[[ -n "$(tal_https_uri_from_fixture "$(tal_file_for_rir "$rir_name")")" ]] || die "missing http(s) TA URI in TAL fixture for $rir_name"
else
[[ -f "$(tal_file_for_rir "$rir_name")" ]] || die "missing TAL fixture for $rir_name"
[[ -f "$(ta_file_for_rir "$rir_name")" ]] || die "missing TA fixture for $rir_name"
fi
done done
mkdir -p "$RUNS_ROOT" "$LOG_ROOT" "$DB_DIR" "$META_DIR" "$TMP_DIR" "$INVALID_ROOT" mkdir -p "$RUNS_ROOT" "$LOG_ROOT" "$DB_DIR" "$META_DIR" "$TMP_DIR" "$INVALID_ROOT"

View File

@ -79,6 +79,11 @@ impl TimingHandle {
.collect() .collect()
} }
pub fn report_snapshot(&self, top_n: usize) -> TimingReportV1 {
let g = self.inner.lock().expect("timing lock");
g.to_report(top_n)
}
/// Record a phase duration directly in nanoseconds. /// Record a phase duration directly in nanoseconds.
/// ///
/// This is useful when aggregating sub-phase timings locally (to reduce lock contention) /// This is useful when aggregating sub-phase timings locally (to reduce lock contention)
@ -88,6 +93,22 @@ impl TimingHandle {
g.phases.record(phase, nanos); g.phases.record(phase, nanos);
} }
pub fn record_publication_point_nanos(&self, manifest_rsync_uri: &str, nanos: u64) {
let mut g = self.inner.lock().expect("timing lock");
g.publication_points.record(manifest_rsync_uri, nanos);
}
pub fn record_publication_point_step_nanos(
&self,
manifest_rsync_uri: &str,
step: &'static str,
nanos: u64,
) {
let mut g = self.inner.lock().expect("timing lock");
g.publication_point_steps
.record(&format!("{manifest_rsync_uri}::{step}"), nanos);
}
pub fn write_json(&self, path: &Path, top_n: usize) -> Result<(), String> { pub fn write_json(&self, path: &Path, top_n: usize) -> Result<(), String> {
let report = { let report = {
let g = self.inner.lock().expect("timing lock"); let g = self.inner.lock().expect("timing lock");
@ -153,7 +174,7 @@ enum TimingSpanKind<'a> {
PublicationPoint(&'a str), PublicationPoint(&'a str),
} }
#[derive(Clone, Debug, Default, Serialize, Deserialize)] #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct DurationStats { pub struct DurationStats {
pub count: u64, pub count: u64,
pub total_nanos: u64, pub total_nanos: u64,
@ -199,6 +220,7 @@ struct TimingCollector {
rrdp_repos: DurationStatsMap, rrdp_repos: DurationStatsMap,
rrdp_repo_steps: DurationStatsMap, rrdp_repo_steps: DurationStatsMap,
publication_points: DurationStatsMap, publication_points: DurationStatsMap,
publication_point_steps: DurationStatsMap,
} }
impl TimingCollector { impl TimingCollector {
@ -210,6 +232,7 @@ impl TimingCollector {
rrdp_repos: DurationStatsMap::default(), rrdp_repos: DurationStatsMap::default(),
rrdp_repo_steps: DurationStatsMap::default(), rrdp_repo_steps: DurationStatsMap::default(),
publication_points: DurationStatsMap::default(), publication_points: DurationStatsMap::default(),
publication_point_steps: DurationStatsMap::default(),
} }
} }
@ -231,6 +254,7 @@ impl TimingCollector {
top_rrdp_repos: self.rrdp_repos.top(top_n), top_rrdp_repos: self.rrdp_repos.top(top_n),
top_rrdp_repo_steps: self.rrdp_repo_steps.top(top_n), top_rrdp_repo_steps: self.rrdp_repo_steps.top(top_n),
top_publication_points: self.publication_points.top(top_n), top_publication_points: self.publication_points.top(top_n),
top_publication_point_steps: self.publication_point_steps.top(top_n),
} }
} }
} }
@ -244,9 +268,10 @@ pub struct TimingReportV1 {
pub top_rrdp_repos: Vec<TopDurationEntry>, pub top_rrdp_repos: Vec<TopDurationEntry>,
pub top_rrdp_repo_steps: Vec<TopDurationEntry>, pub top_rrdp_repo_steps: Vec<TopDurationEntry>,
pub top_publication_points: Vec<TopDurationEntry>, pub top_publication_points: Vec<TopDurationEntry>,
pub top_publication_point_steps: Vec<TopDurationEntry>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TopDurationEntry { pub struct TopDurationEntry {
pub key: String, pub key: String,
pub count: u64, pub count: u64,
@ -283,6 +308,12 @@ mod tests {
let _pp = h.span_publication_point("rsync://example.test/repo/manifest.mft"); let _pp = h.span_publication_point("rsync://example.test/repo/manifest.mft");
} }
h.record_count("vrps", 42); h.record_count("vrps", 42);
h.record_publication_point_nanos("rsync://example.test/repo/manifest.mft", 1_000_000);
h.record_publication_point_step_nanos(
"rsync://example.test/repo/manifest.mft",
"fresh_snapshot_prepare",
1_000_000,
);
let dir = tempfile::tempdir().expect("tempdir"); let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("timing.json"); let path = dir.path().join("timing.json");
@ -312,5 +343,11 @@ mod tests {
.any(|e| e.key.contains("manifest.mft")), .any(|e| e.key.contains("manifest.mft")),
"expected PP in top list" "expected PP in top list"
); );
assert!(
rep.top_publication_point_steps
.iter()
.any(|e| e.key.contains("fresh_snapshot_prepare")),
"expected PP step in top list"
);
} }
} }

View File

@ -6,7 +6,9 @@ use crate::ccr::{
use crate::cir::{CirTrustAnchorBinding, export_cir_from_input_snapshot_multi}; use crate::cir::{CirTrustAnchorBinding, export_cir_from_input_snapshot_multi};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate}; use crate::analysis::timing::{
DurationStats, TimingHandle, TimingMeta, TimingMetaUpdate, TopDurationEntry,
};
use crate::audit::AuditRepoSyncStats; use crate::audit::AuditRepoSyncStats;
#[cfg(test)] #[cfg(test)]
use crate::audit::{ use crate::audit::{
@ -73,6 +75,9 @@ struct RunStageTiming {
download_bytes_total: u64, download_bytes_total: u64,
roa_validation_cache: crate::validation::objects::RoaValidationCacheStats, roa_validation_cache: crate::validation::objects::RoaValidationCacheStats,
analysis_counts: HashMap<String, u64>, analysis_counts: HashMap<String, u64>,
analysis_phases: HashMap<String, DurationStats>,
analysis_top_publication_points: Vec<TopDurationEntry>,
analysis_top_publication_point_steps: Vec<TopDurationEntry>,
vcir_storage_summary_ms: Option<u64>, vcir_storage_summary_ms: Option<u64>,
vcir_storage: Option<VcirStorageSummary>, vcir_storage: Option<VcirStorageSummary>,
memory_telemetry: Option<MemoryTelemetrySummary>, memory_telemetry: Option<MemoryTelemetrySummary>,
@ -2454,6 +2459,9 @@ pub fn run(argv: &[String]) -> Result<(), String> {
&total_started, &total_started,
store.as_ref(), store.as_ref(),
); );
let timing_report_snapshot = timing
.as_ref()
.map(|(_, handle)| handle.report_snapshot(50));
let stage_timing = RunStageTiming { let stage_timing = RunStageTiming {
validation_ms, validation_ms,
enable_roa_validation_cache: args.enable_roa_validation_cache, enable_roa_validation_cache: args.enable_roa_validation_cache,
@ -2483,6 +2491,18 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.as_ref() .as_ref()
.map(|(_, handle)| handle.counts_snapshot()) .map(|(_, handle)| handle.counts_snapshot())
.unwrap_or_default(), .unwrap_or_default(),
analysis_phases: timing_report_snapshot
.as_ref()
.map(|report| report.phases.clone())
.unwrap_or_default(),
analysis_top_publication_points: timing_report_snapshot
.as_ref()
.map(|report| report.top_publication_points.clone())
.unwrap_or_default(),
analysis_top_publication_point_steps: timing_report_snapshot
.as_ref()
.map(|report| report.top_publication_point_steps.clone())
.unwrap_or_default(),
vcir_storage_summary_ms, vcir_storage_summary_ms,
vcir_storage, vcir_storage,
memory_telemetry: Some(MemoryTelemetrySummary { memory_telemetry: Some(MemoryTelemetrySummary {

View File

@ -1625,6 +1625,9 @@ fn run_report_task_and_stage_timing_work() {
download_bytes_total: 15, download_bytes_total: 15,
roa_validation_cache: crate::validation::objects::RoaValidationCacheStats::default(), roa_validation_cache: crate::validation::objects::RoaValidationCacheStats::default(),
analysis_counts: std::collections::HashMap::new(), analysis_counts: std::collections::HashMap::new(),
analysis_phases: std::collections::HashMap::new(),
analysis_top_publication_points: Vec::new(),
analysis_top_publication_point_steps: Vec::new(),
vcir_storage_summary_ms: Some(16), vcir_storage_summary_ms: Some(16),
vcir_storage: Some(VcirStorageSummary { vcir_storage: Some(VcirStorageSummary {
entry_count: 2, entry_count: 2,
@ -1730,6 +1733,9 @@ fn stage_timing_serializes_memory_telemetry() {
"roa_validation_cache_hit_roas".to_string(), "roa_validation_cache_hit_roas".to_string(),
2, 2,
)]), )]),
analysis_phases: std::collections::HashMap::new(),
analysis_top_publication_points: Vec::new(),
analysis_top_publication_point_steps: Vec::new(),
vcir_storage_summary_ms: None, vcir_storage_summary_ms: None,
vcir_storage: None, vcir_storage: None,
memory_telemetry: Some(MemoryTelemetrySummary { memory_telemetry: Some(MemoryTelemetrySummary {

View File

@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::io::Write; use std::io::Write;
use std::time::Duration; use std::time::Duration;
@ -6,6 +7,19 @@ use reqwest::header::HeaderMap;
use crate::sync::rrdp::Fetcher; use crate::sync::rrdp::Fetcher;
thread_local! {
static HTTP_TIMEOUT_OVERRIDE: RefCell<Option<Duration>> = const { RefCell::new(None) };
}
pub fn with_scoped_http_timeout_override<R>(timeout: Duration, f: impl FnOnce() -> R) -> R {
HTTP_TIMEOUT_OVERRIDE.with(|cell| {
let previous = cell.replace(Some(timeout));
let result = f();
let _ = cell.replace(previous);
result
})
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct HttpFetcherConfig { pub struct HttpFetcherConfig {
/// Connection-establishment timeout for HTTP requests. /// Connection-establishment timeout for HTTP requests.
@ -37,6 +51,7 @@ impl Default for HttpFetcherConfig {
pub struct BlockingHttpFetcher { pub struct BlockingHttpFetcher {
short_client: Client, short_client: Client,
large_body_client: Client, large_body_client: Client,
retry_short_client: Client,
short_timeout: Duration, short_timeout: Duration,
large_body_timeout: Duration, large_body_timeout: Duration,
} }
@ -58,9 +73,16 @@ impl BlockingHttpFetcher {
.user_agent(config.user_agent) .user_agent(config.user_agent)
.build() .build()
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let retry_short_client = Client::builder()
.connect_timeout(Duration::from_secs(1))
.timeout(Duration::from_secs(1))
.user_agent("rpki-dev/0.1 (stage2)")
.build()
.map_err(|e| e.to_string())?;
Ok(Self { Ok(Self {
short_client, short_client,
large_body_client, large_body_client,
retry_short_client,
short_timeout, short_timeout,
large_body_timeout, large_body_timeout,
}) })
@ -176,6 +198,9 @@ impl BlockingHttpFetcher {
} }
fn client_for_uri(&self, uri: &str) -> (&Client, &'static str, Duration) { fn client_for_uri(&self, uri: &str) -> (&Client, &'static str, Duration) {
if let Some(timeout) = HTTP_TIMEOUT_OVERRIDE.with(|cell| *cell.borrow()) {
return (&self.retry_short_client, "retry_short", timeout);
}
if uses_large_body_timeout(uri) { if uses_large_body_timeout(uri) {
( (
&self.large_body_client, &self.large_body_client,

View File

@ -24,6 +24,20 @@ pub trait RsyncFetcher: Send + Sync {
/// Return a list of objects as `(rsync_uri, bytes)` pairs. /// Return a list of objects as `(rsync_uri, bytes)` pairs.
fn fetch_objects(&self, rsync_base_uri: &str) -> RsyncFetchResult<Vec<(String, Vec<u8>)>>; fn fetch_objects(&self, rsync_base_uri: &str) -> RsyncFetchResult<Vec<(String, Vec<u8>)>>;
/// Fetch one object by exact rsync URI.
///
/// The default implementation fetches the parent directory and filters the exact
/// object. Live fetchers should override this to avoid widening one-object TAL
/// bootstrap fetches into whole publication point or module synchronizations.
fn fetch_object(&self, rsync_uri: &str) -> RsyncFetchResult<Vec<u8>> {
let base = parent_rsync_uri(rsync_uri).map_err(RsyncFetchError::Fetch)?;
self.fetch_objects(&base)?
.into_iter()
.find(|(uri, _)| uri == rsync_uri)
.map(|(_, bytes)| bytes)
.ok_or_else(|| RsyncFetchError::Fetch(format!("rsync object not found: {rsync_uri}")))
}
/// Stream fetched objects to a visitor without requiring callers to materialize the /// Stream fetched objects to a visitor without requiring callers to materialize the
/// full result vector in memory. /// full result vector in memory.
fn visit_objects( fn visit_objects(
@ -62,6 +76,32 @@ pub trait RsyncFetcher: Send + Sync {
} }
} }
fn parent_rsync_uri(rsync_uri: &str) -> Result<String, String> {
let parsed = url::Url::parse(rsync_uri).map_err(|e| e.to_string())?;
if parsed.scheme() != "rsync" {
return Err(format!("not an rsync URI: {rsync_uri}"));
}
let host = parsed
.host_str()
.ok_or_else(|| format!("missing host in rsync URI: {rsync_uri}"))?;
let segments = parsed
.path_segments()
.ok_or_else(|| format!("missing path in rsync URI: {rsync_uri}"))?
.collect::<Vec<_>>();
if segments.is_empty() || segments.last().copied().unwrap_or_default().is_empty() {
return Err(format!(
"rsync URI must reference a file object: {rsync_uri}"
));
}
let parent_segments = &segments[..segments.len() - 1];
let mut parent = format!("rsync://{host}/");
if !parent_segments.is_empty() {
parent.push_str(&parent_segments.join("/"));
parent.push('/');
}
Ok(parent)
}
/// A simple "rsync" implementation backed by a local directory. /// A simple "rsync" implementation backed by a local directory.
/// ///
/// This is primarily meant for offline tests and fixtures. The key generation mimics rsync URIs: /// This is primarily meant for offline tests and fixtures. The key generation mimics rsync URIs:

View File

@ -317,6 +317,34 @@ impl RsyncFetcher for SystemRsyncFetcher {
Ok(out) Ok(out)
} }
fn fetch_object(&self, rsync_uri: &str) -> RsyncFetchResult<Vec<u8>> {
let parsed =
url::Url::parse(rsync_uri).map_err(|e| RsyncFetchError::Fetch(e.to_string()))?;
if parsed.scheme() != "rsync" {
return Err(RsyncFetchError::Fetch(format!(
"not an rsync URI: {rsync_uri}"
)));
}
let file_name = parsed
.path_segments()
.and_then(|segments| segments.filter(|segment| !segment.is_empty()).next_back())
.ok_or_else(|| {
RsyncFetchError::Fetch(format!(
"rsync URI must reference a file object: {rsync_uri}"
))
})?;
let tmp = TempDir::new().map_err(RsyncFetchError::Fetch)?;
self.run_rsync(rsync_uri, tmp.path())
.map_err(RsyncFetchError::Fetch)?;
let object_path = tmp.path().join(file_name);
std::fs::read(&object_path).map_err(|e| {
RsyncFetchError::Fetch(format!(
"read fetched rsync object failed: {}: {e}",
object_path.display()
))
})
}
fn visit_objects( fn visit_objects(
&self, &self,
rsync_base_uri: &str, rsync_base_uri: &str,

View File

@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
@ -67,6 +68,27 @@ pub trait RepoSyncRuntime: Send + Sync {
timeout: Duration, timeout: Duration,
) -> Result<Option<RepoSyncRuntimeEvent>, String>; ) -> Result<Option<RepoSyncRuntimeEvent>, String>;
fn drain_repo_results_timeout(
&self,
timeout: Duration,
max_events: usize,
) -> Result<Vec<RepoSyncRuntimeEvent>, String> {
let max_events = max_events.max(1);
let mut events = Vec::new();
for index in 0..max_events {
let poll_timeout = if index == 0 {
timeout
} else {
Duration::from_millis(0)
};
let Some(event) = self.recv_repo_result_timeout(poll_timeout)? else {
break;
};
events.push(event);
}
Ok(events)
}
fn reset_run_state(&self) -> Result<(), String>; fn reset_run_state(&self) -> Result<(), String>;
fn prefetch_discovered_children( fn prefetch_discovered_children(
@ -87,6 +109,7 @@ pub struct Phase1RepoSyncRuntime<E: RepoTransportExecutor> {
coordinator: Mutex<GlobalRunCoordinator>, coordinator: Mutex<GlobalRunCoordinator>,
worker_pool: Mutex<RepoTransportWorkerPool<E>>, worker_pool: Mutex<RepoTransportWorkerPool<E>>,
transport_prefetch_recorder: Option<Mutex<TransportPrefetchRecorder>>, transport_prefetch_recorder: Option<Mutex<TransportPrefetchRecorder>>,
retry_short_rsync_scopes: Mutex<HashSet<String>>,
rsync_scope_resolver: Arc<dyn Fn(&str) -> String + Send + Sync>, rsync_scope_resolver: Arc<dyn Fn(&str) -> String + Send + Sync>,
rsync_failure_scope_resolver: Arc<dyn Fn(&str) -> Option<String> + Send + Sync>, rsync_failure_scope_resolver: Arc<dyn Fn(&str) -> Option<String> + Send + Sync>,
sync_preference: SyncPreference, sync_preference: SyncPreference,
@ -119,6 +142,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
coordinator: Mutex::new(coordinator), coordinator: Mutex::new(coordinator),
worker_pool: Mutex::new(worker_pool), worker_pool: Mutex::new(worker_pool),
transport_prefetch_recorder: None, transport_prefetch_recorder: None,
retry_short_rsync_scopes: Mutex::new(HashSet::new()),
rsync_scope_resolver, rsync_scope_resolver,
rsync_failure_scope_resolver, rsync_failure_scope_resolver,
sync_preference, sync_preference,
@ -138,6 +162,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
worker_pool: Mutex::new(worker_pool), worker_pool: Mutex::new(worker_pool),
transport_prefetch_recorder: record_transport_prefetch_requests transport_prefetch_recorder: record_transport_prefetch_requests
.then(|| Mutex::new(TransportPrefetchRecorder::default())), .then(|| Mutex::new(TransportPrefetchRecorder::default())),
retry_short_rsync_scopes: Mutex::new(HashSet::new()),
rsync_scope_resolver, rsync_scope_resolver,
rsync_failure_scope_resolver, rsync_failure_scope_resolver,
sync_preference, sync_preference,
@ -191,6 +216,7 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
rsync_scope_uri, rsync_scope_uri,
rsync_failure_scope_uri, rsync_failure_scope_uri,
self.sync_preference, self.sync_preference,
false,
) )
}; };
@ -310,6 +336,12 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
}; };
let transport_identity = envelope.repo_identity.clone(); let transport_identity = envelope.repo_identity.clone();
let completed_envelope = envelope.clone(); let completed_envelope = envelope.clone();
if let Some(recorder) = self.transport_prefetch_recorder.as_ref() {
recorder
.lock()
.expect("transport prefetch recorder lock poisoned")
.record_result(&envelope);
}
crate::progress_log::emit( crate::progress_log::emit(
"phase1_repo_task_result", "phase1_repo_task_result",
serde_json::json!({ serde_json::json!({
@ -334,7 +366,19 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
}; };
if !completion.follow_up_tasks.is_empty() { if !completion.follow_up_tasks.is_empty() {
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
for task in completion.follow_up_tasks { for mut task in completion.follow_up_tasks {
if let crate::parallel::types::RepoDedupKey::RsyncScope { rsync_scope_uri } =
&task.dedup_key
{
if self
.retry_short_rsync_scopes
.lock()
.expect("retry short rsync scopes lock poisoned")
.contains(rsync_scope_uri)
{
task.retry_short_timeout = true;
}
}
crate::progress_log::emit( crate::progress_log::emit(
"phase1_repo_task_enqueued", "phase1_repo_task_enqueued",
serde_json::json!({ serde_json::json!({
@ -384,6 +428,27 @@ impl<E: RepoTransportExecutor> Phase1RepoSyncRuntime<E> {
})) }))
} }
fn pump_transport_results(
&self,
timeout: Duration,
max_events: usize,
) -> Result<Vec<RepoSyncRuntimeEvent>, String> {
let max_events = max_events.max(1);
let mut events = Vec::new();
for index in 0..max_events {
let poll_timeout = if index == 0 {
timeout
} else {
Duration::from_millis(0)
};
let Some(event) = self.pump_one_transport_result(poll_timeout)? else {
break;
};
events.push(event);
}
Ok(events)
}
fn runtime_state_for_identity(&self, identity: &RepoIdentity) -> Option<RepoRuntimeState> { fn runtime_state_for_identity(&self, identity: &RepoIdentity) -> Option<RepoRuntimeState> {
let coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); let coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator coordinator
@ -445,6 +510,14 @@ impl<E: RepoTransportExecutor> RepoSyncRuntime for Phase1RepoSyncRuntime<E> {
self.pump_one_transport_result(timeout) self.pump_one_transport_result(timeout)
} }
fn drain_repo_results_timeout(
&self,
timeout: Duration,
max_events: usize,
) -> Result<Vec<RepoSyncRuntimeEvent>, String> {
self.pump_transport_results(timeout, max_events)
}
fn reset_run_state(&self) -> Result<(), String> { fn reset_run_state(&self) -> Result<(), String> {
{ {
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
@ -499,6 +572,12 @@ impl<E: RepoTransportExecutor> RepoSyncRuntime for Phase1RepoSyncRuntime<E> {
stats.skipped_incompatible += 1; stats.skipped_incompatible += 1;
continue; continue;
} }
if request.retry_short_rsync_timeout() {
self.retry_short_rsync_scopes
.lock()
.expect("retry short rsync scopes lock poisoned")
.insert(current_rsync_scope_uri.clone());
}
let action = { let action = {
let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned");
coordinator.register_transport_request( coordinator.register_transport_request(
@ -509,6 +588,7 @@ impl<E: RepoTransportExecutor> RepoSyncRuntime for Phase1RepoSyncRuntime<E> {
current_rsync_scope_uri, current_rsync_scope_uri,
current_rsync_failure_scope_uri, current_rsync_failure_scope_uri,
self.sync_preference, self.sync_preference,
request.retry_short_timeout(),
) )
}; };
@ -957,6 +1037,64 @@ mod tests {
); );
} }
#[test]
fn phase1_runtime_drains_multiple_ready_transport_events() {
let count = Arc::new(AtomicUsize::new(0));
let coordinator = GlobalRunCoordinator::new(
ParallelPhase1Config::default(),
vec![TalInputSpec::from_url("https://example.test/arin.tal")],
);
let pool = RepoTransportWorkerPool::new(
RepoWorkerPoolConfig { max_workers: 2 },
CountingSuccessTransportExecutor {
count: Arc::clone(&count),
},
)
.expect("pool");
let runtime = Phase1RepoSyncRuntime::new(
coordinator,
pool,
Arc::new(|base: &str| base.to_string()),
SyncPreference::RrdpThenRsync,
);
let ca1 = sample_ca("rsync://example.test/repo/root.mft");
let mut ca2 = sample_ca("rsync://example.net/repo/root.mft");
ca2.rsync_base_uri = "rsync://example.net/repo/".to_string();
ca2.publication_point_rsync_uri = "rsync://example.net/repo/".to_string();
ca2.rrdp_notification_uri = Some("https://example.net/notify.xml".to_string());
assert!(matches!(
runtime
.request_publication_point_repo(&ca1, 0)
.expect("request ca1"),
super::RepoSyncRequestStatus::Pending { .. }
));
assert!(matches!(
runtime
.request_publication_point_repo(&ca2, 0)
.expect("request ca2"),
super::RepoSyncRequestStatus::Pending { .. }
));
let started = Instant::now();
while count.load(Ordering::SeqCst) < 2 && started.elapsed() < Duration::from_secs(1) {
std::thread::sleep(Duration::from_millis(5));
}
assert_eq!(count.load(Ordering::SeqCst), 2);
let events = runtime
.drain_repo_results_timeout(Duration::from_millis(0), 8)
.expect("drain events");
assert_eq!(events.len(), 2);
assert_eq!(
events
.iter()
.map(|event| event.completions.len())
.sum::<usize>(),
2
);
}
#[test] #[test]
fn phase1_runtime_reset_run_state_clears_completed_transport_reuse() { fn phase1_runtime_reset_run_state_clears_completed_transport_reuse() {
let count = Arc::new(AtomicUsize::new(0)); let count = Arc::new(AtomicUsize::new(0));

View File

@ -284,6 +284,7 @@ impl TransportStateTables {
rsync_failure_scope_uri: None, rsync_failure_scope_uri: None,
repo_identity: identity.clone(), repo_identity: identity.clone(),
mode: RepoTransportMode::Rrdp, mode: RepoTransportMode::Rrdp,
retry_short_timeout: false,
tal_id: requester.tal_id.clone(), tal_id: requester.tal_id.clone(),
rir_id: requester.rir_id.clone(), rir_id: requester.rir_id.clone(),
validation_time, validation_time,
@ -460,6 +461,7 @@ impl TransportStateTables {
rsync_failure_scope_uri: rsync_failure_scope_uri.clone(), rsync_failure_scope_uri: rsync_failure_scope_uri.clone(),
repo_identity: identity.clone(), repo_identity: identity.clone(),
mode: RepoTransportMode::Rsync, mode: RepoTransportMode::Rsync,
retry_short_timeout: false,
tal_id: requester.tal_id.clone(), tal_id: requester.tal_id.clone(),
rir_id: requester.rir_id.clone(), rir_id: requester.rir_id.clone(),
validation_time, validation_time,
@ -557,6 +559,7 @@ impl TransportStateTables {
rsync_failure_scope_uri: record.rsync_failure_scope_key.clone(), rsync_failure_scope_uri: record.rsync_failure_scope_key.clone(),
repo_identity: record.identity.clone(), repo_identity: record.identity.clone(),
mode: RepoTransportMode::Rsync, mode: RepoTransportMode::Rsync,
retry_short_timeout: false,
tal_id: first_requester.tal_id.clone(), tal_id: first_requester.tal_id.clone(),
rir_id: first_requester.rir_id.clone(), rir_id: first_requester.rir_id.clone(),
validation_time: record.validation_time, validation_time: record.validation_time,

View File

@ -12,6 +12,9 @@ use crate::analysis::timing::TimingHandle;
use crate::audit_downloads::DownloadLogHandle; use crate::audit_downloads::DownloadLogHandle;
use crate::current_repo_index::CurrentRepoIndexHandle; use crate::current_repo_index::CurrentRepoIndexHandle;
use crate::fetch::rsync::RsyncFetcher; use crate::fetch::rsync::RsyncFetcher;
use crate::fetch::rsync_system::{
RsyncFailFastProfile, with_scoped_rsync_fail_fast_profile, with_scoped_rsync_timeout_override,
};
use crate::policy::Policy; use crate::policy::Policy;
use crate::storage::RocksStore; use crate::storage::RocksStore;
use crate::sync::repo::{ use crate::sync::repo::{
@ -19,6 +22,8 @@ use crate::sync::repo::{
}; };
use crate::sync::rrdp::Fetcher; use crate::sync::rrdp::Fetcher;
const RETRY_SHORT_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct RepoWorkerPoolConfig { pub struct RepoWorkerPoolConfig {
pub max_workers: usize, pub max_workers: usize,
@ -75,14 +80,28 @@ impl<H: Fetcher + 'static> RepoTransportExecutor for LiveRrdpTransportExecutor<H
.notification_uri .notification_uri
.as_deref() .as_deref()
.expect("rrdp transport requires notification uri"); .expect("rrdp transport requires notification uri");
match run_rrdp_transport( let sync_result = if task.retry_short_timeout {
self.store.as_ref(), crate::fetch::http::with_scoped_http_timeout_override(RETRY_SHORT_TIMEOUT, || {
notification_uri, run_rrdp_transport(
Some(&self.current_repo_index), self.store.as_ref(),
self.http_fetcher.as_ref(), notification_uri,
self.timing.as_ref(), Some(&self.current_repo_index),
self.download_log.as_ref(), self.http_fetcher.as_ref(),
) { self.timing.as_ref(),
self.download_log.as_ref(),
)
})
} else {
run_rrdp_transport(
self.store.as_ref(),
notification_uri,
Some(&self.current_repo_index),
self.http_fetcher.as_ref(),
self.timing.as_ref(),
self.download_log.as_ref(),
)
};
match sync_result {
Ok(_) => RepoTransportResultEnvelope { Ok(_) => RepoTransportResultEnvelope {
dedup_key: task.dedup_key, dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri, rsync_failure_scope_uri: task.rsync_failure_scope_uri,
@ -143,14 +162,37 @@ impl<R: RsyncFetcher + 'static> RepoTransportExecutor for LiveRsyncTransportExec
fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope { fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope {
let started = std::time::Instant::now(); let started = std::time::Instant::now();
debug_assert_eq!(task.mode, RepoTransportMode::Rsync); debug_assert_eq!(task.mode, RepoTransportMode::Rsync);
match run_rsync_transport( let sync_result = if task.retry_short_timeout {
self.store.as_ref(), with_scoped_rsync_timeout_override(RETRY_SHORT_TIMEOUT, || {
&task.repo_identity.rsync_base_uri, with_scoped_rsync_fail_fast_profile(
Some(&self.current_repo_index), RsyncFailFastProfile {
self.rsync_fetcher.as_ref(), initial_wall_clock_timeout: RETRY_SHORT_TIMEOUT,
self.timing.as_ref(), max_wall_clock_timeout: RETRY_SHORT_TIMEOUT,
self.download_log.as_ref(), max_attempts: 1,
) { },
|| {
run_rsync_transport(
self.store.as_ref(),
&task.repo_identity.rsync_base_uri,
Some(&self.current_repo_index),
self.rsync_fetcher.as_ref(),
self.timing.as_ref(),
self.download_log.as_ref(),
)
},
)
})
} else {
run_rsync_transport(
self.store.as_ref(),
&task.repo_identity.rsync_base_uri,
Some(&self.current_repo_index),
self.rsync_fetcher.as_ref(),
self.timing.as_ref(),
self.download_log.as_ref(),
)
};
match sync_result {
Ok(_) => RepoTransportResultEnvelope { Ok(_) => RepoTransportResultEnvelope {
dedup_key: task.dedup_key, dedup_key: task.dedup_key,
rsync_failure_scope_uri: task.rsync_failure_scope_uri, rsync_failure_scope_uri: task.rsync_failure_scope_uri,
@ -667,6 +709,7 @@ mod tests {
rsync_failure_scope_uri: None, rsync_failure_scope_uri: None,
repo_identity: RepoIdentity::new(Some(notification_uri.to_string()), rsync_base_uri), repo_identity: RepoIdentity::new(Some(notification_uri.to_string()), rsync_base_uri),
mode: RepoTransportMode::Rrdp, mode: RepoTransportMode::Rrdp,
retry_short_timeout: false,
tal_id: "arin".to_string(), tal_id: "arin".to_string(),
rir_id: "arin".to_string(), rir_id: "arin".to_string(),
validation_time: time::OffsetDateTime::UNIX_EPOCH, validation_time: time::OffsetDateTime::UNIX_EPOCH,
@ -692,6 +735,7 @@ mod tests {
rsync_failure_scope_uri: None, rsync_failure_scope_uri: None,
repo_identity: RepoIdentity::new(None, rsync_base_uri), repo_identity: RepoIdentity::new(None, rsync_base_uri),
mode: RepoTransportMode::Rsync, mode: RepoTransportMode::Rsync,
retry_short_timeout: false,
tal_id: "arin".to_string(), tal_id: "arin".to_string(),
rir_id: "arin".to_string(), rir_id: "arin".to_string(),
validation_time: time::OffsetDateTime::UNIX_EPOCH, validation_time: time::OffsetDateTime::UNIX_EPOCH,

View File

@ -125,8 +125,9 @@ impl GlobalRunCoordinator {
rsync_scope_uri: String, rsync_scope_uri: String,
rsync_failure_scope_uri: Option<String>, rsync_failure_scope_uri: Option<String>,
sync_preference: SyncPreference, sync_preference: SyncPreference,
retry_short_timeout: bool,
) -> TransportRequestAction { ) -> TransportRequestAction {
let action = self.transport_tables.register_transport_request( let mut action = self.transport_tables.register_transport_request(
identity, identity,
requester, requester,
validation_time, validation_time,
@ -135,8 +136,11 @@ impl GlobalRunCoordinator {
rsync_failure_scope_uri, rsync_failure_scope_uri,
sync_preference, sync_preference,
); );
match &action { match &mut action {
TransportRequestAction::Enqueue(task) => { TransportRequestAction::Enqueue(task) => {
if retry_short_timeout {
task.retry_short_timeout = true;
}
self.stats.repo_tasks_total += 1; self.stats.repo_tasks_total += 1;
self.pending_transport_tasks.push_back(task.clone()); self.pending_transport_tasks.push_back(task.clone());
self.stats.repo_queue_depth = self.pending_transport_tasks.len(); self.stats.repo_queue_depth = self.pending_transport_tasks.len();
@ -308,6 +312,7 @@ mod tests {
"rsync://example.test/repo/".to_string(), "rsync://example.test/repo/".to_string(),
None, None,
SyncPreference::RrdpThenRsync, SyncPreference::RrdpThenRsync,
false,
); );
assert!(matches!( assert!(matches!(
action, action,

View File

@ -40,6 +40,10 @@ pub struct TransportPrefetchRequest {
pub rsync_failure_scope_uri: Option<String>, pub rsync_failure_scope_uri: Option<String>,
pub repo_identity: TransportPrefetchRepoIdentity, pub repo_identity: TransportPrefetchRepoIdentity,
pub mode: TransportPrefetchMode, pub mode: TransportPrefetchMode,
#[serde(default)]
pub last_result: Option<TransportPrefetchLastResult>,
#[serde(default)]
pub last_rsync_result: Option<TransportPrefetchLastResult>,
pub tal_id: String, pub tal_id: String,
pub rir_id: String, pub rir_id: String,
pub priority: u8, pub priority: u8,
@ -84,6 +88,8 @@ impl TransportPrefetchRequest {
rsync_failure_scope_uri, rsync_failure_scope_uri,
repo_identity: TransportPrefetchRepoIdentity::from_identity(identity), repo_identity: TransportPrefetchRepoIdentity::from_identity(identity),
mode, mode,
last_result: None,
last_rsync_result: None,
tal_id: requester.tal_id.clone(), tal_id: requester.tal_id.clone(),
rir_id: requester.rir_id.clone(), rir_id: requester.rir_id.clone(),
priority, priority,
@ -98,6 +104,8 @@ impl TransportPrefetchRequest {
rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(),
repo_identity: TransportPrefetchRepoIdentity::from_identity(&task.repo_identity), repo_identity: TransportPrefetchRepoIdentity::from_identity(&task.repo_identity),
mode: TransportPrefetchMode::from_mode(task.mode), mode: TransportPrefetchMode::from_mode(task.mode),
last_result: None,
last_rsync_result: None,
tal_id: task.tal_id.clone(), tal_id: task.tal_id.clone(),
rir_id: task.rir_id.clone(), rir_id: task.rir_id.clone(),
priority: task.priority, priority: task.priority,
@ -133,6 +141,20 @@ impl TransportPrefetchRequest {
fn recorder_key(&self) -> String { fn recorder_key(&self) -> String {
self.dedup_key.stable_key() self.dedup_key.stable_key()
} }
pub fn retry_short_timeout(&self) -> bool {
matches!(
self.last_result,
Some(TransportPrefetchLastResult { ok: false })
)
}
pub fn retry_short_rsync_timeout(&self) -> bool {
matches!(
self.last_rsync_result,
Some(TransportPrefetchLastResult { ok: false })
) || (self.mode == TransportPrefetchMode::Rsync && self.retry_short_timeout())
}
} }
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
@ -197,6 +219,11 @@ impl TransportPrefetchMode {
} }
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransportPrefetchLastResult {
pub ok: bool,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransportPrefetchRequester { pub struct TransportPrefetchRequester {
pub tal_id: String, pub tal_id: String,
@ -234,6 +261,8 @@ impl TransportPrefetchRequester {
#[derive(Clone, Debug, Default, PartialEq, Eq)] #[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct TransportPrefetchRecorder { pub struct TransportPrefetchRecorder {
requests_by_key: BTreeMap<String, TransportPrefetchRequest>, requests_by_key: BTreeMap<String, TransportPrefetchRequest>,
results_by_key: BTreeMap<String, TransportPrefetchLastResult>,
rsync_results_by_scope: BTreeMap<String, TransportPrefetchLastResult>,
request_order: Vec<String>, request_order: Vec<String>,
} }
@ -263,6 +292,31 @@ impl TransportPrefetchRecorder {
self.record_request(request); self.record_request(request);
} }
pub fn record_result(&mut self, result: &crate::parallel::types::RepoTransportResultEnvelope) {
let key = TransportPrefetchDedupKey::from_repo_key(&result.dedup_key).stable_key();
let last_result = TransportPrefetchLastResult {
ok: matches!(
result.result,
crate::parallel::types::RepoTransportResultKind::Success { .. }
),
};
self.results_by_key.insert(key.clone(), last_result);
if let Some(request) = self.requests_by_key.get_mut(&key) {
request.last_result = Some(last_result);
}
if let crate::parallel::types::RepoDedupKey::RsyncScope { rsync_scope_uri } =
&result.dedup_key
{
self.rsync_results_by_scope
.insert(rsync_scope_uri.clone(), last_result);
for request in self.requests_by_key.values_mut() {
if request.rsync_scope_uri == *rsync_scope_uri {
request.last_rsync_result = Some(last_result);
}
}
}
}
pub fn snapshot(&self, sync_preference: SyncPreference) -> TransportPrefetchSnapshot { pub fn snapshot(&self, sync_preference: SyncPreference) -> TransportPrefetchSnapshot {
TransportPrefetchSnapshot::new( TransportPrefetchSnapshot::new(
sync_preference, sync_preference,
@ -282,6 +336,16 @@ impl TransportPrefetchRecorder {
let key = request.recorder_key(); let key = request.recorder_key();
if !self.requests_by_key.contains_key(&key) { if !self.requests_by_key.contains_key(&key) {
self.request_order.push(key.clone()); self.request_order.push(key.clone());
let mut request = request;
if request.last_result.is_none() {
request.last_result = self.results_by_key.get(&key).copied();
}
if request.last_rsync_result.is_none() {
request.last_rsync_result = self
.rsync_results_by_scope
.get(&request.rsync_scope_uri)
.copied();
}
self.requests_by_key.insert(key, request); self.requests_by_key.insert(key, request);
} }
} }
@ -322,6 +386,7 @@ mod tests {
"rsync://example.test/repo/", "rsync://example.test/repo/",
), ),
mode: RepoTransportMode::Rrdp, mode: RepoTransportMode::Rrdp,
retry_short_timeout: false,
tal_id: "apnic".to_string(), tal_id: "apnic".to_string(),
rir_id: "apnic".to_string(), rir_id: "apnic".to_string(),
validation_time: time::OffsetDateTime::UNIX_EPOCH, validation_time: time::OffsetDateTime::UNIX_EPOCH,
@ -397,4 +462,49 @@ mod tests {
assert!(snapshot.is_compatible_with(SyncPreference::RrdpThenRsync)); assert!(snapshot.is_compatible_with(SyncPreference::RrdpThenRsync));
assert!(!snapshot.is_compatible_with(SyncPreference::RsyncOnly)); assert!(!snapshot.is_compatible_with(SyncPreference::RsyncOnly));
} }
#[test]
fn recorder_marks_failed_rrdp_and_rsync_for_next_run_short_timeouts() {
let mut recorder = TransportPrefetchRecorder::default();
let rrdp_task = task(
"https://example.test/notification.xml",
"rsync://example.test/repo/a.mft",
);
recorder.record_task(&rrdp_task, "rsync://example.test/repo/".to_string());
recorder.record_result(&crate::parallel::types::RepoTransportResultEnvelope {
dedup_key: RepoDedupKey::RrdpNotify {
notification_uri: "https://example.test/notification.xml".to_string(),
},
rsync_failure_scope_uri: None,
repo_identity: rrdp_task.repo_identity.clone(),
mode: RepoTransportMode::Rrdp,
tal_id: "apnic".to_string(),
rir_id: "apnic".to_string(),
timing_ms: 1,
result: crate::parallel::types::RepoTransportResultKind::Failed {
detail: "timeout".to_string(),
warnings: Vec::new(),
},
});
recorder.record_result(&crate::parallel::types::RepoTransportResultEnvelope {
dedup_key: RepoDedupKey::RsyncScope {
rsync_scope_uri: "rsync://example.test/repo/".to_string(),
},
rsync_failure_scope_uri: None,
repo_identity: rrdp_task.repo_identity.clone(),
mode: RepoTransportMode::Rsync,
tal_id: "apnic".to_string(),
rir_id: "apnic".to_string(),
timing_ms: 1,
result: crate::parallel::types::RepoTransportResultKind::Failed {
detail: "timeout".to_string(),
warnings: Vec::new(),
},
});
let snapshot = recorder.snapshot(SyncPreference::RrdpThenRsync);
assert_eq!(snapshot.requests.len(), 1);
assert!(snapshot.requests[0].retry_short_timeout());
assert!(snapshot.requests[0].retry_short_rsync_timeout());
}
} }

View File

@ -108,6 +108,7 @@ pub struct RepoTransportTask {
pub rsync_failure_scope_uri: Option<String>, pub rsync_failure_scope_uri: Option<String>,
pub repo_identity: RepoIdentity, pub repo_identity: RepoIdentity,
pub mode: RepoTransportMode, pub mode: RepoTransportMode,
pub retry_short_timeout: bool,
pub tal_id: String, pub tal_id: String,
pub rir_id: String, pub rir_id: String,
pub validation_time: time::OffsetDateTime, pub validation_time: time::OffsetDateTime,
@ -223,6 +224,7 @@ impl RepoSyncTask {
rsync_failure_scope_uri: None, rsync_failure_scope_uri: None,
repo_identity: self.repo_key.as_identity(), repo_identity: self.repo_key.as_identity(),
mode, mode,
retry_short_timeout: false,
tal_id: self.tal_id.clone(), tal_id: self.tal_id.clone(),
rir_id: self.rir_id.clone(), rir_id: self.rir_id.clone(),
validation_time: self.validation_time, validation_time: self.validation_time,

View File

@ -14,3 +14,100 @@ pub fn resolve_object_from_cache_or_report(
) -> QueryDbResult<Option<ObjectInstanceRecord>> { ) -> QueryDbResult<Option<ObjectInstanceRecord>> {
Ok(None) Ok(None)
} }
#[cfg(test)]
mod tests {
use std::fs;
use serde_json::json;
use super::*;
use crate::query_db::{ArtifactIndexerConfig, index_artifacts};
#[test]
fn report_path_for_run_resolves_indexed_run_directory() {
let temp = tempfile::tempdir().expect("tempdir");
let run_dir = temp.path().join("runs/run_0001");
fs::create_dir_all(&run_dir).expect("run dir");
write_sample_run(&run_dir);
let query_db_path = temp.path().join("query-db");
index_artifacts(&ArtifactIndexerConfig {
query_db_path: query_db_path.clone(),
run_root: Some(temp.path().to_path_buf()),
run_dir: None,
repo_bytes_db_path: None,
projection_entry_limit: 50,
min_run_seq: None,
retain_indexed_runs: None,
})
.expect("index");
let db = QueryDb::open(&query_db_path).expect("open db");
assert_eq!(
report_path_for_run(&db, "run_0001").expect("report path"),
Some(run_dir.join("report.json"))
);
assert!(
report_path_for_run(&db, "missing")
.expect("missing report path")
.is_none()
);
assert!(
resolve_object_from_cache_or_report(&db, "run_0001")
.expect("placeholder resolver")
.is_none()
);
}
fn write_sample_run(run_dir: &Path) {
let report = json!({
"format_version": 2,
"meta": {"validation_time_rfc3339_utc": "2026-06-20T00:00:00Z"},
"tree": {"warnings": []},
"publication_points": [
{
"node_id": 1,
"rsync_base_uri": "rsync://repo.example/rpki/",
"manifest_rsync_uri": "rsync://repo.example/rpki/m.mft",
"publication_point_rsync_uri": "rsync://repo.example/rpki/",
"rrdp_notification_uri": "https://repo.example/rrdp/notification.xml",
"source": "rrdp",
"repo_sync_source": "rrdp",
"repo_sync_phase": "rrdp_delta",
"repo_sync_duration_ms": 7,
"repo_terminal_state": "fresh",
"warnings": [],
"objects": [
{"rsync_uri":"rsync://repo.example/rpki/m.mft","sha256_hex":"11","kind":"manifest","result":"ok"}
]
}
],
"vrps": [],
"aspas": [],
"downloads": [],
"download_stats": {},
"repo_sync_stats": {}
});
fs::write(
run_dir.join("report.json"),
serde_json::to_vec(&report).unwrap(),
)
.expect("report");
let summary = json!({
"status": "success",
"runId": "run_0001",
"runSeq": 1,
"startedAtRfc3339Utc": "2026-06-20T00:00:00Z",
"finishedAtRfc3339Utc": "2026-06-20T00:00:01Z",
"wallMs": 1000,
"reportCounts": {"vrps": 0, "aspas": 0, "publicationPoints": 1, "warnings": 0}
});
fs::write(
run_dir.join("run-summary.json"),
serde_json::to_vec(&summary).unwrap(),
)
.expect("summary");
fs::write(run_dir.join("stage-timing.json"), b"{}").expect("stage");
}
}

View File

@ -4,6 +4,7 @@ mod pack;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex};
use base64::Engine; use base64::Engine;
use rocksdb::{ColumnFamily, DB, Direction, IteratorMode, Options, WriteBatch}; use rocksdb::{ColumnFamily, DB, Direction, IteratorMode, Options, WriteBatch};
@ -49,6 +50,21 @@ pub struct RocksStore {
db: DB, db: DB,
external_raw_store: Option<ExternalRawStoreDb>, external_raw_store: Option<ExternalRawStoreDb>,
external_repo_bytes: Option<ExternalRepoBytesDb>, external_repo_bytes: Option<ExternalRepoBytesDb>,
publication_point_cache_projection_index: Mutex<PublicationPointCacheProjectionIndexState>,
}
enum PublicationPointCacheProjectionIndexState {
Uninitialized,
Disabled,
BuildingFromEmpty {
index: HashMap<String, Arc<[u8]>>,
bytes: usize,
limit: usize,
},
Loaded {
index: HashMap<String, Arc<[u8]>>,
bytes: usize,
},
} }
fn process_vm_rss_kb() -> Option<u64> { fn process_vm_rss_kb() -> Option<u64> {
@ -76,6 +92,28 @@ const ROCKSDB_MEMORY_PROPERTY_NAMES: &[(&str, &str)] = &[
("background_errors", "rocksdb.background-errors"), ("background_errors", "rocksdb.background-errors"),
]; ];
const PP_CACHE_RAW_INDEX_ENV: &str = "RPKI_PP_CACHE_RAW_INDEX";
const PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_ENV: &str =
"RPKI_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES";
const DEFAULT_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES: usize = 32 * 1024 * 1024;
fn pp_cache_raw_index_enabled() -> bool {
match std::env::var(PP_CACHE_RAW_INDEX_ENV) {
Ok(value) => !matches!(
value.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "off" | "no"
),
Err(_) => true,
}
}
fn pp_cache_raw_index_empty_build_limit_bytes() -> usize {
std::env::var(PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_ENV)
.ok()
.and_then(|value| value.trim().parse::<usize>().ok())
.unwrap_or(DEFAULT_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES)
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct RocksDbMemoryProperties { pub struct RocksDbMemoryProperties {
pub cur_size_all_mem_tables: Option<u64>, pub cur_size_all_mem_tables: Option<u64>,
@ -2313,6 +2351,9 @@ impl RocksStore {
db, db,
external_raw_store: None, external_raw_store: None,
external_repo_bytes: None, external_repo_bytes: None,
publication_point_cache_projection_index: Mutex::new(
PublicationPointCacheProjectionIndexState::Uninitialized,
),
}) })
} }
@ -2715,7 +2756,9 @@ impl RocksStore {
publication_point_projection, publication_point_projection,
None, None,
)?; )?;
self.write_batch(batch) self.write_batch(batch)?;
self.update_publication_point_cache_projection_index(publication_point_projection)?;
Ok(())
} }
pub fn replace_vcir_and_manifest_replay_meta( pub fn replace_vcir_and_manifest_replay_meta(
@ -2791,6 +2834,7 @@ impl RocksStore {
let write_batch_started = std::time::Instant::now(); let write_batch_started = std::time::Instant::now();
self.write_batch(batch)?; self.write_batch(batch)?;
self.update_publication_point_cache_projection_index(publication_point_projection)?;
timing.write_batch_ms = write_batch_started.elapsed().as_millis() as u64; timing.write_batch_ms = write_batch_started.elapsed().as_millis() as u64;
timing.rss_after_write_batch_kb = process_vm_rss_kb(); timing.rss_after_write_batch_kb = process_vm_rss_kb();
Ok(timing) Ok(timing)
@ -2853,6 +2897,100 @@ impl RocksStore {
pub fn get_publication_point_cache_projection( pub fn get_publication_point_cache_projection(
&self, &self,
manifest_rsync_uri: &str, manifest_rsync_uri: &str,
) -> StorageResult<Option<PublicationPointCacheProjection>> {
self.get_publication_point_cache_projection_from_db(manifest_rsync_uri)
}
pub fn get_publication_point_cache_projection_cached(
&self,
manifest_rsync_uri: &str,
) -> StorageResult<Option<PublicationPointCacheProjection>> {
let bytes = {
let mut guard = self
.publication_point_cache_projection_index
.lock()
.map_err(|e| {
StorageError::RocksDb(format!(
"publication point cache index lock poisoned: {e}"
))
})?;
if matches!(
*guard,
PublicationPointCacheProjectionIndexState::Uninitialized
) {
let load_started = std::time::Instant::now();
let raw_index_enabled = pp_cache_raw_index_enabled();
let (index, bytes) = if raw_index_enabled {
self.load_publication_point_cache_projection_index()?
} else {
(HashMap::new(), 0)
};
*guard = if !raw_index_enabled {
crate::progress_log::emit(
"publication_point_cache_raw_index",
serde_json::json!({
"state": "disabled_by_env",
"entries": 0,
"bytes": 0,
"load_ms": load_started.elapsed().as_millis() as u64,
}),
);
PublicationPointCacheProjectionIndexState::Disabled
} else if index.is_empty() {
let limit = pp_cache_raw_index_empty_build_limit_bytes();
crate::progress_log::emit(
"publication_point_cache_raw_index",
serde_json::json!({
"state": "empty_building_bounded",
"entries": 0,
"bytes": 0,
"empty_build_limit_bytes": limit,
"load_ms": load_started.elapsed().as_millis() as u64,
}),
);
PublicationPointCacheProjectionIndexState::BuildingFromEmpty {
index,
bytes: 0,
limit,
}
} else {
crate::progress_log::emit(
"publication_point_cache_raw_index",
serde_json::json!({
"state": "loaded",
"entries": index.len(),
"bytes": bytes,
"load_ms": load_started.elapsed().as_millis() as u64,
}),
);
PublicationPointCacheProjectionIndexState::Loaded { index, bytes }
};
}
match &*guard {
PublicationPointCacheProjectionIndexState::Loaded { index, .. } => {
index.get(manifest_rsync_uri).cloned()
}
PublicationPointCacheProjectionIndexState::BuildingFromEmpty { index, .. } => {
index.get(manifest_rsync_uri).cloned()
}
PublicationPointCacheProjectionIndexState::Disabled => None,
PublicationPointCacheProjectionIndexState::Uninitialized => None,
}
};
let Some(bytes) = bytes else {
return Ok(None);
};
let projection = decode_cbor::<PublicationPointCacheProjection>(
bytes.as_ref(),
"publication_point_cache_projection",
)?;
projection.validate_internal()?;
Ok(Some(projection))
}
fn get_publication_point_cache_projection_from_db(
&self,
manifest_rsync_uri: &str,
) -> StorageResult<Option<PublicationPointCacheProjection>> { ) -> StorageResult<Option<PublicationPointCacheProjection>> {
let cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?; let cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?;
let key = publication_point_cache_projection_key(manifest_rsync_uri); let key = publication_point_cache_projection_key(manifest_rsync_uri);
@ -2871,6 +3009,72 @@ impl RocksStore {
Ok(Some(projection)) Ok(Some(projection))
} }
fn load_publication_point_cache_projection_index(
&self,
) -> StorageResult<(HashMap<String, Arc<[u8]>>, usize)> {
let cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?;
let mode = IteratorMode::Start;
let mut index = HashMap::new();
let mut bytes_total = 0usize;
for res in self.db.iterator_cf(cf, mode) {
let (key, value) = res.map_err(|e| StorageError::RocksDb(e.to_string()))?;
let Some(manifest_rsync_uri) =
publication_point_cache_projection_key_manifest_uri(&key)
else {
continue;
};
bytes_total = bytes_total.saturating_add(value.len());
index.insert(manifest_rsync_uri, Arc::<[u8]>::from(value.to_vec()));
}
Ok((index, bytes_total))
}
fn update_publication_point_cache_projection_index(
&self,
projection: Option<&PublicationPointCacheProjection>,
) -> StorageResult<()> {
let Some(projection) = projection else {
return Ok(());
};
let mut guard = self
.publication_point_cache_projection_index
.lock()
.map_err(|e| {
StorageError::RocksDb(format!("publication point cache index lock poisoned: {e}"))
})?;
let bytes = encode_cbor(projection, "publication_point_cache_projection")?;
match &mut *guard {
PublicationPointCacheProjectionIndexState::Loaded {
index,
bytes: total_bytes,
} => {
*total_bytes = total_bytes.saturating_add(bytes.len());
index.insert(
projection.manifest_rsync_uri.clone(),
Arc::<[u8]>::from(bytes),
);
}
PublicationPointCacheProjectionIndexState::BuildingFromEmpty {
index,
bytes: total_bytes,
limit,
} => {
if total_bytes.saturating_add(bytes.len()) <= *limit {
*total_bytes += bytes.len();
index.insert(
projection.manifest_rsync_uri.clone(),
Arc::<[u8]>::from(bytes),
);
} else {
*guard = PublicationPointCacheProjectionIndexState::Disabled;
}
}
PublicationPointCacheProjectionIndexState::Uninitialized
| PublicationPointCacheProjectionIndexState::Disabled => {}
}
Ok(())
}
pub fn put_transport_prefetch_snapshot( pub fn put_transport_prefetch_snapshot(
&self, &self,
snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot, snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot,

View File

@ -38,6 +38,12 @@ pub(super) fn publication_point_cache_projection_key(manifest_rsync_uri: &str) -
format!("{PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}") format!("{PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}")
} }
pub(super) fn publication_point_cache_projection_key_manifest_uri(key: &[u8]) -> Option<String> {
let key = std::str::from_utf8(key).ok()?;
key.strip_prefix(PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX)
.map(str::to_string)
}
pub(super) fn rrdp_source_key(notify_uri: &str) -> String { pub(super) fn rrdp_source_key(notify_uri: &str) -> String {
format!("{RRDP_SOURCE_KEY_PREFIX}{notify_uri}") format!("{RRDP_SOURCE_KEY_PREFIX}{notify_uri}")
} }

View File

@ -375,6 +375,85 @@ fn publication_point_cache_projection_roundtrips_with_vcir() {
assert_eq!(got_projection.related_objects.len(), 2); assert_eq!(got_projection.related_objects.len(), 2);
} }
#[test]
fn publication_point_cache_projection_index_updates_after_first_read() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let vcir = sample_vcir("rsync://example.test/repo/current.mft");
let mut projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
"rsync://example.test/repo/".to_string(),
Some("rsync://example.test/repo/ca.cer".to_string()),
sha256_32(b"ca-cert"),
sha256_32(b"manifest-old"),
sha256_32(b"ta-context"),
sha256_32(b"parent-context"),
sha256_32(b"policy"),
)
.expect("build old publication point projection");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put old projection");
let got_old = store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("get old projection")
.expect("old projection exists");
assert_eq!(got_old.manifest_sha256, sha256_32(b"manifest-old"));
projection.manifest_sha256 = sha256_32(b"manifest-new");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put new projection");
let got_new = store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("get new projection")
.expect("new projection exists");
assert_eq!(got_new.manifest_sha256, sha256_32(b"manifest-new"));
}
#[test]
fn publication_point_cache_projection_cached_empty_db_accepts_bounded_new_entries() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let vcir = sample_vcir("rsync://example.test/repo/current.mft");
let projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
"rsync://example.test/repo/".to_string(),
Some("rsync://example.test/repo/ca.cer".to_string()),
sha256_32(b"ca-cert"),
sha256_32(b"manifest"),
sha256_32(b"ta-context"),
sha256_32(b"parent-context"),
sha256_32(b"policy"),
)
.expect("build publication point projection");
assert!(
store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("cached empty lookup")
.is_none()
);
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put projection after cached empty lookup");
assert_eq!(
store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("cached lookup sees bounded new entry")
.expect("cached projection exists"),
projection
);
assert_eq!(
store
.get_publication_point_cache_projection(&vcir.manifest_rsync_uri)
.expect("direct db lookup")
.expect("projection exists"),
projection
);
}
#[test] #[test]
fn publication_point_cache_projection_rejects_version_mismatch() { fn publication_point_cache_projection_rejects_version_mismatch() {
let vcir = sample_vcir("rsync://example.test/repo/current.mft"); let vcir = sample_vcir("rsync://example.test/repo/current.mft");
@ -967,6 +1046,8 @@ fn transport_prefetch_snapshot_roundtrips() {
rsync_base_uri: "rsync://example.test/repo/".to_string(), rsync_base_uri: "rsync://example.test/repo/".to_string(),
}, },
mode: TransportPrefetchMode::Rrdp, mode: TransportPrefetchMode::Rrdp,
last_result: None,
last_rsync_result: None,
tal_id: "apnic".to_string(), tal_id: "apnic".to_string(),
rir_id: "apnic".to_string(), rir_id: "apnic".to_string(),
priority: 0, priority: 0,

View File

@ -72,6 +72,44 @@ pub fn discover_root_ca_instance_from_tal_url_with_strict_name(
) )
} }
pub fn discover_root_ca_instance_from_tal_url_with_fetchers(
http_fetcher: &dyn Fetcher,
rsync_fetcher: &dyn RsyncFetcher,
tal_url: &str,
) -> Result<DiscoveredRootCaInstance, FromTalError> {
let tal_bytes = http_fetcher
.fetch(tal_url)
.map_err(FromTalError::TalFetch)?;
let tal = Tal::decode_bytes(&tal_bytes)?;
discover_root_ca_instance_from_tal_with_fetchers_impl(
http_fetcher,
rsync_fetcher,
tal,
Some(tal_url.to_string()),
false,
TaUriOrder::HttpFirst,
)
}
pub fn discover_root_ca_instance_from_tal_url_with_fetchers_strict_name(
http_fetcher: &dyn Fetcher,
rsync_fetcher: &dyn RsyncFetcher,
tal_url: &str,
) -> Result<DiscoveredRootCaInstance, FromTalError> {
let tal_bytes = http_fetcher
.fetch(tal_url)
.map_err(FromTalError::TalFetch)?;
let tal = Tal::decode_bytes(&tal_bytes)?;
discover_root_ca_instance_from_tal_with_fetchers_impl(
http_fetcher,
rsync_fetcher,
tal,
Some(tal_url.to_string()),
true,
TaUriOrder::HttpFirst,
)
}
pub fn discover_root_ca_instance_from_tal( pub fn discover_root_ca_instance_from_tal(
http_fetcher: &dyn Fetcher, http_fetcher: &dyn Fetcher,
tal: Tal, tal: Tal,
@ -150,6 +188,7 @@ pub fn discover_root_ca_instance_from_tal_with_fetchers(
tal, tal,
tal_url, tal_url,
false, false,
TaUriOrder::RsyncFirst,
) )
} }
@ -165,15 +204,23 @@ pub fn discover_root_ca_instance_from_tal_with_fetchers_strict_name(
tal, tal,
tal_url, tal_url,
true, true,
TaUriOrder::RsyncFirst,
) )
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum TaUriOrder {
HttpFirst,
RsyncFirst,
}
fn discover_root_ca_instance_from_tal_with_fetchers_impl( fn discover_root_ca_instance_from_tal_with_fetchers_impl(
http_fetcher: &dyn Fetcher, http_fetcher: &dyn Fetcher,
rsync_fetcher: &dyn RsyncFetcher, rsync_fetcher: &dyn RsyncFetcher,
tal: Tal, tal: Tal,
tal_url: Option<String>, tal_url: Option<String>,
strict_name: bool, strict_name: bool,
ta_uri_order: TaUriOrder,
) -> Result<DiscoveredRootCaInstance, FromTalError> { ) -> Result<DiscoveredRootCaInstance, FromTalError> {
if tal.ta_uris.is_empty() { if tal.ta_uris.is_empty() {
return Err(FromTalError::NoTaUris); return Err(FromTalError::NoTaUris);
@ -181,7 +228,7 @@ fn discover_root_ca_instance_from_tal_with_fetchers_impl(
let mut last_err: Option<String> = None; let mut last_err: Option<String> = None;
let mut ta_uris = tal.ta_uris.clone(); let mut ta_uris = tal.ta_uris.clone();
ta_uris.sort_by_key(|uri| if uri.scheme() == "rsync" { 0 } else { 1 }); ta_uris.sort_by_key(|uri| ta_uri_priority(uri, ta_uri_order));
for ta_uri in ta_uris.iter() { for ta_uri in ta_uris.iter() {
let ta_der = match fetch_ta_der(http_fetcher, rsync_fetcher, ta_uri) { let ta_der = match fetch_ta_der(http_fetcher, rsync_fetcher, ta_uri) {
Ok(b) => b, Ok(b) => b,
@ -221,6 +268,16 @@ fn discover_root_ca_instance_from_tal_with_fetchers_impl(
}))) })))
} }
fn ta_uri_priority(uri: &Url, order: TaUriOrder) -> u8 {
match (order, uri.scheme()) {
(TaUriOrder::HttpFirst, "https" | "http") => 0,
(TaUriOrder::HttpFirst, "rsync") => 1,
(TaUriOrder::RsyncFirst, "rsync") => 0,
(TaUriOrder::RsyncFirst, "https" | "http") => 1,
_ => 2,
}
}
fn fetch_ta_der( fn fetch_ta_der(
http_fetcher: &dyn Fetcher, http_fetcher: &dyn Fetcher,
rsync_fetcher: &dyn RsyncFetcher, rsync_fetcher: &dyn RsyncFetcher,
@ -250,41 +307,9 @@ fn fetch_ta_der_via_rsync(
rsync_fetcher: &dyn RsyncFetcher, rsync_fetcher: &dyn RsyncFetcher,
ta_rsync_uri: &str, ta_rsync_uri: &str,
) -> Result<Vec<u8>, String> { ) -> Result<Vec<u8>, String> {
let base = rsync_parent_uri(ta_rsync_uri)?; rsync_fetcher
let objects = rsync_fetcher .fetch_object(ta_rsync_uri)
.fetch_objects(&base) .map_err(|e| e.to_string())
.map_err(|e| e.to_string())?;
objects
.into_iter()
.find(|(uri, _)| uri == ta_rsync_uri)
.map(|(_, bytes)| bytes)
.ok_or_else(|| format!("TA rsync object not found in fetched subtree: {ta_rsync_uri}"))
}
fn rsync_parent_uri(ta_rsync_uri: &str) -> Result<String, String> {
let url = Url::parse(ta_rsync_uri).map_err(|e| e.to_string())?;
if url.scheme() != "rsync" {
return Err(format!("not an rsync URI: {ta_rsync_uri}"));
}
let host = url
.host_str()
.ok_or_else(|| format!("missing host in rsync URI: {ta_rsync_uri}"))?;
let segments = url
.path_segments()
.ok_or_else(|| format!("missing path in rsync URI: {ta_rsync_uri}"))?
.collect::<Vec<_>>();
if segments.is_empty() || segments.last().copied().unwrap_or_default().is_empty() {
return Err(format!(
"rsync URI must reference a file object: {ta_rsync_uri}"
));
}
let parent_segments = &segments[..segments.len() - 1];
let mut parent = format!("rsync://{host}/");
if !parent_segments.is_empty() {
parent.push_str(&parent_segments.join("/"));
parent.push('/');
}
Ok(parent)
} }
pub fn discover_root_ca_instance_from_tal_and_ta_der( pub fn discover_root_ca_instance_from_tal_and_ta_der(
@ -334,6 +359,20 @@ pub fn canonical_tal_rsync_uri_from_bytes(tal_bytes: &[u8]) -> Result<Url, FromT
mod tests { mod tests {
use super::*; use super::*;
use crate::fetch::rsync::LocalDirRsyncFetcher; use crate::fetch::rsync::LocalDirRsyncFetcher;
use std::collections::HashMap;
struct MapHttpFetcher {
map: HashMap<String, Vec<u8>>,
}
impl Fetcher for MapHttpFetcher {
fn fetch(&self, uri: &str) -> Result<Vec<u8>, String> {
self.map
.get(uri)
.cloned()
.ok_or_else(|| format!("no fixture mapped for {uri}"))
}
}
struct FailingHttpFetcher; struct FailingHttpFetcher;
@ -392,6 +431,84 @@ mod tests {
.starts_with("rsync://") .starts_with("rsync://")
); );
} }
struct PanicRsyncFetcher;
impl RsyncFetcher for PanicRsyncFetcher {
fn fetch_objects(
&self,
rsync_base_uri: &str,
) -> crate::fetch::rsync::RsyncFetchResult<Vec<(String, Vec<u8>)>> {
panic!("rsync should not be used when HTTPS TA URI is available: {rsync_base_uri}")
}
}
#[test]
fn discover_root_ca_instance_from_tal_url_with_fetchers_prefers_https_ta_uri() {
let tal_url = "https://example.test/apnic.tal";
let tal_bytes = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tal/apnic-rfc7730-https.tal"),
)
.unwrap();
let ta_der = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/ta/apnic-ta.cer"),
)
.unwrap();
let https_uri = "https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer";
let mut map = HashMap::new();
map.insert(tal_url.to_string(), tal_bytes);
map.insert(https_uri.to_string(), ta_der);
let discovery = discover_root_ca_instance_from_tal_url_with_fetchers(
&MapHttpFetcher { map },
&PanicRsyncFetcher,
tal_url,
)
.expect("discover via HTTPS TA URI");
assert_eq!(
discovery.trust_anchor.resolved_ta_uri.unwrap().as_str(),
https_uri
);
}
#[test]
fn discover_root_ca_instance_from_tal_url_with_fetchers_supports_rsync_only_tal() {
let tal_url = "https://example.test/apnic-rsync-only.tal";
let tal_bytes = std::fs::read_to_string(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/tal/apnic-rfc7730-https.tal"),
)
.unwrap();
let rsync_uri = "rsync://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer";
let key_material = tal_bytes
.split_once("\n\n")
.map(|(_, key)| key)
.expect("fixture contains TAL key material");
let rsync_only_tal = format!("{rsync_uri}\n\n{key_material}");
let ta_der = std::fs::read(
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/ta/apnic-ta.cer"),
)
.unwrap();
let td = tempfile::tempdir().unwrap();
let mirror_root = td.path().join("repository");
std::fs::create_dir_all(&mirror_root).unwrap();
std::fs::write(mirror_root.join("apnic-rpki-root-iana-origin.cer"), ta_der).unwrap();
let mut map = HashMap::new();
map.insert(tal_url.to_string(), rsync_only_tal.into_bytes());
let discovery = discover_root_ca_instance_from_tal_url_with_fetchers(
&MapHttpFetcher { map },
&LocalDirRsyncFetcher::new(mirror_root),
tal_url,
)
.expect("discover via rsync-only TAL");
assert_eq!(
discovery.trust_anchor.resolved_ta_uri.unwrap().as_str(),
rsync_uri
);
}
} }
pub fn run_root_from_tal_url_once( pub fn run_root_from_tal_url_once(
@ -402,7 +519,8 @@ pub fn run_root_from_tal_url_once(
rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher, rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher,
validation_time: time::OffsetDateTime, validation_time: time::OffsetDateTime,
) -> Result<RunFromTalOutput, FromTalError> { ) -> Result<RunFromTalOutput, FromTalError> {
let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?; let discovery =
discover_root_ca_instance_from_tal_url_with_fetchers(http_fetcher, rsync_fetcher, tal_url)?;
let run = run_publication_point_once( let run = run_publication_point_once(
store, store,

View File

@ -27,8 +27,8 @@ use crate::validation::from_tal::{
DiscoveredRootCaInstance, FromTalError, canonical_tal_rsync_uri_from_bytes, DiscoveredRootCaInstance, FromTalError, canonical_tal_rsync_uri_from_bytes,
discover_root_ca_instance_from_tal_and_ta_der, discover_root_ca_instance_from_tal_and_ta_der,
discover_root_ca_instance_from_tal_and_ta_der_with_strict_name, discover_root_ca_instance_from_tal_and_ta_der_with_strict_name,
discover_root_ca_instance_from_tal_url, discover_root_ca_instance_from_tal_url_with_fetchers,
discover_root_ca_instance_from_tal_url_with_strict_name, discover_root_ca_instance_from_tal_url_with_fetchers_strict_name,
discover_root_ca_instance_from_tal_with_fetchers, discover_root_ca_instance_from_tal_with_fetchers,
discover_root_ca_instance_from_tal_with_fetchers_strict_name, discover_root_ca_instance_from_tal_with_fetchers_strict_name,
}; };
@ -348,9 +348,17 @@ fn root_discovery_from_tal_input(
match &tal_input.source { match &tal_input.source {
TalSource::Url(url) => { TalSource::Url(url) => {
if strict_name { if strict_name {
discover_root_ca_instance_from_tal_url_with_strict_name(http_fetcher, url) discover_root_ca_instance_from_tal_url_with_fetchers_strict_name(
http_fetcher,
rsync_fetcher,
url,
)
} else { } else {
discover_root_ca_instance_from_tal_url(http_fetcher, url) discover_root_ca_instance_from_tal_url_with_fetchers(
http_fetcher,
rsync_fetcher,
url,
)
} }
} }
TalSource::DerBytes { TalSource::DerBytes {
@ -414,12 +422,17 @@ fn root_discovery_from_tal_input(
fn discover_root_ca_instance_from_tal_url_with_policy( fn discover_root_ca_instance_from_tal_url_with_policy(
policy: &crate::policy::Policy, policy: &crate::policy::Policy,
http_fetcher: &dyn Fetcher, http_fetcher: &dyn Fetcher,
rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher,
tal_url: &str, tal_url: &str,
) -> Result<DiscoveredRootCaInstance, FromTalError> { ) -> Result<DiscoveredRootCaInstance, FromTalError> {
if policy.strict.name { if policy.strict.name {
discover_root_ca_instance_from_tal_url_with_strict_name(http_fetcher, tal_url) discover_root_ca_instance_from_tal_url_with_fetchers_strict_name(
http_fetcher,
rsync_fetcher,
tal_url,
)
} else { } else {
discover_root_ca_instance_from_tal_url(http_fetcher, tal_url) discover_root_ca_instance_from_tal_url_with_fetchers(http_fetcher, rsync_fetcher, tal_url)
} }
} }
@ -544,8 +557,12 @@ pub fn run_tree_from_tal_url_serial(
validation_time: time::OffsetDateTime, validation_time: time::OffsetDateTime,
config: &TreeRunConfig, config: &TreeRunConfig,
) -> Result<RunTreeFromTalOutput, RunTreeFromTalError> { ) -> Result<RunTreeFromTalOutput, RunTreeFromTalError> {
let discovery = let discovery = discover_root_ca_instance_from_tal_url_with_policy(
discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; policy,
http_fetcher,
rsync_fetcher,
tal_url,
)?;
let runner = make_live_runner( let runner = make_live_runner(
store, store,
@ -585,8 +602,12 @@ pub fn run_tree_from_tal_url_serial_audit(
validation_time: time::OffsetDateTime, validation_time: time::OffsetDateTime,
config: &TreeRunConfig, config: &TreeRunConfig,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError> { ) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError> {
let discovery = let discovery = discover_root_ca_instance_from_tal_url_with_policy(
discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; policy,
http_fetcher,
rsync_fetcher,
tal_url,
)?;
let download_log = DownloadLogHandle::new(); let download_log = DownloadLogHandle::new();
let runner = make_live_runner( let runner = make_live_runner(
@ -648,8 +669,12 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing(
timing: &TimingHandle, timing: &TimingHandle,
) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError> { ) -> Result<RunTreeFromTalAuditOutput, RunTreeFromTalError> {
let _tal = timing.span_phase("tal_bootstrap"); let _tal = timing.span_phase("tal_bootstrap");
let discovery = let discovery = discover_root_ca_instance_from_tal_url_with_policy(
discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; policy,
http_fetcher,
rsync_fetcher,
tal_url,
)?;
drop(_tal); drop(_tal);
let download_log = DownloadLogHandle::new(); let download_log = DownloadLogHandle::new();
@ -935,8 +960,12 @@ where
H: Fetcher + Clone + 'static, H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{ {
let discovery = let discovery = discover_root_ca_instance_from_tal_url_with_policy(
discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; policy,
http_fetcher,
rsync_fetcher,
tal_url,
)?;
run_single_root_parallel_audit_inner( run_single_root_parallel_audit_inner(
store, store,
policy, policy,
@ -1051,8 +1080,12 @@ where
H: Fetcher + Clone + 'static, H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{ {
let discovery = let discovery = discover_root_ca_instance_from_tal_url_with_policy(
discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; policy,
http_fetcher,
rsync_fetcher,
tal_url,
)?;
run_single_root_parallel_audit_inner( run_single_root_parallel_audit_inner(
store, store,
policy, policy,
@ -1086,8 +1119,12 @@ where
H: Fetcher + Clone + 'static, H: Fetcher + Clone + 'static,
R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, R: crate::fetch::rsync::RsyncFetcher + Clone + 'static,
{ {
let discovery = let discovery = discover_root_ca_instance_from_tal_url_with_policy(
discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; policy,
http_fetcher,
rsync_fetcher,
tal_url,
)?;
run_single_root_parallel_audit_inner( run_single_root_parallel_audit_inner(
store, store,
policy, policy,

View File

@ -405,6 +405,8 @@ struct RepoDrainMetrics {
duration_ms: u64, duration_ms: u64,
} }
const REPO_RESULT_DRAIN_MAX_EVENTS: usize = 64;
fn elapsed_ms(started: Instant) -> u64 { fn elapsed_ms(started: Instant) -> u64 {
started.elapsed().as_millis() as u64 started.elapsed().as_millis() as u64
} }
@ -1184,6 +1186,11 @@ fn stage_ready_publication_point(
) { ) {
ParallelObjectsPrepare::Complete(mut objects) => { ParallelObjectsPrepare::Complete(mut objects) => {
metrics.prepare_ms = elapsed_ms(prepare_started); metrics.prepare_ms = elapsed_ms(prepare_started);
runner.record_publication_point_step_ms(
&ready.node.handle.manifest_rsync_uri,
"fresh_objects_prepare",
metrics.prepare_ms,
);
metrics.complete_count = 1; metrics.complete_count = 1;
metrics.roa_tasks = objects.stats.roa_total; metrics.roa_tasks = objects.stats.roa_total;
metrics.aspa_objects = objects.stats.aspa_total; metrics.aspa_objects = objects.stats.aspa_total;
@ -1210,15 +1217,30 @@ fn stage_ready_publication_point(
metrics.direct_finalize_ms = finalize_metrics metrics.direct_finalize_ms = finalize_metrics
.finalize_ms .finalize_ms
.max(elapsed_ms(direct_finalize_started)); .max(elapsed_ms(direct_finalize_started));
runner.record_publication_point_step_ms(
&metrics.manifest_rsync_uri.clone().unwrap_or_default(),
"fresh_direct_finalize",
metrics.direct_finalize_ms,
);
} }
ParallelObjectsPrepare::Staged(objects_stage) => { ParallelObjectsPrepare::Staged(objects_stage) => {
metrics.prepare_ms = elapsed_ms(prepare_started); metrics.prepare_ms = elapsed_ms(prepare_started);
runner.record_publication_point_step_ms(
&ready.node.handle.manifest_rsync_uri,
"fresh_objects_prepare",
metrics.prepare_ms,
);
metrics.staged_count = 1; metrics.staged_count = 1;
metrics.locked_files = objects_stage.locked_file_count(); metrics.locked_files = objects_stage.locked_file_count();
metrics.aspa_objects = objects_stage.aspa_task_count(); metrics.aspa_objects = objects_stage.aspa_task_count();
let build_tasks_started = Instant::now(); let build_tasks_started = Instant::now();
objects_stage.append_roa_tasks_to(pending_roa_dispatch); objects_stage.append_roa_tasks_to(pending_roa_dispatch);
metrics.build_roa_tasks_ms = elapsed_ms(build_tasks_started); metrics.build_roa_tasks_ms = elapsed_ms(build_tasks_started);
runner.record_publication_point_step_ms(
&ready.node.handle.manifest_rsync_uri,
"fresh_build_roa_tasks",
metrics.build_roa_tasks_ms,
);
let task_count = objects_stage.roa_task_count(); let task_count = objects_stage.roa_task_count();
metrics.roa_tasks = task_count; metrics.roa_tasks = task_count;
if task_count == 0 { if task_count == 0 {
@ -1275,6 +1297,12 @@ fn stage_ready_publication_point(
} }
} }
metrics.total_ms = elapsed_ms(publication_point_started); metrics.total_ms = elapsed_ms(publication_point_started);
if metrics.complete_count > 0 {
runner.record_publication_point_total_ms(
metrics.manifest_rsync_uri.as_deref().unwrap_or_default(),
metrics.total_ms,
);
}
emit_ready_publication_point_control_slow( emit_ready_publication_point_control_slow(
metrics.manifest_rsync_uri.as_deref().unwrap_or_default(), metrics.manifest_rsync_uri.as_deref().unwrap_or_default(),
metrics metrics
@ -1916,10 +1944,20 @@ fn finalize_publication_point_state(
.as_millis() as u64 .as_millis() as u64
}); });
let objects_processing_ms = objects_started_at.elapsed().as_millis() as u64; let objects_processing_ms = objects_started_at.elapsed().as_millis() as u64;
runner.record_publication_point_step_ms(
&node.handle.manifest_rsync_uri,
"fresh_objects_processing_lifetime",
objects_processing_ms,
);
let reduce_started = Instant::now(); let reduce_started = Instant::now();
let locked_files = objects_stage.locked_file_count(); let locked_files = objects_stage.locked_file_count();
let reduce_result = reduce_parallel_roa_stage(objects_stage, results, runner.timing.as_ref()); let reduce_result = reduce_parallel_roa_stage(objects_stage, results, runner.timing.as_ref());
let reduce_ms = elapsed_ms(reduce_started); let reduce_ms = elapsed_ms(reduce_started);
runner.record_publication_point_step_ms(
&node.handle.manifest_rsync_uri,
"fresh_roa_reduce",
reduce_ms,
);
let (result, mut metrics) = match reduce_result { let (result, mut metrics) = match reduce_result {
Ok(mut objects) => { Ok(mut objects) => {
@ -1985,6 +2023,25 @@ fn finalize_publication_point_state(
}; };
let finalize_worker_ms = elapsed_ms(finalize_worker_started); let finalize_worker_ms = elapsed_ms(finalize_worker_started);
metrics.finalize_worker_ms = finalize_worker_ms; metrics.finalize_worker_ms = finalize_worker_ms;
runner.record_publication_point_step_ms(
&node.handle.manifest_rsync_uri,
"fresh_finalize_worker",
finalize_worker_ms,
);
runner.record_publication_point_step_ms(
&node.handle.manifest_rsync_uri,
"fresh_finalize_queue_wait",
finalize_queue_wait_ms.unwrap_or(0),
);
runner.record_publication_point_step_ms(
&node.handle.manifest_rsync_uri,
"fresh_finalize",
metrics.finalize_ms,
);
runner.record_publication_point_total_ms(
&node.handle.manifest_rsync_uri,
started_at.elapsed().as_millis() as u64,
);
emit_finalize_breakdown( emit_finalize_breakdown(
"phase2_finalize_worker_breakdown", "phase2_finalize_worker_breakdown",
node.handle.manifest_rsync_uri.as_str(), node.handle.manifest_rsync_uri.as_str(),
@ -2056,10 +2113,10 @@ fn drain_repo_events(
) -> Result<RepoDrainMetrics, TreeRunError> { ) -> Result<RepoDrainMetrics, TreeRunError> {
let started = Instant::now(); let started = Instant::now();
let mut metrics = RepoDrainMetrics::default(); let mut metrics = RepoDrainMetrics::default();
if let Some(event) = repo_runtime let events = repo_runtime
.recv_repo_result_timeout(timeout) .drain_repo_results_timeout(timeout, REPO_RESULT_DRAIN_MAX_EVENTS)
.map_err(TreeRunError::Runner)? .map_err(TreeRunError::Runner)?;
{ for event in events {
metrics.event_count += 1; metrics.event_count += 1;
metrics.completions += event.completions.len(); metrics.completions += event.completions.len();
for completion in event.completions { for completion in event.completions {

View File

@ -25,11 +25,12 @@ use crate::replay::archive::ReplayArchiveIndex;
use crate::replay::delta_archive::ReplayDeltaArchiveIndex; use crate::replay::delta_archive::ReplayDeltaArchiveIndex;
use crate::report::{RfcRef, Warning}; use crate::report::{RfcRef, Warning};
use crate::storage::{ use crate::storage::{
PackFile, PackTime, PublicationPointCacheChild, PublicationPointCacheProjection, PackFile, PackTime, PublicationPointCacheChild, PublicationPointCacheOutput,
RawByHashEntry, RocksStore, ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, PublicationPointCacheProjection, RawByHashEntry, RocksStore, ValidatedCaInstanceResult,
VcirArtifactValidationStatus, VcirAuditSummary, VcirCcrManifestProjection, VcirChildEntry, VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary,
VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, VcirCcrManifestProjection, VcirChildEntry, VcirInstanceGate, VcirLocalOutput,
VcirReplaceTimingBreakdown, VcirSourceObjectType, VcirSummary, VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, VcirReplaceTimingBreakdown,
VcirSourceObjectType, VcirSummary,
}; };
use crate::sync::repo::{ use crate::sync::repo::{
sync_publication_point, sync_publication_point_replay, sync_publication_point_replay_delta, sync_publication_point, sync_publication_point_replay, sync_publication_point_replay_delta,
@ -241,7 +242,7 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
} }
let projection = match self let projection = match self
.store .store
.get_publication_point_cache_projection(&ca.manifest_rsync_uri) .get_publication_point_cache_projection_cached(&ca.manifest_rsync_uri)
{ {
Ok(Some(projection)) => projection, Ok(Some(projection)) => projection,
Ok(None) => { Ok(None) => {
@ -526,16 +527,12 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
let output_reuse_count = projection.outputs.len() as u64; let output_reuse_count = projection.outputs.len() as u64;
let child_reuse_count = projection.children.len() as u64; let child_reuse_count = projection.children.len() as u64;
let related_object_reuse_count = projection.related_objects.len() as u64; let related_object_reuse_count = projection.related_objects.len() as u64;
let to_vcir_started = std::time::Instant::now();
let mut vcir = projection.to_vcir_for_reuse(self.validation_time);
let to_vcir_ms = self.record_publication_point_cache_phase_ms(
"publication_point_cache_to_vcir_total",
to_vcir_started,
);
vcir.parent_manifest_rsync_uri = ca.parent_manifest_rsync_uri.clone();
let build_objects_started = std::time::Instant::now(); let build_objects_started = std::time::Instant::now();
let mut objects = let mut objects = build_objects_output_from_publication_point_cache_projection(
build_objects_output_from_vcir(&vcir, self.validation_time, &mut warnings); &projection,
self.validation_time,
&mut warnings,
);
let build_objects_ms = self.record_publication_point_cache_phase_ms( let build_objects_ms = self.record_publication_point_cache_phase_ms(
"publication_point_cache_build_objects_total", "publication_point_cache_build_objects_total",
build_objects_started, build_objects_started,
@ -563,19 +560,18 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
ccr_append_started, ccr_append_started,
); );
let audit_build_started = std::time::Instant::now(); let audit_build_started = std::time::Instant::now();
let audit = build_publication_point_audit_from_vcir( let audit = build_publication_point_audit_from_publication_point_cache_projection(
ca, ca,
PublicationPointSource::PublicationPointCache, PublicationPointSource::PublicationPointCache,
repo_sync_source, repo_sync_source,
repo_sync_phase, repo_sync_phase,
Some(repo_sync_duration_ms), Some(repo_sync_duration_ms),
repo_sync_err, repo_sync_err,
Some(&vcir), &projection,
None, self.validation_time,
&warnings, &warnings,
&objects, &objects,
&child_audits, &child_audits,
&[],
); );
let audit_build_ms = self.record_publication_point_cache_phase_ms( let audit_build_ms = self.record_publication_point_cache_phase_ms(
"publication_point_cache_audit_build_total", "publication_point_cache_audit_build_total",
@ -614,7 +610,6 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
"children_reused": child_reuse_count, "children_reused": child_reuse_count,
"related_objects_reused": related_object_reuse_count, "related_objects_reused": related_object_reuse_count,
"audit_objects_reused": audit_object_count, "audit_objects_reused": audit_object_count,
"to_vcir_ms": to_vcir_ms,
"build_objects_ms": build_objects_ms, "build_objects_ms": build_objects_ms,
"restore_children_ms": restore_children_ms, "restore_children_ms": restore_children_ms,
"restore_children_workers": child_restore_workers, "restore_children_workers": child_restore_workers,
@ -649,6 +644,27 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
nanos / 1_000_000 nanos / 1_000_000
} }
pub(crate) fn record_publication_point_total_ms(&self, manifest_rsync_uri: &str, ms: u64) {
if let Some(timing) = self.timing.as_ref() {
timing.record_publication_point_nanos(manifest_rsync_uri, ms.saturating_mul(1_000_000));
}
}
pub(crate) fn record_publication_point_step_ms(
&self,
manifest_rsync_uri: &str,
step: &'static str,
ms: u64,
) {
if let Some(timing) = self.timing.as_ref() {
timing.record_publication_point_step_nanos(
manifest_rsync_uri,
step,
ms.saturating_mul(1_000_000),
);
}
}
fn publication_point_cache_child_restore_worker_count(&self) -> usize { fn publication_point_cache_child_restore_worker_count(&self) -> usize {
self.parallel_phase2_config self.parallel_phase2_config
.as_ref() .as_ref()
@ -772,6 +788,88 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
error, error,
snapshot_prepare_ms, snapshot_prepare_ms,
})?; })?;
if let Some(timing) = self.timing.as_ref() {
timing.record_phase_nanos(
"fresh_snapshot_prepare_total",
snapshot_prepare_ms.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_snapshot_manifest_load_total",
snapshot_prepare_timing
.manifest_load_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_snapshot_manifest_decode_total",
snapshot_prepare_timing
.manifest_decode_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_snapshot_replay_guard_total",
snapshot_prepare_timing
.replay_guard_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_snapshot_manifest_entries_total",
snapshot_prepare_timing
.manifest_entries_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_snapshot_pack_files_total",
snapshot_prepare_timing
.pack_files_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_snapshot_ee_path_validate_total",
snapshot_prepare_timing
.ee_path_validate_ms
.saturating_mul(1_000_000),
);
timing.record_count("fresh_publication_points", 1);
timing.record_count(
"fresh_manifest_files_total",
snapshot_prepare_timing.manifest_file_count as u64,
);
}
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_snapshot_prepare",
snapshot_prepare_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_snapshot_manifest_load",
snapshot_prepare_timing.manifest_load_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_snapshot_manifest_decode",
snapshot_prepare_timing.manifest_decode_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_snapshot_replay_guard",
snapshot_prepare_timing.replay_guard_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_snapshot_manifest_entries",
snapshot_prepare_timing.manifest_entries_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_snapshot_pack_files",
snapshot_prepare_timing.pack_files_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_snapshot_ee_path_validate",
snapshot_prepare_timing.ee_path_validate_ms,
);
let child_discovery_started = std::time::Instant::now(); let child_discovery_started = std::time::Instant::now();
let out = { let out = {
@ -800,6 +898,26 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
), ),
}; };
let child_discovery_ms = child_discovery_started.elapsed().as_millis() as u64; let child_discovery_ms = child_discovery_started.elapsed().as_millis() as u64;
if let Some(timing) = self.timing.as_ref() {
timing.record_phase_nanos(
"fresh_child_discovery_total",
child_discovery_ms.saturating_mul(1_000_000),
);
timing.record_count(
"fresh_children_discovered",
discovered_children.len() as u64,
);
timing.record_count("fresh_child_audits", child_audits.len() as u64);
timing.record_count(
"fresh_router_keys_discovered",
discovered_router_keys.len() as u64,
);
}
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_child_discovery",
child_discovery_ms,
);
Ok(FreshPublicationPointStage { Ok(FreshPublicationPointStage {
fresh_point, fresh_point,
@ -829,6 +947,17 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
let snapshot_pack_started = std::time::Instant::now(); let snapshot_pack_started = std::time::Instant::now();
let pack = fresh_point.to_publication_point_snapshot(); let pack = fresh_point.to_publication_point_snapshot();
let snapshot_pack_ms = snapshot_pack_started.elapsed().as_millis() as u64; let snapshot_pack_ms = snapshot_pack_started.elapsed().as_millis() as u64;
if let Some(timing) = self.timing.as_ref() {
timing.record_phase_nanos(
"fresh_snapshot_pack_total",
snapshot_pack_ms.saturating_mul(1_000_000),
);
}
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_snapshot_pack",
snapshot_pack_ms,
);
let persist_vcir_started = std::time::Instant::now(); let persist_vcir_started = std::time::Instant::now();
let persist_vcir_timing = if self.persist_vcir { let persist_vcir_timing = if self.persist_vcir {
@ -850,6 +979,64 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
PersistVcirTimingBreakdown::default() PersistVcirTimingBreakdown::default()
}; };
let persist_vcir_ms = persist_vcir_started.elapsed().as_millis() as u64; let persist_vcir_ms = persist_vcir_started.elapsed().as_millis() as u64;
if let Some(timing) = self.timing.as_ref() {
timing.record_phase_nanos(
"fresh_persist_vcir_total",
persist_vcir_ms.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_persist_embedded_store_total",
persist_vcir_timing
.embedded_store_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_persist_build_vcir_total",
persist_vcir_timing.build_vcir_ms.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_persist_replace_vcir_total",
persist_vcir_timing
.replace_vcir_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_persist_local_outputs_total",
persist_vcir_timing
.build_vcir
.local_outputs_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_persist_child_entries_total",
persist_vcir_timing
.build_vcir
.child_entries_ms
.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_persist_related_artifacts_total",
persist_vcir_timing
.build_vcir
.related_artifacts_ms
.saturating_mul(1_000_000),
);
}
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_persist_vcir",
persist_vcir_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_persist_build_vcir",
persist_vcir_timing.build_vcir_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_persist_replace_vcir",
persist_vcir_timing.replace_vcir_ms,
);
// local_outputs_cache only exists to build/persist VCIR. Release it before the // local_outputs_cache only exists to build/persist VCIR. Release it before the
// publication point result is retained for the rest of the run. // publication point result is retained for the rest of the run.
@ -868,6 +1055,26 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
self.append_ccr_manifest_projection(&ccr_manifest_projection)?; self.append_ccr_manifest_projection(&ccr_manifest_projection)?;
ccr_append_ms = ccr_append_started.elapsed().as_millis() as u64; ccr_append_ms = ccr_append_started.elapsed().as_millis() as u64;
} }
if let Some(timing) = self.timing.as_ref() {
timing.record_phase_nanos(
"fresh_ccr_projection_build_total",
ccr_projection_build_ms.saturating_mul(1_000_000),
);
timing.record_phase_nanos(
"fresh_ccr_append_total",
ccr_append_ms.saturating_mul(1_000_000),
);
}
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_ccr_projection_build",
ccr_projection_build_ms,
);
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_ccr_append",
ccr_append_ms,
);
let audit_build_started = std::time::Instant::now(); let audit_build_started = std::time::Instant::now();
let audit = build_publication_point_audit_from_snapshot( let audit = build_publication_point_audit_from_snapshot(
@ -883,6 +1090,17 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> {
&child_audits, &child_audits,
); );
let audit_build_ms = audit_build_started.elapsed().as_millis() as u64; let audit_build_ms = audit_build_started.elapsed().as_millis() as u64;
if let Some(timing) = self.timing.as_ref() {
timing.record_phase_nanos(
"fresh_audit_build_total",
audit_build_ms.saturating_mul(1_000_000),
);
}
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_audit_build",
audit_build_ms,
);
Ok(FreshPublicationPointFinalizeOutput { Ok(FreshPublicationPointFinalizeOutput {
result: PublicationPointRunResult { result: PublicationPointRunResult {
@ -1296,6 +1514,11 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> {
} }
}; };
let objects_processing_ms = objects_processing_started.elapsed().as_millis() as u64; let objects_processing_ms = objects_processing_started.elapsed().as_millis() as u64;
self.record_publication_point_step_ms(
&ca.manifest_rsync_uri,
"fresh_objects_processing",
objects_processing_ms,
);
objects.router_keys.extend(discovered_router_keys); objects.router_keys.extend(discovered_router_keys);
objects objects
@ -2741,6 +2964,98 @@ fn build_publication_point_audit_from_vcir(
} }
} }
fn build_publication_point_audit_from_publication_point_cache_projection(
ca: &CaInstanceHandle,
source: PublicationPointSource,
repo_sync_source: Option<&str>,
repo_sync_phase: Option<&str>,
repo_sync_duration_ms: Option<u64>,
repo_sync_error: Option<&str>,
projection: &PublicationPointCacheProjection,
validation_time: time::OffsetDateTime,
runner_warnings: &[Warning],
objects: &crate::validation::objects::ObjectsOutput,
child_audits: &[ObjectAuditEntry],
) -> PublicationPointAudit {
let mut warnings = Vec::new();
warnings.extend(runner_warnings.iter().map(AuditWarning::from));
warnings.extend(objects.warnings.iter().map(AuditWarning::from));
let mut audit_by_uri: HashMap<String, ObjectAuditEntry> = HashMap::new();
for artifact in &projection.related_objects {
let Some(uri) = artifact.uri.as_ref() else {
continue;
};
audit_by_uri.insert(
uri.clone(),
ObjectAuditEntry {
rsync_uri: uri.clone(),
sha256_hex: artifact.sha256.clone(),
kind: kind_from_vcir_artifact_kind(artifact.artifact_kind),
result: audit_result_from_vcir_status(artifact.validation_status),
detail: None,
},
);
}
for entry in child_audits {
audit_by_uri.insert(entry.rsync_uri.clone(), entry.clone());
}
for entry in &objects.audit {
audit_by_uri.insert(entry.rsync_uri.clone(), entry.clone());
}
let mut ordered_uris: Vec<String> = projection
.related_objects
.iter()
.filter_map(|artifact| artifact.uri.clone())
.collect();
ordered_uris.sort();
ordered_uris.dedup();
let mut objects_out: Vec<ObjectAuditEntry> = Vec::with_capacity(ordered_uris.len().max(1));
if let Some(entry) = audit_by_uri.remove(&projection.manifest_rsync_uri) {
objects_out.push(entry);
} else {
objects_out.push(ObjectAuditEntry {
rsync_uri: projection.manifest_rsync_uri.clone(),
sha256_hex: hex::encode(projection.manifest_sha256),
kind: AuditObjectKind::Manifest,
result: AuditObjectResult::Ok,
detail: None,
});
}
for uri in ordered_uris {
if uri == projection.manifest_rsync_uri {
continue;
}
if let Some(entry) = audit_by_uri.remove(&uri) {
objects_out.push(entry);
}
}
PublicationPointAudit {
node_id: None,
parent_node_id: None,
discovered_from: None,
rsync_base_uri: ca.rsync_base_uri.clone(),
manifest_rsync_uri: ca.manifest_rsync_uri.clone(),
publication_point_rsync_uri: ca.publication_point_rsync_uri.clone(),
rrdp_notification_uri: ca.rrdp_notification_uri.clone(),
source: source_label(source),
repo_sync_source: repo_sync_source.map(ToString::to_string),
repo_sync_phase: repo_sync_phase.map(ToString::to_string),
repo_sync_duration_ms,
repo_sync_error: repo_sync_error.map(ToString::to_string),
repo_terminal_state: terminal_state_label(source).to_string(),
this_update_rfc3339_utc: projection.manifest_this_update.rfc3339_utc.clone(),
next_update_rfc3339_utc: projection.manifest_next_update.rfc3339_utc.clone(),
verified_at_rfc3339_utc: PackTime::from_utc_offset_datetime(validation_time).rfc3339_utc,
warnings,
objects: objects_out,
}
}
fn parse_snapshot_time_value(pack_time: &PackTime) -> Result<time::OffsetDateTime, String> { fn parse_snapshot_time_value(pack_time: &PackTime) -> Result<time::OffsetDateTime, String> {
time::OffsetDateTime::parse( time::OffsetDateTime::parse(
&pack_time.rfc3339_utc, &pack_time.rfc3339_utc,
@ -3160,6 +3475,202 @@ fn build_objects_output_from_vcir(
output output
} }
fn build_objects_output_from_publication_point_cache_projection(
projection: &PublicationPointCacheProjection,
validation_time: time::OffsetDateTime,
warnings: &mut Vec<Warning>,
) -> crate::validation::objects::ObjectsOutput {
let mut output = empty_objects_output();
let mut audit_by_uri: HashMap<String, ObjectAuditEntry> = HashMap::new();
let mut roa_total: HashSet<String> = HashSet::new();
let mut aspa_total: HashSet<String> = HashSet::new();
let mut roa_ok: HashSet<String> = HashSet::new();
let mut aspa_ok: HashSet<String> = HashSet::new();
for artifact in &projection.related_objects {
if artifact.artifact_role != VcirArtifactRole::SignedObject {
continue;
}
if let Some(uri) = artifact.uri.as_ref() {
match artifact.artifact_kind {
VcirArtifactKind::Roa => {
roa_total.insert(uri.clone());
}
VcirArtifactKind::Aspa => {
aspa_total.insert(uri.clone());
}
_ => {}
}
}
}
for projected in &projection.outputs {
let effective_until = match parse_snapshot_time_value(&projected.item_effective_until) {
Ok(value) => value,
Err(err) => {
warnings.push(
Warning::new(format!(
"publication-point cached local output has invalid item_effective_until: {err}"
))
.with_context(&projected.source_object_uri),
);
audit_by_uri.insert(
projected.source_object_uri.clone(),
ObjectAuditEntry {
rsync_uri: projected.source_object_uri.clone(),
sha256_hex: hex::encode(projected.source_object_hash),
kind: audit_kind_for_vcir_output_type(projected.output_type),
result: AuditObjectResult::Error,
detail: Some(
"publication-point cached local output has invalid item_effective_until"
.to_string(),
),
},
);
continue;
}
};
if validation_time > effective_until {
audit_by_uri
.entry(projected.source_object_uri.clone())
.or_insert_with(|| ObjectAuditEntry {
rsync_uri: projected.source_object_uri.clone(),
sha256_hex: hex::encode(projected.source_object_hash),
kind: audit_kind_for_vcir_output_type(projected.output_type),
result: AuditObjectResult::Skipped,
detail: Some(
"skipped: publication-point cached local output expired".to_string(),
),
});
continue;
}
match projected.output_type {
VcirOutputType::Vrp => match parse_publication_point_cache_vrp_output(projected) {
Ok(vrp) => {
roa_ok.insert(projected.source_object_uri.clone());
output.vrps.push(vrp);
audit_by_uri.insert(
projected.source_object_uri.clone(),
ObjectAuditEntry {
rsync_uri: projected.source_object_uri.clone(),
sha256_hex: hex::encode(projected.source_object_hash),
kind: AuditObjectKind::Roa,
result: AuditObjectResult::Ok,
detail: None,
},
);
}
Err(err) => {
warnings.push(
Warning::new(format!(
"publication-point cached ROA local output parse failed: {err}"
))
.with_context(&projected.source_object_uri),
);
audit_by_uri.insert(
projected.source_object_uri.clone(),
ObjectAuditEntry {
rsync_uri: projected.source_object_uri.clone(),
sha256_hex: hex::encode(projected.source_object_hash),
kind: AuditObjectKind::Roa,
result: AuditObjectResult::Error,
detail: Some(format!(
"publication-point cached ROA local output parse failed: {err}"
)),
},
);
}
},
VcirOutputType::Aspa => match parse_publication_point_cache_aspa_output(projected) {
Ok(aspa) => {
aspa_ok.insert(projected.source_object_uri.clone());
output.aspas.push(aspa);
audit_by_uri.insert(
projected.source_object_uri.clone(),
ObjectAuditEntry {
rsync_uri: projected.source_object_uri.clone(),
sha256_hex: hex::encode(projected.source_object_hash),
kind: AuditObjectKind::Aspa,
result: AuditObjectResult::Ok,
detail: None,
},
);
}
Err(err) => {
warnings.push(
Warning::new(format!(
"publication-point cached ASPA local output parse failed: {err}"
))
.with_context(&projected.source_object_uri),
);
audit_by_uri.insert(
projected.source_object_uri.clone(),
ObjectAuditEntry {
rsync_uri: projected.source_object_uri.clone(),
sha256_hex: hex::encode(projected.source_object_hash),
kind: AuditObjectKind::Aspa,
result: AuditObjectResult::Error,
detail: Some(format!(
"publication-point cached ASPA local output parse failed: {err}"
)),
},
);
}
},
VcirOutputType::RouterKey => {
match parse_publication_point_cache_router_key_output(projected) {
Ok(router_key) => {
output.router_keys.push(router_key);
audit_by_uri.insert(
projected.source_object_uri.clone(),
ObjectAuditEntry {
rsync_uri: projected.source_object_uri.clone(),
sha256_hex: hex::encode(projected.source_object_hash),
kind: AuditObjectKind::RouterCertificate,
result: AuditObjectResult::Ok,
detail: Some(
"publication-point cached Router Key local output restored"
.to_string(),
),
},
);
}
Err(err) => {
warnings.push(
Warning::new(format!(
"publication-point cached Router Key local output parse failed: {err}"
))
.with_context(&projected.source_object_uri),
);
audit_by_uri.insert(
projected.source_object_uri.clone(),
ObjectAuditEntry {
rsync_uri: projected.source_object_uri.clone(),
sha256_hex: hex::encode(projected.source_object_hash),
kind: AuditObjectKind::RouterCertificate,
result: AuditObjectResult::Error,
detail: Some(format!(
"publication-point cached Router Key local output parse failed: {err}"
)),
},
);
}
}
}
}
}
output.stats.roa_total = roa_total.len();
output.stats.roa_ok = roa_ok.len();
output.stats.aspa_total = aspa_total.len();
output.stats.aspa_ok = aspa_ok.len();
let mut audit: Vec<_> = audit_by_uri.into_values().collect();
audit.sort_by(|left, right| left.rsync_uri.cmp(&right.rsync_uri));
output.audit = audit;
output
}
fn parse_vcir_vrp_output(local: &VcirLocalOutput) -> Result<Vrp, String> { fn parse_vcir_vrp_output(local: &VcirLocalOutput) -> Result<Vrp, String> {
match &local.payload { match &local.payload {
VcirLocalOutputPayload::Vrp { VcirLocalOutputPayload::Vrp {
@ -3213,6 +3724,65 @@ fn parse_vcir_router_key_output(local: &VcirLocalOutput) -> Result<RouterKeyPayl
} }
} }
fn parse_publication_point_cache_vrp_output(
projected: &PublicationPointCacheOutput,
) -> Result<Vrp, String> {
match &projected.payload {
VcirLocalOutputPayload::Vrp {
asn,
afi,
prefix_len,
addr,
max_length,
} => Ok(Vrp {
asn: *asn,
prefix: crate::data_model::roa::IpPrefix {
afi: *afi,
prefix_len: *prefix_len,
addr: *addr,
},
max_length: *max_length,
}),
_ => Err("publication-point cache output payload is not VRP".to_string()),
}
}
fn parse_publication_point_cache_aspa_output(
projected: &PublicationPointCacheOutput,
) -> Result<AspaAttestation, String> {
match &projected.payload {
VcirLocalOutputPayload::Aspa {
customer_as_id,
provider_as_ids,
} => Ok(AspaAttestation {
customer_as_id: *customer_as_id,
provider_as_ids: provider_as_ids.clone(),
}),
_ => Err("publication-point cache output payload is not ASPA".to_string()),
}
}
fn parse_publication_point_cache_router_key_output(
projected: &PublicationPointCacheOutput,
) -> Result<RouterKeyPayload, String> {
match &projected.payload {
VcirLocalOutputPayload::RouterKey {
as_id,
ski,
spki_der,
} => Ok(RouterKeyPayload {
as_id: *as_id,
ski: ski.clone(),
spki_der: spki_der.clone(),
source_object_uri: projected.source_object_uri.clone(),
source_object_hash: hex::encode(projected.source_object_hash),
source_ee_cert_hash: hex::encode(projected.source_ee_cert_hash),
item_effective_until: projected.item_effective_until.clone(),
}),
_ => Err("publication-point cache output payload is not Router Key".to_string()),
}
}
fn restore_children_from_vcir( fn restore_children_from_vcir(
store: &RocksStore, store: &RocksStore,
ca: &CaInstanceHandle, ca: &CaInstanceHandle,