rpki/src/fetch/rsync_system.rs

834 lines
29 KiB
Rust

use std::cell::RefCell;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::process::Stdio;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use sha2::Digest;
use uuid::Uuid;
use crate::fetch::rsync::{
RsyncFetchError, RsyncFetchResult, RsyncFetcher, normalize_rsync_base_uri,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RsyncScopePolicy {
PublicationPoint,
ModuleRoot,
}
impl Default for RsyncScopePolicy {
fn default() -> Self {
Self::PublicationPoint
}
}
impl RsyncScopePolicy {
pub fn parse_cli_value(value: &str) -> Result<Self, String> {
match value {
"publication-point" => Ok(Self::PublicationPoint),
"module-root" => Ok(Self::ModuleRoot),
_ => Err(format!(
"invalid --rsync-scope: {value}; expected publication-point or module-root"
)),
}
}
}
#[derive(Clone, Debug)]
pub struct SystemRsyncConfig {
pub rsync_bin: PathBuf,
pub connect_timeout: Duration,
pub timeout: Duration,
pub extra_args: Vec<String>,
/// Optional root directory for persistent rsync mirrors.
///
/// When set, callers may choose to sync into stable subdirectories under this
/// root (instead of a temporary directory) to benefit from rsync's incremental
/// behavior across runs.
///
/// Note: actual mirror behavior is implemented separately from config wiring.
pub mirror_root: Option<PathBuf>,
pub scope_policy: RsyncScopePolicy,
}
impl Default for SystemRsyncConfig {
fn default() -> Self {
Self {
rsync_bin: PathBuf::from("rsync"),
connect_timeout: Duration::from_secs(15),
timeout: Duration::from_secs(30),
extra_args: Vec::new(),
mirror_root: None,
scope_policy: RsyncScopePolicy::default(),
}
}
}
/// A `RsyncFetcher` implementation backed by the system `rsync` binary.
///
/// This is intended for live stage2 runs. For unit tests and offline fixtures,
/// prefer `LocalDirRsyncFetcher`.
#[derive(Clone, Debug)]
pub struct SystemRsyncFetcher {
config: SystemRsyncConfig,
}
thread_local! {
static RSYNC_TIMEOUT_OVERRIDE: RefCell<Option<Duration>> = const { RefCell::new(None) };
static RSYNC_FAIL_FAST_PROFILE: RefCell<Option<RsyncFailFastProfile>> = const { RefCell::new(None) };
}
pub fn with_scoped_rsync_timeout_override<R>(timeout: Duration, f: impl FnOnce() -> R) -> R {
RSYNC_TIMEOUT_OVERRIDE.with(|cell| {
let previous = cell.replace(Some(timeout));
let result = f();
let _ = cell.replace(previous);
result
})
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RsyncFailFastProfile {
pub initial_wall_clock_timeout: Duration,
pub max_wall_clock_timeout: Duration,
pub max_attempts: usize,
}
pub fn with_scoped_rsync_fail_fast_profile<R>(
profile: RsyncFailFastProfile,
f: impl FnOnce() -> R,
) -> R {
RSYNC_FAIL_FAST_PROFILE.with(|cell| {
let previous = cell.replace(Some(profile));
let result = f();
let _ = cell.replace(previous);
result
})
}
impl SystemRsyncFetcher {
pub fn new(config: SystemRsyncConfig) -> Self {
Self { config }
}
fn mirror_dst_dir(&self, normalized_rsync_base_uri: &str) -> Result<Option<PathBuf>, String> {
let Some(root) = self.config.mirror_root.as_ref() else {
return Ok(None);
};
std::fs::create_dir_all(root)
.map_err(|e| format!("create rsync mirror root failed: {}: {e}", root.display()))?;
let hash = hex::encode(sha2::Sha256::digest(normalized_rsync_base_uri.as_bytes()));
let dir = root.join(hash);
std::fs::create_dir_all(&dir).map_err(|e| {
format!(
"create rsync mirror directory failed: {}: {e}",
dir.display()
)
})?;
Ok(Some(dir))
}
fn run_rsync(&self, src: &str, dst: &Path) -> Result<(), String> {
let fail_fast = RSYNC_FAIL_FAST_PROFILE.with(|cell| *cell.borrow());
if let Some(profile) = fail_fast {
return self.run_rsync_fail_fast(src, dst, profile);
}
self.run_rsync_once(src, dst, None, false)
}
fn run_rsync_once(
&self,
src: &str,
dst: &Path,
wall_clock_timeout: Option<Duration>,
keep_partial: bool,
) -> Result<(), String> {
// `--timeout` is I/O timeout in seconds (applies to network reads/writes).
let timeout =
RSYNC_TIMEOUT_OVERRIDE.with(|cell| cell.borrow().unwrap_or(self.config.timeout));
let connect_timeout_secs = self.config.connect_timeout.as_secs().max(1).to_string();
let timeout_secs = timeout.as_secs().max(1).to_string();
let is_remote_rsync = src.starts_with("rsync://");
let mut cmd = Command::new(&self.config.rsync_bin);
cmd.arg("-rt")
.arg("--delete")
.arg("--timeout")
.arg(timeout_secs)
.args(&self.config.extra_args);
if is_remote_rsync {
cmd.arg("--contimeout").arg(connect_timeout_secs);
}
if keep_partial {
cmd.arg("--partial");
}
cmd.arg(src)
.arg(dst)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = cmd
.spawn()
.map_err(|e| format!("rsync spawn failed: {e}"))?;
if let Some(limit) = wall_clock_timeout {
let started = Instant::now();
loop {
match child
.try_wait()
.map_err(|e| format!("rsync wait failed: {e}"))?
{
Some(_status) => {
let out = child
.wait_with_output()
.map_err(|e| format!("rsync wait_with_output failed: {e}"))?;
if out.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&out.stderr);
let stdout = String::from_utf8_lossy(&out.stdout);
return Err(format!(
"rsync failed: status={} stdout={} stderr={}",
out.status,
stdout.trim(),
stderr.trim()
));
}
None => {
if started.elapsed() >= limit {
let _ = child.kill();
let out = child
.wait_with_output()
.map_err(|e| format!("rsync wait_with_output failed: {e}"))?;
let stderr = String::from_utf8_lossy(&out.stderr);
let stdout = String::from_utf8_lossy(&out.stdout);
return Err(format!(
"rsync wall-clock timeout after {}s: stdout={} stderr={}",
limit.as_secs(),
stdout.trim(),
stderr.trim()
));
}
thread::sleep(Duration::from_millis(100));
}
}
}
}
let out = child
.wait_with_output()
.map_err(|e| format!("rsync wait_with_output failed: {e}"))?;
if out.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&out.stderr);
let stdout = String::from_utf8_lossy(&out.stdout);
Err(format!(
"rsync failed: status={} stdout={} stderr={}",
out.status,
stdout.trim(),
stderr.trim()
))
}
fn run_rsync_fail_fast(
&self,
src: &str,
dst: &Path,
profile: RsyncFailFastProfile,
) -> Result<(), String> {
let mut attempt = 0usize;
let mut timeout = profile.initial_wall_clock_timeout;
let mut previous_progress = (0usize, 0u64);
let mut zero_progress_attempts = 0usize;
let max_timeout = std::cmp::max(
profile.max_wall_clock_timeout,
profile.initial_wall_clock_timeout,
);
loop {
attempt += 1;
match self.run_rsync_once(src, dst, Some(timeout), true) {
Ok(()) => return Ok(()),
Err(err) => {
if is_hard_fail_rsync_error(&err) {
return Err(format!(
"rsync fail-fast hard-fail on attempt {}: {}",
attempt, err
));
}
if !err.contains("wall-clock timeout") {
return Err(err);
}
let progress = dir_progress(dst)
.map_err(|e| format!("rsync fail-fast progress stat failed: {e}"))?;
if progress == (0, 0) {
zero_progress_attempts += 1;
if zero_progress_attempts >= 2 || attempt >= profile.max_attempts {
return Err(format!(
"rsync fail-fast gave up after {} attempts with no progress: {}",
attempt, err
));
}
} else if progress == previous_progress {
return Err(format!(
"rsync fail-fast gave up after {} attempts with no additional progress: {}",
attempt, err
));
} else {
previous_progress = progress;
}
if attempt >= profile.max_attempts {
return Err(format!(
"rsync fail-fast exhausted {} attempts: {}",
profile.max_attempts, err
));
}
timeout = std::cmp::min(timeout.saturating_mul(2), max_timeout);
}
}
}
}
fn scope_fetch_uri(&self, rsync_base_uri: &str) -> String {
match self.config.scope_policy {
RsyncScopePolicy::PublicationPoint => normalize_rsync_base_uri(rsync_base_uri),
RsyncScopePolicy::ModuleRoot => rsync_module_root_uri(rsync_base_uri)
.unwrap_or_else(|| normalize_rsync_base_uri(rsync_base_uri)),
}
}
}
impl RsyncFetcher for SystemRsyncFetcher {
fn fetch_objects(&self, rsync_base_uri: &str) -> RsyncFetchResult<Vec<(String, Vec<u8>)>> {
let mut out = Vec::new();
self.visit_objects(rsync_base_uri, &mut |uri, bytes| {
out.push((uri, bytes));
Ok(())
})?;
Ok(out)
}
fn visit_objects(
&self,
rsync_base_uri: &str,
visitor: &mut dyn FnMut(String, Vec<u8>) -> Result<(), String>,
) -> RsyncFetchResult<(usize, u64)> {
let base = self.scope_fetch_uri(rsync_base_uri);
let mut count = 0usize;
let mut bytes_total = 0u64;
let mut wrapped = |uri: String, bytes: Vec<u8>| -> Result<(), String> {
bytes_total += bytes.len() as u64;
count += 1;
visitor(uri, bytes)
};
if let Some(dst) = self
.mirror_dst_dir(&base)
.map_err(|e| RsyncFetchError::Fetch(e.to_string()))?
{
self.run_rsync(&base, &dst)
.map_err(RsyncFetchError::Fetch)?;
walk_dir_visit(&dst, &dst, &base, &mut wrapped).map_err(RsyncFetchError::Fetch)?;
return Ok((count, bytes_total));
}
let tmp = TempDir::new().map_err(|e| RsyncFetchError::Fetch(e.to_string()))?;
self.run_rsync(&base, tmp.path())
.map_err(RsyncFetchError::Fetch)?;
walk_dir_visit(tmp.path(), tmp.path(), &base, &mut wrapped)
.map_err(RsyncFetchError::Fetch)?;
Ok((count, bytes_total))
}
fn dedup_key(&self, rsync_base_uri: &str) -> String {
self.scope_fetch_uri(rsync_base_uri)
}
}
struct TempDir {
path: PathBuf,
}
impl TempDir {
fn new() -> Result<Self, String> {
let mut p = std::env::temp_dir();
p.push(format!("rpki-system-rsync-{}", Uuid::new_v4()));
std::fs::create_dir_all(&p).map_err(|e| e.to_string())?;
Ok(Self { path: p })
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
fn rsync_module_root_uri(s: &str) -> Option<String> {
let normalized = normalize_rsync_base_uri(s);
let rest = normalized.strip_prefix("rsync://")?;
let mut host_and_path = rest.splitn(2, '/');
let authority = host_and_path.next()?;
let path = host_and_path.next()?;
let mut segments: Vec<&str> = path
.split('/')
.filter(|segment| !segment.is_empty())
.collect();
if segments.is_empty() {
return None;
}
let module = segments.remove(0);
Some(format!("rsync://{authority}/{module}/"))
}
fn walk_dir_collect(
root: &Path,
current: &Path,
rsync_base_uri: &str,
out: &mut Vec<(String, Vec<u8>)>,
) -> Result<(), String> {
let rd = std::fs::read_dir(current).map_err(|e| e.to_string())?;
for entry in rd {
let entry = entry.map_err(|e| e.to_string())?;
let path = entry.path();
let meta = entry.metadata().map_err(|e| e.to_string())?;
if meta.is_dir() {
walk_dir_collect(root, &path, rsync_base_uri, out)?;
continue;
}
if !meta.is_file() {
continue;
}
let rel = path
.strip_prefix(root)
.map_err(|e| e.to_string())?
.to_string_lossy()
.replace('\\', "/");
let uri = format!("{rsync_base_uri}{rel}");
let bytes = std::fs::read(&path).map_err(|e| e.to_string())?;
out.push((uri, bytes));
}
Ok(())
}
fn dir_progress(root: &Path) -> Result<(usize, u64), String> {
if !root.exists() {
return Ok((0, 0));
}
let mut files = 0usize;
let mut bytes = 0u64;
let mut stack = vec![root.to_path_buf()];
while let Some(path) = stack.pop() {
let rd = std::fs::read_dir(&path).map_err(|e| e.to_string())?;
for entry in rd {
let entry = entry.map_err(|e| e.to_string())?;
let path = entry.path();
let meta = entry.metadata().map_err(|e| e.to_string())?;
if meta.is_dir() {
stack.push(path);
} else if meta.is_file() {
files += 1;
bytes += meta.len();
}
}
}
Ok((files, bytes))
}
fn is_hard_fail_rsync_error(msg: &str) -> bool {
let lower = msg.to_ascii_lowercase();
lower.contains("no route to host")
|| lower.contains("network is unreachable")
|| lower.contains("connection refused")
|| lower.contains("name or service not known")
}
fn walk_dir_visit(
root: &Path,
current: &Path,
rsync_base_uri: &str,
visitor: &mut dyn FnMut(String, Vec<u8>) -> Result<(), String>,
) -> Result<(), String> {
let rd = std::fs::read_dir(current).map_err(|e| e.to_string())?;
for entry in rd {
let entry = entry.map_err(|e| e.to_string())?;
let path = entry.path();
let meta = entry.metadata().map_err(|e| e.to_string())?;
if meta.is_dir() {
walk_dir_visit(root, &path, rsync_base_uri, visitor)?;
continue;
}
if !meta.is_file() {
continue;
}
let rel = path
.strip_prefix(root)
.map_err(|e| e.to_string())?
.to_string_lossy()
.replace('\\', "/");
let uri = format!("{rsync_base_uri}{rel}");
let bytes = std::fs::read(&path).map_err(|e| e.to_string())?;
visitor(uri, bytes)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_rsync_base_uri_appends_slash_when_missing() {
assert_eq!(
normalize_rsync_base_uri("rsync://example.net/repo"),
"rsync://example.net/repo/".to_string()
);
assert_eq!(
normalize_rsync_base_uri("rsync://example.net/repo/"),
"rsync://example.net/repo/".to_string()
);
}
#[test]
fn walk_dir_collect_collects_files_and_normalizes_backslashes_in_uri() {
let temp = tempfile::tempdir().expect("tempdir");
let root = temp.path();
std::fs::create_dir_all(root.join("sub")).expect("mkdir");
std::fs::write(root.join("sub").join("a.cer"), b"x").expect("write");
std::fs::write(root.join("b\\c.mft"), b"y").expect("write backslash file");
let mut out: Vec<(String, Vec<u8>)> = Vec::new();
walk_dir_collect(root, root, "rsync://example.net/repo/", &mut out).expect("walk");
out.sort_by(|a, b| a.0.cmp(&b.0));
assert_eq!(out.len(), 2);
assert_eq!(out[0].0, "rsync://example.net/repo/b/c.mft");
assert_eq!(out[0].1, b"y");
assert_eq!(out[1].0, "rsync://example.net/repo/sub/a.cer");
assert_eq!(out[1].1, b"x");
}
#[test]
fn rsync_module_root_uri_returns_host_and_module_only() {
assert_eq!(
rsync_module_root_uri("rsync://example.net/repo/ta/ca/publication-point/"),
Some("rsync://example.net/repo/".to_string())
);
assert_eq!(
rsync_module_root_uri("rsync://example.net/repo/ta/"),
Some("rsync://example.net/repo/".to_string())
);
assert_eq!(
rsync_module_root_uri("rsync://example.net/repo/"),
Some("rsync://example.net/repo/".to_string())
);
assert_eq!(rsync_module_root_uri("https://example.net/repo"), None);
}
#[test]
fn system_rsync_dedup_key_uses_publication_point_scope_by_default() {
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig::default());
assert_eq!(
fetcher.dedup_key("rsync://example.net/repo/ta/ca/publication-point/"),
"rsync://example.net/repo/ta/ca/publication-point/"
);
}
#[test]
fn system_rsync_dedup_key_uses_module_root_when_configured() {
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
scope_policy: RsyncScopePolicy::ModuleRoot,
..SystemRsyncConfig::default()
});
assert_eq!(
fetcher.dedup_key("rsync://example.net/repo/ta/ca/publication-point/"),
"rsync://example.net/repo/"
);
}
#[test]
fn system_rsync_fetcher_reports_spawn_and_exit_errors() {
let dst = tempfile::tempdir().expect("tempdir");
// 1) Spawn error.
let f = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: PathBuf::from("/this/does/not/exist/rsync"),
connect_timeout: Duration::from_secs(1),
timeout: Duration::from_secs(1),
extra_args: Vec::new(),
mirror_root: None,
scope_policy: RsyncScopePolicy::default(),
});
let e = f
.run_rsync("rsync://example.net/repo/", dst.path())
.expect_err("spawn must fail");
assert!(e.contains("rsync spawn failed:"), "{e}");
// 2) Non-zero exit status.
let f = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: PathBuf::from("false"),
connect_timeout: Duration::from_secs(1),
timeout: Duration::from_secs(1),
extra_args: Vec::new(),
mirror_root: None,
scope_policy: RsyncScopePolicy::default(),
});
let e = f
.run_rsync("rsync://example.net/repo/", dst.path())
.expect_err("false must fail");
assert!(e.contains("rsync failed:"), "{e}");
assert!(e.contains("status="), "{e}");
}
#[test]
fn mirror_dst_dir_reports_root_creation_error() {
let temp = tempfile::tempdir().expect("tempdir");
let root_file = temp.path().join("mirror-root-file");
std::fs::write(&root_file, b"not a directory").expect("write root file");
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: PathBuf::from("rsync"),
connect_timeout: Duration::from_secs(1),
timeout: Duration::from_secs(1),
extra_args: Vec::new(),
mirror_root: Some(root_file.clone()),
scope_policy: RsyncScopePolicy::default(),
});
let err = fetcher
.mirror_dst_dir("rsync://example.net/repo/")
.expect_err("file mirror root must fail");
assert!(err.contains("create rsync mirror root failed"), "{err}");
assert!(err.contains(&root_file.display().to_string()), "{err}");
}
#[cfg(unix)]
#[test]
fn mirror_dst_dir_reports_directory_creation_error_inside_root() {
use std::os::unix::fs::PermissionsExt;
let temp = tempfile::tempdir().expect("tempdir");
let root = temp.path().join("mirror");
std::fs::create_dir_all(&root).expect("mkdir root");
let mut perms = std::fs::metadata(&root).expect("metadata").permissions();
perms.set_mode(0o555);
std::fs::set_permissions(&root, perms).expect("chmod root readonly");
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: PathBuf::from("rsync"),
connect_timeout: Duration::from_secs(1),
timeout: Duration::from_secs(1),
extra_args: Vec::new(),
mirror_root: Some(root.clone()),
scope_policy: RsyncScopePolicy::default(),
});
let err = fetcher
.mirror_dst_dir("rsync://example.net/repo/")
.expect_err("readonly mirror root must fail");
assert!(
err.contains("create rsync mirror directory failed"),
"{err}"
);
let mut perms = std::fs::metadata(&root).expect("metadata").permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&root, perms).expect("restore perms");
}
#[cfg(unix)]
#[test]
fn walk_dir_collect_ignores_non_file_entries() {
use std::os::unix::net::UnixListener;
let temp = tempfile::tempdir().expect("tempdir");
let root = temp.path();
std::fs::write(root.join("a.cer"), b"x").expect("write file");
let socket_path = root.join("skip.sock");
let _listener = UnixListener::bind(&socket_path).expect("bind socket");
let mut out: Vec<(String, Vec<u8>)> = Vec::new();
walk_dir_collect(root, root, "rsync://example.net/repo/", &mut out).expect("walk");
assert_eq!(out.len(), 1);
assert_eq!(out[0].0, "rsync://example.net/repo/a.cer");
}
#[cfg(unix)]
#[test]
fn rsync_fail_fast_retries_when_progress_is_made() {
use std::os::unix::fs::PermissionsExt;
let temp = tempfile::tempdir().expect("tempdir");
let script = temp.path().join("fake-rsync.sh");
let state = temp.path().join("state.txt");
std::fs::write(
&script,
format!(
"#!/usr/bin/env bash\nset -euo pipefail\nSTATE=\"{}\"\nDST=\"${{@: -1}}\"\nCOUNT=0\nif [[ -f \"$STATE\" ]]; then COUNT=$(cat \"$STATE\"); fi\nCOUNT=$((COUNT+1))\necho \"$COUNT\" > \"$STATE\"\nmkdir -p \"$DST\"\nif [[ \"$COUNT\" -eq 1 ]]; then\n echo first > \"$DST/part1\"\n sleep 2\nelse\n echo second > \"$DST/part2\"\nfi\n",
state.display()
),
)
.expect("write script");
let mut perms = std::fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script, perms).unwrap();
let dst = temp.path().join("dst");
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: script,
connect_timeout: Duration::from_secs(15),
timeout: Duration::from_secs(60),
extra_args: Vec::new(),
mirror_root: None,
scope_policy: RsyncScopePolicy::default(),
});
fetcher
.run_rsync_fail_fast(
"rsync://example.net/repo/",
&dst,
RsyncFailFastProfile {
initial_wall_clock_timeout: Duration::from_secs(1),
max_wall_clock_timeout: Duration::from_secs(4),
max_attempts: 3,
},
)
.expect("eventual success");
assert!(dst.join("part1").exists());
assert!(dst.join("part2").exists());
}
#[cfg(unix)]
#[test]
fn rsync_fail_fast_gives_up_after_two_zero_progress_timeouts() {
use std::os::unix::fs::PermissionsExt;
let temp = tempfile::tempdir().expect("tempdir");
let script = temp.path().join("fake-rsync.sh");
std::fs::write(&script, "#!/usr/bin/env bash\nset -euo pipefail\nsleep 5\n")
.expect("write script");
let mut perms = std::fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script, perms).unwrap();
let dst = temp.path().join("dst");
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: script,
connect_timeout: Duration::from_secs(15),
timeout: Duration::from_secs(60),
extra_args: Vec::new(),
mirror_root: None,
scope_policy: RsyncScopePolicy::default(),
});
let err = fetcher
.run_rsync_fail_fast(
"rsync://example.net/repo/",
&dst,
RsyncFailFastProfile {
initial_wall_clock_timeout: Duration::from_secs(1),
max_wall_clock_timeout: Duration::from_secs(2),
max_attempts: 4,
},
)
.expect_err("must fail");
assert!(err.contains("no progress"), "{err}");
}
#[cfg(unix)]
#[test]
fn rsync_fail_fast_hard_fail_stops_after_first_attempt() {
use std::os::unix::fs::PermissionsExt;
let temp = tempfile::tempdir().expect("tempdir");
let script = temp.path().join("fake-rsync.sh");
let state = temp.path().join("state.txt");
std::fs::write(
&script,
format!(
"#!/usr/bin/env bash\nset -euo pipefail\nSTATE=\"{}\"\nCOUNT=0\nif [[ -f \"$STATE\" ]]; then COUNT=$(cat \"$STATE\"); fi\nCOUNT=$((COUNT+1))\necho \"$COUNT\" > \"$STATE\"\necho 'rsync: [Receiver] failed to connect to host (1.2.3.4): Connection refused (111)' >&2\nexit 10\n",
state.display()
),
)
.expect("write script");
let mut perms = std::fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script, perms).unwrap();
let dst = temp.path().join("dst");
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: script,
connect_timeout: Duration::from_secs(15),
timeout: Duration::from_secs(60),
extra_args: Vec::new(),
mirror_root: None,
scope_policy: RsyncScopePolicy::default(),
});
let err = fetcher
.run_rsync_fail_fast(
"rsync://example.net/repo/",
&dst,
RsyncFailFastProfile {
initial_wall_clock_timeout: Duration::from_secs(10),
max_wall_clock_timeout: Duration::from_secs(80),
max_attempts: 4,
},
)
.expect_err("must hard fail");
assert!(err.contains("hard-fail"), "{err}");
let count = std::fs::read_to_string(&state).unwrap();
assert_eq!(count.trim(), "1");
}
#[cfg(unix)]
#[test]
fn run_rsync_once_passes_contimeout_and_timeout_args() {
use std::os::unix::fs::PermissionsExt;
let temp = tempfile::tempdir().expect("tempdir");
let script = temp.path().join("capture-rsync.sh");
let args_file = temp.path().join("args.txt");
std::fs::write(
&script,
format!(
"#!/usr/bin/env bash\nset -euo pipefail\nprintf '%s\\n' \"$@\" > \"{}\"\nDST=\"${{@: -1}}\"\nmkdir -p \"$DST\"\n",
args_file.display()
),
)
.expect("write script");
let mut perms = std::fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script, perms).unwrap();
let dst = temp.path().join("dst");
let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig {
rsync_bin: script,
connect_timeout: Duration::from_secs(15),
timeout: Duration::from_secs(30),
extra_args: Vec::new(),
mirror_root: None,
scope_policy: RsyncScopePolicy::default(),
});
fetcher
.run_rsync_once("rsync://example.net/repo/", &dst, None, false)
.expect("rsync");
let args = std::fs::read_to_string(&args_file).expect("read args");
assert!(args.contains("--contimeout\n15\n"), "{args}");
assert!(args.contains("--timeout\n30\n"), "{args}");
}
}