From 61d3e636ae1d5cf03ee0536c11adb687f469e22b Mon Sep 17 00:00:00 2001 From: yuyr Date: Sun, 21 Jun 2026 12:46:23 +0800 Subject: [PATCH] =?UTF-8?q?20260620=20mmap=20PP=20cache=20index=E6=B6=88?= =?UTF-8?q?=E9=99=A4RocksDB=E5=85=A8=E9=87=8F=E6=89=AB=E6=8F=8F=E5=B0=96?= =?UTF-8?q?=E5=B3=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 1 + src/cli.rs | 24 ++ src/cli/tests.rs | 4 + src/storage.rs | 280 +++++++++++++++++--- src/storage/pp_cache_index.rs | 468 ++++++++++++++++++++++++++++++++++ src/storage/tests.rs | 103 ++++++++ 6 files changed, 841 insertions(+), 39 deletions(-) create mode 100644 src/storage/pp_cache_index.rs diff --git a/Cargo.toml b/Cargo.toml index 6105dea..a86519f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ serde_json = { version = "1.0.140", features = ["raw_value"] } toml = "0.8.20" rocksdb = { version = "0.22.0", optional = true, default-features = false, features = ["lz4"] } serde_cbor = "0.11.2" +memmap2 = "0.9.10" roxmltree = "0.20.0" quick-xml = "0.37.2" uuid = { version = "1.7.0", features = ["v4"] } diff --git a/src/cli.rs b/src/cli.rs index 2631b3f..5858f94 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -80,6 +80,8 @@ struct RunStageTiming { analysis_top_publication_point_steps: Vec, vcir_storage_summary_ms: Option, vcir_storage: Option, + publication_point_cache_index_load: Option, + publication_point_cache_index_refresh: Option, memory_telemetry: Option, } @@ -2459,6 +2461,26 @@ pub fn run(argv: &[String]) -> Result<(), String> { &total_started, store.as_ref(), ); + let publication_point_cache_index_refresh = if args.enable_publication_point_validation_cache + || args.publication_point_cache_observe_only + { + match store.refresh_publication_point_cache_mmap_index() { + Ok(stats) => stats, + Err(e) => { + crate::progress_log::emit( + "publication_point_cache_mmap_index_refresh", + serde_json::json!({ + "state": "failed", + "error": e.to_string(), + }), + ); + None + } + } + } else { + None + }; + let publication_point_cache_index_load = store.publication_point_cache_mmap_index_load_stats(); let timing_report_snapshot = timing .as_ref() .map(|(_, handle)| handle.report_snapshot(50)); @@ -2505,6 +2527,8 @@ pub fn run(argv: &[String]) -> Result<(), String> { .unwrap_or_default(), vcir_storage_summary_ms, vcir_storage, + publication_point_cache_index_load, + publication_point_cache_index_refresh, memory_telemetry: Some(MemoryTelemetrySummary { checkpoints: memory_checkpoints, object_graph: Some(estimate_shared_object_graph(&shared)), diff --git a/src/cli/tests.rs b/src/cli/tests.rs index e474a90..0219863 100644 --- a/src/cli/tests.rs +++ b/src/cli/tests.rs @@ -1675,6 +1675,8 @@ fn run_report_task_and_stage_timing_work() { local_output_projection_saved_bytes: 0, }], }), + publication_point_cache_index_load: None, + publication_point_cache_index_refresh: None, memory_telemetry: None, }; write_stage_timing(Some(&report_path), &stage_timing).expect("write stage timing"); @@ -1738,6 +1740,8 @@ fn stage_timing_serializes_memory_telemetry() { analysis_top_publication_point_steps: Vec::new(), vcir_storage_summary_ms: None, vcir_storage: None, + publication_point_cache_index_load: None, + publication_point_cache_index_refresh: None, memory_telemetry: Some(MemoryTelemetrySummary { checkpoints: vec![MemoryTelemetryCheckpoint { label: "after_validation".to_string(), diff --git a/src/storage.rs b/src/storage.rs index b068efe..5ccaa54 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,9 +1,10 @@ mod config; mod keys; mod pack; +mod pp_cache_index; use std::collections::{HashMap, HashSet}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use base64::Engine; @@ -23,6 +24,11 @@ pub use config::{ use keys::*; use pack::compute_sha256_32; pub use pack::{PackBytes, PackFile, PackTime}; +pub use pp_cache_index::{PpCacheIndexLoadStats, PpCacheIndexRefreshStats}; +use pp_cache_index::{ + PpCacheMmapIndexSet, default_pp_cache_index_dir, load_pp_cache_mmap_index_set, + write_pp_cache_index_atomic, write_pp_cache_index_segment, +}; #[derive(Debug, thiserror::Error)] pub enum StorageError { #[error("rocksdb error: {0}")] @@ -50,6 +56,7 @@ pub struct RocksStore { db: DB, external_raw_store: Option, external_repo_bytes: Option, + publication_point_cache_index_dir: PathBuf, publication_point_cache_projection_index: Mutex, } @@ -65,6 +72,12 @@ enum PublicationPointCacheProjectionIndexState { index: HashMap>, bytes: usize, }, + LoadedMmap { + mmap: PpCacheMmapIndexSet, + dirty: HashMap>, + dirty_bytes: usize, + load_stats: PpCacheIndexLoadStats, + }, } fn process_vm_rss_kb() -> Option { @@ -2351,6 +2364,7 @@ impl RocksStore { db, external_raw_store: None, external_repo_bytes: None, + publication_point_cache_index_dir: default_pp_cache_index_dir(path), publication_point_cache_projection_index: Mutex::new( PublicationPointCacheProjectionIndexState::Uninitialized, ), @@ -2412,6 +2426,19 @@ impl RocksStore { RocksDbMemorySnapshot { databases, totals } } + pub fn publication_point_cache_mmap_index_load_stats(&self) -> Option { + let guard = self.publication_point_cache_projection_index.lock().ok()?; + match &*guard { + PublicationPointCacheProjectionIndexState::LoadedMmap { load_stats, .. } => { + Some(load_stats.clone()) + } + PublicationPointCacheProjectionIndexState::Uninitialized + | PublicationPointCacheProjectionIndexState::Disabled + | PublicationPointCacheProjectionIndexState::BuildingFromEmpty { .. } + | PublicationPointCacheProjectionIndexState::Loaded { .. } => None, + } + } + fn cf(&self, name: &'static str) -> StorageResult<&ColumnFamily> { self.db .cf_handle(name) @@ -2905,7 +2932,8 @@ impl RocksStore { &self, manifest_rsync_uri: &str, ) -> StorageResult> { - let bytes = { + let mut owned_bytes: Option> = None; + { let mut guard = self .publication_point_cache_projection_index .lock() @@ -2920,11 +2948,6 @@ impl RocksStore { ) { let load_started = std::time::Instant::now(); let raw_index_enabled = pp_cache_raw_index_enabled(); - let (index, bytes) = if raw_index_enabled { - self.load_publication_point_cache_projection_index()? - } else { - (HashMap::new(), 0) - }; *guard = if !raw_index_enabled { crate::progress_log::emit( "publication_point_cache_raw_index", @@ -2936,47 +2959,95 @@ impl RocksStore { }), ); PublicationPointCacheProjectionIndexState::Disabled - } else if index.is_empty() { - let limit = pp_cache_raw_index_empty_build_limit_bytes(); - crate::progress_log::emit( - "publication_point_cache_raw_index", - serde_json::json!({ - "state": "empty_building_bounded", - "entries": 0, - "bytes": 0, - "empty_build_limit_bytes": limit, - "load_ms": load_started.elapsed().as_millis() as u64, - }), - ); - PublicationPointCacheProjectionIndexState::BuildingFromEmpty { - index, - bytes: 0, - limit, - } } else { - crate::progress_log::emit( - "publication_point_cache_raw_index", - serde_json::json!({ - "state": "loaded", - "entries": index.len(), - "bytes": bytes, - "load_ms": load_started.elapsed().as_millis() as u64, - }), - ); - PublicationPointCacheProjectionIndexState::Loaded { index, bytes } + match load_pp_cache_mmap_index_set(&self.publication_point_cache_index_dir) { + Ok((mmap, stats)) => { + crate::progress_log::emit( + "publication_point_cache_mmap_index_load", + serde_json::json!({ + "state": "loaded", + "entries": stats.entries, + "bytes": stats.bytes, + "file_bytes": stats.file_bytes, + "load_ms": stats.load_ms, + }), + ); + PublicationPointCacheProjectionIndexState::LoadedMmap { + mmap, + dirty: HashMap::new(), + dirty_bytes: 0, + load_stats: stats, + } + } + Err(e) => { + crate::progress_log::emit( + "publication_point_cache_mmap_index_load", + serde_json::json!({ + "state": "fallback_scan", + "error": e.to_string(), + "load_ms": load_started.elapsed().as_millis() as u64, + }), + ); + let scan_started = std::time::Instant::now(); + let (index, bytes) = + self.load_publication_point_cache_projection_index()?; + if index.is_empty() { + let limit = pp_cache_raw_index_empty_build_limit_bytes(); + crate::progress_log::emit( + "publication_point_cache_raw_index", + serde_json::json!({ + "state": "empty_building_bounded", + "entries": 0, + "bytes": 0, + "empty_build_limit_bytes": limit, + "load_ms": scan_started.elapsed().as_millis() as u64, + }), + ); + PublicationPointCacheProjectionIndexState::BuildingFromEmpty { + index, + bytes: 0, + limit, + } + } else { + crate::progress_log::emit( + "publication_point_cache_raw_index", + serde_json::json!({ + "state": "loaded", + "entries": index.len(), + "bytes": bytes, + "load_ms": scan_started.elapsed().as_millis() as u64, + }), + ); + PublicationPointCacheProjectionIndexState::Loaded { index, bytes } + } + } + } }; } match &*guard { PublicationPointCacheProjectionIndexState::Loaded { index, .. } => { - index.get(manifest_rsync_uri).cloned() + owned_bytes = index.get(manifest_rsync_uri).cloned(); } PublicationPointCacheProjectionIndexState::BuildingFromEmpty { index, .. } => { - index.get(manifest_rsync_uri).cloned() + owned_bytes = index.get(manifest_rsync_uri).cloned(); } - PublicationPointCacheProjectionIndexState::Disabled => None, - PublicationPointCacheProjectionIndexState::Uninitialized => None, + PublicationPointCacheProjectionIndexState::LoadedMmap { mmap, dirty, .. } => { + if let Some(bytes) = dirty.get(manifest_rsync_uri).cloned() { + owned_bytes = Some(bytes); + } else if let Some(bytes) = mmap.get(manifest_rsync_uri) { + let projection = decode_cbor::( + bytes, + "publication_point_cache_projection", + )?; + projection.validate_internal()?; + return Ok(Some(projection)); + } + } + PublicationPointCacheProjectionIndexState::Disabled + | PublicationPointCacheProjectionIndexState::Uninitialized => {} } - }; + } + let bytes = owned_bytes; let Some(bytes) = bytes else { return Ok(None); }; @@ -3029,6 +3100,24 @@ impl RocksStore { Ok((index, bytes_total)) } + fn load_publication_point_cache_projection_entries( + &self, + ) -> StorageResult)>> { + let cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?; + let mode = IteratorMode::Start; + let mut entries = Vec::new(); + for res in self.db.iterator_cf(cf, mode) { + let (key, value) = res.map_err(|e| StorageError::RocksDb(e.to_string()))?; + let Some(manifest_rsync_uri) = + publication_point_cache_projection_key_manifest_uri(&key) + else { + continue; + }; + entries.push((manifest_rsync_uri, value.to_vec())); + } + Ok(entries) + } + fn update_publication_point_cache_projection_index( &self, projection: Option<&PublicationPointCacheProjection>, @@ -3069,12 +3158,125 @@ impl RocksStore { *guard = PublicationPointCacheProjectionIndexState::Disabled; } } + PublicationPointCacheProjectionIndexState::LoadedMmap { + dirty, dirty_bytes, .. + } => { + *dirty_bytes = dirty_bytes.saturating_add(bytes.len()); + dirty.insert( + projection.manifest_rsync_uri.clone(), + Arc::<[u8]>::from(bytes), + ); + } PublicationPointCacheProjectionIndexState::Uninitialized | PublicationPointCacheProjectionIndexState::Disabled => {} } Ok(()) } + pub fn refresh_publication_point_cache_mmap_index( + &self, + ) -> StorageResult> { + if !pp_cache_raw_index_enabled() { + return Ok(None); + } + enum RefreshAction { + Entries { + entries: Vec<(String, Vec)>, + write_segment: bool, + old_entries: usize, + dirty_entries: usize, + }, + ScanDb, + } + let action = { + let guard = self + .publication_point_cache_projection_index + .lock() + .map_err(|e| { + StorageError::RocksDb(format!( + "publication point cache index lock poisoned: {e}" + )) + })?; + match &*guard { + PublicationPointCacheProjectionIndexState::LoadedMmap { mmap, dirty, .. } => { + RefreshAction::Entries { + entries: dirty + .iter() + .map(|(key, value)| (key.clone(), value.as_ref().to_vec())) + .collect::>(), + write_segment: true, + old_entries: mmap.entries(), + dirty_entries: dirty.len(), + } + } + PublicationPointCacheProjectionIndexState::Loaded { index, .. } + | PublicationPointCacheProjectionIndexState::BuildingFromEmpty { index, .. } => { + RefreshAction::Entries { + entries: index + .iter() + .map(|(key, value)| (key.clone(), value.as_ref().to_vec())) + .collect::>(), + write_segment: false, + old_entries: 0, + dirty_entries: 0, + } + } + PublicationPointCacheProjectionIndexState::Disabled + | PublicationPointCacheProjectionIndexState::Uninitialized => RefreshAction::ScanDb, + } + }; + let (entries, write_segment, old_entries, dirty_entries) = match action { + RefreshAction::Entries { + entries, + write_segment, + old_entries, + dirty_entries, + } => (entries, write_segment, old_entries, dirty_entries), + RefreshAction::ScanDb => ( + self.load_publication_point_cache_projection_entries()?, + false, + 0, + 0, + ), + }; + if entries.is_empty() { + return Ok(None); + } + let current_path = self.publication_point_cache_index_dir.join("current.idx"); + let mut stats = if write_segment && current_path.exists() { + write_pp_cache_index_segment(&self.publication_point_cache_index_dir, entries)? + } else { + write_pp_cache_index_atomic(¤t_path, entries)? + }; + stats.old_entries = old_entries; + stats.dirty_entries = dirty_entries; + crate::progress_log::emit( + "publication_point_cache_mmap_index_refresh", + serde_json::json!({ + "state": stats.state, + "old_entries": stats.old_entries, + "dirty_entries": stats.dirty_entries, + "new_entries": stats.new_entries, + "file_bytes": stats.file_bytes, + "write_ms": stats.write_ms, + }), + ); + let mut guard = self + .publication_point_cache_projection_index + .lock() + .map_err(|e| { + StorageError::RocksDb(format!("publication point cache index lock poisoned: {e}")) + })?; + if let PublicationPointCacheProjectionIndexState::LoadedMmap { + dirty, dirty_bytes, .. + } = &mut *guard + { + dirty.clear(); + *dirty_bytes = 0; + } + Ok(Some(stats)) + } + pub fn put_transport_prefetch_snapshot( &self, snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot, diff --git a/src/storage/pp_cache_index.rs b/src/storage/pp_cache_index.rs new file mode 100644 index 0000000..73a306a --- /dev/null +++ b/src/storage/pp_cache_index.rs @@ -0,0 +1,468 @@ +use std::collections::{BTreeMap, HashMap}; +use std::fs::{self, File}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +use memmap2::Mmap; +use serde::Serialize; + +use super::{StorageError, StorageResult}; + +const MAGIC: &[u8; 12] = b"RPKIPPIDX\0\0\0"; +const VERSION: u32 = 1; +const HEADER_LEN: usize = 64; +const ENTRY_LEN: usize = 24; + +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +pub struct PpCacheIndexLoadStats { + pub source: String, + pub entries: usize, + pub bytes: usize, + pub file_bytes: u64, + pub load_ms: u64, +} + +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)] +pub struct PpCacheIndexRefreshStats { + pub state: String, + pub old_entries: usize, + pub dirty_entries: usize, + pub new_entries: usize, + pub file_bytes: u64, + pub write_ms: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +struct PpCacheIndexEntry { + value_offset: usize, + value_len: usize, +} + +#[derive(Debug)] +pub struct PpCacheMmapIndex { + mmap: Arc, + entries: HashMap, + bytes: usize, + file_bytes: u64, +} + +impl PpCacheMmapIndex { + pub fn open(path: &Path) -> StorageResult { + let file = File::open(path).map_err(|e| StorageError::RocksDb(e.to_string()))?; + let file_len = file + .metadata() + .map_err(|e| StorageError::RocksDb(e.to_string()))? + .len() as usize; + if file_len < HEADER_LEN { + return Err(StorageError::InvalidData { + entity: "publication_point_cache_mmap_index", + detail: "file too small".to_string(), + }); + } + // SAFETY: The mmap is read-only, held by Arc for all returned slices, and the file + // format is bounds-checked before any slice is exposed. + let mmap = unsafe { Mmap::map(&file) }.map_err(|e| StorageError::RocksDb(e.to_string()))?; + if &mmap[0..MAGIC.len()] != MAGIC { + return Err(StorageError::InvalidData { + entity: "publication_point_cache_mmap_index", + detail: "invalid magic".to_string(), + }); + } + let version = read_u32(&mmap, 12)?; + if version != VERSION { + return Err(StorageError::InvalidData { + entity: "publication_point_cache_mmap_index", + detail: format!("unsupported version {version}"), + }); + } + let entry_count = read_u64(&mmap, 16)? as usize; + let entry_table_offset = read_u64(&mmap, 24)? as usize; + let entry_table_len = read_u64(&mmap, 32)? as usize; + let blob_offset = read_u64(&mmap, 40)? as usize; + let blob_len = read_u64(&mmap, 48)? as usize; + checked_range(file_len, entry_table_offset, entry_table_len)?; + checked_range(file_len, blob_offset, blob_len)?; + if entry_table_len != entry_count.saturating_mul(ENTRY_LEN) { + return Err(StorageError::InvalidData { + entity: "publication_point_cache_mmap_index", + detail: "entry table length mismatch".to_string(), + }); + } + + let mut entries = HashMap::with_capacity(entry_count); + let mut bytes = 0usize; + for offset in (entry_table_offset..entry_table_offset + entry_table_len).step_by(ENTRY_LEN) + { + let key_offset = read_u64(&mmap, offset)? as usize; + let key_len = read_u32(&mmap, offset + 8)? as usize; + let value_offset = read_u64(&mmap, offset + 12)? as usize; + let value_len = read_u32(&mmap, offset + 20)? as usize; + checked_range(blob_len, key_offset, key_len)?; + checked_range(blob_len, value_offset, value_len)?; + let key_start = blob_offset + key_offset; + let key_end = key_start + key_len; + let key = std::str::from_utf8(&mmap[key_start..key_end]) + .map_err(|e| StorageError::InvalidData { + entity: "publication_point_cache_mmap_index.key", + detail: e.to_string(), + })? + .to_string(); + bytes = bytes.saturating_add(value_len); + entries.insert( + key, + PpCacheIndexEntry { + value_offset, + value_len, + }, + ); + } + Ok(Self { + mmap: Arc::new(mmap), + entries, + bytes, + file_bytes: file_len as u64, + }) + } + + pub fn get(&self, manifest_rsync_uri: &str) -> Option<&[u8]> { + let entry = self.entries.get(manifest_rsync_uri)?; + let blob_offset = read_u64(self.mmap.as_ref(), 40).ok()? as usize; + let start = blob_offset + entry.value_offset; + let end = start + entry.value_len; + Some(&self.mmap[start..end]) + } + + pub fn entries(&self) -> usize { + self.entries.len() + } + + pub fn bytes(&self) -> usize { + self.bytes + } + + pub fn file_bytes(&self) -> u64 { + self.file_bytes + } +} + +#[derive(Debug)] +pub struct PpCacheMmapIndexSet { + indexes: Vec, + entries: usize, + bytes: usize, + file_bytes: u64, +} + +impl PpCacheMmapIndexSet { + pub fn get(&self, manifest_rsync_uri: &str) -> Option<&[u8]> { + self.indexes + .iter() + .find_map(|index| index.get(manifest_rsync_uri)) + } + + pub fn entries(&self) -> usize { + self.entries + } + + pub fn bytes(&self) -> usize { + self.bytes + } + + pub fn file_bytes(&self) -> u64 { + self.file_bytes + } +} + +pub fn default_pp_cache_index_dir(db_path: &Path) -> PathBuf { + let file_name = db_path + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("work-db"); + db_path.with_file_name(format!("{file_name}.pp-cache-index")) +} + +pub fn load_pp_cache_mmap_index( + path: &Path, +) -> StorageResult<(PpCacheMmapIndex, PpCacheIndexLoadStats)> { + let started = Instant::now(); + let index = PpCacheMmapIndex::open(path)?; + let file_bytes = fs::metadata(path) + .map_err(|e| StorageError::RocksDb(e.to_string()))? + .len(); + let stats = PpCacheIndexLoadStats { + source: "mmap".to_string(), + entries: index.entries(), + bytes: index.bytes(), + file_bytes, + load_ms: started.elapsed().as_millis() as u64, + }; + Ok((index, stats)) +} + +pub fn load_pp_cache_mmap_index_set( + dir: &Path, +) -> StorageResult<(PpCacheMmapIndexSet, PpCacheIndexLoadStats)> { + let started = Instant::now(); + let mut paths = Vec::new(); + if dir.exists() { + let mut segments = fs::read_dir(dir) + .map_err(|e| StorageError::RocksDb(e.to_string()))? + .filter_map(Result::ok) + .map(|entry| entry.path()) + .filter(|path| { + path.file_name() + .and_then(|name| name.to_str()) + .is_some_and(|name| name.starts_with("segment-") && name.ends_with(".idx")) + }) + .collect::>(); + segments.sort(); + segments.reverse(); + paths.extend(segments); + let current = dir.join("current.idx"); + if current.exists() { + paths.push(current); + } + } + if paths.is_empty() { + return Err(StorageError::InvalidData { + entity: "publication_point_cache_mmap_index", + detail: "index file missing".to_string(), + }); + } + + let mut indexes = Vec::new(); + let mut entries = 0usize; + let mut bytes = 0usize; + let mut file_bytes = 0u64; + for path in paths { + let (index, _) = load_pp_cache_mmap_index(&path)?; + entries = entries.saturating_add(index.entries()); + bytes = bytes.saturating_add(index.bytes()); + file_bytes = file_bytes.saturating_add(index.file_bytes()); + indexes.push(index); + } + let set = PpCacheMmapIndexSet { + indexes, + entries, + bytes, + file_bytes, + }; + let stats = PpCacheIndexLoadStats { + source: "mmap".to_string(), + entries: set.entries(), + bytes: set.bytes(), + file_bytes: set.file_bytes(), + load_ms: started.elapsed().as_millis() as u64, + }; + Ok((set, stats)) +} + +pub fn write_pp_cache_index_segment( + dir: &Path, + entries: I, +) -> StorageResult +where + I: IntoIterator)>, +{ + fs::create_dir_all(dir).map_err(|e| StorageError::RocksDb(e.to_string()))?; + let current = dir.join("current.idx"); + if !current.exists() { + return write_pp_cache_index_atomic(¤t, entries); + } + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| StorageError::RocksDb(e.to_string()))? + .as_nanos(); + let segment = dir.join(format!("segment-{now:020}-{}.idx", std::process::id())); + let mut stats = write_pp_cache_index_atomic(&segment, entries)?; + stats.state = "segment_written".to_string(); + Ok(stats) +} + +pub fn write_pp_cache_index_atomic( + path: &Path, + entries: I, +) -> StorageResult +where + I: IntoIterator)>, +{ + let started = Instant::now(); + let parent = path.parent().ok_or_else(|| StorageError::InvalidData { + entity: "publication_point_cache_mmap_index.path", + detail: "missing parent".to_string(), + })?; + fs::create_dir_all(parent).map_err(|e| StorageError::RocksDb(e.to_string()))?; + + let mut ordered = BTreeMap::>::new(); + for (key, value) in entries { + ordered.insert(key, value); + } + + let tmp_path = parent.join("next.tmp"); + write_pp_cache_index_file( + &tmp_path, + ordered.iter().map(|(k, v)| (k.as_str(), v.as_slice())), + )?; + let file_bytes = fs::metadata(&tmp_path) + .map_err(|e| StorageError::RocksDb(e.to_string()))? + .len(); + fs::rename(&tmp_path, path).map_err(|e| StorageError::RocksDb(e.to_string()))?; + Ok(PpCacheIndexRefreshStats { + state: "written".to_string(), + old_entries: 0, + dirty_entries: 0, + new_entries: ordered.len(), + file_bytes, + write_ms: started.elapsed().as_millis() as u64, + }) +} + +fn write_pp_cache_index_file<'a, I>(path: &Path, entries: I) -> StorageResult<()> +where + I: IntoIterator, +{ + let entries = entries.into_iter().collect::>(); + let entry_table_offset = HEADER_LEN; + let entry_table_len = entries.len() * ENTRY_LEN; + let blob_offset = entry_table_offset + entry_table_len; + let mut table = Vec::with_capacity(entry_table_len); + let mut blob = Vec::new(); + for (key, value) in entries.iter() { + let key_offset = blob.len(); + blob.extend_from_slice(key.as_bytes()); + let value_offset = blob.len(); + blob.extend_from_slice(value); + write_u64(&mut table, key_offset as u64); + write_u32(&mut table, key.len() as u32); + write_u64(&mut table, value_offset as u64); + write_u32(&mut table, value.len() as u32); + } + + let mut header = Vec::with_capacity(HEADER_LEN); + header.extend_from_slice(MAGIC); + write_u32(&mut header, VERSION); + write_u64(&mut header, entries.len() as u64); + write_u64(&mut header, entry_table_offset as u64); + write_u64(&mut header, entry_table_len as u64); + write_u64(&mut header, blob_offset as u64); + write_u64(&mut header, blob.len() as u64); + header.resize(HEADER_LEN, 0); + + let mut file = File::create(path).map_err(|e| StorageError::RocksDb(e.to_string()))?; + file.write_all(&header) + .map_err(|e| StorageError::RocksDb(e.to_string()))?; + file.write_all(&table) + .map_err(|e| StorageError::RocksDb(e.to_string()))?; + file.write_all(&blob) + .map_err(|e| StorageError::RocksDb(e.to_string()))?; + file.sync_all() + .map_err(|e| StorageError::RocksDb(e.to_string()))?; + Ok(()) +} + +fn checked_range(total: usize, offset: usize, len: usize) -> StorageResult<()> { + if offset.checked_add(len).is_none_or(|end| end > total) { + return Err(StorageError::InvalidData { + entity: "publication_point_cache_mmap_index.range", + detail: "out of bounds".to_string(), + }); + } + Ok(()) +} + +fn read_u32(bytes: &[u8], offset: usize) -> StorageResult { + checked_range(bytes.len(), offset, 4)?; + Ok(u32::from_le_bytes( + bytes[offset..offset + 4].try_into().unwrap(), + )) +} + +fn read_u64(bytes: &[u8], offset: usize) -> StorageResult { + checked_range(bytes.len(), offset, 8)?; + Ok(u64::from_le_bytes( + bytes[offset..offset + 8].try_into().unwrap(), + )) +} + +fn write_u32(out: &mut Vec, value: u32) { + out.extend_from_slice(&value.to_le_bytes()); +} + +fn write_u64(out: &mut Vec, value: u64) { + out.extend_from_slice(&value.to_le_bytes()); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn pp_cache_index_roundtrips_multiple_entries() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = dir.path().join("current.idx"); + let stats = write_pp_cache_index_atomic( + &path, + vec![ + ("rsync://example.test/a.mft".to_string(), b"aaa".to_vec()), + ("rsync://example.test/b.mft".to_string(), b"bbbb".to_vec()), + ], + ) + .expect("write index"); + assert_eq!(stats.new_entries, 2); + + let (index, load) = load_pp_cache_mmap_index(&path).expect("load index"); + assert_eq!(load.entries, 2); + assert_eq!(index.get("rsync://example.test/a.mft"), Some(&b"aaa"[..])); + assert_eq!(index.get("rsync://example.test/b.mft"), Some(&b"bbbb"[..])); + assert_eq!(index.get("rsync://example.test/missing.mft"), None); + } + + #[test] + fn pp_cache_index_rejects_bad_magic() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = dir.path().join("current.idx"); + fs::write(&path, b"bad").expect("write bad"); + let err = load_pp_cache_mmap_index(&path).expect_err("bad index rejected"); + assert!(err.to_string().contains("file too small")); + } + + #[test] + fn pp_cache_index_last_duplicate_wins() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = dir.path().join("current.idx"); + write_pp_cache_index_atomic( + &path, + vec![ + ("rsync://example.test/a.mft".to_string(), b"old".to_vec()), + ("rsync://example.test/a.mft".to_string(), b"new".to_vec()), + ], + ) + .expect("write index"); + + let (index, _) = load_pp_cache_mmap_index(&path).expect("load index"); + assert_eq!(index.entries(), 1); + assert_eq!(index.get("rsync://example.test/a.mft"), Some(&b"new"[..])); + } + + #[test] + fn pp_cache_index_set_prefers_newest_segment() { + let dir = tempfile::tempdir().expect("tempdir"); + let current = dir.path().join("current.idx"); + write_pp_cache_index_atomic( + ¤t, + vec![("rsync://example.test/a.mft".to_string(), b"old".to_vec())], + ) + .expect("write current index"); + write_pp_cache_index_segment( + dir.path(), + vec![("rsync://example.test/a.mft".to_string(), b"new".to_vec())], + ) + .expect("write segment index"); + + let (set, stats) = load_pp_cache_mmap_index_set(dir.path()).expect("load index set"); + assert_eq!(stats.entries, 2); + assert_eq!(set.get("rsync://example.test/a.mft"), Some(&b"new"[..])); + } +} diff --git a/src/storage/tests.rs b/src/storage/tests.rs index 0bb1bc5..ab4e906 100644 --- a/src/storage/tests.rs +++ b/src/storage/tests.rs @@ -454,6 +454,109 @@ fn publication_point_cache_projection_cached_empty_db_accepts_bounded_new_entrie ); } +#[test] +fn publication_point_cache_mmap_index_refresh_roundtrips_after_reopen() { + let td = tempfile::tempdir().expect("tempdir"); + let db_path = td.path().join("work-db"); + let vcir = sample_vcir("rsync://example.test/repo/current.mft"); + let projection = PublicationPointCacheProjection::from_vcir_with_context( + &vcir, + "rsync://example.test/repo/".to_string(), + Some("rsync://example.test/repo/ca.cer".to_string()), + sha256_32(b"ca-cert"), + sha256_32(b"manifest"), + sha256_32(b"ta-context"), + sha256_32(b"parent-context"), + sha256_32(b"policy"), + ) + .expect("build publication point projection"); + + { + let store = RocksStore::open(&db_path).expect("open rocksdb"); + store + .put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection)) + .expect("put projection"); + let stats = store + .refresh_publication_point_cache_mmap_index() + .expect("refresh mmap index") + .expect("refresh stats"); + assert_eq!(stats.new_entries, 1); + assert_eq!(stats.state, "written"); + } + + let store = RocksStore::open(&db_path).expect("reopen rocksdb"); + let got = store + .get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri) + .expect("get cached projection from mmap") + .expect("projection exists"); + assert_eq!(got, projection); +} + +#[test] +fn publication_point_cache_mmap_index_dirty_overlay_wins_and_refreshes_segment() { + let td = tempfile::tempdir().expect("tempdir"); + let db_path = td.path().join("work-db"); + let vcir = sample_vcir("rsync://example.test/repo/current.mft"); + let mut projection = PublicationPointCacheProjection::from_vcir_with_context( + &vcir, + "rsync://example.test/repo/".to_string(), + Some("rsync://example.test/repo/ca.cer".to_string()), + sha256_32(b"ca-cert"), + sha256_32(b"manifest-old"), + sha256_32(b"ta-context"), + sha256_32(b"parent-context"), + sha256_32(b"policy"), + ) + .expect("build publication point projection"); + + { + let store = RocksStore::open(&db_path).expect("open rocksdb"); + store + .put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection)) + .expect("put old projection"); + store + .refresh_publication_point_cache_mmap_index() + .expect("refresh base mmap index"); + } + + { + let store = RocksStore::open(&db_path).expect("reopen rocksdb"); + assert_eq!( + store + .get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri) + .expect("get old projection") + .expect("old projection exists") + .manifest_sha256, + sha256_32(b"manifest-old") + ); + projection.manifest_sha256 = sha256_32(b"manifest-new"); + store + .put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection)) + .expect("put new projection"); + assert_eq!( + store + .get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri) + .expect("get dirty projection") + .expect("dirty projection exists") + .manifest_sha256, + sha256_32(b"manifest-new") + ); + let stats = store + .refresh_publication_point_cache_mmap_index() + .expect("refresh dirty mmap segment") + .expect("refresh stats"); + assert_eq!(stats.state, "segment_written"); + assert_eq!(stats.dirty_entries, 1); + } + + let store = RocksStore::open(&db_path).expect("reopen rocksdb after segment"); + let got = store + .get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri) + .expect("get refreshed projection") + .expect("projection exists"); + assert_eq!(got.manifest_sha256, sha256_32(b"manifest-new")); +} + #[test] fn publication_point_cache_projection_rejects_version_mismatch() { let vcir = sample_vcir("rsync://example.test/repo/current.mft");