rpki/src/fetch/rsync_system.rs
2026-02-09 19:35:54 +08:00

143 lines
3.9 KiB
Rust

use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::Duration;
use uuid::Uuid;
use crate::fetch::rsync::{RsyncFetchError, RsyncFetchResult, RsyncFetcher};
#[derive(Clone, Debug)]
pub struct SystemRsyncConfig {
pub rsync_bin: PathBuf,
pub timeout: Duration,
pub extra_args: Vec<String>,
}
impl Default for SystemRsyncConfig {
fn default() -> Self {
Self {
rsync_bin: PathBuf::from("rsync"),
timeout: Duration::from_secs(60),
extra_args: Vec::new(),
}
}
}
/// A `RsyncFetcher` implementation backed by the system `rsync` binary.
///
/// This is intended for live stage2 runs. For unit tests and offline fixtures,
/// prefer `LocalDirRsyncFetcher`.
#[derive(Clone, Debug)]
pub struct SystemRsyncFetcher {
config: SystemRsyncConfig,
}
impl SystemRsyncFetcher {
pub fn new(config: SystemRsyncConfig) -> Self {
Self { config }
}
fn run_rsync(&self, src: &str, dst: &Path) -> Result<(), String> {
// `--timeout` is I/O timeout in seconds (applies to network reads/writes).
let timeout_secs = self.config.timeout.as_secs().max(1).to_string();
let mut cmd = Command::new(&self.config.rsync_bin);
cmd.arg("-rt")
.arg("--delete")
.arg("--timeout")
.arg(timeout_secs)
.args(&self.config.extra_args)
.arg(src)
.arg(dst);
let out = cmd.output().map_err(|e| format!("rsync spawn failed: {e}"))?;
if !out.status.success() {
let stderr = String::from_utf8_lossy(&out.stderr);
let stdout = String::from_utf8_lossy(&out.stdout);
return Err(format!(
"rsync failed: status={} stdout={} stderr={}",
out.status,
stdout.trim(),
stderr.trim()
));
}
Ok(())
}
}
impl RsyncFetcher for SystemRsyncFetcher {
fn fetch_objects(&self, rsync_base_uri: &str) -> RsyncFetchResult<Vec<(String, Vec<u8>)>> {
let base = normalize_rsync_base_uri(rsync_base_uri);
let tmp = TempDir::new().map_err(|e| RsyncFetchError::Fetch(e.to_string()))?;
self.run_rsync(&base, tmp.path())
.map_err(RsyncFetchError::Fetch)?;
let mut out = Vec::new();
walk_dir_collect(tmp.path(), tmp.path(), &base, &mut out)
.map_err(RsyncFetchError::Fetch)?;
Ok(out)
}
}
struct TempDir {
path: PathBuf,
}
impl TempDir {
fn new() -> Result<Self, String> {
let mut p = std::env::temp_dir();
p.push(format!("rpki-system-rsync-{}", Uuid::new_v4()));
std::fs::create_dir_all(&p).map_err(|e| e.to_string())?;
Ok(Self { path: p })
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
fn normalize_rsync_base_uri(s: &str) -> String {
if s.ends_with('/') {
s.to_string()
} else {
format!("{s}/")
}
}
fn walk_dir_collect(
root: &Path,
current: &Path,
rsync_base_uri: &str,
out: &mut Vec<(String, Vec<u8>)>,
) -> Result<(), String> {
let rd = std::fs::read_dir(current).map_err(|e| e.to_string())?;
for entry in rd {
let entry = entry.map_err(|e| e.to_string())?;
let path = entry.path();
let meta = entry.metadata().map_err(|e| e.to_string())?;
if meta.is_dir() {
walk_dir_collect(root, &path, rsync_base_uri, out)?;
continue;
}
if !meta.is_file() {
continue;
}
let rel = path
.strip_prefix(root)
.map_err(|e| e.to_string())?
.to_string_lossy()
.replace('\\', "/");
let uri = format!("{rsync_base_uri}{rel}");
let bytes = std::fs::read(&path).map_err(|e| e.to_string())?;
out.push((uri, bytes));
}
Ok(())
}