use std::path::PathBuf; use std::sync::Arc; use rocksdb::{DB, Options, WriteBatch}; use crate::storage::{ RawByHashEntry, RocksDbMemoryDbSnapshot, RocksStore, StorageError, StorageResult, memory_db_snapshot_for_column_families, }; const RAW_BY_HASH_KEY_PREFIX: &str = "rawbyhash:"; const RAW_BLOB_KEY_PREFIX: &str = "rawblob:"; const REPO_BYTES_KEY_PREFIX: &str = "sha256:"; fn raw_by_hash_key(sha256_hex: &str) -> String { format!("{RAW_BY_HASH_KEY_PREFIX}{sha256_hex}") } fn raw_blob_key(sha256_hex: &str) -> String { format!("{RAW_BLOB_KEY_PREFIX}{sha256_hex}") } fn repo_bytes_key(sha256_hex: &str) -> String { format!("{REPO_BYTES_KEY_PREFIX}{sha256_hex}") } fn validate_blob_sha256_hex(sha256_hex: &str) -> StorageResult<()> { if sha256_hex.len() != 64 || !sha256_hex.as_bytes().iter().all(u8::is_ascii_hexdigit) { return Err(StorageError::InvalidData { entity: "raw_blob", detail: format!("invalid sha256 hex: {sha256_hex}"), }); } Ok(()) } fn validate_blob_bytes(bytes: &[u8]) -> StorageResult<()> { if bytes.is_empty() { return Err(StorageError::InvalidData { entity: "raw_blob", detail: "bytes must not be empty".to_string(), }); } Ok(()) } pub trait RawObjectStore { fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult>; fn get_raw_entries_batch( &self, sha256_hexes: &[String], ) -> StorageResult>>; fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult>> { self.get_raw_entry(sha256_hex) .map(|entry| entry.map(|entry| entry.bytes)) } fn get_blob_bytes_batch(&self, sha256_hexes: &[String]) -> StorageResult>>> { self.get_raw_entries_batch(sha256_hexes).map(|entries| { entries .into_iter() .map(|entry| entry.map(|entry| entry.bytes)) .collect() }) } } #[derive(Clone, Debug)] pub struct ExternalRawStoreDb { path: PathBuf, db: Arc, } #[derive(Clone, Debug)] pub struct ExternalRepoBytesDb { path: PathBuf, db: Arc, } impl ExternalRawStoreDb { pub fn open(path: impl Into) -> StorageResult { let path = path.into(); if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).map_err(|e| StorageError::RocksDb(e.to_string()))?; } let mut opts = Options::default(); opts.create_if_missing(true); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); let db = DB::open(&opts, &path).map_err(|e| StorageError::RocksDb(e.to_string()))?; Ok(Self { path, db: Arc::new(db), }) } pub fn put_raw_entry(&self, entry: &RawByHashEntry) -> StorageResult<()> { entry.validate_internal()?; let key = raw_by_hash_key(&entry.sha256_hex); let blob_key = raw_blob_key(&entry.sha256_hex); let value = serde_cbor::to_vec(entry).map_err(|e| StorageError::Codec { entity: "raw_by_hash", detail: e.to_string(), })?; let blob_value = entry.bytes.clone(); self.db .write({ let mut batch = WriteBatch::default(); batch.put(key.as_bytes(), value); batch.put(blob_key.as_bytes(), blob_value); batch }) .map_err(|e| StorageError::RocksDb(e.to_string()))?; Ok(()) } pub fn put_raw_entries_batch(&self, entries: &[RawByHashEntry]) -> StorageResult<()> { if entries.is_empty() { return Ok(()); } let mut batch = WriteBatch::default(); for entry in entries { entry.validate_internal()?; let key = raw_by_hash_key(&entry.sha256_hex); let blob_key = raw_blob_key(&entry.sha256_hex); let value = serde_cbor::to_vec(entry).map_err(|e| StorageError::Codec { entity: "raw_by_hash", detail: e.to_string(), })?; batch.put(key.as_bytes(), value); batch.put(blob_key.as_bytes(), entry.bytes.as_slice()); } self.db .write(batch) .map_err(|e| StorageError::RocksDb(e.to_string()))?; Ok(()) } pub fn put_blob_bytes_batch(&self, blobs: &[(String, Vec)]) -> StorageResult<()> { if blobs.is_empty() { return Ok(()); } let mut batch = WriteBatch::default(); for (sha256_hex, bytes) in blobs { validate_blob_sha256_hex(sha256_hex)?; validate_blob_bytes(bytes)?; let blob_key = raw_blob_key(sha256_hex); batch.put(blob_key.as_bytes(), bytes.as_slice()); } self.db .write(batch) .map_err(|e| StorageError::RocksDb(e.to_string()))?; Ok(()) } pub fn delete_raw_entry(&self, sha256_hex: &str) -> StorageResult<()> { let key = raw_by_hash_key(sha256_hex); let blob_key = raw_blob_key(sha256_hex); self.db .write({ let mut batch = WriteBatch::default(); batch.delete(key.as_bytes()); batch.delete(blob_key.as_bytes()); batch }) .map_err(|e| StorageError::RocksDb(e.to_string())) } pub fn path(&self) -> &PathBuf { &self.path } pub(crate) fn memory_snapshot(&self, label: impl Into) -> RocksDbMemoryDbSnapshot { memory_db_snapshot_for_column_families(label, self.db.as_ref(), None) } } impl ExternalRepoBytesDb { pub fn open(path: impl Into) -> StorageResult { let path = path.into(); if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).map_err(|e| StorageError::RocksDb(e.to_string()))?; } let mut opts = Options::default(); opts.create_if_missing(true); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); let db = DB::open(&opts, &path).map_err(|e| StorageError::RocksDb(e.to_string()))?; Ok(Self { path, db: Arc::new(db), }) } pub fn open_read_only(path: impl Into) -> StorageResult { let path = path.into(); let mut opts = Options::default(); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); let db = DB::open_for_read_only(&opts, &path, false) .map_err(|e| StorageError::RocksDb(e.to_string()))?; Ok(Self { path, db: Arc::new(db), }) } pub fn put_blob_bytes_batch(&self, blobs: &[(String, Vec)]) -> StorageResult<()> { if blobs.is_empty() { return Ok(()); } let mut batch = WriteBatch::default(); for (sha256_hex, bytes) in blobs { validate_blob_sha256_hex(sha256_hex)?; validate_blob_bytes(bytes)?; let key = repo_bytes_key(sha256_hex); batch.put(key.as_bytes(), bytes.as_slice()); } self.db .write(batch) .map_err(|e| StorageError::RocksDb(e.to_string()))?; Ok(()) } pub fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult>> { validate_blob_sha256_hex(sha256_hex)?; let key = repo_bytes_key(sha256_hex); self.db .get(key.as_bytes()) .map_err(|e| StorageError::RocksDb(e.to_string())) } pub fn get_blob_bytes_batch( &self, sha256_hexes: &[String], ) -> StorageResult>>> { if sha256_hexes.is_empty() { return Ok(Vec::new()); } let keys: Vec = sha256_hexes .iter() .map(|hash| { validate_blob_sha256_hex(hash)?; Ok::(repo_bytes_key(hash)) }) .collect::>()?; self.db .multi_get(keys.iter().map(|key| key.as_bytes())) .into_iter() .map(|res| res.map_err(|e| StorageError::RocksDb(e.to_string()))) .collect() } pub fn path(&self) -> &PathBuf { &self.path } pub(crate) fn memory_snapshot(&self, label: impl Into) -> RocksDbMemoryDbSnapshot { memory_db_snapshot_for_column_families(label, self.db.as_ref(), None) } } impl RawObjectStore for RocksStore { fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult> { self.get_raw_by_hash_entry(sha256_hex) } fn get_raw_entries_batch( &self, sha256_hexes: &[String], ) -> StorageResult>> { self.get_raw_by_hash_entries_batch(sha256_hexes) } fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult>> { RocksStore::get_blob_bytes(self, sha256_hex) } fn get_blob_bytes_batch(&self, sha256_hexes: &[String]) -> StorageResult>>> { RocksStore::get_blob_bytes_batch(self, sha256_hexes) } } impl RawObjectStore for ExternalRawStoreDb { fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult> { let key = raw_by_hash_key(sha256_hex); let Some(bytes) = self .db .get(key.as_bytes()) .map_err(|e| StorageError::RocksDb(e.to_string()))? else { return Ok(None); }; let entry = serde_cbor::from_slice::(&bytes).map_err(|e| StorageError::Codec { entity: "raw_by_hash", detail: e.to_string(), })?; entry.validate_internal()?; Ok(Some(entry)) } fn get_raw_entries_batch( &self, sha256_hexes: &[String], ) -> StorageResult>> { if sha256_hexes.is_empty() { return Ok(Vec::new()); } let keys: Vec = sha256_hexes .iter() .map(|hash| raw_by_hash_key(hash)) .collect(); self.db .multi_get(keys.iter().map(|key| key.as_bytes())) .into_iter() .map(|res| { let maybe = res.map_err(|e| StorageError::RocksDb(e.to_string()))?; match maybe { Some(bytes) => { let entry = serde_cbor::from_slice::(&bytes).map_err(|e| { StorageError::Codec { entity: "raw_by_hash", detail: e.to_string(), } })?; entry.validate_internal()?; Ok(Some(entry)) } None => Ok(None), } }) .collect() } fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult>> { let key = raw_blob_key(sha256_hex); self.db .get(key.as_bytes()) .map_err(|e| StorageError::RocksDb(e.to_string())) } fn get_blob_bytes_batch(&self, sha256_hexes: &[String]) -> StorageResult>>> { if sha256_hexes.is_empty() { return Ok(Vec::new()); } let keys: Vec = sha256_hexes.iter().map(|hash| raw_blob_key(hash)).collect(); self.db .multi_get(keys.iter().map(|key| key.as_bytes())) .into_iter() .map(|res| res.map_err(|e| StorageError::RocksDb(e.to_string()))) .collect() } } #[cfg(test)] mod tests { use super::{ExternalRawStoreDb, ExternalRepoBytesDb, RawObjectStore}; use crate::storage::{RawByHashEntry, RocksStore, StorageError, StorageResult}; use std::collections::HashMap; fn sha256_hex(bytes: &[u8]) -> String { use sha2::{Digest, Sha256}; hex::encode(Sha256::digest(bytes)) } #[derive(Default)] struct MockRawStore { entries: HashMap, } impl RawObjectStore for MockRawStore { fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult> { Ok(self.entries.get(sha256_hex).cloned()) } fn get_raw_entries_batch( &self, sha256_hexes: &[String], ) -> StorageResult>> { Ok(sha256_hexes .iter() .map(|hash| self.entries.get(hash).cloned()) .collect()) } } #[test] fn rocks_store_raw_object_store_reads_single_and_batch_entries() { let td = tempfile::tempdir().expect("tempdir"); let store = RocksStore::open(td.path()).expect("open rocksdb"); let a = b"object-a".to_vec(); let b = b"object-b".to_vec(); let a_hash = sha256_hex(&a); let b_hash = sha256_hex(&b); store .put_raw_by_hash_entry(&RawByHashEntry::from_bytes(a_hash.clone(), a.clone())) .expect("put a"); store .put_raw_by_hash_entry(&RawByHashEntry::from_bytes(b_hash.clone(), b.clone())) .expect("put b"); let single = store .get_raw_entry(&a_hash) .expect("get single") .expect("present"); assert_eq!(single.bytes, a); let batch = store .get_raw_entries_batch(&[a_hash.clone(), "00".repeat(32), b_hash.clone()]) .expect("get batch"); assert_eq!(batch.len(), 3); assert_eq!( batch[0].as_ref().map(|entry| entry.bytes.as_slice()), Some(a.as_slice()) ); assert!(batch[1].is_none()); assert_eq!( batch[2].as_ref().map(|entry| entry.bytes.as_slice()), Some(b.as_slice()) ); } #[test] fn external_raw_store_db_roundtrips_entries() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store"); let mut entry = RawByHashEntry::from_bytes(sha256_hex(b"blob"), b"blob".to_vec()); entry .origin_uris .push("rsync://example.test/repo/a.cer".to_string()); entry.object_type = Some("cer".to_string()); raw_store.put_raw_entry(&entry).expect("put raw entry"); let got = raw_store .get_raw_entry(&entry.sha256_hex) .expect("read raw entry") .expect("entry exists"); assert_eq!(got, entry); } #[test] fn external_raw_store_db_batch_writes_and_reads() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store"); let a = RawByHashEntry::from_bytes(sha256_hex(b"a"), b"a".to_vec()); let b = RawByHashEntry::from_bytes(sha256_hex(b"b"), b"b".to_vec()); raw_store .put_raw_entries_batch(&[a.clone(), b.clone()]) .expect("batch put"); let batch = raw_store .get_raw_entries_batch(&[a.sha256_hex.clone(), b.sha256_hex.clone()]) .expect("batch get"); assert_eq!(batch.len(), 2); assert_eq!(batch[0], Some(a)); assert_eq!(batch[1], Some(b)); } #[test] fn raw_object_store_default_blob_helpers_return_bytes_only() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("nested/raw-store.db")) .expect("open raw store"); let mut entry = RawByHashEntry::from_bytes(sha256_hex(b"blob"), b"blob".to_vec()); entry .origin_uris .push("rsync://example.test/repo/blob.roa".to_string()); raw_store.put_raw_entry(&entry).expect("put raw entry"); let single = raw_store .get_blob_bytes(&entry.sha256_hex) .expect("get blob bytes") .expect("entry exists"); assert_eq!(single, b"blob".to_vec()); let batch = raw_store .get_blob_bytes_batch(&[entry.sha256_hex.clone(), "00".repeat(32)]) .expect("get blob bytes batch"); assert_eq!(batch, vec![Some(b"blob".to_vec()), None]); } #[test] fn raw_object_store_default_blob_helpers_work_for_custom_store() { let mut store = MockRawStore::default(); let a = RawByHashEntry::from_bytes(sha256_hex(b"a"), b"a".to_vec()); let b = RawByHashEntry::from_bytes(sha256_hex(b"b"), b"b".to_vec()); store.entries.insert(a.sha256_hex.clone(), a.clone()); store.entries.insert(b.sha256_hex.clone(), b.clone()); let single = store .get_blob_bytes(&a.sha256_hex) .expect("single blob bytes") .expect("present"); assert_eq!(single, b"a".to_vec()); let batch = store .get_blob_bytes_batch(&[a.sha256_hex.clone(), "00".repeat(32), b.sha256_hex.clone()]) .expect("batch blob bytes"); assert_eq!(batch, vec![Some(b"a".to_vec()), None, Some(b"b".to_vec())]); } #[test] fn rocks_store_blob_helpers_use_external_raw_store_fast_path() { let td = tempfile::tempdir().expect("tempdir"); let store = RocksStore::open_with_external_raw_store( &td.path().join("db"), &td.path().join("raw-store.db"), ) .expect("open store with external raw store"); let entry = RawByHashEntry::from_bytes(sha256_hex(b"blob-fast"), b"blob-fast".to_vec()); store.put_raw_by_hash_entry(&entry).expect("put"); let single = store .get_blob_bytes(&entry.sha256_hex) .expect("single blob bytes") .expect("present"); assert_eq!(single, b"blob-fast".to_vec()); let batch = store .get_blob_bytes_batch(&[entry.sha256_hex.clone(), "00".repeat(32)]) .expect("batch blob bytes"); assert_eq!(batch, vec![Some(b"blob-fast".to_vec()), None]); } #[test] fn external_raw_store_db_delete_removes_entry() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store"); let entry = RawByHashEntry::from_bytes(sha256_hex(b"gone"), b"gone".to_vec()); raw_store.put_raw_entry(&entry).expect("put"); assert!( raw_store .get_raw_entry(&entry.sha256_hex) .unwrap() .is_some() ); raw_store .delete_raw_entry(&entry.sha256_hex) .expect("delete entry"); assert!( raw_store .get_raw_entry(&entry.sha256_hex) .unwrap() .is_none() ); } #[test] fn put_blob_bytes_batch_round_trips_without_raw_entry() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store"); let a = (sha256_hex(b"blob-a"), b"blob-a".to_vec()); let b = (sha256_hex(b"blob-b"), b"blob-b".to_vec()); raw_store .put_blob_bytes_batch(&[a.clone(), b.clone()]) .expect("put blobs"); assert_eq!( raw_store.get_blob_bytes(&a.0).expect("get blob a"), Some(a.1.clone()) ); assert_eq!( raw_store.get_blob_bytes(&b.0).expect("get blob b"), Some(b.1.clone()) ); assert!(raw_store.get_raw_entry(&a.0).expect("get raw a").is_none()); assert!(raw_store.get_raw_entry(&b.0).expect("get raw b").is_none()); } #[test] fn put_blob_bytes_batch_rejects_invalid_inputs() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store"); let err = raw_store .put_blob_bytes_batch(&[("zz".repeat(32), b"blob".to_vec())]) .expect_err("invalid hash should fail"); assert!(matches!(err, StorageError::InvalidData { .. })); let err = raw_store .put_blob_bytes_batch(&[(sha256_hex(b"blob"), Vec::new())]) .expect_err("empty bytes should fail"); assert!(matches!(err, StorageError::InvalidData { .. })); } #[test] fn external_raw_store_db_rejects_invalid_entry_on_put() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store"); let bad = RawByHashEntry { sha256_hex: "11".repeat(32), bytes: b"blob".to_vec(), origin_uris: Vec::new(), object_type: None, encoding: None, }; let err = raw_store .put_raw_entry(&bad) .expect_err("invalid hash should fail"); assert!(matches!(err, StorageError::InvalidData { .. })); } #[test] fn external_raw_store_db_reports_codec_error_for_corrupt_value() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store"); raw_store .db .put(b"rawbyhash:deadbeef", b"not-cbor") .expect("inject corrupt bytes"); let err = raw_store .get_raw_entry("deadbeef") .expect_err("corrupt value should fail"); assert!(matches!( err, StorageError::Codec { entity: "raw_by_hash", .. } )); } #[test] fn external_raw_store_db_batch_returns_empty_for_empty_request() { let td = tempfile::tempdir().expect("tempdir"); let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store"); let entries = raw_store .get_raw_entries_batch(&[]) .expect("empty batch succeeds"); assert!(entries.is_empty()); raw_store .put_raw_entries_batch(&[]) .expect("empty put succeeds"); } #[test] fn external_repo_bytes_db_roundtrips_blob_bytes_without_raw_entry() { let td = tempfile::tempdir().expect("tempdir"); let repo_bytes = ExternalRepoBytesDb::open(td.path().join("repo-bytes.db")).expect("open repo bytes"); let bytes = b"repo-bytes-object".to_vec(); let hash = sha256_hex(&bytes); repo_bytes .put_blob_bytes_batch(&[(hash.clone(), bytes.clone())]) .expect("put repo bytes"); assert_eq!( repo_bytes.get_blob_bytes(&hash).expect("get repo bytes"), Some(bytes.clone()) ); assert_eq!( repo_bytes .get_blob_bytes_batch(&[hash, "00".repeat(32)]) .expect("get repo bytes batch"), vec![Some(bytes), None] ); } #[test] fn external_repo_bytes_db_rejects_invalid_inputs() { let td = tempfile::tempdir().expect("tempdir"); let repo_bytes = ExternalRepoBytesDb::open(td.path().join("repo-bytes.db")).expect("open repo bytes"); assert!( repo_bytes .put_blob_bytes_batch(&[("not-a-valid-hash".to_string(), b"blob".to_vec())]) .is_err() ); assert!( repo_bytes .put_blob_bytes_batch(&[(sha256_hex(b"blob"), Vec::new())]) .is_err() ); assert!(repo_bytes.get_blob_bytes("not-a-valid-hash").is_err()); } }