From 184d3cb95bf01d8c0d1275fc31bc9d95335a0805 Mon Sep 17 00:00:00 2001 From: yuyr Date: Sat, 20 Jun 2026 15:31:20 +0800 Subject: [PATCH] =?UTF-8?q?20260620=20=E4=BC=98=E5=8C=96delta=E9=95=BF?= =?UTF-8?q?=E5=B0=BE=E6=8E=A7=E5=88=B6=E9=9D=A2=E5=92=8CPP=20cache?= =?UTF-8?q?=E5=BF=AB=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fixtures/live_20260619/manifest.json | 80 +++ fixtures/live_20260619/ta/afrinic-ta.cer | Bin 0 -> 1216 bytes fixtures/live_20260619/ta/apnic-ta.cer | Bin 0 -> 1222 bytes fixtures/live_20260619/ta/arin-ta.cer | Bin 0 -> 1143 bytes fixtures/live_20260619/ta/lacnic-ta.cer | Bin 0 -> 1166 bytes fixtures/live_20260619/ta/ripe-ncc-ta.cer | Bin 0 -> 1036 bytes fixtures/live_20260619/tal/afrinic.tal | 10 + .../live_20260619/tal/apnic-rfc7730-https.tal | 10 + fixtures/live_20260619/tal/apnic.tal | 10 + fixtures/live_20260619/tal/arin.tal | 19 + fixtures/live_20260619/tal/lacnic.tal | 4 + fixtures/live_20260619/tal/ripe-ncc.tal | 10 + scripts/soak/run_soak.sh | 85 ++- src/analysis/timing.rs | 41 +- src/cli.rs | 22 +- src/cli/tests.rs | 6 + src/fetch/http.rs | 25 + src/fetch/rsync.rs | 40 ++ src/fetch/rsync_system.rs | 28 + src/parallel/repo_runtime.rs | 140 +++- src/parallel/repo_scheduler.rs | 3 + src/parallel/repo_worker.rs | 76 ++- src/parallel/run_coordinator.rs | 9 +- src/parallel/transport_prefetch.rs | 110 ++++ src/parallel/types.rs | 2 + src/query/object_resolver.rs | 97 +++ src/storage.rs | 206 +++++- src/storage/keys.rs | 6 + src/storage/tests.rs | 81 +++ src/validation/from_tal.rs | 192 ++++-- src/validation/run_tree_from_tal.rs | 73 ++- src/validation/tree_parallel.rs | 65 +- src/validation/tree_runner.rs | 610 +++++++++++++++++- 33 files changed, 1951 insertions(+), 109 deletions(-) create mode 100644 fixtures/live_20260619/manifest.json create mode 100644 fixtures/live_20260619/ta/afrinic-ta.cer create mode 100644 fixtures/live_20260619/ta/apnic-ta.cer create mode 100644 fixtures/live_20260619/ta/arin-ta.cer create mode 100644 fixtures/live_20260619/ta/lacnic-ta.cer create mode 100644 fixtures/live_20260619/ta/ripe-ncc-ta.cer create mode 100644 fixtures/live_20260619/tal/afrinic.tal create mode 100644 fixtures/live_20260619/tal/apnic-rfc7730-https.tal create mode 100644 fixtures/live_20260619/tal/apnic.tal create mode 100644 fixtures/live_20260619/tal/arin.tal create mode 100644 fixtures/live_20260619/tal/lacnic.tal create mode 100644 fixtures/live_20260619/tal/ripe-ncc.tal diff --git a/fixtures/live_20260619/manifest.json b/fixtures/live_20260619/manifest.json new file mode 100644 index 0000000..877b518 --- /dev/null +++ b/fixtures/live_20260619/manifest.json @@ -0,0 +1,80 @@ +{ + "created_at_utc": "2026-06-19T08:08:03Z", + "items": [ + { + "rir": "afrinic", + "ta_bytes": 1216, + "ta_download": "200 1216 1.351147", + "ta_elapsed_s": 1.227, + "ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/afrinic-ta.cer", + "ta_sha256": "43a26fd28bafb9398e5b2ab19e036b450bd04f4973a7f5ad151cebdee0edac36", + "ta_uri": "https://rpki.afrinic.net/repository/AfriNIC.cer", + "tal_bytes": 496, + "tal_download": "200 496 1.361326", + "tal_elapsed_s": 1.238, + "tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/afrinic.tal", + "tal_sha256": "2838ef30ea27ce5705abf5f5adb131d8c35b1f50858338a2f3c84bb207c2fa35", + "tal_url": "https://rpki.afrinic.net/tal/afrinic.tal" + }, + { + "rir": "apnic", + "ta_bytes": 1222, + "ta_download": "200 1222 1.012321", + "ta_elapsed_s": 0.921, + "ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/apnic-ta.cer", + "ta_sha256": "2014230ad49b2777ac2bde0948ddfa4b8f207114c549e26d755de88c3593e3af", + "ta_uri": "https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer", + "tal_bytes": 532, + "tal_download": "200 466 1.007155", + "tal_elapsed_s": 0.917, + "tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/apnic.tal", + "tal_sha256": "472e551f7c551c2e999e582b7c9437d3bee4900fe53afff62aeb28d4940ade94", + "tal_url": "https://tal.apnic.net/apnic.tal" + }, + { + "rir": "arin", + "ta_bytes": 1143, + "ta_download": "200 1143 0.816714", + "ta_elapsed_s": 0.743, + "ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/arin-ta.cer", + "ta_sha256": "5b3c2f6f04abd19261084487a43c156f778b0b8926d63801d48c7a93ed349492", + "ta_uri": "https://rrdp.arin.net/arin-rpki-ta.cer", + "tal_bytes": 1258, + "tal_download": "200 1258 0.848581", + "tal_elapsed_s": 0.774, + "tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/arin.tal", + "tal_sha256": "1f8bdb03bcc30a3b8e11fd9a87102fba250c22137a3c8baa9c81b139cb412639", + "tal_url": "https://www.arin.net/resources/manage/rpki/arin.tal" + }, + { + "rir": "lacnic", + "ta_bytes": 1166, + "ta_download": "200 1166 1.025273", + "ta_elapsed_s": 0.932, + "ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/lacnic-ta.cer", + "ta_sha256": "f44bc51008fd6998de7597b72a79b07bf3ebcb0f14daa7c7c022c0e9d66e0ad0", + "ta_uri": "https://rrdp.lacnic.net/ta/rta-lacnic-rpki.cer", + "tal_bytes": 502, + "tal_download": "200 502 1.729122", + "tal_elapsed_s": 1.565, + "tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/lacnic.tal", + "tal_sha256": "d44bb9394ab009c8b53e5efebf2a1c9450bab61a27efe00de5a3e4587a3a2f6a", + "tal_url": "https://www.lacnic.net/innovaportal/file/4983/1/lacnic.tal" + }, + { + "rir": "ripe", + "ta_bytes": 1036, + "ta_download": "200 1036 1.060992", + "ta_elapsed_s": 4.536, + "ta_path": "rpki_2/rpki/fixtures/live_20260619/ta/ripe-ncc-ta.cer", + "ta_sha256": "3e3f7e4efc8d0cea03d9cc1fde6e168b45c26d7b3272e14abd8da4871886e539", + "ta_uri": "https://rpki.ripe.net/ta/ripe-ncc-ta.cer", + "tal_bytes": 482, + "tal_download": "200 482 1.092082", + "tal_elapsed_s": 0.992, + "tal_path": "rpki_2/rpki/fixtures/live_20260619/tal/ripe-ncc.tal", + "tal_sha256": "59ca27ef93f23682749fcefe7c6d70fbc723343549ff9e4d3996acaff79817fb", + "tal_url": "https://tal.rpki.ripe.net/ripe-ncc.tal" + } + ] +} diff --git a/fixtures/live_20260619/ta/afrinic-ta.cer b/fixtures/live_20260619/ta/afrinic-ta.cer new file mode 100644 index 0000000000000000000000000000000000000000..734586beee6fb83ae9642dfc0d176383eefee58c GIT binary patch literal 1216 zcmXqLV%cNR#JpqyGZP~d6DPyd^F_OI>gH(}@Un4gwRyCC=VfH%W@Ru?HdHi_XJZa! zVHTEfOe@Ot^K{k?%Fi#+bxtiR$xO>kPAo|^kQ3)MGB7YUHZZU>wJXl!2IeM4eg=akMlPl%Mn;BX2eg<(JFFk~-f;iBP~gmxHTgf-pLuMO z)lJIn+kNGPji-&o1NWDrYaU*yG+WoN!5TErcH84h`Gl=&y(eUSIlk}vt|J#KYkkj6 zG}jU~=1bR~v(4>pb;?A;@cf*&+kP7UZMfmZdp`AT^ptMhgCgB4Lpv*&8MK9T#IWbVx9Wya-y_6zsRzWY4iWD}SA#eBV_ zb*I|C+d^Z5eQM@J{ckVw`5)c@OYh!CC~MK za%|y0SU+Ry)sBK2>t5}j_dPMEXxD-zbKXBn*mCiT$qUx$l7URjj0}v6n;1VDG%>z4 z@MmKVl~rJ5{LkWQ;B4T?#-Yu|$jZvj$7En?U<~6iF^Z{Yl#~<{Tj}c;6=Y}XC8iZ+ z=4B@9<)xPB7v$$;CRge^2L$UCq@)?}gA5b~8OUnD%*goPfDa_V4-#NuW@2PBkOlGi zSj1RFUT5%Ie7$yDddo((9edd#0)%cEZZl|Nz-yltBB&@4(RKncyf zd}waS%+J%S$jvcOglS{s0EMBDfxrSj10Dk|CPoG(R)+uo|Nl2ohbv(RDUmP`GvH)l zVqi32W@cmnGnpVv)I7-y$_?e_559(8WLMAJn!oLcS$fa>gDK}E3?|e?%bfh+ywStG z;9cn6`1eNnVe*;VBT}j^Z4G89y!KV@=nCt*%u?L_N+N%5D&1q7v-nM**9Pg8zWr0? zI<1R%6Cuc37_cT|Yj51+=XWQCE(&hiFt^xi-^#;BIYd1-F&SF=JE7B2J_!m z&IzlycVk)JuDD(QpKU%Yc%iX&(M7#SmS@%LWDeI@9+Z;aWPZ%0N+fyJCi(uLYsTpg z-|sgw$Fv8hTuNc~)MK(zp0!<6GVSu>_763Z^}DiKF1^tDa{Z+4$?}hy=I`TEBK{mI g6SJ5V%pr2S(%{kx?>~R;3q@akmfdM2E0bsd0Fi*OnE(I) literal 0 HcmV?d00001 diff --git a/fixtures/live_20260619/ta/apnic-ta.cer b/fixtures/live_20260619/ta/apnic-ta.cer new file mode 100644 index 0000000000000000000000000000000000000000..d360b87037c07fb7b2bfb8a34fe291a2cdc99117 GIT binary patch literal 1222 zcmXqLVmV~c#Jp+&GZP~dlZeLw`MQM-D}Dwp{_1}uE_A8t6V2ZSylk9WZ60mkc^MhG zSs4t}3{?!2*qB3En1!Vi3-U6Pb&CqJGj)sd^GkFy6Y~;v^NTXmGxH4O#CeU(42+D- zjEoH|49ud$d5tWPxI~-N#HfVq1V&Z{<|amd27@L>E~X|%Muy{8!aVkEO<<7Nm8cYa z=Ff`S7T+N5%MnYD&Ma2g@0qAP(cqrbtm}WmlcJ}z&zh>Wz|wH>UX3H0RIBfu+kLi$ z*&68E*FxA+#`5Z&qN(0Q{aTKcp}zi8XC1MIUWHGkID zebTqLBsts9G;FrRk&YN?gERXAINdq56V{(ueE5fO)+Hw%EiSfs7oXj@qRn|n<9_$L zL)U7o7glwub-$09C$%KAW&g3ww~M>E7cF|;s`|%oFHmO= zomO-?^0%7Jd+XyenU(EIEY-VizG@NiHCq>6JS{QL>&k}PNorX)&qgK$FJ5<N^Jn>lLJ=8SsP55(b&YYQW6M_}_pJB)|_6U}0urWHXQj z@%dQ9SVXwzoSJZ#x2nrLEn8Bt{=p3g^X-cani$z&HZpRvH2ySb{AS<}6ER?Ib1o{b z%uBY?*9RpaBu5mb7UUOamgE;z>RT9FxHuUZni!gynHn0p8aWv|IU8A+8JZipxfr^+ z8R;A3!42eUOQM2-&Zbq~*?tCosrmWFQMtuYo)PisMy`&=C1##_xoIT^#&G+1+tgu! zSX7i!faHO^{F2PH%;dz9%=|pPirgFnMVKZ=4p8U`83-)kGvG1cVq#=uVrBUM|Nnmj zb+{6CkP-<4F#}E(CI&_WW@bhPFp~+wM9q%Ope&%g=($ey@wpipE7w2USN`Mp!P2Ii zz2Onto-1zoU-kH_Po}~(Ij^9jF7A~Nv%I_}ieGI{_B{7}@o}-NSt~u3NUNRCa;-RU z;K^H?W6#;!1*OBDZ!FfAifH|i{*u+``02n^Gd-?Fv#RXb5v(4eZk4z^w*Ttar0vr- z={KbxnsvQb>X@j}eU6=%kDmRT|KoKV^FxNT{3A}hN{jzYVcN!baceZo8JpgT$Nx+^ zG$EtKpY6)yscSYf$Ulv}^Hl6-(5#@TTERXi(u(d}=L~n->mEBpLnWSRs^uN-xvO+N uHeS_mvrQ8@KfN&^WbFpVr}rLnZ=9+m8O66{@6W4i7e+PS)c^76xB~!hN3xs% literal 0 HcmV?d00001 diff --git a/fixtures/live_20260619/ta/arin-ta.cer b/fixtures/live_20260619/ta/arin-ta.cer new file mode 100644 index 0000000000000000000000000000000000000000..8ae345405f267c55d6d65cfd92e61b24447e0233 GIT binary patch literal 1143 zcmXqLVktIgVvb(G%*4pVB*MtcGv8Sw!slt~0U52!6E}07+2mlr%f_kI=F#?@mywa1 zmBB#VP}D$}jX9KsS(qoWC^JvDs31F2wR&FqORkPjx^|{>kiRo(} zo>8-~S>hIwlP~en#Lz>UxpLbZzM17#e-HJCt}WYC&e49s!k~ZMrdSJ+)dnp00yiJ} zG1Ip`Wukp4qkDBJPY%K`EaOvOKUmh)6WjB+xF$Z%!r#);&|*UcdMp~V)NnV1K&Q23~NzTy1uw=|wh3 zFE_2kzzA*@Z<|_1Nl8I5$Q?x~1qgTK<(FipWhN(l#4l|UI9V94WAZEbH!o3zDSQ-BR|Nq}01Z1`XNY=~1!@v!ufse_+%D@7~V`3E3fjPdsyd2^7qSWI2 z(xT+lVsO~%Cl?g!Wt8ORASZ2ZaMEUEVEKHH!}Z$r3WfgXw%TjX`Tf7)H&>m~I&@bnBy*zxzv@t@o8Sf0!hhn{jNJgMsaxq+DAg zz61A^PtJVq)qRoCVCvS)W18Q#B}-4$IB&Uk_jQM#YPW90?YS#I#Z^V-q=W6<<8$_J neZDu|Nvmn`&)>(-hniYT*=XZ%tNeR=KW*aNOv(3$bmyJ`a&7qHU~XdMXE11Deq$cdKGQaS-XGb zgzP-a^vUz=HnS!8rYzd`gwuyJH_T4z)0B3`pCvPopHBIbvj48Kz|ZzG;XDi{JKj{w zRs>sTJjgg_!gSsDWIL<9Aw7_(U{Kcl) z?1*FEnmqPca>aPfty^$$(jB7(c0UT^s&jMSvaoMt-?(5(%Wb8a2}~#F&f|`6PP$^W zUC^TN{O2Y7Db2~eOw5c7jEkEXrx-LbPB7qSV-A%SW@P-&!fL?G$oStt79_yOBE}-} zr)$pUw-SEhm-$}F@h?w|d9rc6y8$0anjfT|g_((w&7ko)8;3R?o#qDsA-#N@opWWBu968)lr>`Z+KU*8}Ju8*rN zjCg&-rZ&CYv=ReLxE;K0x)~)U1;rq@6{QrQxT+|nKtC_PBr`2DIk6-&KToeBH^)E% z(!04WqQ5Lf^< zi;0neiIw61|NsAyQ!Y0+6`48-cQ@KB1l4Cob16V)S^e`9o>eH%FF(%3B^q?DEc; zw{C~V#2+)C{d;O3wmEYyYx$MOmo;}pTmIpe(+Zrmw72N**Y`W0g|7SMaN*1U#n!)- z<6^E>hUq`&4U5p|JM7Z^?DC_7s%5X1#jZQq$~Z;iQM-oVFJ6bb>HBs0U+<4$&qxjA zuX}q!>Pxh+!Jqhj+V$QK%qq?+y|jrIIM>f$BGA0`$mM%}#dq%V8olz;Jow!|*T2?m hU!L={M@)KZ8)xpU{9+xaX0d9PflyNVJl?R+a{wwEm{b4& literal 0 HcmV?d00001 diff --git a/fixtures/live_20260619/ta/ripe-ncc-ta.cer b/fixtures/live_20260619/ta/ripe-ncc-ta.cer new file mode 100644 index 0000000000000000000000000000000000000000..a31db56ba14a744dad88c074f5a6abd1cdff5cb5 GIT binary patch literal 1036 zcmXqLV&O1oV*0RvnTe5!iHT9gfR~L^tIebBJ1-+6H!FjIn4yS)5F2wS3$rkHQD#A^ zZeDV-Zb_nnoH(zMnSrUHv7wQXv8hQEm}>#&;?&o~sDx|(XBI|zzMmJ*+aa*#ul=Q))xDa`k-r%tk96?c z#;z0*jC0_AA^)}ObKwW(ys5WNY?%F7;#SejA1@1jb7r0kD_vfs*`MB9uE{qv;7T9ie& zFvxK2^`1>VovW|+Oz>-7c+$Nk{_IrE)VOB{Jo&u>vpO$7UVE2e`ue8p0?P7}W{SF} z-1L9_bN{-UoYI_^a{~Fpeg*mFIV>=eb$kBr>U;Z>**zSyrA>2UOup&+R0!UwbG)(9 zj(Kr+1*5Ujw{RwAMh3>kO^k5{O^i_nvTV$uvV1IJEFv#LwdJo}T=M-1%ds0zGkE3p zh}j=D;0H+yGcx{XVKrc8Wc+Wy2NK{139v9TF|rvnZe-)oW@BV!WoP7OX|xW55M7 zfse^R9mZy42T4m9h#7FQFflM1Ff%hUfSF7XrXpM+2S}lifxrT=Sxk%!Osow5|NsAw zoCdkUX^@dYSYm$cEWsXjwby?$7oMMF(NmS4eD>cdWBm;`1rGTgKhqM#x{%XHh|Q=@ zYDSp-I(~28K&E+rT9-}#Qel&Pu$@Q{7f}{6~6|*w^=FXYZ|gu`-?G&N5!X|IF5roS!Ne5cL&6W#~( TimingReportV1 { + let g = self.inner.lock().expect("timing lock"); + g.to_report(top_n) + } + /// Record a phase duration directly in nanoseconds. /// /// This is useful when aggregating sub-phase timings locally (to reduce lock contention) @@ -88,6 +93,22 @@ impl TimingHandle { g.phases.record(phase, nanos); } + pub fn record_publication_point_nanos(&self, manifest_rsync_uri: &str, nanos: u64) { + let mut g = self.inner.lock().expect("timing lock"); + g.publication_points.record(manifest_rsync_uri, nanos); + } + + pub fn record_publication_point_step_nanos( + &self, + manifest_rsync_uri: &str, + step: &'static str, + nanos: u64, + ) { + let mut g = self.inner.lock().expect("timing lock"); + g.publication_point_steps + .record(&format!("{manifest_rsync_uri}::{step}"), nanos); + } + pub fn write_json(&self, path: &Path, top_n: usize) -> Result<(), String> { let report = { let g = self.inner.lock().expect("timing lock"); @@ -153,7 +174,7 @@ enum TimingSpanKind<'a> { PublicationPoint(&'a str), } -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct DurationStats { pub count: u64, pub total_nanos: u64, @@ -199,6 +220,7 @@ struct TimingCollector { rrdp_repos: DurationStatsMap, rrdp_repo_steps: DurationStatsMap, publication_points: DurationStatsMap, + publication_point_steps: DurationStatsMap, } impl TimingCollector { @@ -210,6 +232,7 @@ impl TimingCollector { rrdp_repos: DurationStatsMap::default(), rrdp_repo_steps: DurationStatsMap::default(), publication_points: DurationStatsMap::default(), + publication_point_steps: DurationStatsMap::default(), } } @@ -231,6 +254,7 @@ impl TimingCollector { top_rrdp_repos: self.rrdp_repos.top(top_n), top_rrdp_repo_steps: self.rrdp_repo_steps.top(top_n), top_publication_points: self.publication_points.top(top_n), + top_publication_point_steps: self.publication_point_steps.top(top_n), } } } @@ -244,9 +268,10 @@ pub struct TimingReportV1 { pub top_rrdp_repos: Vec, pub top_rrdp_repo_steps: Vec, pub top_publication_points: Vec, + pub top_publication_point_steps: Vec, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct TopDurationEntry { pub key: String, pub count: u64, @@ -283,6 +308,12 @@ mod tests { let _pp = h.span_publication_point("rsync://example.test/repo/manifest.mft"); } h.record_count("vrps", 42); + h.record_publication_point_nanos("rsync://example.test/repo/manifest.mft", 1_000_000); + h.record_publication_point_step_nanos( + "rsync://example.test/repo/manifest.mft", + "fresh_snapshot_prepare", + 1_000_000, + ); let dir = tempfile::tempdir().expect("tempdir"); let path = dir.path().join("timing.json"); @@ -312,5 +343,11 @@ mod tests { .any(|e| e.key.contains("manifest.mft")), "expected PP in top list" ); + assert!( + rep.top_publication_point_steps + .iter() + .any(|e| e.key.contains("fresh_snapshot_prepare")), + "expected PP step in top list" + ); } } diff --git a/src/cli.rs b/src/cli.rs index 716fec5..2631b3f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -6,7 +6,9 @@ use crate::ccr::{ use crate::cir::{CirTrustAnchorBinding, export_cir_from_input_snapshot_multi}; use std::path::{Path, PathBuf}; -use crate::analysis::timing::{TimingHandle, TimingMeta, TimingMetaUpdate}; +use crate::analysis::timing::{ + DurationStats, TimingHandle, TimingMeta, TimingMetaUpdate, TopDurationEntry, +}; use crate::audit::AuditRepoSyncStats; #[cfg(test)] use crate::audit::{ @@ -73,6 +75,9 @@ struct RunStageTiming { download_bytes_total: u64, roa_validation_cache: crate::validation::objects::RoaValidationCacheStats, analysis_counts: HashMap, + analysis_phases: HashMap, + analysis_top_publication_points: Vec, + analysis_top_publication_point_steps: Vec, vcir_storage_summary_ms: Option, vcir_storage: Option, memory_telemetry: Option, @@ -2454,6 +2459,9 @@ pub fn run(argv: &[String]) -> Result<(), String> { &total_started, store.as_ref(), ); + let timing_report_snapshot = timing + .as_ref() + .map(|(_, handle)| handle.report_snapshot(50)); let stage_timing = RunStageTiming { validation_ms, enable_roa_validation_cache: args.enable_roa_validation_cache, @@ -2483,6 +2491,18 @@ pub fn run(argv: &[String]) -> Result<(), String> { .as_ref() .map(|(_, handle)| handle.counts_snapshot()) .unwrap_or_default(), + analysis_phases: timing_report_snapshot + .as_ref() + .map(|report| report.phases.clone()) + .unwrap_or_default(), + analysis_top_publication_points: timing_report_snapshot + .as_ref() + .map(|report| report.top_publication_points.clone()) + .unwrap_or_default(), + analysis_top_publication_point_steps: timing_report_snapshot + .as_ref() + .map(|report| report.top_publication_point_steps.clone()) + .unwrap_or_default(), vcir_storage_summary_ms, vcir_storage, memory_telemetry: Some(MemoryTelemetrySummary { diff --git a/src/cli/tests.rs b/src/cli/tests.rs index 9487784..e474a90 100644 --- a/src/cli/tests.rs +++ b/src/cli/tests.rs @@ -1625,6 +1625,9 @@ fn run_report_task_and_stage_timing_work() { download_bytes_total: 15, roa_validation_cache: crate::validation::objects::RoaValidationCacheStats::default(), analysis_counts: std::collections::HashMap::new(), + analysis_phases: std::collections::HashMap::new(), + analysis_top_publication_points: Vec::new(), + analysis_top_publication_point_steps: Vec::new(), vcir_storage_summary_ms: Some(16), vcir_storage: Some(VcirStorageSummary { entry_count: 2, @@ -1730,6 +1733,9 @@ fn stage_timing_serializes_memory_telemetry() { "roa_validation_cache_hit_roas".to_string(), 2, )]), + analysis_phases: std::collections::HashMap::new(), + analysis_top_publication_points: Vec::new(), + analysis_top_publication_point_steps: Vec::new(), vcir_storage_summary_ms: None, vcir_storage: None, memory_telemetry: Some(MemoryTelemetrySummary { diff --git a/src/fetch/http.rs b/src/fetch/http.rs index 1ceb448..604b521 100644 --- a/src/fetch/http.rs +++ b/src/fetch/http.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::io::Write; use std::time::Duration; @@ -6,6 +7,19 @@ use reqwest::header::HeaderMap; use crate::sync::rrdp::Fetcher; +thread_local! { + static HTTP_TIMEOUT_OVERRIDE: RefCell> = const { RefCell::new(None) }; +} + +pub fn with_scoped_http_timeout_override(timeout: Duration, f: impl FnOnce() -> R) -> R { + HTTP_TIMEOUT_OVERRIDE.with(|cell| { + let previous = cell.replace(Some(timeout)); + let result = f(); + let _ = cell.replace(previous); + result + }) +} + #[derive(Clone, Debug)] pub struct HttpFetcherConfig { /// Connection-establishment timeout for HTTP requests. @@ -37,6 +51,7 @@ impl Default for HttpFetcherConfig { pub struct BlockingHttpFetcher { short_client: Client, large_body_client: Client, + retry_short_client: Client, short_timeout: Duration, large_body_timeout: Duration, } @@ -58,9 +73,16 @@ impl BlockingHttpFetcher { .user_agent(config.user_agent) .build() .map_err(|e| e.to_string())?; + let retry_short_client = Client::builder() + .connect_timeout(Duration::from_secs(1)) + .timeout(Duration::from_secs(1)) + .user_agent("rpki-dev/0.1 (stage2)") + .build() + .map_err(|e| e.to_string())?; Ok(Self { short_client, large_body_client, + retry_short_client, short_timeout, large_body_timeout, }) @@ -176,6 +198,9 @@ impl BlockingHttpFetcher { } fn client_for_uri(&self, uri: &str) -> (&Client, &'static str, Duration) { + if let Some(timeout) = HTTP_TIMEOUT_OVERRIDE.with(|cell| *cell.borrow()) { + return (&self.retry_short_client, "retry_short", timeout); + } if uses_large_body_timeout(uri) { ( &self.large_body_client, diff --git a/src/fetch/rsync.rs b/src/fetch/rsync.rs index 146644e..a18ec2d 100644 --- a/src/fetch/rsync.rs +++ b/src/fetch/rsync.rs @@ -24,6 +24,20 @@ pub trait RsyncFetcher: Send + Sync { /// Return a list of objects as `(rsync_uri, bytes)` pairs. fn fetch_objects(&self, rsync_base_uri: &str) -> RsyncFetchResult)>>; + /// Fetch one object by exact rsync URI. + /// + /// The default implementation fetches the parent directory and filters the exact + /// object. Live fetchers should override this to avoid widening one-object TAL + /// bootstrap fetches into whole publication point or module synchronizations. + fn fetch_object(&self, rsync_uri: &str) -> RsyncFetchResult> { + let base = parent_rsync_uri(rsync_uri).map_err(RsyncFetchError::Fetch)?; + self.fetch_objects(&base)? + .into_iter() + .find(|(uri, _)| uri == rsync_uri) + .map(|(_, bytes)| bytes) + .ok_or_else(|| RsyncFetchError::Fetch(format!("rsync object not found: {rsync_uri}"))) + } + /// Stream fetched objects to a visitor without requiring callers to materialize the /// full result vector in memory. fn visit_objects( @@ -62,6 +76,32 @@ pub trait RsyncFetcher: Send + Sync { } } +fn parent_rsync_uri(rsync_uri: &str) -> Result { + let parsed = url::Url::parse(rsync_uri).map_err(|e| e.to_string())?; + if parsed.scheme() != "rsync" { + return Err(format!("not an rsync URI: {rsync_uri}")); + } + let host = parsed + .host_str() + .ok_or_else(|| format!("missing host in rsync URI: {rsync_uri}"))?; + let segments = parsed + .path_segments() + .ok_or_else(|| format!("missing path in rsync URI: {rsync_uri}"))? + .collect::>(); + if segments.is_empty() || segments.last().copied().unwrap_or_default().is_empty() { + return Err(format!( + "rsync URI must reference a file object: {rsync_uri}" + )); + } + let parent_segments = &segments[..segments.len() - 1]; + let mut parent = format!("rsync://{host}/"); + if !parent_segments.is_empty() { + parent.push_str(&parent_segments.join("/")); + parent.push('/'); + } + Ok(parent) +} + /// A simple "rsync" implementation backed by a local directory. /// /// This is primarily meant for offline tests and fixtures. The key generation mimics rsync URIs: diff --git a/src/fetch/rsync_system.rs b/src/fetch/rsync_system.rs index a44f5cb..aeb7913 100644 --- a/src/fetch/rsync_system.rs +++ b/src/fetch/rsync_system.rs @@ -317,6 +317,34 @@ impl RsyncFetcher for SystemRsyncFetcher { Ok(out) } + fn fetch_object(&self, rsync_uri: &str) -> RsyncFetchResult> { + let parsed = + url::Url::parse(rsync_uri).map_err(|e| RsyncFetchError::Fetch(e.to_string()))?; + if parsed.scheme() != "rsync" { + return Err(RsyncFetchError::Fetch(format!( + "not an rsync URI: {rsync_uri}" + ))); + } + let file_name = parsed + .path_segments() + .and_then(|segments| segments.filter(|segment| !segment.is_empty()).next_back()) + .ok_or_else(|| { + RsyncFetchError::Fetch(format!( + "rsync URI must reference a file object: {rsync_uri}" + )) + })?; + let tmp = TempDir::new().map_err(RsyncFetchError::Fetch)?; + self.run_rsync(rsync_uri, tmp.path()) + .map_err(RsyncFetchError::Fetch)?; + let object_path = tmp.path().join(file_name); + std::fs::read(&object_path).map_err(|e| { + RsyncFetchError::Fetch(format!( + "read fetched rsync object failed: {}: {e}", + object_path.display() + )) + }) + } + fn visit_objects( &self, rsync_base_uri: &str, diff --git a/src/parallel/repo_runtime.rs b/src/parallel/repo_runtime.rs index e22f57b..cbfb8df 100644 --- a/src/parallel/repo_runtime.rs +++ b/src/parallel/repo_runtime.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -67,6 +68,27 @@ pub trait RepoSyncRuntime: Send + Sync { timeout: Duration, ) -> Result, String>; + fn drain_repo_results_timeout( + &self, + timeout: Duration, + max_events: usize, + ) -> Result, String> { + let max_events = max_events.max(1); + let mut events = Vec::new(); + for index in 0..max_events { + let poll_timeout = if index == 0 { + timeout + } else { + Duration::from_millis(0) + }; + let Some(event) = self.recv_repo_result_timeout(poll_timeout)? else { + break; + }; + events.push(event); + } + Ok(events) + } + fn reset_run_state(&self) -> Result<(), String>; fn prefetch_discovered_children( @@ -87,6 +109,7 @@ pub struct Phase1RepoSyncRuntime { coordinator: Mutex, worker_pool: Mutex>, transport_prefetch_recorder: Option>, + retry_short_rsync_scopes: Mutex>, rsync_scope_resolver: Arc String + Send + Sync>, rsync_failure_scope_resolver: Arc Option + Send + Sync>, sync_preference: SyncPreference, @@ -119,6 +142,7 @@ impl Phase1RepoSyncRuntime { coordinator: Mutex::new(coordinator), worker_pool: Mutex::new(worker_pool), transport_prefetch_recorder: None, + retry_short_rsync_scopes: Mutex::new(HashSet::new()), rsync_scope_resolver, rsync_failure_scope_resolver, sync_preference, @@ -138,6 +162,7 @@ impl Phase1RepoSyncRuntime { worker_pool: Mutex::new(worker_pool), transport_prefetch_recorder: record_transport_prefetch_requests .then(|| Mutex::new(TransportPrefetchRecorder::default())), + retry_short_rsync_scopes: Mutex::new(HashSet::new()), rsync_scope_resolver, rsync_failure_scope_resolver, sync_preference, @@ -191,6 +216,7 @@ impl Phase1RepoSyncRuntime { rsync_scope_uri, rsync_failure_scope_uri, self.sync_preference, + false, ) }; @@ -310,6 +336,12 @@ impl Phase1RepoSyncRuntime { }; let transport_identity = envelope.repo_identity.clone(); let completed_envelope = envelope.clone(); + if let Some(recorder) = self.transport_prefetch_recorder.as_ref() { + recorder + .lock() + .expect("transport prefetch recorder lock poisoned") + .record_result(&envelope); + } crate::progress_log::emit( "phase1_repo_task_result", serde_json::json!({ @@ -334,7 +366,19 @@ impl Phase1RepoSyncRuntime { }; if !completion.follow_up_tasks.is_empty() { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); - for task in completion.follow_up_tasks { + for mut task in completion.follow_up_tasks { + if let crate::parallel::types::RepoDedupKey::RsyncScope { rsync_scope_uri } = + &task.dedup_key + { + if self + .retry_short_rsync_scopes + .lock() + .expect("retry short rsync scopes lock poisoned") + .contains(rsync_scope_uri) + { + task.retry_short_timeout = true; + } + } crate::progress_log::emit( "phase1_repo_task_enqueued", serde_json::json!({ @@ -384,6 +428,27 @@ impl Phase1RepoSyncRuntime { })) } + fn pump_transport_results( + &self, + timeout: Duration, + max_events: usize, + ) -> Result, String> { + let max_events = max_events.max(1); + let mut events = Vec::new(); + for index in 0..max_events { + let poll_timeout = if index == 0 { + timeout + } else { + Duration::from_millis(0) + }; + let Some(event) = self.pump_one_transport_result(poll_timeout)? else { + break; + }; + events.push(event); + } + Ok(events) + } + fn runtime_state_for_identity(&self, identity: &RepoIdentity) -> Option { let coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator @@ -445,6 +510,14 @@ impl RepoSyncRuntime for Phase1RepoSyncRuntime { self.pump_one_transport_result(timeout) } + fn drain_repo_results_timeout( + &self, + timeout: Duration, + max_events: usize, + ) -> Result, String> { + self.pump_transport_results(timeout, max_events) + } + fn reset_run_state(&self) -> Result<(), String> { { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); @@ -499,6 +572,12 @@ impl RepoSyncRuntime for Phase1RepoSyncRuntime { stats.skipped_incompatible += 1; continue; } + if request.retry_short_rsync_timeout() { + self.retry_short_rsync_scopes + .lock() + .expect("retry short rsync scopes lock poisoned") + .insert(current_rsync_scope_uri.clone()); + } let action = { let mut coordinator = self.coordinator.lock().expect("coordinator lock poisoned"); coordinator.register_transport_request( @@ -509,6 +588,7 @@ impl RepoSyncRuntime for Phase1RepoSyncRuntime { current_rsync_scope_uri, current_rsync_failure_scope_uri, self.sync_preference, + request.retry_short_timeout(), ) }; @@ -957,6 +1037,64 @@ mod tests { ); } + #[test] + fn phase1_runtime_drains_multiple_ready_transport_events() { + let count = Arc::new(AtomicUsize::new(0)); + let coordinator = GlobalRunCoordinator::new( + ParallelPhase1Config::default(), + vec![TalInputSpec::from_url("https://example.test/arin.tal")], + ); + let pool = RepoTransportWorkerPool::new( + RepoWorkerPoolConfig { max_workers: 2 }, + CountingSuccessTransportExecutor { + count: Arc::clone(&count), + }, + ) + .expect("pool"); + let runtime = Phase1RepoSyncRuntime::new( + coordinator, + pool, + Arc::new(|base: &str| base.to_string()), + SyncPreference::RrdpThenRsync, + ); + let ca1 = sample_ca("rsync://example.test/repo/root.mft"); + let mut ca2 = sample_ca("rsync://example.net/repo/root.mft"); + ca2.rsync_base_uri = "rsync://example.net/repo/".to_string(); + ca2.publication_point_rsync_uri = "rsync://example.net/repo/".to_string(); + ca2.rrdp_notification_uri = Some("https://example.net/notify.xml".to_string()); + + assert!(matches!( + runtime + .request_publication_point_repo(&ca1, 0) + .expect("request ca1"), + super::RepoSyncRequestStatus::Pending { .. } + )); + assert!(matches!( + runtime + .request_publication_point_repo(&ca2, 0) + .expect("request ca2"), + super::RepoSyncRequestStatus::Pending { .. } + )); + + let started = Instant::now(); + while count.load(Ordering::SeqCst) < 2 && started.elapsed() < Duration::from_secs(1) { + std::thread::sleep(Duration::from_millis(5)); + } + assert_eq!(count.load(Ordering::SeqCst), 2); + + let events = runtime + .drain_repo_results_timeout(Duration::from_millis(0), 8) + .expect("drain events"); + assert_eq!(events.len(), 2); + assert_eq!( + events + .iter() + .map(|event| event.completions.len()) + .sum::(), + 2 + ); + } + #[test] fn phase1_runtime_reset_run_state_clears_completed_transport_reuse() { let count = Arc::new(AtomicUsize::new(0)); diff --git a/src/parallel/repo_scheduler.rs b/src/parallel/repo_scheduler.rs index 62b5727..8666a23 100644 --- a/src/parallel/repo_scheduler.rs +++ b/src/parallel/repo_scheduler.rs @@ -284,6 +284,7 @@ impl TransportStateTables { rsync_failure_scope_uri: None, repo_identity: identity.clone(), mode: RepoTransportMode::Rrdp, + retry_short_timeout: false, tal_id: requester.tal_id.clone(), rir_id: requester.rir_id.clone(), validation_time, @@ -460,6 +461,7 @@ impl TransportStateTables { rsync_failure_scope_uri: rsync_failure_scope_uri.clone(), repo_identity: identity.clone(), mode: RepoTransportMode::Rsync, + retry_short_timeout: false, tal_id: requester.tal_id.clone(), rir_id: requester.rir_id.clone(), validation_time, @@ -557,6 +559,7 @@ impl TransportStateTables { rsync_failure_scope_uri: record.rsync_failure_scope_key.clone(), repo_identity: record.identity.clone(), mode: RepoTransportMode::Rsync, + retry_short_timeout: false, tal_id: first_requester.tal_id.clone(), rir_id: first_requester.rir_id.clone(), validation_time: record.validation_time, diff --git a/src/parallel/repo_worker.rs b/src/parallel/repo_worker.rs index efbd347..3e5526d 100644 --- a/src/parallel/repo_worker.rs +++ b/src/parallel/repo_worker.rs @@ -12,6 +12,9 @@ use crate::analysis::timing::TimingHandle; use crate::audit_downloads::DownloadLogHandle; use crate::current_repo_index::CurrentRepoIndexHandle; use crate::fetch::rsync::RsyncFetcher; +use crate::fetch::rsync_system::{ + RsyncFailFastProfile, with_scoped_rsync_fail_fast_profile, with_scoped_rsync_timeout_override, +}; use crate::policy::Policy; use crate::storage::RocksStore; use crate::sync::repo::{ @@ -19,6 +22,8 @@ use crate::sync::repo::{ }; use crate::sync::rrdp::Fetcher; +const RETRY_SHORT_TIMEOUT: Duration = Duration::from_secs(1); + #[derive(Clone, Debug, PartialEq, Eq)] pub struct RepoWorkerPoolConfig { pub max_workers: usize, @@ -75,14 +80,28 @@ impl RepoTransportExecutor for LiveRrdpTransportExecutor RepoTransportResultEnvelope { dedup_key: task.dedup_key, rsync_failure_scope_uri: task.rsync_failure_scope_uri, @@ -143,14 +162,37 @@ impl RepoTransportExecutor for LiveRsyncTransportExec fn execute_transport(&self, task: RepoTransportTask) -> RepoTransportResultEnvelope { let started = std::time::Instant::now(); debug_assert_eq!(task.mode, RepoTransportMode::Rsync); - match run_rsync_transport( - self.store.as_ref(), - &task.repo_identity.rsync_base_uri, - Some(&self.current_repo_index), - self.rsync_fetcher.as_ref(), - self.timing.as_ref(), - self.download_log.as_ref(), - ) { + let sync_result = if task.retry_short_timeout { + with_scoped_rsync_timeout_override(RETRY_SHORT_TIMEOUT, || { + with_scoped_rsync_fail_fast_profile( + RsyncFailFastProfile { + initial_wall_clock_timeout: RETRY_SHORT_TIMEOUT, + max_wall_clock_timeout: RETRY_SHORT_TIMEOUT, + max_attempts: 1, + }, + || { + run_rsync_transport( + self.store.as_ref(), + &task.repo_identity.rsync_base_uri, + Some(&self.current_repo_index), + self.rsync_fetcher.as_ref(), + self.timing.as_ref(), + self.download_log.as_ref(), + ) + }, + ) + }) + } else { + run_rsync_transport( + self.store.as_ref(), + &task.repo_identity.rsync_base_uri, + Some(&self.current_repo_index), + self.rsync_fetcher.as_ref(), + self.timing.as_ref(), + self.download_log.as_ref(), + ) + }; + match sync_result { Ok(_) => RepoTransportResultEnvelope { dedup_key: task.dedup_key, rsync_failure_scope_uri: task.rsync_failure_scope_uri, @@ -667,6 +709,7 @@ mod tests { rsync_failure_scope_uri: None, repo_identity: RepoIdentity::new(Some(notification_uri.to_string()), rsync_base_uri), mode: RepoTransportMode::Rrdp, + retry_short_timeout: false, tal_id: "arin".to_string(), rir_id: "arin".to_string(), validation_time: time::OffsetDateTime::UNIX_EPOCH, @@ -692,6 +735,7 @@ mod tests { rsync_failure_scope_uri: None, repo_identity: RepoIdentity::new(None, rsync_base_uri), mode: RepoTransportMode::Rsync, + retry_short_timeout: false, tal_id: "arin".to_string(), rir_id: "arin".to_string(), validation_time: time::OffsetDateTime::UNIX_EPOCH, diff --git a/src/parallel/run_coordinator.rs b/src/parallel/run_coordinator.rs index 38ce112..db5ffec 100644 --- a/src/parallel/run_coordinator.rs +++ b/src/parallel/run_coordinator.rs @@ -125,8 +125,9 @@ impl GlobalRunCoordinator { rsync_scope_uri: String, rsync_failure_scope_uri: Option, sync_preference: SyncPreference, + retry_short_timeout: bool, ) -> TransportRequestAction { - let action = self.transport_tables.register_transport_request( + let mut action = self.transport_tables.register_transport_request( identity, requester, validation_time, @@ -135,8 +136,11 @@ impl GlobalRunCoordinator { rsync_failure_scope_uri, sync_preference, ); - match &action { + match &mut action { TransportRequestAction::Enqueue(task) => { + if retry_short_timeout { + task.retry_short_timeout = true; + } self.stats.repo_tasks_total += 1; self.pending_transport_tasks.push_back(task.clone()); self.stats.repo_queue_depth = self.pending_transport_tasks.len(); @@ -308,6 +312,7 @@ mod tests { "rsync://example.test/repo/".to_string(), None, SyncPreference::RrdpThenRsync, + false, ); assert!(matches!( action, diff --git a/src/parallel/transport_prefetch.rs b/src/parallel/transport_prefetch.rs index fa0ce0a..85c471d 100644 --- a/src/parallel/transport_prefetch.rs +++ b/src/parallel/transport_prefetch.rs @@ -40,6 +40,10 @@ pub struct TransportPrefetchRequest { pub rsync_failure_scope_uri: Option, pub repo_identity: TransportPrefetchRepoIdentity, pub mode: TransportPrefetchMode, + #[serde(default)] + pub last_result: Option, + #[serde(default)] + pub last_rsync_result: Option, pub tal_id: String, pub rir_id: String, pub priority: u8, @@ -84,6 +88,8 @@ impl TransportPrefetchRequest { rsync_failure_scope_uri, repo_identity: TransportPrefetchRepoIdentity::from_identity(identity), mode, + last_result: None, + last_rsync_result: None, tal_id: requester.tal_id.clone(), rir_id: requester.rir_id.clone(), priority, @@ -98,6 +104,8 @@ impl TransportPrefetchRequest { rsync_failure_scope_uri: task.rsync_failure_scope_uri.clone(), repo_identity: TransportPrefetchRepoIdentity::from_identity(&task.repo_identity), mode: TransportPrefetchMode::from_mode(task.mode), + last_result: None, + last_rsync_result: None, tal_id: task.tal_id.clone(), rir_id: task.rir_id.clone(), priority: task.priority, @@ -133,6 +141,20 @@ impl TransportPrefetchRequest { fn recorder_key(&self) -> String { self.dedup_key.stable_key() } + + pub fn retry_short_timeout(&self) -> bool { + matches!( + self.last_result, + Some(TransportPrefetchLastResult { ok: false }) + ) + } + + pub fn retry_short_rsync_timeout(&self) -> bool { + matches!( + self.last_rsync_result, + Some(TransportPrefetchLastResult { ok: false }) + ) || (self.mode == TransportPrefetchMode::Rsync && self.retry_short_timeout()) + } } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -197,6 +219,11 @@ impl TransportPrefetchMode { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TransportPrefetchLastResult { + pub ok: bool, +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct TransportPrefetchRequester { pub tal_id: String, @@ -234,6 +261,8 @@ impl TransportPrefetchRequester { #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct TransportPrefetchRecorder { requests_by_key: BTreeMap, + results_by_key: BTreeMap, + rsync_results_by_scope: BTreeMap, request_order: Vec, } @@ -263,6 +292,31 @@ impl TransportPrefetchRecorder { self.record_request(request); } + pub fn record_result(&mut self, result: &crate::parallel::types::RepoTransportResultEnvelope) { + let key = TransportPrefetchDedupKey::from_repo_key(&result.dedup_key).stable_key(); + let last_result = TransportPrefetchLastResult { + ok: matches!( + result.result, + crate::parallel::types::RepoTransportResultKind::Success { .. } + ), + }; + self.results_by_key.insert(key.clone(), last_result); + if let Some(request) = self.requests_by_key.get_mut(&key) { + request.last_result = Some(last_result); + } + if let crate::parallel::types::RepoDedupKey::RsyncScope { rsync_scope_uri } = + &result.dedup_key + { + self.rsync_results_by_scope + .insert(rsync_scope_uri.clone(), last_result); + for request in self.requests_by_key.values_mut() { + if request.rsync_scope_uri == *rsync_scope_uri { + request.last_rsync_result = Some(last_result); + } + } + } + } + pub fn snapshot(&self, sync_preference: SyncPreference) -> TransportPrefetchSnapshot { TransportPrefetchSnapshot::new( sync_preference, @@ -282,6 +336,16 @@ impl TransportPrefetchRecorder { let key = request.recorder_key(); if !self.requests_by_key.contains_key(&key) { self.request_order.push(key.clone()); + let mut request = request; + if request.last_result.is_none() { + request.last_result = self.results_by_key.get(&key).copied(); + } + if request.last_rsync_result.is_none() { + request.last_rsync_result = self + .rsync_results_by_scope + .get(&request.rsync_scope_uri) + .copied(); + } self.requests_by_key.insert(key, request); } } @@ -322,6 +386,7 @@ mod tests { "rsync://example.test/repo/", ), mode: RepoTransportMode::Rrdp, + retry_short_timeout: false, tal_id: "apnic".to_string(), rir_id: "apnic".to_string(), validation_time: time::OffsetDateTime::UNIX_EPOCH, @@ -397,4 +462,49 @@ mod tests { assert!(snapshot.is_compatible_with(SyncPreference::RrdpThenRsync)); assert!(!snapshot.is_compatible_with(SyncPreference::RsyncOnly)); } + + #[test] + fn recorder_marks_failed_rrdp_and_rsync_for_next_run_short_timeouts() { + let mut recorder = TransportPrefetchRecorder::default(); + let rrdp_task = task( + "https://example.test/notification.xml", + "rsync://example.test/repo/a.mft", + ); + recorder.record_task(&rrdp_task, "rsync://example.test/repo/".to_string()); + recorder.record_result(&crate::parallel::types::RepoTransportResultEnvelope { + dedup_key: RepoDedupKey::RrdpNotify { + notification_uri: "https://example.test/notification.xml".to_string(), + }, + rsync_failure_scope_uri: None, + repo_identity: rrdp_task.repo_identity.clone(), + mode: RepoTransportMode::Rrdp, + tal_id: "apnic".to_string(), + rir_id: "apnic".to_string(), + timing_ms: 1, + result: crate::parallel::types::RepoTransportResultKind::Failed { + detail: "timeout".to_string(), + warnings: Vec::new(), + }, + }); + recorder.record_result(&crate::parallel::types::RepoTransportResultEnvelope { + dedup_key: RepoDedupKey::RsyncScope { + rsync_scope_uri: "rsync://example.test/repo/".to_string(), + }, + rsync_failure_scope_uri: None, + repo_identity: rrdp_task.repo_identity.clone(), + mode: RepoTransportMode::Rsync, + tal_id: "apnic".to_string(), + rir_id: "apnic".to_string(), + timing_ms: 1, + result: crate::parallel::types::RepoTransportResultKind::Failed { + detail: "timeout".to_string(), + warnings: Vec::new(), + }, + }); + + let snapshot = recorder.snapshot(SyncPreference::RrdpThenRsync); + assert_eq!(snapshot.requests.len(), 1); + assert!(snapshot.requests[0].retry_short_timeout()); + assert!(snapshot.requests[0].retry_short_rsync_timeout()); + } } diff --git a/src/parallel/types.rs b/src/parallel/types.rs index ce278d9..48a6a71 100644 --- a/src/parallel/types.rs +++ b/src/parallel/types.rs @@ -108,6 +108,7 @@ pub struct RepoTransportTask { pub rsync_failure_scope_uri: Option, pub repo_identity: RepoIdentity, pub mode: RepoTransportMode, + pub retry_short_timeout: bool, pub tal_id: String, pub rir_id: String, pub validation_time: time::OffsetDateTime, @@ -223,6 +224,7 @@ impl RepoSyncTask { rsync_failure_scope_uri: None, repo_identity: self.repo_key.as_identity(), mode, + retry_short_timeout: false, tal_id: self.tal_id.clone(), rir_id: self.rir_id.clone(), validation_time: self.validation_time, diff --git a/src/query/object_resolver.rs b/src/query/object_resolver.rs index 5c28dbd..1c1d03a 100644 --- a/src/query/object_resolver.rs +++ b/src/query/object_resolver.rs @@ -14,3 +14,100 @@ pub fn resolve_object_from_cache_or_report( ) -> QueryDbResult> { Ok(None) } + +#[cfg(test)] +mod tests { + use std::fs; + + use serde_json::json; + + use super::*; + use crate::query_db::{ArtifactIndexerConfig, index_artifacts}; + + #[test] + fn report_path_for_run_resolves_indexed_run_directory() { + let temp = tempfile::tempdir().expect("tempdir"); + let run_dir = temp.path().join("runs/run_0001"); + fs::create_dir_all(&run_dir).expect("run dir"); + write_sample_run(&run_dir); + + let query_db_path = temp.path().join("query-db"); + index_artifacts(&ArtifactIndexerConfig { + query_db_path: query_db_path.clone(), + run_root: Some(temp.path().to_path_buf()), + run_dir: None, + repo_bytes_db_path: None, + projection_entry_limit: 50, + min_run_seq: None, + retain_indexed_runs: None, + }) + .expect("index"); + + let db = QueryDb::open(&query_db_path).expect("open db"); + assert_eq!( + report_path_for_run(&db, "run_0001").expect("report path"), + Some(run_dir.join("report.json")) + ); + assert!( + report_path_for_run(&db, "missing") + .expect("missing report path") + .is_none() + ); + assert!( + resolve_object_from_cache_or_report(&db, "run_0001") + .expect("placeholder resolver") + .is_none() + ); + } + + fn write_sample_run(run_dir: &Path) { + let report = json!({ + "format_version": 2, + "meta": {"validation_time_rfc3339_utc": "2026-06-20T00:00:00Z"}, + "tree": {"warnings": []}, + "publication_points": [ + { + "node_id": 1, + "rsync_base_uri": "rsync://repo.example/rpki/", + "manifest_rsync_uri": "rsync://repo.example/rpki/m.mft", + "publication_point_rsync_uri": "rsync://repo.example/rpki/", + "rrdp_notification_uri": "https://repo.example/rrdp/notification.xml", + "source": "rrdp", + "repo_sync_source": "rrdp", + "repo_sync_phase": "rrdp_delta", + "repo_sync_duration_ms": 7, + "repo_terminal_state": "fresh", + "warnings": [], + "objects": [ + {"rsync_uri":"rsync://repo.example/rpki/m.mft","sha256_hex":"11","kind":"manifest","result":"ok"} + ] + } + ], + "vrps": [], + "aspas": [], + "downloads": [], + "download_stats": {}, + "repo_sync_stats": {} + }); + fs::write( + run_dir.join("report.json"), + serde_json::to_vec(&report).unwrap(), + ) + .expect("report"); + let summary = json!({ + "status": "success", + "runId": "run_0001", + "runSeq": 1, + "startedAtRfc3339Utc": "2026-06-20T00:00:00Z", + "finishedAtRfc3339Utc": "2026-06-20T00:00:01Z", + "wallMs": 1000, + "reportCounts": {"vrps": 0, "aspas": 0, "publicationPoints": 1, "warnings": 0} + }); + fs::write( + run_dir.join("run-summary.json"), + serde_json::to_vec(&summary).unwrap(), + ) + .expect("summary"); + fs::write(run_dir.join("stage-timing.json"), b"{}").expect("stage"); + } +} diff --git a/src/storage.rs b/src/storage.rs index 2336738..b068efe 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -4,6 +4,7 @@ mod pack; use std::collections::{HashMap, HashSet}; use std::path::Path; +use std::sync::{Arc, Mutex}; use base64::Engine; use rocksdb::{ColumnFamily, DB, Direction, IteratorMode, Options, WriteBatch}; @@ -49,6 +50,21 @@ pub struct RocksStore { db: DB, external_raw_store: Option, external_repo_bytes: Option, + publication_point_cache_projection_index: Mutex, +} + +enum PublicationPointCacheProjectionIndexState { + Uninitialized, + Disabled, + BuildingFromEmpty { + index: HashMap>, + bytes: usize, + limit: usize, + }, + Loaded { + index: HashMap>, + bytes: usize, + }, } fn process_vm_rss_kb() -> Option { @@ -76,6 +92,28 @@ const ROCKSDB_MEMORY_PROPERTY_NAMES: &[(&str, &str)] = &[ ("background_errors", "rocksdb.background-errors"), ]; +const PP_CACHE_RAW_INDEX_ENV: &str = "RPKI_PP_CACHE_RAW_INDEX"; +const PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_ENV: &str = + "RPKI_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES"; +const DEFAULT_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES: usize = 32 * 1024 * 1024; + +fn pp_cache_raw_index_enabled() -> bool { + match std::env::var(PP_CACHE_RAW_INDEX_ENV) { + Ok(value) => !matches!( + value.trim().to_ascii_lowercase().as_str(), + "0" | "false" | "off" | "no" + ), + Err(_) => true, + } +} + +fn pp_cache_raw_index_empty_build_limit_bytes() -> usize { + std::env::var(PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_ENV) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(DEFAULT_PP_CACHE_RAW_INDEX_EMPTY_BUILD_LIMIT_BYTES) +} + #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] pub struct RocksDbMemoryProperties { pub cur_size_all_mem_tables: Option, @@ -2313,6 +2351,9 @@ impl RocksStore { db, external_raw_store: None, external_repo_bytes: None, + publication_point_cache_projection_index: Mutex::new( + PublicationPointCacheProjectionIndexState::Uninitialized, + ), }) } @@ -2715,7 +2756,9 @@ impl RocksStore { publication_point_projection, None, )?; - self.write_batch(batch) + self.write_batch(batch)?; + self.update_publication_point_cache_projection_index(publication_point_projection)?; + Ok(()) } pub fn replace_vcir_and_manifest_replay_meta( @@ -2791,6 +2834,7 @@ impl RocksStore { let write_batch_started = std::time::Instant::now(); self.write_batch(batch)?; + self.update_publication_point_cache_projection_index(publication_point_projection)?; timing.write_batch_ms = write_batch_started.elapsed().as_millis() as u64; timing.rss_after_write_batch_kb = process_vm_rss_kb(); Ok(timing) @@ -2853,6 +2897,100 @@ impl RocksStore { pub fn get_publication_point_cache_projection( &self, manifest_rsync_uri: &str, + ) -> StorageResult> { + self.get_publication_point_cache_projection_from_db(manifest_rsync_uri) + } + + pub fn get_publication_point_cache_projection_cached( + &self, + manifest_rsync_uri: &str, + ) -> StorageResult> { + let bytes = { + let mut guard = self + .publication_point_cache_projection_index + .lock() + .map_err(|e| { + StorageError::RocksDb(format!( + "publication point cache index lock poisoned: {e}" + )) + })?; + if matches!( + *guard, + PublicationPointCacheProjectionIndexState::Uninitialized + ) { + let load_started = std::time::Instant::now(); + let raw_index_enabled = pp_cache_raw_index_enabled(); + let (index, bytes) = if raw_index_enabled { + self.load_publication_point_cache_projection_index()? + } else { + (HashMap::new(), 0) + }; + *guard = if !raw_index_enabled { + crate::progress_log::emit( + "publication_point_cache_raw_index", + serde_json::json!({ + "state": "disabled_by_env", + "entries": 0, + "bytes": 0, + "load_ms": load_started.elapsed().as_millis() as u64, + }), + ); + PublicationPointCacheProjectionIndexState::Disabled + } else if index.is_empty() { + let limit = pp_cache_raw_index_empty_build_limit_bytes(); + crate::progress_log::emit( + "publication_point_cache_raw_index", + serde_json::json!({ + "state": "empty_building_bounded", + "entries": 0, + "bytes": 0, + "empty_build_limit_bytes": limit, + "load_ms": load_started.elapsed().as_millis() as u64, + }), + ); + PublicationPointCacheProjectionIndexState::BuildingFromEmpty { + index, + bytes: 0, + limit, + } + } else { + crate::progress_log::emit( + "publication_point_cache_raw_index", + serde_json::json!({ + "state": "loaded", + "entries": index.len(), + "bytes": bytes, + "load_ms": load_started.elapsed().as_millis() as u64, + }), + ); + PublicationPointCacheProjectionIndexState::Loaded { index, bytes } + }; + } + match &*guard { + PublicationPointCacheProjectionIndexState::Loaded { index, .. } => { + index.get(manifest_rsync_uri).cloned() + } + PublicationPointCacheProjectionIndexState::BuildingFromEmpty { index, .. } => { + index.get(manifest_rsync_uri).cloned() + } + PublicationPointCacheProjectionIndexState::Disabled => None, + PublicationPointCacheProjectionIndexState::Uninitialized => None, + } + }; + let Some(bytes) = bytes else { + return Ok(None); + }; + let projection = decode_cbor::( + bytes.as_ref(), + "publication_point_cache_projection", + )?; + projection.validate_internal()?; + Ok(Some(projection)) + } + + fn get_publication_point_cache_projection_from_db( + &self, + manifest_rsync_uri: &str, ) -> StorageResult> { let cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?; let key = publication_point_cache_projection_key(manifest_rsync_uri); @@ -2871,6 +3009,72 @@ impl RocksStore { Ok(Some(projection)) } + fn load_publication_point_cache_projection_index( + &self, + ) -> StorageResult<(HashMap>, usize)> { + let cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?; + let mode = IteratorMode::Start; + let mut index = HashMap::new(); + let mut bytes_total = 0usize; + for res in self.db.iterator_cf(cf, mode) { + let (key, value) = res.map_err(|e| StorageError::RocksDb(e.to_string()))?; + let Some(manifest_rsync_uri) = + publication_point_cache_projection_key_manifest_uri(&key) + else { + continue; + }; + bytes_total = bytes_total.saturating_add(value.len()); + index.insert(manifest_rsync_uri, Arc::<[u8]>::from(value.to_vec())); + } + Ok((index, bytes_total)) + } + + fn update_publication_point_cache_projection_index( + &self, + projection: Option<&PublicationPointCacheProjection>, + ) -> StorageResult<()> { + let Some(projection) = projection else { + return Ok(()); + }; + let mut guard = self + .publication_point_cache_projection_index + .lock() + .map_err(|e| { + StorageError::RocksDb(format!("publication point cache index lock poisoned: {e}")) + })?; + let bytes = encode_cbor(projection, "publication_point_cache_projection")?; + match &mut *guard { + PublicationPointCacheProjectionIndexState::Loaded { + index, + bytes: total_bytes, + } => { + *total_bytes = total_bytes.saturating_add(bytes.len()); + index.insert( + projection.manifest_rsync_uri.clone(), + Arc::<[u8]>::from(bytes), + ); + } + PublicationPointCacheProjectionIndexState::BuildingFromEmpty { + index, + bytes: total_bytes, + limit, + } => { + if total_bytes.saturating_add(bytes.len()) <= *limit { + *total_bytes += bytes.len(); + index.insert( + projection.manifest_rsync_uri.clone(), + Arc::<[u8]>::from(bytes), + ); + } else { + *guard = PublicationPointCacheProjectionIndexState::Disabled; + } + } + PublicationPointCacheProjectionIndexState::Uninitialized + | PublicationPointCacheProjectionIndexState::Disabled => {} + } + Ok(()) + } + pub fn put_transport_prefetch_snapshot( &self, snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot, diff --git a/src/storage/keys.rs b/src/storage/keys.rs index 9b6f0ac..eec3f0e 100644 --- a/src/storage/keys.rs +++ b/src/storage/keys.rs @@ -38,6 +38,12 @@ pub(super) fn publication_point_cache_projection_key(manifest_rsync_uri: &str) - format!("{PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX}{manifest_rsync_uri}") } +pub(super) fn publication_point_cache_projection_key_manifest_uri(key: &[u8]) -> Option { + let key = std::str::from_utf8(key).ok()?; + key.strip_prefix(PUBLICATION_POINT_CACHE_PROJECTION_KEY_PREFIX) + .map(str::to_string) +} + pub(super) fn rrdp_source_key(notify_uri: &str) -> String { format!("{RRDP_SOURCE_KEY_PREFIX}{notify_uri}") } diff --git a/src/storage/tests.rs b/src/storage/tests.rs index 7981d22..0bb1bc5 100644 --- a/src/storage/tests.rs +++ b/src/storage/tests.rs @@ -375,6 +375,85 @@ fn publication_point_cache_projection_roundtrips_with_vcir() { assert_eq!(got_projection.related_objects.len(), 2); } +#[test] +fn publication_point_cache_projection_index_updates_after_first_read() { + let td = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(td.path()).expect("open rocksdb"); + let vcir = sample_vcir("rsync://example.test/repo/current.mft"); + let mut projection = PublicationPointCacheProjection::from_vcir_with_context( + &vcir, + "rsync://example.test/repo/".to_string(), + Some("rsync://example.test/repo/ca.cer".to_string()), + sha256_32(b"ca-cert"), + sha256_32(b"manifest-old"), + sha256_32(b"ta-context"), + sha256_32(b"parent-context"), + sha256_32(b"policy"), + ) + .expect("build old publication point projection"); + + store + .put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection)) + .expect("put old projection"); + let got_old = store + .get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri) + .expect("get old projection") + .expect("old projection exists"); + assert_eq!(got_old.manifest_sha256, sha256_32(b"manifest-old")); + + projection.manifest_sha256 = sha256_32(b"manifest-new"); + store + .put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection)) + .expect("put new projection"); + let got_new = store + .get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri) + .expect("get new projection") + .expect("new projection exists"); + assert_eq!(got_new.manifest_sha256, sha256_32(b"manifest-new")); +} + +#[test] +fn publication_point_cache_projection_cached_empty_db_accepts_bounded_new_entries() { + let td = tempfile::tempdir().expect("tempdir"); + let store = RocksStore::open(td.path()).expect("open rocksdb"); + let vcir = sample_vcir("rsync://example.test/repo/current.mft"); + let projection = PublicationPointCacheProjection::from_vcir_with_context( + &vcir, + "rsync://example.test/repo/".to_string(), + Some("rsync://example.test/repo/ca.cer".to_string()), + sha256_32(b"ca-cert"), + sha256_32(b"manifest"), + sha256_32(b"ta-context"), + sha256_32(b"parent-context"), + sha256_32(b"policy"), + ) + .expect("build publication point projection"); + + assert!( + store + .get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri) + .expect("cached empty lookup") + .is_none() + ); + store + .put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection)) + .expect("put projection after cached empty lookup"); + assert_eq!( + store + .get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri) + .expect("cached lookup sees bounded new entry") + .expect("cached projection exists"), + projection + ); + assert_eq!( + store + .get_publication_point_cache_projection(&vcir.manifest_rsync_uri) + .expect("direct db lookup") + .expect("projection exists"), + projection + ); +} + #[test] fn publication_point_cache_projection_rejects_version_mismatch() { let vcir = sample_vcir("rsync://example.test/repo/current.mft"); @@ -967,6 +1046,8 @@ fn transport_prefetch_snapshot_roundtrips() { rsync_base_uri: "rsync://example.test/repo/".to_string(), }, mode: TransportPrefetchMode::Rrdp, + last_result: None, + last_rsync_result: None, tal_id: "apnic".to_string(), rir_id: "apnic".to_string(), priority: 0, diff --git a/src/validation/from_tal.rs b/src/validation/from_tal.rs index a549c94..99abcab 100644 --- a/src/validation/from_tal.rs +++ b/src/validation/from_tal.rs @@ -72,6 +72,44 @@ pub fn discover_root_ca_instance_from_tal_url_with_strict_name( ) } +pub fn discover_root_ca_instance_from_tal_url_with_fetchers( + http_fetcher: &dyn Fetcher, + rsync_fetcher: &dyn RsyncFetcher, + tal_url: &str, +) -> Result { + let tal_bytes = http_fetcher + .fetch(tal_url) + .map_err(FromTalError::TalFetch)?; + let tal = Tal::decode_bytes(&tal_bytes)?; + discover_root_ca_instance_from_tal_with_fetchers_impl( + http_fetcher, + rsync_fetcher, + tal, + Some(tal_url.to_string()), + false, + TaUriOrder::HttpFirst, + ) +} + +pub fn discover_root_ca_instance_from_tal_url_with_fetchers_strict_name( + http_fetcher: &dyn Fetcher, + rsync_fetcher: &dyn RsyncFetcher, + tal_url: &str, +) -> Result { + let tal_bytes = http_fetcher + .fetch(tal_url) + .map_err(FromTalError::TalFetch)?; + let tal = Tal::decode_bytes(&tal_bytes)?; + discover_root_ca_instance_from_tal_with_fetchers_impl( + http_fetcher, + rsync_fetcher, + tal, + Some(tal_url.to_string()), + true, + TaUriOrder::HttpFirst, + ) +} + pub fn discover_root_ca_instance_from_tal( http_fetcher: &dyn Fetcher, tal: Tal, @@ -150,6 +188,7 @@ pub fn discover_root_ca_instance_from_tal_with_fetchers( tal, tal_url, false, + TaUriOrder::RsyncFirst, ) } @@ -165,15 +204,23 @@ pub fn discover_root_ca_instance_from_tal_with_fetchers_strict_name( tal, tal_url, true, + TaUriOrder::RsyncFirst, ) } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum TaUriOrder { + HttpFirst, + RsyncFirst, +} + fn discover_root_ca_instance_from_tal_with_fetchers_impl( http_fetcher: &dyn Fetcher, rsync_fetcher: &dyn RsyncFetcher, tal: Tal, tal_url: Option, strict_name: bool, + ta_uri_order: TaUriOrder, ) -> Result { if tal.ta_uris.is_empty() { return Err(FromTalError::NoTaUris); @@ -181,7 +228,7 @@ fn discover_root_ca_instance_from_tal_with_fetchers_impl( let mut last_err: Option = None; let mut ta_uris = tal.ta_uris.clone(); - ta_uris.sort_by_key(|uri| if uri.scheme() == "rsync" { 0 } else { 1 }); + ta_uris.sort_by_key(|uri| ta_uri_priority(uri, ta_uri_order)); for ta_uri in ta_uris.iter() { let ta_der = match fetch_ta_der(http_fetcher, rsync_fetcher, ta_uri) { Ok(b) => b, @@ -221,6 +268,16 @@ fn discover_root_ca_instance_from_tal_with_fetchers_impl( }))) } +fn ta_uri_priority(uri: &Url, order: TaUriOrder) -> u8 { + match (order, uri.scheme()) { + (TaUriOrder::HttpFirst, "https" | "http") => 0, + (TaUriOrder::HttpFirst, "rsync") => 1, + (TaUriOrder::RsyncFirst, "rsync") => 0, + (TaUriOrder::RsyncFirst, "https" | "http") => 1, + _ => 2, + } +} + fn fetch_ta_der( http_fetcher: &dyn Fetcher, rsync_fetcher: &dyn RsyncFetcher, @@ -250,41 +307,9 @@ fn fetch_ta_der_via_rsync( rsync_fetcher: &dyn RsyncFetcher, ta_rsync_uri: &str, ) -> Result, String> { - let base = rsync_parent_uri(ta_rsync_uri)?; - let objects = rsync_fetcher - .fetch_objects(&base) - .map_err(|e| e.to_string())?; - objects - .into_iter() - .find(|(uri, _)| uri == ta_rsync_uri) - .map(|(_, bytes)| bytes) - .ok_or_else(|| format!("TA rsync object not found in fetched subtree: {ta_rsync_uri}")) -} - -fn rsync_parent_uri(ta_rsync_uri: &str) -> Result { - let url = Url::parse(ta_rsync_uri).map_err(|e| e.to_string())?; - if url.scheme() != "rsync" { - return Err(format!("not an rsync URI: {ta_rsync_uri}")); - } - let host = url - .host_str() - .ok_or_else(|| format!("missing host in rsync URI: {ta_rsync_uri}"))?; - let segments = url - .path_segments() - .ok_or_else(|| format!("missing path in rsync URI: {ta_rsync_uri}"))? - .collect::>(); - if segments.is_empty() || segments.last().copied().unwrap_or_default().is_empty() { - return Err(format!( - "rsync URI must reference a file object: {ta_rsync_uri}" - )); - } - let parent_segments = &segments[..segments.len() - 1]; - let mut parent = format!("rsync://{host}/"); - if !parent_segments.is_empty() { - parent.push_str(&parent_segments.join("/")); - parent.push('/'); - } - Ok(parent) + rsync_fetcher + .fetch_object(ta_rsync_uri) + .map_err(|e| e.to_string()) } pub fn discover_root_ca_instance_from_tal_and_ta_der( @@ -334,6 +359,20 @@ pub fn canonical_tal_rsync_uri_from_bytes(tal_bytes: &[u8]) -> Result>, + } + + impl Fetcher for MapHttpFetcher { + fn fetch(&self, uri: &str) -> Result, String> { + self.map + .get(uri) + .cloned() + .ok_or_else(|| format!("no fixture mapped for {uri}")) + } + } struct FailingHttpFetcher; @@ -392,6 +431,84 @@ mod tests { .starts_with("rsync://") ); } + + struct PanicRsyncFetcher; + + impl RsyncFetcher for PanicRsyncFetcher { + fn fetch_objects( + &self, + rsync_base_uri: &str, + ) -> crate::fetch::rsync::RsyncFetchResult)>> { + panic!("rsync should not be used when HTTPS TA URI is available: {rsync_base_uri}") + } + } + + #[test] + fn discover_root_ca_instance_from_tal_url_with_fetchers_prefers_https_ta_uri() { + let tal_url = "https://example.test/apnic.tal"; + let tal_bytes = std::fs::read( + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/tal/apnic-rfc7730-https.tal"), + ) + .unwrap(); + let ta_der = std::fs::read( + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/ta/apnic-ta.cer"), + ) + .unwrap(); + let https_uri = "https://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer"; + let mut map = HashMap::new(); + map.insert(tal_url.to_string(), tal_bytes); + map.insert(https_uri.to_string(), ta_der); + let discovery = discover_root_ca_instance_from_tal_url_with_fetchers( + &MapHttpFetcher { map }, + &PanicRsyncFetcher, + tal_url, + ) + .expect("discover via HTTPS TA URI"); + assert_eq!( + discovery.trust_anchor.resolved_ta_uri.unwrap().as_str(), + https_uri + ); + } + + #[test] + fn discover_root_ca_instance_from_tal_url_with_fetchers_supports_rsync_only_tal() { + let tal_url = "https://example.test/apnic-rsync-only.tal"; + let tal_bytes = std::fs::read_to_string( + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/tal/apnic-rfc7730-https.tal"), + ) + .unwrap(); + let rsync_uri = "rsync://rpki.apnic.net/repository/apnic-rpki-root-iana-origin.cer"; + let key_material = tal_bytes + .split_once("\n\n") + .map(|(_, key)| key) + .expect("fixture contains TAL key material"); + let rsync_only_tal = format!("{rsync_uri}\n\n{key_material}"); + let ta_der = std::fs::read( + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests/fixtures/ta/apnic-ta.cer"), + ) + .unwrap(); + let td = tempfile::tempdir().unwrap(); + let mirror_root = td.path().join("repository"); + std::fs::create_dir_all(&mirror_root).unwrap(); + std::fs::write(mirror_root.join("apnic-rpki-root-iana-origin.cer"), ta_der).unwrap(); + + let mut map = HashMap::new(); + map.insert(tal_url.to_string(), rsync_only_tal.into_bytes()); + let discovery = discover_root_ca_instance_from_tal_url_with_fetchers( + &MapHttpFetcher { map }, + &LocalDirRsyncFetcher::new(mirror_root), + tal_url, + ) + .expect("discover via rsync-only TAL"); + assert_eq!( + discovery.trust_anchor.resolved_ta_uri.unwrap().as_str(), + rsync_uri + ); + } } pub fn run_root_from_tal_url_once( @@ -402,7 +519,8 @@ pub fn run_root_from_tal_url_once( rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher, validation_time: time::OffsetDateTime, ) -> Result { - let discovery = discover_root_ca_instance_from_tal_url(http_fetcher, tal_url)?; + let discovery = + discover_root_ca_instance_from_tal_url_with_fetchers(http_fetcher, rsync_fetcher, tal_url)?; let run = run_publication_point_once( store, diff --git a/src/validation/run_tree_from_tal.rs b/src/validation/run_tree_from_tal.rs index ac538c9..037783c 100644 --- a/src/validation/run_tree_from_tal.rs +++ b/src/validation/run_tree_from_tal.rs @@ -27,8 +27,8 @@ use crate::validation::from_tal::{ DiscoveredRootCaInstance, FromTalError, canonical_tal_rsync_uri_from_bytes, discover_root_ca_instance_from_tal_and_ta_der, discover_root_ca_instance_from_tal_and_ta_der_with_strict_name, - discover_root_ca_instance_from_tal_url, - discover_root_ca_instance_from_tal_url_with_strict_name, + discover_root_ca_instance_from_tal_url_with_fetchers, + discover_root_ca_instance_from_tal_url_with_fetchers_strict_name, discover_root_ca_instance_from_tal_with_fetchers, discover_root_ca_instance_from_tal_with_fetchers_strict_name, }; @@ -348,9 +348,17 @@ fn root_discovery_from_tal_input( match &tal_input.source { TalSource::Url(url) => { if strict_name { - discover_root_ca_instance_from_tal_url_with_strict_name(http_fetcher, url) + discover_root_ca_instance_from_tal_url_with_fetchers_strict_name( + http_fetcher, + rsync_fetcher, + url, + ) } else { - discover_root_ca_instance_from_tal_url(http_fetcher, url) + discover_root_ca_instance_from_tal_url_with_fetchers( + http_fetcher, + rsync_fetcher, + url, + ) } } TalSource::DerBytes { @@ -414,12 +422,17 @@ fn root_discovery_from_tal_input( fn discover_root_ca_instance_from_tal_url_with_policy( policy: &crate::policy::Policy, http_fetcher: &dyn Fetcher, + rsync_fetcher: &dyn crate::fetch::rsync::RsyncFetcher, tal_url: &str, ) -> Result { if policy.strict.name { - discover_root_ca_instance_from_tal_url_with_strict_name(http_fetcher, tal_url) + discover_root_ca_instance_from_tal_url_with_fetchers_strict_name( + http_fetcher, + rsync_fetcher, + tal_url, + ) } else { - discover_root_ca_instance_from_tal_url(http_fetcher, tal_url) + discover_root_ca_instance_from_tal_url_with_fetchers(http_fetcher, rsync_fetcher, tal_url) } } @@ -544,8 +557,12 @@ pub fn run_tree_from_tal_url_serial( validation_time: time::OffsetDateTime, config: &TreeRunConfig, ) -> Result { - let discovery = - discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; + let discovery = discover_root_ca_instance_from_tal_url_with_policy( + policy, + http_fetcher, + rsync_fetcher, + tal_url, + )?; let runner = make_live_runner( store, @@ -585,8 +602,12 @@ pub fn run_tree_from_tal_url_serial_audit( validation_time: time::OffsetDateTime, config: &TreeRunConfig, ) -> Result { - let discovery = - discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; + let discovery = discover_root_ca_instance_from_tal_url_with_policy( + policy, + http_fetcher, + rsync_fetcher, + tal_url, + )?; let download_log = DownloadLogHandle::new(); let runner = make_live_runner( @@ -648,8 +669,12 @@ pub fn run_tree_from_tal_url_serial_audit_with_timing( timing: &TimingHandle, ) -> Result { let _tal = timing.span_phase("tal_bootstrap"); - let discovery = - discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; + let discovery = discover_root_ca_instance_from_tal_url_with_policy( + policy, + http_fetcher, + rsync_fetcher, + tal_url, + )?; drop(_tal); let download_log = DownloadLogHandle::new(); @@ -935,8 +960,12 @@ where H: Fetcher + Clone + 'static, R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, { - let discovery = - discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; + let discovery = discover_root_ca_instance_from_tal_url_with_policy( + policy, + http_fetcher, + rsync_fetcher, + tal_url, + )?; run_single_root_parallel_audit_inner( store, policy, @@ -1051,8 +1080,12 @@ where H: Fetcher + Clone + 'static, R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, { - let discovery = - discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; + let discovery = discover_root_ca_instance_from_tal_url_with_policy( + policy, + http_fetcher, + rsync_fetcher, + tal_url, + )?; run_single_root_parallel_audit_inner( store, policy, @@ -1086,8 +1119,12 @@ where H: Fetcher + Clone + 'static, R: crate::fetch::rsync::RsyncFetcher + Clone + 'static, { - let discovery = - discover_root_ca_instance_from_tal_url_with_policy(policy, http_fetcher, tal_url)?; + let discovery = discover_root_ca_instance_from_tal_url_with_policy( + policy, + http_fetcher, + rsync_fetcher, + tal_url, + )?; run_single_root_parallel_audit_inner( store, policy, diff --git a/src/validation/tree_parallel.rs b/src/validation/tree_parallel.rs index b1d849d..9db51f8 100644 --- a/src/validation/tree_parallel.rs +++ b/src/validation/tree_parallel.rs @@ -405,6 +405,8 @@ struct RepoDrainMetrics { duration_ms: u64, } +const REPO_RESULT_DRAIN_MAX_EVENTS: usize = 64; + fn elapsed_ms(started: Instant) -> u64 { started.elapsed().as_millis() as u64 } @@ -1184,6 +1186,11 @@ fn stage_ready_publication_point( ) { ParallelObjectsPrepare::Complete(mut objects) => { metrics.prepare_ms = elapsed_ms(prepare_started); + runner.record_publication_point_step_ms( + &ready.node.handle.manifest_rsync_uri, + "fresh_objects_prepare", + metrics.prepare_ms, + ); metrics.complete_count = 1; metrics.roa_tasks = objects.stats.roa_total; metrics.aspa_objects = objects.stats.aspa_total; @@ -1210,15 +1217,30 @@ fn stage_ready_publication_point( metrics.direct_finalize_ms = finalize_metrics .finalize_ms .max(elapsed_ms(direct_finalize_started)); + runner.record_publication_point_step_ms( + &metrics.manifest_rsync_uri.clone().unwrap_or_default(), + "fresh_direct_finalize", + metrics.direct_finalize_ms, + ); } ParallelObjectsPrepare::Staged(objects_stage) => { metrics.prepare_ms = elapsed_ms(prepare_started); + runner.record_publication_point_step_ms( + &ready.node.handle.manifest_rsync_uri, + "fresh_objects_prepare", + metrics.prepare_ms, + ); metrics.staged_count = 1; metrics.locked_files = objects_stage.locked_file_count(); metrics.aspa_objects = objects_stage.aspa_task_count(); let build_tasks_started = Instant::now(); objects_stage.append_roa_tasks_to(pending_roa_dispatch); metrics.build_roa_tasks_ms = elapsed_ms(build_tasks_started); + runner.record_publication_point_step_ms( + &ready.node.handle.manifest_rsync_uri, + "fresh_build_roa_tasks", + metrics.build_roa_tasks_ms, + ); let task_count = objects_stage.roa_task_count(); metrics.roa_tasks = task_count; if task_count == 0 { @@ -1275,6 +1297,12 @@ fn stage_ready_publication_point( } } metrics.total_ms = elapsed_ms(publication_point_started); + if metrics.complete_count > 0 { + runner.record_publication_point_total_ms( + metrics.manifest_rsync_uri.as_deref().unwrap_or_default(), + metrics.total_ms, + ); + } emit_ready_publication_point_control_slow( metrics.manifest_rsync_uri.as_deref().unwrap_or_default(), metrics @@ -1916,10 +1944,20 @@ fn finalize_publication_point_state( .as_millis() as u64 }); let objects_processing_ms = objects_started_at.elapsed().as_millis() as u64; + runner.record_publication_point_step_ms( + &node.handle.manifest_rsync_uri, + "fresh_objects_processing_lifetime", + objects_processing_ms, + ); let reduce_started = Instant::now(); let locked_files = objects_stage.locked_file_count(); let reduce_result = reduce_parallel_roa_stage(objects_stage, results, runner.timing.as_ref()); let reduce_ms = elapsed_ms(reduce_started); + runner.record_publication_point_step_ms( + &node.handle.manifest_rsync_uri, + "fresh_roa_reduce", + reduce_ms, + ); let (result, mut metrics) = match reduce_result { Ok(mut objects) => { @@ -1985,6 +2023,25 @@ fn finalize_publication_point_state( }; let finalize_worker_ms = elapsed_ms(finalize_worker_started); metrics.finalize_worker_ms = finalize_worker_ms; + runner.record_publication_point_step_ms( + &node.handle.manifest_rsync_uri, + "fresh_finalize_worker", + finalize_worker_ms, + ); + runner.record_publication_point_step_ms( + &node.handle.manifest_rsync_uri, + "fresh_finalize_queue_wait", + finalize_queue_wait_ms.unwrap_or(0), + ); + runner.record_publication_point_step_ms( + &node.handle.manifest_rsync_uri, + "fresh_finalize", + metrics.finalize_ms, + ); + runner.record_publication_point_total_ms( + &node.handle.manifest_rsync_uri, + started_at.elapsed().as_millis() as u64, + ); emit_finalize_breakdown( "phase2_finalize_worker_breakdown", node.handle.manifest_rsync_uri.as_str(), @@ -2056,10 +2113,10 @@ fn drain_repo_events( ) -> Result { let started = Instant::now(); let mut metrics = RepoDrainMetrics::default(); - if let Some(event) = repo_runtime - .recv_repo_result_timeout(timeout) - .map_err(TreeRunError::Runner)? - { + let events = repo_runtime + .drain_repo_results_timeout(timeout, REPO_RESULT_DRAIN_MAX_EVENTS) + .map_err(TreeRunError::Runner)?; + for event in events { metrics.event_count += 1; metrics.completions += event.completions.len(); for completion in event.completions { diff --git a/src/validation/tree_runner.rs b/src/validation/tree_runner.rs index f145d97..d878de5 100644 --- a/src/validation/tree_runner.rs +++ b/src/validation/tree_runner.rs @@ -25,11 +25,12 @@ use crate::replay::archive::ReplayArchiveIndex; use crate::replay::delta_archive::ReplayDeltaArchiveIndex; use crate::report::{RfcRef, Warning}; use crate::storage::{ - PackFile, PackTime, PublicationPointCacheChild, PublicationPointCacheProjection, - RawByHashEntry, RocksStore, ValidatedCaInstanceResult, VcirArtifactKind, VcirArtifactRole, - VcirArtifactValidationStatus, VcirAuditSummary, VcirCcrManifestProjection, VcirChildEntry, - VcirInstanceGate, VcirLocalOutput, VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, - VcirReplaceTimingBreakdown, VcirSourceObjectType, VcirSummary, + PackFile, PackTime, PublicationPointCacheChild, PublicationPointCacheOutput, + PublicationPointCacheProjection, RawByHashEntry, RocksStore, ValidatedCaInstanceResult, + VcirArtifactKind, VcirArtifactRole, VcirArtifactValidationStatus, VcirAuditSummary, + VcirCcrManifestProjection, VcirChildEntry, VcirInstanceGate, VcirLocalOutput, + VcirLocalOutputPayload, VcirOutputType, VcirRelatedArtifact, VcirReplaceTimingBreakdown, + VcirSourceObjectType, VcirSummary, }; use crate::sync::repo::{ sync_publication_point, sync_publication_point_replay, sync_publication_point_replay_delta, @@ -241,7 +242,7 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { } let projection = match self .store - .get_publication_point_cache_projection(&ca.manifest_rsync_uri) + .get_publication_point_cache_projection_cached(&ca.manifest_rsync_uri) { Ok(Some(projection)) => projection, Ok(None) => { @@ -526,16 +527,12 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { let output_reuse_count = projection.outputs.len() as u64; let child_reuse_count = projection.children.len() as u64; let related_object_reuse_count = projection.related_objects.len() as u64; - let to_vcir_started = std::time::Instant::now(); - let mut vcir = projection.to_vcir_for_reuse(self.validation_time); - let to_vcir_ms = self.record_publication_point_cache_phase_ms( - "publication_point_cache_to_vcir_total", - to_vcir_started, - ); - vcir.parent_manifest_rsync_uri = ca.parent_manifest_rsync_uri.clone(); let build_objects_started = std::time::Instant::now(); - let mut objects = - build_objects_output_from_vcir(&vcir, self.validation_time, &mut warnings); + let mut objects = build_objects_output_from_publication_point_cache_projection( + &projection, + self.validation_time, + &mut warnings, + ); let build_objects_ms = self.record_publication_point_cache_phase_ms( "publication_point_cache_build_objects_total", build_objects_started, @@ -563,19 +560,18 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { ccr_append_started, ); let audit_build_started = std::time::Instant::now(); - let audit = build_publication_point_audit_from_vcir( + let audit = build_publication_point_audit_from_publication_point_cache_projection( ca, PublicationPointSource::PublicationPointCache, repo_sync_source, repo_sync_phase, Some(repo_sync_duration_ms), repo_sync_err, - Some(&vcir), - None, + &projection, + self.validation_time, &warnings, &objects, &child_audits, - &[], ); let audit_build_ms = self.record_publication_point_cache_phase_ms( "publication_point_cache_audit_build_total", @@ -614,7 +610,6 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { "children_reused": child_reuse_count, "related_objects_reused": related_object_reuse_count, "audit_objects_reused": audit_object_count, - "to_vcir_ms": to_vcir_ms, "build_objects_ms": build_objects_ms, "restore_children_ms": restore_children_ms, "restore_children_workers": child_restore_workers, @@ -649,6 +644,27 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { nanos / 1_000_000 } + pub(crate) fn record_publication_point_total_ms(&self, manifest_rsync_uri: &str, ms: u64) { + if let Some(timing) = self.timing.as_ref() { + timing.record_publication_point_nanos(manifest_rsync_uri, ms.saturating_mul(1_000_000)); + } + } + + pub(crate) fn record_publication_point_step_ms( + &self, + manifest_rsync_uri: &str, + step: &'static str, + ms: u64, + ) { + if let Some(timing) = self.timing.as_ref() { + timing.record_publication_point_step_nanos( + manifest_rsync_uri, + step, + ms.saturating_mul(1_000_000), + ); + } + } + fn publication_point_cache_child_restore_worker_count(&self) -> usize { self.parallel_phase2_config .as_ref() @@ -772,6 +788,88 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { error, snapshot_prepare_ms, })?; + if let Some(timing) = self.timing.as_ref() { + timing.record_phase_nanos( + "fresh_snapshot_prepare_total", + snapshot_prepare_ms.saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_snapshot_manifest_load_total", + snapshot_prepare_timing + .manifest_load_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_snapshot_manifest_decode_total", + snapshot_prepare_timing + .manifest_decode_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_snapshot_replay_guard_total", + snapshot_prepare_timing + .replay_guard_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_snapshot_manifest_entries_total", + snapshot_prepare_timing + .manifest_entries_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_snapshot_pack_files_total", + snapshot_prepare_timing + .pack_files_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_snapshot_ee_path_validate_total", + snapshot_prepare_timing + .ee_path_validate_ms + .saturating_mul(1_000_000), + ); + timing.record_count("fresh_publication_points", 1); + timing.record_count( + "fresh_manifest_files_total", + snapshot_prepare_timing.manifest_file_count as u64, + ); + } + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_snapshot_prepare", + snapshot_prepare_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_snapshot_manifest_load", + snapshot_prepare_timing.manifest_load_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_snapshot_manifest_decode", + snapshot_prepare_timing.manifest_decode_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_snapshot_replay_guard", + snapshot_prepare_timing.replay_guard_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_snapshot_manifest_entries", + snapshot_prepare_timing.manifest_entries_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_snapshot_pack_files", + snapshot_prepare_timing.pack_files_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_snapshot_ee_path_validate", + snapshot_prepare_timing.ee_path_validate_ms, + ); let child_discovery_started = std::time::Instant::now(); let out = { @@ -800,6 +898,26 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { ), }; let child_discovery_ms = child_discovery_started.elapsed().as_millis() as u64; + if let Some(timing) = self.timing.as_ref() { + timing.record_phase_nanos( + "fresh_child_discovery_total", + child_discovery_ms.saturating_mul(1_000_000), + ); + timing.record_count( + "fresh_children_discovered", + discovered_children.len() as u64, + ); + timing.record_count("fresh_child_audits", child_audits.len() as u64); + timing.record_count( + "fresh_router_keys_discovered", + discovered_router_keys.len() as u64, + ); + } + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_child_discovery", + child_discovery_ms, + ); Ok(FreshPublicationPointStage { fresh_point, @@ -829,6 +947,17 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { let snapshot_pack_started = std::time::Instant::now(); let pack = fresh_point.to_publication_point_snapshot(); let snapshot_pack_ms = snapshot_pack_started.elapsed().as_millis() as u64; + if let Some(timing) = self.timing.as_ref() { + timing.record_phase_nanos( + "fresh_snapshot_pack_total", + snapshot_pack_ms.saturating_mul(1_000_000), + ); + } + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_snapshot_pack", + snapshot_pack_ms, + ); let persist_vcir_started = std::time::Instant::now(); let persist_vcir_timing = if self.persist_vcir { @@ -850,6 +979,64 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { PersistVcirTimingBreakdown::default() }; let persist_vcir_ms = persist_vcir_started.elapsed().as_millis() as u64; + if let Some(timing) = self.timing.as_ref() { + timing.record_phase_nanos( + "fresh_persist_vcir_total", + persist_vcir_ms.saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_persist_embedded_store_total", + persist_vcir_timing + .embedded_store_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_persist_build_vcir_total", + persist_vcir_timing.build_vcir_ms.saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_persist_replace_vcir_total", + persist_vcir_timing + .replace_vcir_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_persist_local_outputs_total", + persist_vcir_timing + .build_vcir + .local_outputs_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_persist_child_entries_total", + persist_vcir_timing + .build_vcir + .child_entries_ms + .saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_persist_related_artifacts_total", + persist_vcir_timing + .build_vcir + .related_artifacts_ms + .saturating_mul(1_000_000), + ); + } + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_persist_vcir", + persist_vcir_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_persist_build_vcir", + persist_vcir_timing.build_vcir_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_persist_replace_vcir", + persist_vcir_timing.replace_vcir_ms, + ); // local_outputs_cache only exists to build/persist VCIR. Release it before the // publication point result is retained for the rest of the run. @@ -868,6 +1055,26 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { self.append_ccr_manifest_projection(&ccr_manifest_projection)?; ccr_append_ms = ccr_append_started.elapsed().as_millis() as u64; } + if let Some(timing) = self.timing.as_ref() { + timing.record_phase_nanos( + "fresh_ccr_projection_build_total", + ccr_projection_build_ms.saturating_mul(1_000_000), + ); + timing.record_phase_nanos( + "fresh_ccr_append_total", + ccr_append_ms.saturating_mul(1_000_000), + ); + } + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_ccr_projection_build", + ccr_projection_build_ms, + ); + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_ccr_append", + ccr_append_ms, + ); let audit_build_started = std::time::Instant::now(); let audit = build_publication_point_audit_from_snapshot( @@ -883,6 +1090,17 @@ impl<'a> Rpkiv1PublicationPointRunner<'a> { &child_audits, ); let audit_build_ms = audit_build_started.elapsed().as_millis() as u64; + if let Some(timing) = self.timing.as_ref() { + timing.record_phase_nanos( + "fresh_audit_build_total", + audit_build_ms.saturating_mul(1_000_000), + ); + } + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_audit_build", + audit_build_ms, + ); Ok(FreshPublicationPointFinalizeOutput { result: PublicationPointRunResult { @@ -1296,6 +1514,11 @@ impl<'a> PublicationPointRunner for Rpkiv1PublicationPointRunner<'a> { } }; let objects_processing_ms = objects_processing_started.elapsed().as_millis() as u64; + self.record_publication_point_step_ms( + &ca.manifest_rsync_uri, + "fresh_objects_processing", + objects_processing_ms, + ); objects.router_keys.extend(discovered_router_keys); objects @@ -2741,6 +2964,98 @@ fn build_publication_point_audit_from_vcir( } } +fn build_publication_point_audit_from_publication_point_cache_projection( + ca: &CaInstanceHandle, + source: PublicationPointSource, + repo_sync_source: Option<&str>, + repo_sync_phase: Option<&str>, + repo_sync_duration_ms: Option, + repo_sync_error: Option<&str>, + projection: &PublicationPointCacheProjection, + validation_time: time::OffsetDateTime, + runner_warnings: &[Warning], + objects: &crate::validation::objects::ObjectsOutput, + child_audits: &[ObjectAuditEntry], +) -> PublicationPointAudit { + let mut warnings = Vec::new(); + warnings.extend(runner_warnings.iter().map(AuditWarning::from)); + warnings.extend(objects.warnings.iter().map(AuditWarning::from)); + + let mut audit_by_uri: HashMap = HashMap::new(); + for artifact in &projection.related_objects { + let Some(uri) = artifact.uri.as_ref() else { + continue; + }; + audit_by_uri.insert( + uri.clone(), + ObjectAuditEntry { + rsync_uri: uri.clone(), + sha256_hex: artifact.sha256.clone(), + kind: kind_from_vcir_artifact_kind(artifact.artifact_kind), + result: audit_result_from_vcir_status(artifact.validation_status), + detail: None, + }, + ); + } + for entry in child_audits { + audit_by_uri.insert(entry.rsync_uri.clone(), entry.clone()); + } + for entry in &objects.audit { + audit_by_uri.insert(entry.rsync_uri.clone(), entry.clone()); + } + + let mut ordered_uris: Vec = projection + .related_objects + .iter() + .filter_map(|artifact| artifact.uri.clone()) + .collect(); + ordered_uris.sort(); + ordered_uris.dedup(); + + let mut objects_out: Vec = Vec::with_capacity(ordered_uris.len().max(1)); + if let Some(entry) = audit_by_uri.remove(&projection.manifest_rsync_uri) { + objects_out.push(entry); + } else { + objects_out.push(ObjectAuditEntry { + rsync_uri: projection.manifest_rsync_uri.clone(), + sha256_hex: hex::encode(projection.manifest_sha256), + kind: AuditObjectKind::Manifest, + result: AuditObjectResult::Ok, + detail: None, + }); + } + + for uri in ordered_uris { + if uri == projection.manifest_rsync_uri { + continue; + } + if let Some(entry) = audit_by_uri.remove(&uri) { + objects_out.push(entry); + } + } + + PublicationPointAudit { + node_id: None, + parent_node_id: None, + discovered_from: None, + rsync_base_uri: ca.rsync_base_uri.clone(), + manifest_rsync_uri: ca.manifest_rsync_uri.clone(), + publication_point_rsync_uri: ca.publication_point_rsync_uri.clone(), + rrdp_notification_uri: ca.rrdp_notification_uri.clone(), + source: source_label(source), + repo_sync_source: repo_sync_source.map(ToString::to_string), + repo_sync_phase: repo_sync_phase.map(ToString::to_string), + repo_sync_duration_ms, + repo_sync_error: repo_sync_error.map(ToString::to_string), + repo_terminal_state: terminal_state_label(source).to_string(), + this_update_rfc3339_utc: projection.manifest_this_update.rfc3339_utc.clone(), + next_update_rfc3339_utc: projection.manifest_next_update.rfc3339_utc.clone(), + verified_at_rfc3339_utc: PackTime::from_utc_offset_datetime(validation_time).rfc3339_utc, + warnings, + objects: objects_out, + } +} + fn parse_snapshot_time_value(pack_time: &PackTime) -> Result { time::OffsetDateTime::parse( &pack_time.rfc3339_utc, @@ -3160,6 +3475,202 @@ fn build_objects_output_from_vcir( output } +fn build_objects_output_from_publication_point_cache_projection( + projection: &PublicationPointCacheProjection, + validation_time: time::OffsetDateTime, + warnings: &mut Vec, +) -> crate::validation::objects::ObjectsOutput { + let mut output = empty_objects_output(); + let mut audit_by_uri: HashMap = HashMap::new(); + let mut roa_total: HashSet = HashSet::new(); + let mut aspa_total: HashSet = HashSet::new(); + let mut roa_ok: HashSet = HashSet::new(); + let mut aspa_ok: HashSet = HashSet::new(); + + for artifact in &projection.related_objects { + if artifact.artifact_role != VcirArtifactRole::SignedObject { + continue; + } + if let Some(uri) = artifact.uri.as_ref() { + match artifact.artifact_kind { + VcirArtifactKind::Roa => { + roa_total.insert(uri.clone()); + } + VcirArtifactKind::Aspa => { + aspa_total.insert(uri.clone()); + } + _ => {} + } + } + } + + for projected in &projection.outputs { + let effective_until = match parse_snapshot_time_value(&projected.item_effective_until) { + Ok(value) => value, + Err(err) => { + warnings.push( + Warning::new(format!( + "publication-point cached local output has invalid item_effective_until: {err}" + )) + .with_context(&projected.source_object_uri), + ); + audit_by_uri.insert( + projected.source_object_uri.clone(), + ObjectAuditEntry { + rsync_uri: projected.source_object_uri.clone(), + sha256_hex: hex::encode(projected.source_object_hash), + kind: audit_kind_for_vcir_output_type(projected.output_type), + result: AuditObjectResult::Error, + detail: Some( + "publication-point cached local output has invalid item_effective_until" + .to_string(), + ), + }, + ); + continue; + } + }; + if validation_time > effective_until { + audit_by_uri + .entry(projected.source_object_uri.clone()) + .or_insert_with(|| ObjectAuditEntry { + rsync_uri: projected.source_object_uri.clone(), + sha256_hex: hex::encode(projected.source_object_hash), + kind: audit_kind_for_vcir_output_type(projected.output_type), + result: AuditObjectResult::Skipped, + detail: Some( + "skipped: publication-point cached local output expired".to_string(), + ), + }); + continue; + } + + match projected.output_type { + VcirOutputType::Vrp => match parse_publication_point_cache_vrp_output(projected) { + Ok(vrp) => { + roa_ok.insert(projected.source_object_uri.clone()); + output.vrps.push(vrp); + audit_by_uri.insert( + projected.source_object_uri.clone(), + ObjectAuditEntry { + rsync_uri: projected.source_object_uri.clone(), + sha256_hex: hex::encode(projected.source_object_hash), + kind: AuditObjectKind::Roa, + result: AuditObjectResult::Ok, + detail: None, + }, + ); + } + Err(err) => { + warnings.push( + Warning::new(format!( + "publication-point cached ROA local output parse failed: {err}" + )) + .with_context(&projected.source_object_uri), + ); + audit_by_uri.insert( + projected.source_object_uri.clone(), + ObjectAuditEntry { + rsync_uri: projected.source_object_uri.clone(), + sha256_hex: hex::encode(projected.source_object_hash), + kind: AuditObjectKind::Roa, + result: AuditObjectResult::Error, + detail: Some(format!( + "publication-point cached ROA local output parse failed: {err}" + )), + }, + ); + } + }, + VcirOutputType::Aspa => match parse_publication_point_cache_aspa_output(projected) { + Ok(aspa) => { + aspa_ok.insert(projected.source_object_uri.clone()); + output.aspas.push(aspa); + audit_by_uri.insert( + projected.source_object_uri.clone(), + ObjectAuditEntry { + rsync_uri: projected.source_object_uri.clone(), + sha256_hex: hex::encode(projected.source_object_hash), + kind: AuditObjectKind::Aspa, + result: AuditObjectResult::Ok, + detail: None, + }, + ); + } + Err(err) => { + warnings.push( + Warning::new(format!( + "publication-point cached ASPA local output parse failed: {err}" + )) + .with_context(&projected.source_object_uri), + ); + audit_by_uri.insert( + projected.source_object_uri.clone(), + ObjectAuditEntry { + rsync_uri: projected.source_object_uri.clone(), + sha256_hex: hex::encode(projected.source_object_hash), + kind: AuditObjectKind::Aspa, + result: AuditObjectResult::Error, + detail: Some(format!( + "publication-point cached ASPA local output parse failed: {err}" + )), + }, + ); + } + }, + VcirOutputType::RouterKey => { + match parse_publication_point_cache_router_key_output(projected) { + Ok(router_key) => { + output.router_keys.push(router_key); + audit_by_uri.insert( + projected.source_object_uri.clone(), + ObjectAuditEntry { + rsync_uri: projected.source_object_uri.clone(), + sha256_hex: hex::encode(projected.source_object_hash), + kind: AuditObjectKind::RouterCertificate, + result: AuditObjectResult::Ok, + detail: Some( + "publication-point cached Router Key local output restored" + .to_string(), + ), + }, + ); + } + Err(err) => { + warnings.push( + Warning::new(format!( + "publication-point cached Router Key local output parse failed: {err}" + )) + .with_context(&projected.source_object_uri), + ); + audit_by_uri.insert( + projected.source_object_uri.clone(), + ObjectAuditEntry { + rsync_uri: projected.source_object_uri.clone(), + sha256_hex: hex::encode(projected.source_object_hash), + kind: AuditObjectKind::RouterCertificate, + result: AuditObjectResult::Error, + detail: Some(format!( + "publication-point cached Router Key local output parse failed: {err}" + )), + }, + ); + } + } + } + } + } + + output.stats.roa_total = roa_total.len(); + output.stats.roa_ok = roa_ok.len(); + output.stats.aspa_total = aspa_total.len(); + output.stats.aspa_ok = aspa_ok.len(); + let mut audit: Vec<_> = audit_by_uri.into_values().collect(); + audit.sort_by(|left, right| left.rsync_uri.cmp(&right.rsync_uri)); + output.audit = audit; + output +} + fn parse_vcir_vrp_output(local: &VcirLocalOutput) -> Result { match &local.payload { VcirLocalOutputPayload::Vrp { @@ -3213,6 +3724,65 @@ fn parse_vcir_router_key_output(local: &VcirLocalOutput) -> Result Result { + match &projected.payload { + VcirLocalOutputPayload::Vrp { + asn, + afi, + prefix_len, + addr, + max_length, + } => Ok(Vrp { + asn: *asn, + prefix: crate::data_model::roa::IpPrefix { + afi: *afi, + prefix_len: *prefix_len, + addr: *addr, + }, + max_length: *max_length, + }), + _ => Err("publication-point cache output payload is not VRP".to_string()), + } +} + +fn parse_publication_point_cache_aspa_output( + projected: &PublicationPointCacheOutput, +) -> Result { + match &projected.payload { + VcirLocalOutputPayload::Aspa { + customer_as_id, + provider_as_ids, + } => Ok(AspaAttestation { + customer_as_id: *customer_as_id, + provider_as_ids: provider_as_ids.clone(), + }), + _ => Err("publication-point cache output payload is not ASPA".to_string()), + } +} + +fn parse_publication_point_cache_router_key_output( + projected: &PublicationPointCacheOutput, +) -> Result { + match &projected.payload { + VcirLocalOutputPayload::RouterKey { + as_id, + ski, + spki_der, + } => Ok(RouterKeyPayload { + as_id: *as_id, + ski: ski.clone(), + spki_der: spki_der.clone(), + source_object_uri: projected.source_object_uri.clone(), + source_object_hash: hex::encode(projected.source_object_hash), + source_ee_cert_hash: hex::encode(projected.source_ee_cert_hash), + item_effective_until: projected.item_effective_until.clone(), + }), + _ => Err("publication-point cache output payload is not Router Key".to_string()), + } +} + fn restore_children_from_vcir( store: &RocksStore, ca: &CaInstanceHandle,