20260620 mmap PP cache index消除RocksDB全量扫描尖峰

This commit is contained in:
yuyr 2026-06-21 12:46:23 +08:00
parent 184d3cb95b
commit 61d3e636ae
6 changed files with 841 additions and 39 deletions

View File

@ -25,6 +25,7 @@ serde_json = { version = "1.0.140", features = ["raw_value"] }
toml = "0.8.20" toml = "0.8.20"
rocksdb = { version = "0.22.0", optional = true, default-features = false, features = ["lz4"] } rocksdb = { version = "0.22.0", optional = true, default-features = false, features = ["lz4"] }
serde_cbor = "0.11.2" serde_cbor = "0.11.2"
memmap2 = "0.9.10"
roxmltree = "0.20.0" roxmltree = "0.20.0"
quick-xml = "0.37.2" quick-xml = "0.37.2"
uuid = { version = "1.7.0", features = ["v4"] } uuid = { version = "1.7.0", features = ["v4"] }

View File

@ -80,6 +80,8 @@ struct RunStageTiming {
analysis_top_publication_point_steps: Vec<TopDurationEntry>, analysis_top_publication_point_steps: Vec<TopDurationEntry>,
vcir_storage_summary_ms: Option<u64>, vcir_storage_summary_ms: Option<u64>,
vcir_storage: Option<VcirStorageSummary>, vcir_storage: Option<VcirStorageSummary>,
publication_point_cache_index_load: Option<crate::storage::PpCacheIndexLoadStats>,
publication_point_cache_index_refresh: Option<crate::storage::PpCacheIndexRefreshStats>,
memory_telemetry: Option<MemoryTelemetrySummary>, memory_telemetry: Option<MemoryTelemetrySummary>,
} }
@ -2459,6 +2461,26 @@ pub fn run(argv: &[String]) -> Result<(), String> {
&total_started, &total_started,
store.as_ref(), store.as_ref(),
); );
let publication_point_cache_index_refresh = if args.enable_publication_point_validation_cache
|| args.publication_point_cache_observe_only
{
match store.refresh_publication_point_cache_mmap_index() {
Ok(stats) => stats,
Err(e) => {
crate::progress_log::emit(
"publication_point_cache_mmap_index_refresh",
serde_json::json!({
"state": "failed",
"error": e.to_string(),
}),
);
None
}
}
} else {
None
};
let publication_point_cache_index_load = store.publication_point_cache_mmap_index_load_stats();
let timing_report_snapshot = timing let timing_report_snapshot = timing
.as_ref() .as_ref()
.map(|(_, handle)| handle.report_snapshot(50)); .map(|(_, handle)| handle.report_snapshot(50));
@ -2505,6 +2527,8 @@ pub fn run(argv: &[String]) -> Result<(), String> {
.unwrap_or_default(), .unwrap_or_default(),
vcir_storage_summary_ms, vcir_storage_summary_ms,
vcir_storage, vcir_storage,
publication_point_cache_index_load,
publication_point_cache_index_refresh,
memory_telemetry: Some(MemoryTelemetrySummary { memory_telemetry: Some(MemoryTelemetrySummary {
checkpoints: memory_checkpoints, checkpoints: memory_checkpoints,
object_graph: Some(estimate_shared_object_graph(&shared)), object_graph: Some(estimate_shared_object_graph(&shared)),

View File

@ -1675,6 +1675,8 @@ fn run_report_task_and_stage_timing_work() {
local_output_projection_saved_bytes: 0, local_output_projection_saved_bytes: 0,
}], }],
}), }),
publication_point_cache_index_load: None,
publication_point_cache_index_refresh: None,
memory_telemetry: None, memory_telemetry: None,
}; };
write_stage_timing(Some(&report_path), &stage_timing).expect("write stage timing"); write_stage_timing(Some(&report_path), &stage_timing).expect("write stage timing");
@ -1738,6 +1740,8 @@ fn stage_timing_serializes_memory_telemetry() {
analysis_top_publication_point_steps: Vec::new(), analysis_top_publication_point_steps: Vec::new(),
vcir_storage_summary_ms: None, vcir_storage_summary_ms: None,
vcir_storage: None, vcir_storage: None,
publication_point_cache_index_load: None,
publication_point_cache_index_refresh: None,
memory_telemetry: Some(MemoryTelemetrySummary { memory_telemetry: Some(MemoryTelemetrySummary {
checkpoints: vec![MemoryTelemetryCheckpoint { checkpoints: vec![MemoryTelemetryCheckpoint {
label: "after_validation".to_string(), label: "after_validation".to_string(),

View File

@ -1,9 +1,10 @@
mod config; mod config;
mod keys; mod keys;
mod pack; mod pack;
mod pp_cache_index;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::path::Path; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use base64::Engine; use base64::Engine;
@ -23,6 +24,11 @@ pub use config::{
use keys::*; use keys::*;
use pack::compute_sha256_32; use pack::compute_sha256_32;
pub use pack::{PackBytes, PackFile, PackTime}; pub use pack::{PackBytes, PackFile, PackTime};
pub use pp_cache_index::{PpCacheIndexLoadStats, PpCacheIndexRefreshStats};
use pp_cache_index::{
PpCacheMmapIndexSet, default_pp_cache_index_dir, load_pp_cache_mmap_index_set,
write_pp_cache_index_atomic, write_pp_cache_index_segment,
};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum StorageError { pub enum StorageError {
#[error("rocksdb error: {0}")] #[error("rocksdb error: {0}")]
@ -50,6 +56,7 @@ pub struct RocksStore {
db: DB, db: DB,
external_raw_store: Option<ExternalRawStoreDb>, external_raw_store: Option<ExternalRawStoreDb>,
external_repo_bytes: Option<ExternalRepoBytesDb>, external_repo_bytes: Option<ExternalRepoBytesDb>,
publication_point_cache_index_dir: PathBuf,
publication_point_cache_projection_index: Mutex<PublicationPointCacheProjectionIndexState>, publication_point_cache_projection_index: Mutex<PublicationPointCacheProjectionIndexState>,
} }
@ -65,6 +72,12 @@ enum PublicationPointCacheProjectionIndexState {
index: HashMap<String, Arc<[u8]>>, index: HashMap<String, Arc<[u8]>>,
bytes: usize, bytes: usize,
}, },
LoadedMmap {
mmap: PpCacheMmapIndexSet,
dirty: HashMap<String, Arc<[u8]>>,
dirty_bytes: usize,
load_stats: PpCacheIndexLoadStats,
},
} }
fn process_vm_rss_kb() -> Option<u64> { fn process_vm_rss_kb() -> Option<u64> {
@ -2351,6 +2364,7 @@ impl RocksStore {
db, db,
external_raw_store: None, external_raw_store: None,
external_repo_bytes: None, external_repo_bytes: None,
publication_point_cache_index_dir: default_pp_cache_index_dir(path),
publication_point_cache_projection_index: Mutex::new( publication_point_cache_projection_index: Mutex::new(
PublicationPointCacheProjectionIndexState::Uninitialized, PublicationPointCacheProjectionIndexState::Uninitialized,
), ),
@ -2412,6 +2426,19 @@ impl RocksStore {
RocksDbMemorySnapshot { databases, totals } RocksDbMemorySnapshot { databases, totals }
} }
pub fn publication_point_cache_mmap_index_load_stats(&self) -> Option<PpCacheIndexLoadStats> {
let guard = self.publication_point_cache_projection_index.lock().ok()?;
match &*guard {
PublicationPointCacheProjectionIndexState::LoadedMmap { load_stats, .. } => {
Some(load_stats.clone())
}
PublicationPointCacheProjectionIndexState::Uninitialized
| PublicationPointCacheProjectionIndexState::Disabled
| PublicationPointCacheProjectionIndexState::BuildingFromEmpty { .. }
| PublicationPointCacheProjectionIndexState::Loaded { .. } => None,
}
}
fn cf(&self, name: &'static str) -> StorageResult<&ColumnFamily> { fn cf(&self, name: &'static str) -> StorageResult<&ColumnFamily> {
self.db self.db
.cf_handle(name) .cf_handle(name)
@ -2905,7 +2932,8 @@ impl RocksStore {
&self, &self,
manifest_rsync_uri: &str, manifest_rsync_uri: &str,
) -> StorageResult<Option<PublicationPointCacheProjection>> { ) -> StorageResult<Option<PublicationPointCacheProjection>> {
let bytes = { let mut owned_bytes: Option<Arc<[u8]>> = None;
{
let mut guard = self let mut guard = self
.publication_point_cache_projection_index .publication_point_cache_projection_index
.lock() .lock()
@ -2920,11 +2948,6 @@ impl RocksStore {
) { ) {
let load_started = std::time::Instant::now(); let load_started = std::time::Instant::now();
let raw_index_enabled = pp_cache_raw_index_enabled(); let raw_index_enabled = pp_cache_raw_index_enabled();
let (index, bytes) = if raw_index_enabled {
self.load_publication_point_cache_projection_index()?
} else {
(HashMap::new(), 0)
};
*guard = if !raw_index_enabled { *guard = if !raw_index_enabled {
crate::progress_log::emit( crate::progress_log::emit(
"publication_point_cache_raw_index", "publication_point_cache_raw_index",
@ -2936,47 +2959,95 @@ impl RocksStore {
}), }),
); );
PublicationPointCacheProjectionIndexState::Disabled PublicationPointCacheProjectionIndexState::Disabled
} else if index.is_empty() {
let limit = pp_cache_raw_index_empty_build_limit_bytes();
crate::progress_log::emit(
"publication_point_cache_raw_index",
serde_json::json!({
"state": "empty_building_bounded",
"entries": 0,
"bytes": 0,
"empty_build_limit_bytes": limit,
"load_ms": load_started.elapsed().as_millis() as u64,
}),
);
PublicationPointCacheProjectionIndexState::BuildingFromEmpty {
index,
bytes: 0,
limit,
}
} else { } else {
crate::progress_log::emit( match load_pp_cache_mmap_index_set(&self.publication_point_cache_index_dir) {
"publication_point_cache_raw_index", Ok((mmap, stats)) => {
serde_json::json!({ crate::progress_log::emit(
"state": "loaded", "publication_point_cache_mmap_index_load",
"entries": index.len(), serde_json::json!({
"bytes": bytes, "state": "loaded",
"load_ms": load_started.elapsed().as_millis() as u64, "entries": stats.entries,
}), "bytes": stats.bytes,
); "file_bytes": stats.file_bytes,
PublicationPointCacheProjectionIndexState::Loaded { index, bytes } "load_ms": stats.load_ms,
}),
);
PublicationPointCacheProjectionIndexState::LoadedMmap {
mmap,
dirty: HashMap::new(),
dirty_bytes: 0,
load_stats: stats,
}
}
Err(e) => {
crate::progress_log::emit(
"publication_point_cache_mmap_index_load",
serde_json::json!({
"state": "fallback_scan",
"error": e.to_string(),
"load_ms": load_started.elapsed().as_millis() as u64,
}),
);
let scan_started = std::time::Instant::now();
let (index, bytes) =
self.load_publication_point_cache_projection_index()?;
if index.is_empty() {
let limit = pp_cache_raw_index_empty_build_limit_bytes();
crate::progress_log::emit(
"publication_point_cache_raw_index",
serde_json::json!({
"state": "empty_building_bounded",
"entries": 0,
"bytes": 0,
"empty_build_limit_bytes": limit,
"load_ms": scan_started.elapsed().as_millis() as u64,
}),
);
PublicationPointCacheProjectionIndexState::BuildingFromEmpty {
index,
bytes: 0,
limit,
}
} else {
crate::progress_log::emit(
"publication_point_cache_raw_index",
serde_json::json!({
"state": "loaded",
"entries": index.len(),
"bytes": bytes,
"load_ms": scan_started.elapsed().as_millis() as u64,
}),
);
PublicationPointCacheProjectionIndexState::Loaded { index, bytes }
}
}
}
}; };
} }
match &*guard { match &*guard {
PublicationPointCacheProjectionIndexState::Loaded { index, .. } => { PublicationPointCacheProjectionIndexState::Loaded { index, .. } => {
index.get(manifest_rsync_uri).cloned() owned_bytes = index.get(manifest_rsync_uri).cloned();
} }
PublicationPointCacheProjectionIndexState::BuildingFromEmpty { index, .. } => { PublicationPointCacheProjectionIndexState::BuildingFromEmpty { index, .. } => {
index.get(manifest_rsync_uri).cloned() owned_bytes = index.get(manifest_rsync_uri).cloned();
} }
PublicationPointCacheProjectionIndexState::Disabled => None, PublicationPointCacheProjectionIndexState::LoadedMmap { mmap, dirty, .. } => {
PublicationPointCacheProjectionIndexState::Uninitialized => None, if let Some(bytes) = dirty.get(manifest_rsync_uri).cloned() {
owned_bytes = Some(bytes);
} else if let Some(bytes) = mmap.get(manifest_rsync_uri) {
let projection = decode_cbor::<PublicationPointCacheProjection>(
bytes,
"publication_point_cache_projection",
)?;
projection.validate_internal()?;
return Ok(Some(projection));
}
}
PublicationPointCacheProjectionIndexState::Disabled
| PublicationPointCacheProjectionIndexState::Uninitialized => {}
} }
}; }
let bytes = owned_bytes;
let Some(bytes) = bytes else { let Some(bytes) = bytes else {
return Ok(None); return Ok(None);
}; };
@ -3029,6 +3100,24 @@ impl RocksStore {
Ok((index, bytes_total)) Ok((index, bytes_total))
} }
fn load_publication_point_cache_projection_entries(
&self,
) -> StorageResult<Vec<(String, Vec<u8>)>> {
let cf = self.cf(CF_PUBLICATION_POINT_CACHE_PROJECTION)?;
let mode = IteratorMode::Start;
let mut entries = Vec::new();
for res in self.db.iterator_cf(cf, mode) {
let (key, value) = res.map_err(|e| StorageError::RocksDb(e.to_string()))?;
let Some(manifest_rsync_uri) =
publication_point_cache_projection_key_manifest_uri(&key)
else {
continue;
};
entries.push((manifest_rsync_uri, value.to_vec()));
}
Ok(entries)
}
fn update_publication_point_cache_projection_index( fn update_publication_point_cache_projection_index(
&self, &self,
projection: Option<&PublicationPointCacheProjection>, projection: Option<&PublicationPointCacheProjection>,
@ -3069,12 +3158,125 @@ impl RocksStore {
*guard = PublicationPointCacheProjectionIndexState::Disabled; *guard = PublicationPointCacheProjectionIndexState::Disabled;
} }
} }
PublicationPointCacheProjectionIndexState::LoadedMmap {
dirty, dirty_bytes, ..
} => {
*dirty_bytes = dirty_bytes.saturating_add(bytes.len());
dirty.insert(
projection.manifest_rsync_uri.clone(),
Arc::<[u8]>::from(bytes),
);
}
PublicationPointCacheProjectionIndexState::Uninitialized PublicationPointCacheProjectionIndexState::Uninitialized
| PublicationPointCacheProjectionIndexState::Disabled => {} | PublicationPointCacheProjectionIndexState::Disabled => {}
} }
Ok(()) Ok(())
} }
pub fn refresh_publication_point_cache_mmap_index(
&self,
) -> StorageResult<Option<PpCacheIndexRefreshStats>> {
if !pp_cache_raw_index_enabled() {
return Ok(None);
}
enum RefreshAction {
Entries {
entries: Vec<(String, Vec<u8>)>,
write_segment: bool,
old_entries: usize,
dirty_entries: usize,
},
ScanDb,
}
let action = {
let guard = self
.publication_point_cache_projection_index
.lock()
.map_err(|e| {
StorageError::RocksDb(format!(
"publication point cache index lock poisoned: {e}"
))
})?;
match &*guard {
PublicationPointCacheProjectionIndexState::LoadedMmap { mmap, dirty, .. } => {
RefreshAction::Entries {
entries: dirty
.iter()
.map(|(key, value)| (key.clone(), value.as_ref().to_vec()))
.collect::<Vec<_>>(),
write_segment: true,
old_entries: mmap.entries(),
dirty_entries: dirty.len(),
}
}
PublicationPointCacheProjectionIndexState::Loaded { index, .. }
| PublicationPointCacheProjectionIndexState::BuildingFromEmpty { index, .. } => {
RefreshAction::Entries {
entries: index
.iter()
.map(|(key, value)| (key.clone(), value.as_ref().to_vec()))
.collect::<Vec<_>>(),
write_segment: false,
old_entries: 0,
dirty_entries: 0,
}
}
PublicationPointCacheProjectionIndexState::Disabled
| PublicationPointCacheProjectionIndexState::Uninitialized => RefreshAction::ScanDb,
}
};
let (entries, write_segment, old_entries, dirty_entries) = match action {
RefreshAction::Entries {
entries,
write_segment,
old_entries,
dirty_entries,
} => (entries, write_segment, old_entries, dirty_entries),
RefreshAction::ScanDb => (
self.load_publication_point_cache_projection_entries()?,
false,
0,
0,
),
};
if entries.is_empty() {
return Ok(None);
}
let current_path = self.publication_point_cache_index_dir.join("current.idx");
let mut stats = if write_segment && current_path.exists() {
write_pp_cache_index_segment(&self.publication_point_cache_index_dir, entries)?
} else {
write_pp_cache_index_atomic(&current_path, entries)?
};
stats.old_entries = old_entries;
stats.dirty_entries = dirty_entries;
crate::progress_log::emit(
"publication_point_cache_mmap_index_refresh",
serde_json::json!({
"state": stats.state,
"old_entries": stats.old_entries,
"dirty_entries": stats.dirty_entries,
"new_entries": stats.new_entries,
"file_bytes": stats.file_bytes,
"write_ms": stats.write_ms,
}),
);
let mut guard = self
.publication_point_cache_projection_index
.lock()
.map_err(|e| {
StorageError::RocksDb(format!("publication point cache index lock poisoned: {e}"))
})?;
if let PublicationPointCacheProjectionIndexState::LoadedMmap {
dirty, dirty_bytes, ..
} = &mut *guard
{
dirty.clear();
*dirty_bytes = 0;
}
Ok(Some(stats))
}
pub fn put_transport_prefetch_snapshot( pub fn put_transport_prefetch_snapshot(
&self, &self,
snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot, snapshot: &crate::parallel::transport_prefetch::TransportPrefetchSnapshot,

View File

@ -0,0 +1,468 @@
use std::collections::{BTreeMap, HashMap};
use std::fs::{self, File};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use memmap2::Mmap;
use serde::Serialize;
use super::{StorageError, StorageResult};
const MAGIC: &[u8; 12] = b"RPKIPPIDX\0\0\0";
const VERSION: u32 = 1;
const HEADER_LEN: usize = 64;
const ENTRY_LEN: usize = 24;
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct PpCacheIndexLoadStats {
pub source: String,
pub entries: usize,
pub bytes: usize,
pub file_bytes: u64,
pub load_ms: u64,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct PpCacheIndexRefreshStats {
pub state: String,
pub old_entries: usize,
pub dirty_entries: usize,
pub new_entries: usize,
pub file_bytes: u64,
pub write_ms: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct PpCacheIndexEntry {
value_offset: usize,
value_len: usize,
}
#[derive(Debug)]
pub struct PpCacheMmapIndex {
mmap: Arc<Mmap>,
entries: HashMap<String, PpCacheIndexEntry>,
bytes: usize,
file_bytes: u64,
}
impl PpCacheMmapIndex {
pub fn open(path: &Path) -> StorageResult<Self> {
let file = File::open(path).map_err(|e| StorageError::RocksDb(e.to_string()))?;
let file_len = file
.metadata()
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.len() as usize;
if file_len < HEADER_LEN {
return Err(StorageError::InvalidData {
entity: "publication_point_cache_mmap_index",
detail: "file too small".to_string(),
});
}
// SAFETY: The mmap is read-only, held by Arc for all returned slices, and the file
// format is bounds-checked before any slice is exposed.
let mmap = unsafe { Mmap::map(&file) }.map_err(|e| StorageError::RocksDb(e.to_string()))?;
if &mmap[0..MAGIC.len()] != MAGIC {
return Err(StorageError::InvalidData {
entity: "publication_point_cache_mmap_index",
detail: "invalid magic".to_string(),
});
}
let version = read_u32(&mmap, 12)?;
if version != VERSION {
return Err(StorageError::InvalidData {
entity: "publication_point_cache_mmap_index",
detail: format!("unsupported version {version}"),
});
}
let entry_count = read_u64(&mmap, 16)? as usize;
let entry_table_offset = read_u64(&mmap, 24)? as usize;
let entry_table_len = read_u64(&mmap, 32)? as usize;
let blob_offset = read_u64(&mmap, 40)? as usize;
let blob_len = read_u64(&mmap, 48)? as usize;
checked_range(file_len, entry_table_offset, entry_table_len)?;
checked_range(file_len, blob_offset, blob_len)?;
if entry_table_len != entry_count.saturating_mul(ENTRY_LEN) {
return Err(StorageError::InvalidData {
entity: "publication_point_cache_mmap_index",
detail: "entry table length mismatch".to_string(),
});
}
let mut entries = HashMap::with_capacity(entry_count);
let mut bytes = 0usize;
for offset in (entry_table_offset..entry_table_offset + entry_table_len).step_by(ENTRY_LEN)
{
let key_offset = read_u64(&mmap, offset)? as usize;
let key_len = read_u32(&mmap, offset + 8)? as usize;
let value_offset = read_u64(&mmap, offset + 12)? as usize;
let value_len = read_u32(&mmap, offset + 20)? as usize;
checked_range(blob_len, key_offset, key_len)?;
checked_range(blob_len, value_offset, value_len)?;
let key_start = blob_offset + key_offset;
let key_end = key_start + key_len;
let key = std::str::from_utf8(&mmap[key_start..key_end])
.map_err(|e| StorageError::InvalidData {
entity: "publication_point_cache_mmap_index.key",
detail: e.to_string(),
})?
.to_string();
bytes = bytes.saturating_add(value_len);
entries.insert(
key,
PpCacheIndexEntry {
value_offset,
value_len,
},
);
}
Ok(Self {
mmap: Arc::new(mmap),
entries,
bytes,
file_bytes: file_len as u64,
})
}
pub fn get(&self, manifest_rsync_uri: &str) -> Option<&[u8]> {
let entry = self.entries.get(manifest_rsync_uri)?;
let blob_offset = read_u64(self.mmap.as_ref(), 40).ok()? as usize;
let start = blob_offset + entry.value_offset;
let end = start + entry.value_len;
Some(&self.mmap[start..end])
}
pub fn entries(&self) -> usize {
self.entries.len()
}
pub fn bytes(&self) -> usize {
self.bytes
}
pub fn file_bytes(&self) -> u64 {
self.file_bytes
}
}
#[derive(Debug)]
pub struct PpCacheMmapIndexSet {
indexes: Vec<PpCacheMmapIndex>,
entries: usize,
bytes: usize,
file_bytes: u64,
}
impl PpCacheMmapIndexSet {
pub fn get(&self, manifest_rsync_uri: &str) -> Option<&[u8]> {
self.indexes
.iter()
.find_map(|index| index.get(manifest_rsync_uri))
}
pub fn entries(&self) -> usize {
self.entries
}
pub fn bytes(&self) -> usize {
self.bytes
}
pub fn file_bytes(&self) -> u64 {
self.file_bytes
}
}
pub fn default_pp_cache_index_dir(db_path: &Path) -> PathBuf {
let file_name = db_path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("work-db");
db_path.with_file_name(format!("{file_name}.pp-cache-index"))
}
pub fn load_pp_cache_mmap_index(
path: &Path,
) -> StorageResult<(PpCacheMmapIndex, PpCacheIndexLoadStats)> {
let started = Instant::now();
let index = PpCacheMmapIndex::open(path)?;
let file_bytes = fs::metadata(path)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.len();
let stats = PpCacheIndexLoadStats {
source: "mmap".to_string(),
entries: index.entries(),
bytes: index.bytes(),
file_bytes,
load_ms: started.elapsed().as_millis() as u64,
};
Ok((index, stats))
}
pub fn load_pp_cache_mmap_index_set(
dir: &Path,
) -> StorageResult<(PpCacheMmapIndexSet, PpCacheIndexLoadStats)> {
let started = Instant::now();
let mut paths = Vec::new();
if dir.exists() {
let mut segments = fs::read_dir(dir)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.filter_map(Result::ok)
.map(|entry| entry.path())
.filter(|path| {
path.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name.starts_with("segment-") && name.ends_with(".idx"))
})
.collect::<Vec<_>>();
segments.sort();
segments.reverse();
paths.extend(segments);
let current = dir.join("current.idx");
if current.exists() {
paths.push(current);
}
}
if paths.is_empty() {
return Err(StorageError::InvalidData {
entity: "publication_point_cache_mmap_index",
detail: "index file missing".to_string(),
});
}
let mut indexes = Vec::new();
let mut entries = 0usize;
let mut bytes = 0usize;
let mut file_bytes = 0u64;
for path in paths {
let (index, _) = load_pp_cache_mmap_index(&path)?;
entries = entries.saturating_add(index.entries());
bytes = bytes.saturating_add(index.bytes());
file_bytes = file_bytes.saturating_add(index.file_bytes());
indexes.push(index);
}
let set = PpCacheMmapIndexSet {
indexes,
entries,
bytes,
file_bytes,
};
let stats = PpCacheIndexLoadStats {
source: "mmap".to_string(),
entries: set.entries(),
bytes: set.bytes(),
file_bytes: set.file_bytes(),
load_ms: started.elapsed().as_millis() as u64,
};
Ok((set, stats))
}
pub fn write_pp_cache_index_segment<I>(
dir: &Path,
entries: I,
) -> StorageResult<PpCacheIndexRefreshStats>
where
I: IntoIterator<Item = (String, Vec<u8>)>,
{
fs::create_dir_all(dir).map_err(|e| StorageError::RocksDb(e.to_string()))?;
let current = dir.join("current.idx");
if !current.exists() {
return write_pp_cache_index_atomic(&current, entries);
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.as_nanos();
let segment = dir.join(format!("segment-{now:020}-{}.idx", std::process::id()));
let mut stats = write_pp_cache_index_atomic(&segment, entries)?;
stats.state = "segment_written".to_string();
Ok(stats)
}
pub fn write_pp_cache_index_atomic<I>(
path: &Path,
entries: I,
) -> StorageResult<PpCacheIndexRefreshStats>
where
I: IntoIterator<Item = (String, Vec<u8>)>,
{
let started = Instant::now();
let parent = path.parent().ok_or_else(|| StorageError::InvalidData {
entity: "publication_point_cache_mmap_index.path",
detail: "missing parent".to_string(),
})?;
fs::create_dir_all(parent).map_err(|e| StorageError::RocksDb(e.to_string()))?;
let mut ordered = BTreeMap::<String, Vec<u8>>::new();
for (key, value) in entries {
ordered.insert(key, value);
}
let tmp_path = parent.join("next.tmp");
write_pp_cache_index_file(
&tmp_path,
ordered.iter().map(|(k, v)| (k.as_str(), v.as_slice())),
)?;
let file_bytes = fs::metadata(&tmp_path)
.map_err(|e| StorageError::RocksDb(e.to_string()))?
.len();
fs::rename(&tmp_path, path).map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(PpCacheIndexRefreshStats {
state: "written".to_string(),
old_entries: 0,
dirty_entries: 0,
new_entries: ordered.len(),
file_bytes,
write_ms: started.elapsed().as_millis() as u64,
})
}
fn write_pp_cache_index_file<'a, I>(path: &Path, entries: I) -> StorageResult<()>
where
I: IntoIterator<Item = (&'a str, &'a [u8])>,
{
let entries = entries.into_iter().collect::<Vec<_>>();
let entry_table_offset = HEADER_LEN;
let entry_table_len = entries.len() * ENTRY_LEN;
let blob_offset = entry_table_offset + entry_table_len;
let mut table = Vec::with_capacity(entry_table_len);
let mut blob = Vec::new();
for (key, value) in entries.iter() {
let key_offset = blob.len();
blob.extend_from_slice(key.as_bytes());
let value_offset = blob.len();
blob.extend_from_slice(value);
write_u64(&mut table, key_offset as u64);
write_u32(&mut table, key.len() as u32);
write_u64(&mut table, value_offset as u64);
write_u32(&mut table, value.len() as u32);
}
let mut header = Vec::with_capacity(HEADER_LEN);
header.extend_from_slice(MAGIC);
write_u32(&mut header, VERSION);
write_u64(&mut header, entries.len() as u64);
write_u64(&mut header, entry_table_offset as u64);
write_u64(&mut header, entry_table_len as u64);
write_u64(&mut header, blob_offset as u64);
write_u64(&mut header, blob.len() as u64);
header.resize(HEADER_LEN, 0);
let mut file = File::create(path).map_err(|e| StorageError::RocksDb(e.to_string()))?;
file.write_all(&header)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
file.write_all(&table)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
file.write_all(&blob)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
file.sync_all()
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
fn checked_range(total: usize, offset: usize, len: usize) -> StorageResult<()> {
if offset.checked_add(len).is_none_or(|end| end > total) {
return Err(StorageError::InvalidData {
entity: "publication_point_cache_mmap_index.range",
detail: "out of bounds".to_string(),
});
}
Ok(())
}
fn read_u32(bytes: &[u8], offset: usize) -> StorageResult<u32> {
checked_range(bytes.len(), offset, 4)?;
Ok(u32::from_le_bytes(
bytes[offset..offset + 4].try_into().unwrap(),
))
}
fn read_u64(bytes: &[u8], offset: usize) -> StorageResult<u64> {
checked_range(bytes.len(), offset, 8)?;
Ok(u64::from_le_bytes(
bytes[offset..offset + 8].try_into().unwrap(),
))
}
fn write_u32(out: &mut Vec<u8>, value: u32) {
out.extend_from_slice(&value.to_le_bytes());
}
fn write_u64(out: &mut Vec<u8>, value: u64) {
out.extend_from_slice(&value.to_le_bytes());
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pp_cache_index_roundtrips_multiple_entries() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("current.idx");
let stats = write_pp_cache_index_atomic(
&path,
vec![
("rsync://example.test/a.mft".to_string(), b"aaa".to_vec()),
("rsync://example.test/b.mft".to_string(), b"bbbb".to_vec()),
],
)
.expect("write index");
assert_eq!(stats.new_entries, 2);
let (index, load) = load_pp_cache_mmap_index(&path).expect("load index");
assert_eq!(load.entries, 2);
assert_eq!(index.get("rsync://example.test/a.mft"), Some(&b"aaa"[..]));
assert_eq!(index.get("rsync://example.test/b.mft"), Some(&b"bbbb"[..]));
assert_eq!(index.get("rsync://example.test/missing.mft"), None);
}
#[test]
fn pp_cache_index_rejects_bad_magic() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("current.idx");
fs::write(&path, b"bad").expect("write bad");
let err = load_pp_cache_mmap_index(&path).expect_err("bad index rejected");
assert!(err.to_string().contains("file too small"));
}
#[test]
fn pp_cache_index_last_duplicate_wins() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("current.idx");
write_pp_cache_index_atomic(
&path,
vec![
("rsync://example.test/a.mft".to_string(), b"old".to_vec()),
("rsync://example.test/a.mft".to_string(), b"new".to_vec()),
],
)
.expect("write index");
let (index, _) = load_pp_cache_mmap_index(&path).expect("load index");
assert_eq!(index.entries(), 1);
assert_eq!(index.get("rsync://example.test/a.mft"), Some(&b"new"[..]));
}
#[test]
fn pp_cache_index_set_prefers_newest_segment() {
let dir = tempfile::tempdir().expect("tempdir");
let current = dir.path().join("current.idx");
write_pp_cache_index_atomic(
&current,
vec![("rsync://example.test/a.mft".to_string(), b"old".to_vec())],
)
.expect("write current index");
write_pp_cache_index_segment(
dir.path(),
vec![("rsync://example.test/a.mft".to_string(), b"new".to_vec())],
)
.expect("write segment index");
let (set, stats) = load_pp_cache_mmap_index_set(dir.path()).expect("load index set");
assert_eq!(stats.entries, 2);
assert_eq!(set.get("rsync://example.test/a.mft"), Some(&b"new"[..]));
}
}

View File

@ -454,6 +454,109 @@ fn publication_point_cache_projection_cached_empty_db_accepts_bounded_new_entrie
); );
} }
#[test]
fn publication_point_cache_mmap_index_refresh_roundtrips_after_reopen() {
let td = tempfile::tempdir().expect("tempdir");
let db_path = td.path().join("work-db");
let vcir = sample_vcir("rsync://example.test/repo/current.mft");
let projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
"rsync://example.test/repo/".to_string(),
Some("rsync://example.test/repo/ca.cer".to_string()),
sha256_32(b"ca-cert"),
sha256_32(b"manifest"),
sha256_32(b"ta-context"),
sha256_32(b"parent-context"),
sha256_32(b"policy"),
)
.expect("build publication point projection");
{
let store = RocksStore::open(&db_path).expect("open rocksdb");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put projection");
let stats = store
.refresh_publication_point_cache_mmap_index()
.expect("refresh mmap index")
.expect("refresh stats");
assert_eq!(stats.new_entries, 1);
assert_eq!(stats.state, "written");
}
let store = RocksStore::open(&db_path).expect("reopen rocksdb");
let got = store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("get cached projection from mmap")
.expect("projection exists");
assert_eq!(got, projection);
}
#[test]
fn publication_point_cache_mmap_index_dirty_overlay_wins_and_refreshes_segment() {
let td = tempfile::tempdir().expect("tempdir");
let db_path = td.path().join("work-db");
let vcir = sample_vcir("rsync://example.test/repo/current.mft");
let mut projection = PublicationPointCacheProjection::from_vcir_with_context(
&vcir,
"rsync://example.test/repo/".to_string(),
Some("rsync://example.test/repo/ca.cer".to_string()),
sha256_32(b"ca-cert"),
sha256_32(b"manifest-old"),
sha256_32(b"ta-context"),
sha256_32(b"parent-context"),
sha256_32(b"policy"),
)
.expect("build publication point projection");
{
let store = RocksStore::open(&db_path).expect("open rocksdb");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put old projection");
store
.refresh_publication_point_cache_mmap_index()
.expect("refresh base mmap index");
}
{
let store = RocksStore::open(&db_path).expect("reopen rocksdb");
assert_eq!(
store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("get old projection")
.expect("old projection exists")
.manifest_sha256,
sha256_32(b"manifest-old")
);
projection.manifest_sha256 = sha256_32(b"manifest-new");
store
.put_vcir_with_publication_point_cache_projection(&vcir, Some(&projection))
.expect("put new projection");
assert_eq!(
store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("get dirty projection")
.expect("dirty projection exists")
.manifest_sha256,
sha256_32(b"manifest-new")
);
let stats = store
.refresh_publication_point_cache_mmap_index()
.expect("refresh dirty mmap segment")
.expect("refresh stats");
assert_eq!(stats.state, "segment_written");
assert_eq!(stats.dirty_entries, 1);
}
let store = RocksStore::open(&db_path).expect("reopen rocksdb after segment");
let got = store
.get_publication_point_cache_projection_cached(&vcir.manifest_rsync_uri)
.expect("get refreshed projection")
.expect("projection exists");
assert_eq!(got.manifest_sha256, sha256_32(b"manifest-new"));
}
#[test] #[test]
fn publication_point_cache_projection_rejects_version_mismatch() { fn publication_point_cache_projection_rejects_version_mismatch() {
let vcir = sample_vcir("rsync://example.test/repo/current.mft"); let vcir = sample_vcir("rsync://example.test/repo/current.mft");