324 lines
11 KiB
Rust
324 lines
11 KiB
Rust
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
|
|
use rocksdb::{DB, Options, WriteBatch};
|
|
|
|
use crate::storage::{RawByHashEntry, RocksStore, StorageError, StorageResult};
|
|
|
|
const RAW_BY_HASH_KEY_PREFIX: &str = "rawbyhash:";
|
|
|
|
fn raw_by_hash_key(sha256_hex: &str) -> String {
|
|
format!("{RAW_BY_HASH_KEY_PREFIX}{sha256_hex}")
|
|
}
|
|
|
|
pub trait RawObjectStore {
|
|
fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult<Option<RawByHashEntry>>;
|
|
|
|
fn get_raw_entries_batch(
|
|
&self,
|
|
sha256_hexes: &[String],
|
|
) -> StorageResult<Vec<Option<RawByHashEntry>>>;
|
|
|
|
fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult<Option<Vec<u8>>> {
|
|
self.get_raw_entry(sha256_hex)
|
|
.map(|entry| entry.map(|entry| entry.bytes))
|
|
}
|
|
|
|
fn get_blob_bytes_batch(
|
|
&self,
|
|
sha256_hexes: &[String],
|
|
) -> StorageResult<Vec<Option<Vec<u8>>>> {
|
|
self.get_raw_entries_batch(sha256_hexes).map(|entries| {
|
|
entries
|
|
.into_iter()
|
|
.map(|entry| entry.map(|entry| entry.bytes))
|
|
.collect()
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct ExternalRawStoreDb {
|
|
path: PathBuf,
|
|
db: Arc<DB>,
|
|
}
|
|
|
|
impl ExternalRawStoreDb {
|
|
pub fn open(path: impl Into<PathBuf>) -> StorageResult<Self> {
|
|
let path = path.into();
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent).map_err(|e| StorageError::RocksDb(e.to_string()))?;
|
|
}
|
|
let mut opts = Options::default();
|
|
opts.create_if_missing(true);
|
|
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
|
let db = DB::open(&opts, &path).map_err(|e| StorageError::RocksDb(e.to_string()))?;
|
|
Ok(Self {
|
|
path,
|
|
db: Arc::new(db),
|
|
})
|
|
}
|
|
|
|
pub fn put_raw_entry(&self, entry: &RawByHashEntry) -> StorageResult<()> {
|
|
entry.validate_internal()?;
|
|
let key = raw_by_hash_key(&entry.sha256_hex);
|
|
let value =
|
|
serde_cbor::to_vec(entry).map_err(|e| StorageError::Codec { entity: "raw_by_hash", detail: e.to_string() })?;
|
|
self.db
|
|
.put(key.as_bytes(), value)
|
|
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn put_raw_entries_batch(&self, entries: &[RawByHashEntry]) -> StorageResult<()> {
|
|
if entries.is_empty() {
|
|
return Ok(());
|
|
}
|
|
let mut batch = WriteBatch::default();
|
|
for entry in entries {
|
|
entry.validate_internal()?;
|
|
let key = raw_by_hash_key(&entry.sha256_hex);
|
|
let value = serde_cbor::to_vec(entry).map_err(|e| StorageError::Codec {
|
|
entity: "raw_by_hash",
|
|
detail: e.to_string(),
|
|
})?;
|
|
batch.put(key.as_bytes(), value);
|
|
}
|
|
self.db
|
|
.write(batch)
|
|
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn delete_raw_entry(&self, sha256_hex: &str) -> StorageResult<()> {
|
|
let key = raw_by_hash_key(sha256_hex);
|
|
self.db
|
|
.delete(key.as_bytes())
|
|
.map_err(|e| StorageError::RocksDb(e.to_string()))
|
|
}
|
|
|
|
pub fn path(&self) -> &PathBuf {
|
|
&self.path
|
|
}
|
|
}
|
|
|
|
impl RawObjectStore for RocksStore {
|
|
fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult<Option<RawByHashEntry>> {
|
|
self.get_raw_by_hash_entry(sha256_hex)
|
|
}
|
|
|
|
fn get_raw_entries_batch(
|
|
&self,
|
|
sha256_hexes: &[String],
|
|
) -> StorageResult<Vec<Option<RawByHashEntry>>> {
|
|
self.get_raw_by_hash_entries_batch(sha256_hexes)
|
|
}
|
|
}
|
|
|
|
impl RawObjectStore for ExternalRawStoreDb {
|
|
fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult<Option<RawByHashEntry>> {
|
|
let key = raw_by_hash_key(sha256_hex);
|
|
let Some(bytes) = self
|
|
.db
|
|
.get(key.as_bytes())
|
|
.map_err(|e| StorageError::RocksDb(e.to_string()))?
|
|
else {
|
|
return Ok(None);
|
|
};
|
|
let entry = serde_cbor::from_slice::<RawByHashEntry>(&bytes).map_err(|e| StorageError::Codec {
|
|
entity: "raw_by_hash",
|
|
detail: e.to_string(),
|
|
})?;
|
|
entry.validate_internal()?;
|
|
Ok(Some(entry))
|
|
}
|
|
|
|
fn get_raw_entries_batch(
|
|
&self,
|
|
sha256_hexes: &[String],
|
|
) -> StorageResult<Vec<Option<RawByHashEntry>>> {
|
|
if sha256_hexes.is_empty() {
|
|
return Ok(Vec::new());
|
|
}
|
|
let keys: Vec<String> = sha256_hexes.iter().map(|hash| raw_by_hash_key(hash)).collect();
|
|
self.db
|
|
.multi_get(keys.iter().map(|key| key.as_bytes()))
|
|
.into_iter()
|
|
.map(|res| {
|
|
let maybe = res.map_err(|e| StorageError::RocksDb(e.to_string()))?;
|
|
match maybe {
|
|
Some(bytes) => {
|
|
let entry = serde_cbor::from_slice::<RawByHashEntry>(&bytes).map_err(|e| {
|
|
StorageError::Codec {
|
|
entity: "raw_by_hash",
|
|
detail: e.to_string(),
|
|
}
|
|
})?;
|
|
entry.validate_internal()?;
|
|
Ok(Some(entry))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
})
|
|
.collect()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::{ExternalRawStoreDb, RawObjectStore};
|
|
use crate::storage::{RawByHashEntry, RocksStore, StorageError};
|
|
|
|
fn sha256_hex(bytes: &[u8]) -> String {
|
|
use sha2::{Digest, Sha256};
|
|
hex::encode(Sha256::digest(bytes))
|
|
}
|
|
|
|
#[test]
|
|
fn rocks_store_raw_object_store_reads_single_and_batch_entries() {
|
|
let td = tempfile::tempdir().expect("tempdir");
|
|
let store = RocksStore::open(td.path()).expect("open rocksdb");
|
|
|
|
let a = b"object-a".to_vec();
|
|
let b = b"object-b".to_vec();
|
|
let a_hash = sha256_hex(&a);
|
|
let b_hash = sha256_hex(&b);
|
|
|
|
store
|
|
.put_raw_by_hash_entry(&RawByHashEntry::from_bytes(a_hash.clone(), a.clone()))
|
|
.expect("put a");
|
|
store
|
|
.put_raw_by_hash_entry(&RawByHashEntry::from_bytes(b_hash.clone(), b.clone()))
|
|
.expect("put b");
|
|
|
|
let single = store
|
|
.get_raw_entry(&a_hash)
|
|
.expect("get single")
|
|
.expect("present");
|
|
assert_eq!(single.bytes, a);
|
|
|
|
let batch = store
|
|
.get_raw_entries_batch(&[a_hash.clone(), "00".repeat(32), b_hash.clone()])
|
|
.expect("get batch");
|
|
assert_eq!(batch.len(), 3);
|
|
assert_eq!(batch[0].as_ref().map(|entry| entry.bytes.as_slice()), Some(a.as_slice()));
|
|
assert!(batch[1].is_none());
|
|
assert_eq!(batch[2].as_ref().map(|entry| entry.bytes.as_slice()), Some(b.as_slice()));
|
|
}
|
|
|
|
#[test]
|
|
fn external_raw_store_db_roundtrips_entries() {
|
|
let td = tempfile::tempdir().expect("tempdir");
|
|
let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
|
|
|
|
let mut entry = RawByHashEntry::from_bytes(sha256_hex(b"blob"), b"blob".to_vec());
|
|
entry.origin_uris.push("rsync://example.test/repo/a.cer".to_string());
|
|
entry.object_type = Some("cer".to_string());
|
|
raw_store.put_raw_entry(&entry).expect("put raw entry");
|
|
|
|
let got = raw_store
|
|
.get_raw_entry(&entry.sha256_hex)
|
|
.expect("read raw entry")
|
|
.expect("entry exists");
|
|
assert_eq!(got, entry);
|
|
}
|
|
|
|
#[test]
|
|
fn external_raw_store_db_batch_writes_and_reads() {
|
|
let td = tempfile::tempdir().expect("tempdir");
|
|
let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
|
|
|
|
let a = RawByHashEntry::from_bytes(sha256_hex(b"a"), b"a".to_vec());
|
|
let b = RawByHashEntry::from_bytes(sha256_hex(b"b"), b"b".to_vec());
|
|
raw_store
|
|
.put_raw_entries_batch(&[a.clone(), b.clone()])
|
|
.expect("batch put");
|
|
|
|
let batch = raw_store
|
|
.get_raw_entries_batch(&[a.sha256_hex.clone(), b.sha256_hex.clone()])
|
|
.expect("batch get");
|
|
assert_eq!(batch.len(), 2);
|
|
assert_eq!(batch[0], Some(a));
|
|
assert_eq!(batch[1], Some(b));
|
|
}
|
|
|
|
#[test]
|
|
fn raw_object_store_default_blob_helpers_return_bytes_only() {
|
|
let td = tempfile::tempdir().expect("tempdir");
|
|
let raw_store = ExternalRawStoreDb::open(td.path().join("nested/raw-store.db"))
|
|
.expect("open raw store");
|
|
|
|
let mut entry = RawByHashEntry::from_bytes(sha256_hex(b"blob"), b"blob".to_vec());
|
|
entry.origin_uris.push("rsync://example.test/repo/blob.roa".to_string());
|
|
raw_store.put_raw_entry(&entry).expect("put raw entry");
|
|
|
|
let single = raw_store
|
|
.get_blob_bytes(&entry.sha256_hex)
|
|
.expect("get blob bytes")
|
|
.expect("entry exists");
|
|
assert_eq!(single, b"blob".to_vec());
|
|
|
|
let batch = raw_store
|
|
.get_blob_bytes_batch(&[entry.sha256_hex.clone(), "00".repeat(32)])
|
|
.expect("get blob bytes batch");
|
|
assert_eq!(batch, vec![Some(b"blob".to_vec()), None]);
|
|
}
|
|
|
|
#[test]
|
|
fn external_raw_store_db_delete_removes_entry() {
|
|
let td = tempfile::tempdir().expect("tempdir");
|
|
let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
|
|
|
|
let entry = RawByHashEntry::from_bytes(sha256_hex(b"gone"), b"gone".to_vec());
|
|
raw_store.put_raw_entry(&entry).expect("put");
|
|
assert!(raw_store.get_raw_entry(&entry.sha256_hex).unwrap().is_some());
|
|
|
|
raw_store
|
|
.delete_raw_entry(&entry.sha256_hex)
|
|
.expect("delete entry");
|
|
assert!(raw_store.get_raw_entry(&entry.sha256_hex).unwrap().is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn external_raw_store_db_rejects_invalid_entry_on_put() {
|
|
let td = tempfile::tempdir().expect("tempdir");
|
|
let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
|
|
|
|
let bad = RawByHashEntry {
|
|
sha256_hex: "11".repeat(32),
|
|
bytes: b"blob".to_vec(),
|
|
origin_uris: Vec::new(),
|
|
object_type: None,
|
|
encoding: None,
|
|
};
|
|
let err = raw_store.put_raw_entry(&bad).expect_err("invalid hash should fail");
|
|
assert!(matches!(err, StorageError::InvalidData { .. }));
|
|
}
|
|
|
|
#[test]
|
|
fn external_raw_store_db_reports_codec_error_for_corrupt_value() {
|
|
let td = tempfile::tempdir().expect("tempdir");
|
|
let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
|
|
raw_store
|
|
.db
|
|
.put(b"rawbyhash:deadbeef", b"not-cbor")
|
|
.expect("inject corrupt bytes");
|
|
|
|
let err = raw_store
|
|
.get_raw_entry("deadbeef")
|
|
.expect_err("corrupt value should fail");
|
|
assert!(matches!(err, StorageError::Codec { entity: "raw_by_hash", .. }));
|
|
}
|
|
|
|
#[test]
|
|
fn external_raw_store_db_batch_returns_empty_for_empty_request() {
|
|
let td = tempfile::tempdir().expect("tempdir");
|
|
let raw_store = ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
|
|
let entries = raw_store
|
|
.get_raw_entries_batch(&[])
|
|
.expect("empty batch succeeds");
|
|
assert!(entries.is_empty());
|
|
raw_store.put_raw_entries_batch(&[]).expect("empty put succeeds");
|
|
}
|
|
}
|