rpki/src/blob_store.rs

699 lines
24 KiB
Rust

use std::path::PathBuf;
use std::sync::Arc;
use rocksdb::{DB, Options, WriteBatch};
use crate::storage::{
RawByHashEntry, RocksDbMemoryDbSnapshot, RocksStore, StorageError, StorageResult,
memory_db_snapshot_for_column_families,
};
const RAW_BY_HASH_KEY_PREFIX: &str = "rawbyhash:";
const RAW_BLOB_KEY_PREFIX: &str = "rawblob:";
const REPO_BYTES_KEY_PREFIX: &str = "sha256:";
fn raw_by_hash_key(sha256_hex: &str) -> String {
format!("{RAW_BY_HASH_KEY_PREFIX}{sha256_hex}")
}
fn raw_blob_key(sha256_hex: &str) -> String {
format!("{RAW_BLOB_KEY_PREFIX}{sha256_hex}")
}
fn repo_bytes_key(sha256_hex: &str) -> String {
format!("{REPO_BYTES_KEY_PREFIX}{sha256_hex}")
}
fn validate_blob_sha256_hex(sha256_hex: &str) -> StorageResult<()> {
if sha256_hex.len() != 64 || !sha256_hex.as_bytes().iter().all(u8::is_ascii_hexdigit) {
return Err(StorageError::InvalidData {
entity: "raw_blob",
detail: format!("invalid sha256 hex: {sha256_hex}"),
});
}
Ok(())
}
fn validate_blob_bytes(bytes: &[u8]) -> StorageResult<()> {
if bytes.is_empty() {
return Err(StorageError::InvalidData {
entity: "raw_blob",
detail: "bytes must not be empty".to_string(),
});
}
Ok(())
}
pub trait RawObjectStore {
fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult<Option<RawByHashEntry>>;
fn get_raw_entries_batch(
&self,
sha256_hexes: &[String],
) -> StorageResult<Vec<Option<RawByHashEntry>>>;
fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult<Option<Vec<u8>>> {
self.get_raw_entry(sha256_hex)
.map(|entry| entry.map(|entry| entry.bytes))
}
fn get_blob_bytes_batch(&self, sha256_hexes: &[String]) -> StorageResult<Vec<Option<Vec<u8>>>> {
self.get_raw_entries_batch(sha256_hexes).map(|entries| {
entries
.into_iter()
.map(|entry| entry.map(|entry| entry.bytes))
.collect()
})
}
}
#[derive(Clone, Debug)]
pub struct ExternalRawStoreDb {
path: PathBuf,
db: Arc<DB>,
}
#[derive(Clone, Debug)]
pub struct ExternalRepoBytesDb {
path: PathBuf,
db: Arc<DB>,
}
impl ExternalRawStoreDb {
pub fn open(path: impl Into<PathBuf>) -> StorageResult<Self> {
let path = path.into();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|e| StorageError::RocksDb(e.to_string()))?;
}
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
let db = DB::open(&opts, &path).map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(Self {
path,
db: Arc::new(db),
})
}
pub fn put_raw_entry(&self, entry: &RawByHashEntry) -> StorageResult<()> {
entry.validate_internal()?;
let key = raw_by_hash_key(&entry.sha256_hex);
let blob_key = raw_blob_key(&entry.sha256_hex);
let value = serde_cbor::to_vec(entry).map_err(|e| StorageError::Codec {
entity: "raw_by_hash",
detail: e.to_string(),
})?;
let blob_value = entry.bytes.clone();
self.db
.write({
let mut batch = WriteBatch::default();
batch.put(key.as_bytes(), value);
batch.put(blob_key.as_bytes(), blob_value);
batch
})
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn put_raw_entries_batch(&self, entries: &[RawByHashEntry]) -> StorageResult<()> {
if entries.is_empty() {
return Ok(());
}
let mut batch = WriteBatch::default();
for entry in entries {
entry.validate_internal()?;
let key = raw_by_hash_key(&entry.sha256_hex);
let blob_key = raw_blob_key(&entry.sha256_hex);
let value = serde_cbor::to_vec(entry).map_err(|e| StorageError::Codec {
entity: "raw_by_hash",
detail: e.to_string(),
})?;
batch.put(key.as_bytes(), value);
batch.put(blob_key.as_bytes(), entry.bytes.as_slice());
}
self.db
.write(batch)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn put_blob_bytes_batch(&self, blobs: &[(String, Vec<u8>)]) -> StorageResult<()> {
if blobs.is_empty() {
return Ok(());
}
let mut batch = WriteBatch::default();
for (sha256_hex, bytes) in blobs {
validate_blob_sha256_hex(sha256_hex)?;
validate_blob_bytes(bytes)?;
let blob_key = raw_blob_key(sha256_hex);
batch.put(blob_key.as_bytes(), bytes.as_slice());
}
self.db
.write(batch)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn delete_raw_entry(&self, sha256_hex: &str) -> StorageResult<()> {
let key = raw_by_hash_key(sha256_hex);
let blob_key = raw_blob_key(sha256_hex);
self.db
.write({
let mut batch = WriteBatch::default();
batch.delete(key.as_bytes());
batch.delete(blob_key.as_bytes());
batch
})
.map_err(|e| StorageError::RocksDb(e.to_string()))
}
pub fn path(&self) -> &PathBuf {
&self.path
}
pub(crate) fn memory_snapshot(&self, label: impl Into<String>) -> RocksDbMemoryDbSnapshot {
memory_db_snapshot_for_column_families(label, self.db.as_ref(), None)
}
}
impl ExternalRepoBytesDb {
pub fn open(path: impl Into<PathBuf>) -> StorageResult<Self> {
let path = path.into();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|e| StorageError::RocksDb(e.to_string()))?;
}
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
let db = DB::open(&opts, &path).map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(Self {
path,
db: Arc::new(db),
})
}
pub fn open_read_only(path: impl Into<PathBuf>) -> StorageResult<Self> {
let path = path.into();
let mut opts = Options::default();
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
let db = DB::open_for_read_only(&opts, &path, false)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(Self {
path,
db: Arc::new(db),
})
}
pub fn put_blob_bytes_batch(&self, blobs: &[(String, Vec<u8>)]) -> StorageResult<()> {
if blobs.is_empty() {
return Ok(());
}
let mut batch = WriteBatch::default();
for (sha256_hex, bytes) in blobs {
validate_blob_sha256_hex(sha256_hex)?;
validate_blob_bytes(bytes)?;
let key = repo_bytes_key(sha256_hex);
batch.put(key.as_bytes(), bytes.as_slice());
}
self.db
.write(batch)
.map_err(|e| StorageError::RocksDb(e.to_string()))?;
Ok(())
}
pub fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult<Option<Vec<u8>>> {
validate_blob_sha256_hex(sha256_hex)?;
let key = repo_bytes_key(sha256_hex);
self.db
.get(key.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))
}
pub fn get_blob_bytes_batch(
&self,
sha256_hexes: &[String],
) -> StorageResult<Vec<Option<Vec<u8>>>> {
if sha256_hexes.is_empty() {
return Ok(Vec::new());
}
let keys: Vec<String> = sha256_hexes
.iter()
.map(|hash| {
validate_blob_sha256_hex(hash)?;
Ok::<String, StorageError>(repo_bytes_key(hash))
})
.collect::<Result<_, _>>()?;
self.db
.multi_get(keys.iter().map(|key| key.as_bytes()))
.into_iter()
.map(|res| res.map_err(|e| StorageError::RocksDb(e.to_string())))
.collect()
}
pub fn path(&self) -> &PathBuf {
&self.path
}
pub(crate) fn memory_snapshot(&self, label: impl Into<String>) -> RocksDbMemoryDbSnapshot {
memory_db_snapshot_for_column_families(label, self.db.as_ref(), None)
}
}
impl RawObjectStore for RocksStore {
fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult<Option<RawByHashEntry>> {
self.get_raw_by_hash_entry(sha256_hex)
}
fn get_raw_entries_batch(
&self,
sha256_hexes: &[String],
) -> StorageResult<Vec<Option<RawByHashEntry>>> {
self.get_raw_by_hash_entries_batch(sha256_hexes)
}
fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult<Option<Vec<u8>>> {
RocksStore::get_blob_bytes(self, sha256_hex)
}
fn get_blob_bytes_batch(&self, sha256_hexes: &[String]) -> StorageResult<Vec<Option<Vec<u8>>>> {
RocksStore::get_blob_bytes_batch(self, sha256_hexes)
}
}
impl RawObjectStore for ExternalRawStoreDb {
fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult<Option<RawByHashEntry>> {
let key = raw_by_hash_key(sha256_hex);
let Some(bytes) = self
.db
.get(key.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))?
else {
return Ok(None);
};
let entry =
serde_cbor::from_slice::<RawByHashEntry>(&bytes).map_err(|e| StorageError::Codec {
entity: "raw_by_hash",
detail: e.to_string(),
})?;
entry.validate_internal()?;
Ok(Some(entry))
}
fn get_raw_entries_batch(
&self,
sha256_hexes: &[String],
) -> StorageResult<Vec<Option<RawByHashEntry>>> {
if sha256_hexes.is_empty() {
return Ok(Vec::new());
}
let keys: Vec<String> = sha256_hexes
.iter()
.map(|hash| raw_by_hash_key(hash))
.collect();
self.db
.multi_get(keys.iter().map(|key| key.as_bytes()))
.into_iter()
.map(|res| {
let maybe = res.map_err(|e| StorageError::RocksDb(e.to_string()))?;
match maybe {
Some(bytes) => {
let entry =
serde_cbor::from_slice::<RawByHashEntry>(&bytes).map_err(|e| {
StorageError::Codec {
entity: "raw_by_hash",
detail: e.to_string(),
}
})?;
entry.validate_internal()?;
Ok(Some(entry))
}
None => Ok(None),
}
})
.collect()
}
fn get_blob_bytes(&self, sha256_hex: &str) -> StorageResult<Option<Vec<u8>>> {
let key = raw_blob_key(sha256_hex);
self.db
.get(key.as_bytes())
.map_err(|e| StorageError::RocksDb(e.to_string()))
}
fn get_blob_bytes_batch(&self, sha256_hexes: &[String]) -> StorageResult<Vec<Option<Vec<u8>>>> {
if sha256_hexes.is_empty() {
return Ok(Vec::new());
}
let keys: Vec<String> = sha256_hexes.iter().map(|hash| raw_blob_key(hash)).collect();
self.db
.multi_get(keys.iter().map(|key| key.as_bytes()))
.into_iter()
.map(|res| res.map_err(|e| StorageError::RocksDb(e.to_string())))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::{ExternalRawStoreDb, ExternalRepoBytesDb, RawObjectStore};
use crate::storage::{RawByHashEntry, RocksStore, StorageError, StorageResult};
use std::collections::HashMap;
fn sha256_hex(bytes: &[u8]) -> String {
use sha2::{Digest, Sha256};
hex::encode(Sha256::digest(bytes))
}
#[derive(Default)]
struct MockRawStore {
entries: HashMap<String, RawByHashEntry>,
}
impl RawObjectStore for MockRawStore {
fn get_raw_entry(&self, sha256_hex: &str) -> StorageResult<Option<RawByHashEntry>> {
Ok(self.entries.get(sha256_hex).cloned())
}
fn get_raw_entries_batch(
&self,
sha256_hexes: &[String],
) -> StorageResult<Vec<Option<RawByHashEntry>>> {
Ok(sha256_hexes
.iter()
.map(|hash| self.entries.get(hash).cloned())
.collect())
}
}
#[test]
fn rocks_store_raw_object_store_reads_single_and_batch_entries() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open(td.path()).expect("open rocksdb");
let a = b"object-a".to_vec();
let b = b"object-b".to_vec();
let a_hash = sha256_hex(&a);
let b_hash = sha256_hex(&b);
store
.put_raw_by_hash_entry(&RawByHashEntry::from_bytes(a_hash.clone(), a.clone()))
.expect("put a");
store
.put_raw_by_hash_entry(&RawByHashEntry::from_bytes(b_hash.clone(), b.clone()))
.expect("put b");
let single = store
.get_raw_entry(&a_hash)
.expect("get single")
.expect("present");
assert_eq!(single.bytes, a);
let batch = store
.get_raw_entries_batch(&[a_hash.clone(), "00".repeat(32), b_hash.clone()])
.expect("get batch");
assert_eq!(batch.len(), 3);
assert_eq!(
batch[0].as_ref().map(|entry| entry.bytes.as_slice()),
Some(a.as_slice())
);
assert!(batch[1].is_none());
assert_eq!(
batch[2].as_ref().map(|entry| entry.bytes.as_slice()),
Some(b.as_slice())
);
}
#[test]
fn external_raw_store_db_roundtrips_entries() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store =
ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
let mut entry = RawByHashEntry::from_bytes(sha256_hex(b"blob"), b"blob".to_vec());
entry
.origin_uris
.push("rsync://example.test/repo/a.cer".to_string());
entry.object_type = Some("cer".to_string());
raw_store.put_raw_entry(&entry).expect("put raw entry");
let got = raw_store
.get_raw_entry(&entry.sha256_hex)
.expect("read raw entry")
.expect("entry exists");
assert_eq!(got, entry);
}
#[test]
fn external_raw_store_db_batch_writes_and_reads() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store =
ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
let a = RawByHashEntry::from_bytes(sha256_hex(b"a"), b"a".to_vec());
let b = RawByHashEntry::from_bytes(sha256_hex(b"b"), b"b".to_vec());
raw_store
.put_raw_entries_batch(&[a.clone(), b.clone()])
.expect("batch put");
let batch = raw_store
.get_raw_entries_batch(&[a.sha256_hex.clone(), b.sha256_hex.clone()])
.expect("batch get");
assert_eq!(batch.len(), 2);
assert_eq!(batch[0], Some(a));
assert_eq!(batch[1], Some(b));
}
#[test]
fn raw_object_store_default_blob_helpers_return_bytes_only() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store = ExternalRawStoreDb::open(td.path().join("nested/raw-store.db"))
.expect("open raw store");
let mut entry = RawByHashEntry::from_bytes(sha256_hex(b"blob"), b"blob".to_vec());
entry
.origin_uris
.push("rsync://example.test/repo/blob.roa".to_string());
raw_store.put_raw_entry(&entry).expect("put raw entry");
let single = raw_store
.get_blob_bytes(&entry.sha256_hex)
.expect("get blob bytes")
.expect("entry exists");
assert_eq!(single, b"blob".to_vec());
let batch = raw_store
.get_blob_bytes_batch(&[entry.sha256_hex.clone(), "00".repeat(32)])
.expect("get blob bytes batch");
assert_eq!(batch, vec![Some(b"blob".to_vec()), None]);
}
#[test]
fn raw_object_store_default_blob_helpers_work_for_custom_store() {
let mut store = MockRawStore::default();
let a = RawByHashEntry::from_bytes(sha256_hex(b"a"), b"a".to_vec());
let b = RawByHashEntry::from_bytes(sha256_hex(b"b"), b"b".to_vec());
store.entries.insert(a.sha256_hex.clone(), a.clone());
store.entries.insert(b.sha256_hex.clone(), b.clone());
let single = store
.get_blob_bytes(&a.sha256_hex)
.expect("single blob bytes")
.expect("present");
assert_eq!(single, b"a".to_vec());
let batch = store
.get_blob_bytes_batch(&[a.sha256_hex.clone(), "00".repeat(32), b.sha256_hex.clone()])
.expect("batch blob bytes");
assert_eq!(batch, vec![Some(b"a".to_vec()), None, Some(b"b".to_vec())]);
}
#[test]
fn rocks_store_blob_helpers_use_external_raw_store_fast_path() {
let td = tempfile::tempdir().expect("tempdir");
let store = RocksStore::open_with_external_raw_store(
&td.path().join("db"),
&td.path().join("raw-store.db"),
)
.expect("open store with external raw store");
let entry = RawByHashEntry::from_bytes(sha256_hex(b"blob-fast"), b"blob-fast".to_vec());
store.put_raw_by_hash_entry(&entry).expect("put");
let single = store
.get_blob_bytes(&entry.sha256_hex)
.expect("single blob bytes")
.expect("present");
assert_eq!(single, b"blob-fast".to_vec());
let batch = store
.get_blob_bytes_batch(&[entry.sha256_hex.clone(), "00".repeat(32)])
.expect("batch blob bytes");
assert_eq!(batch, vec![Some(b"blob-fast".to_vec()), None]);
}
#[test]
fn external_raw_store_db_delete_removes_entry() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store =
ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
let entry = RawByHashEntry::from_bytes(sha256_hex(b"gone"), b"gone".to_vec());
raw_store.put_raw_entry(&entry).expect("put");
assert!(
raw_store
.get_raw_entry(&entry.sha256_hex)
.unwrap()
.is_some()
);
raw_store
.delete_raw_entry(&entry.sha256_hex)
.expect("delete entry");
assert!(
raw_store
.get_raw_entry(&entry.sha256_hex)
.unwrap()
.is_none()
);
}
#[test]
fn put_blob_bytes_batch_round_trips_without_raw_entry() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store =
ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
let a = (sha256_hex(b"blob-a"), b"blob-a".to_vec());
let b = (sha256_hex(b"blob-b"), b"blob-b".to_vec());
raw_store
.put_blob_bytes_batch(&[a.clone(), b.clone()])
.expect("put blobs");
assert_eq!(
raw_store.get_blob_bytes(&a.0).expect("get blob a"),
Some(a.1.clone())
);
assert_eq!(
raw_store.get_blob_bytes(&b.0).expect("get blob b"),
Some(b.1.clone())
);
assert!(raw_store.get_raw_entry(&a.0).expect("get raw a").is_none());
assert!(raw_store.get_raw_entry(&b.0).expect("get raw b").is_none());
}
#[test]
fn put_blob_bytes_batch_rejects_invalid_inputs() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store =
ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
let err = raw_store
.put_blob_bytes_batch(&[("zz".repeat(32), b"blob".to_vec())])
.expect_err("invalid hash should fail");
assert!(matches!(err, StorageError::InvalidData { .. }));
let err = raw_store
.put_blob_bytes_batch(&[(sha256_hex(b"blob"), Vec::new())])
.expect_err("empty bytes should fail");
assert!(matches!(err, StorageError::InvalidData { .. }));
}
#[test]
fn external_raw_store_db_rejects_invalid_entry_on_put() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store =
ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
let bad = RawByHashEntry {
sha256_hex: "11".repeat(32),
bytes: b"blob".to_vec(),
origin_uris: Vec::new(),
object_type: None,
encoding: None,
};
let err = raw_store
.put_raw_entry(&bad)
.expect_err("invalid hash should fail");
assert!(matches!(err, StorageError::InvalidData { .. }));
}
#[test]
fn external_raw_store_db_reports_codec_error_for_corrupt_value() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store =
ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
raw_store
.db
.put(b"rawbyhash:deadbeef", b"not-cbor")
.expect("inject corrupt bytes");
let err = raw_store
.get_raw_entry("deadbeef")
.expect_err("corrupt value should fail");
assert!(matches!(
err,
StorageError::Codec {
entity: "raw_by_hash",
..
}
));
}
#[test]
fn external_raw_store_db_batch_returns_empty_for_empty_request() {
let td = tempfile::tempdir().expect("tempdir");
let raw_store =
ExternalRawStoreDb::open(td.path().join("raw-store.db")).expect("open raw store");
let entries = raw_store
.get_raw_entries_batch(&[])
.expect("empty batch succeeds");
assert!(entries.is_empty());
raw_store
.put_raw_entries_batch(&[])
.expect("empty put succeeds");
}
#[test]
fn external_repo_bytes_db_roundtrips_blob_bytes_without_raw_entry() {
let td = tempfile::tempdir().expect("tempdir");
let repo_bytes =
ExternalRepoBytesDb::open(td.path().join("repo-bytes.db")).expect("open repo bytes");
let bytes = b"repo-bytes-object".to_vec();
let hash = sha256_hex(&bytes);
repo_bytes
.put_blob_bytes_batch(&[(hash.clone(), bytes.clone())])
.expect("put repo bytes");
assert_eq!(
repo_bytes.get_blob_bytes(&hash).expect("get repo bytes"),
Some(bytes.clone())
);
assert_eq!(
repo_bytes
.get_blob_bytes_batch(&[hash, "00".repeat(32)])
.expect("get repo bytes batch"),
vec![Some(bytes), None]
);
}
#[test]
fn external_repo_bytes_db_rejects_invalid_inputs() {
let td = tempfile::tempdir().expect("tempdir");
let repo_bytes =
ExternalRepoBytesDb::open(td.path().join("repo-bytes.db")).expect("open repo bytes");
assert!(
repo_bytes
.put_blob_bytes_batch(&[("not-a-valid-hash".to_string(), b"blob".to_vec())])
.is_err()
);
assert!(
repo_bytes
.put_blob_bytes_batch(&[(sha256_hex(b"blob"), Vec::new())])
.is_err()
);
assert!(repo_bytes.get_blob_bytes("not-a-valid-hash").is_err());
}
}