From 615f8709af1fa8a839b7096a055c399c7cf5b058 Mon Sep 17 00:00:00 2001 From: yuyr Date: Fri, 15 May 2026 17:17:41 +0800 Subject: [PATCH] =?UTF-8?q?20260514=20rsync=E9=BB=98=E8=AE=A4=E9=99=90?= =?UTF-8?q?=E5=AE=9A=E5=8F=91=E5=B8=83=E7=82=B9scope?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/soak/portable-soak.env.example | 4 ++ scripts/soak/run_soak.sh | 98 +++++++++++++++++++++++++- src/cli.rs | 43 ++++++++++- src/fetch/rsync_system.rs | 61 ++++++++++++++-- 4 files changed, 196 insertions(+), 10 deletions(-) diff --git a/scripts/soak/portable-soak.env.example b/scripts/soak/portable-soak.env.example index 447b1fe..6748667 100644 --- a/scripts/soak/portable-soak.env.example +++ b/scripts/soak/portable-soak.env.example @@ -24,6 +24,10 @@ OUTPUT_COMPACT_REPORT=1 # 是否复用持久 rsync mirror。1 表示跨 run 复用;失败隔离数据库时也不会清理 mirror。 ALLOW_RSYNC_MIRROR_REUSE=1 +# rsync 同步 scope。 +# publication-point 表示默认只同步当前发布点;module-root 表示扩大到 rsync module 根目录。 +RSYNC_SCOPE=publication-point + # 前一轮失败或不完整时,是否隔离旧数据库和运行态目录后强制下一轮 snapshot。 # 建议保持 1;设置为 0 时,检测到前一轮失败会直接停止。 FAILURE_SNAPSHOT_RESET=1 diff --git a/scripts/soak/run_soak.sh b/scripts/soak/run_soak.sh index 96fd0f0..fc564cb 100755 --- a/scripts/soak/run_soak.sh +++ b/scripts/soak/run_soak.sh @@ -17,6 +17,7 @@ RUN_ROOT="${RUN_ROOT:-$PACKAGE_ROOT}" RETAIN_RUNS="${RETAIN_RUNS:-10}" OUTPUT_COMPACT_REPORT="${OUTPUT_COMPACT_REPORT:-1}" ALLOW_RSYNC_MIRROR_REUSE="${ALLOW_RSYNC_MIRROR_REUSE:-1}" +RSYNC_SCOPE="${RSYNC_SCOPE:-publication-point}" FAILURE_SNAPSHOT_RESET="${FAILURE_SNAPSHOT_RESET:-1}" DB_STATS_EXACT_EVERY="${DB_STATS_EXACT_EVERY:-3}" RPKI_PROGRESS_LOG="${RPKI_PROGRESS_LOG:-1}" @@ -76,6 +77,16 @@ validate_non_negative_int() { [[ "$value" =~ ^[0-9]+$ ]] || die "$name must be an integer: $value" } +validate_rsync_scope() { + case "$RSYNC_SCOPE" in + publication-point|module-root) + ;; + *) + die "RSYNC_SCOPE must be publication-point or module-root: $RSYNC_SCOPE" + ;; + esac +} + normalize_token() { local token="$1" token="${token#"${token%%[![:space:]]*}"}" @@ -335,6 +346,7 @@ build_child_args() { CHILD_ARGS=( --db "$DB_DIR/work-db" --repo-bytes-db "$DB_DIR/repo-bytes.db" + --rsync-scope "$RSYNC_SCOPE" ) if is_true "$ALLOW_RSYNC_MIRROR_REUSE"; then CHILD_ARGS+=(--rsync-mirror-root "$RSYNC_MIRROR_ROOT") @@ -382,6 +394,8 @@ build_child_args() { copy_inner_run_outputs() { local daemon_state_root="$1" local run_dir="$2" + local outer_run_index="$3" + local outer_run_id="$4" local inner_run_dir inner_run_dir="$(find "$daemon_state_root/runs" -mindepth 1 -maxdepth 1 -type d 2>/dev/null | sort | tail -n 1 || true)" if [[ -n "$inner_run_dir" && -d "$inner_run_dir" ]]; then @@ -389,8 +403,85 @@ copy_inner_run_outputs() { cp -a "$inner_run_dir"/. "$run_dir"/ shopt -u dotglob nullglob fi - [[ -f "$daemon_state_root/daemon-status.json" ]] && cp "$daemon_state_root/daemon-status.json" "$run_dir/daemon-status.json" - [[ -f "$daemon_state_root/daemon-runs.jsonl" ]] && cp "$daemon_state_root/daemon-runs.jsonl" "$run_dir/daemon-runs.jsonl" + [[ -f "$daemon_state_root/daemon-status.json" ]] && cp "$daemon_state_root/daemon-status.json" "$run_dir/daemon-status.inner.json" + [[ -f "$daemon_state_root/daemon-runs.jsonl" ]] && cp "$daemon_state_root/daemon-runs.jsonl" "$run_dir/daemon-runs.inner.jsonl" + normalize_outer_run_metadata "$run_dir" "$outer_run_index" "$outer_run_id" "$inner_run_dir" "$daemon_state_root" +} + +normalize_outer_run_metadata() { + local run_dir="$1" + local outer_run_index="$2" + local outer_run_id="$3" + local inner_run_dir="$4" + local daemon_state_root="$5" + python3 - "$run_dir" "$outer_run_index" "$outer_run_id" "$inner_run_dir" "$daemon_state_root" <<'PY' +import json +import pathlib +import sys + +run_dir = pathlib.Path(sys.argv[1]).resolve() +outer_run_index = int(sys.argv[2]) +outer_run_id = sys.argv[3] +inner_run_dir = sys.argv[4] +daemon_state_root = pathlib.Path(sys.argv[5]) + +def replace_paths(value): + if isinstance(value, dict): + return {key: replace_paths(item) for key, item in value.items()} + if isinstance(value, list): + return [replace_paths(item) for item in value] + if isinstance(value, str) and inner_run_dir: + return value.replace(inner_run_dir, str(run_dir)) + return value + +def normalize_summary(summary): + summary = dict(summary) + summary.setdefault("innerRunSeq", summary.get("runSeq")) + summary.setdefault("innerRunId", summary.get("runId")) + summary.setdefault("innerRunDir", summary.get("runDir")) + summary = replace_paths(summary) + summary["runSeq"] = outer_run_index + summary["runId"] = outer_run_id + summary["runDir"] = str(run_dir) + return summary + +summary_path = run_dir / "run-summary.json" +if summary_path.exists(): + summary = json.loads(summary_path.read_text(encoding="utf-8")) + summary_path.write_text( + json.dumps(normalize_summary(summary), indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + +inner_status_path = run_dir / "daemon-status.inner.json" +if not inner_status_path.exists(): + raw_status_path = daemon_state_root / "daemon-status.json" + if raw_status_path.exists(): + inner_status_path.write_text(raw_status_path.read_text(encoding="utf-8"), encoding="utf-8") +if inner_status_path.exists(): + status = json.loads(inner_status_path.read_text(encoding="utf-8")) + status.setdefault("innerLastRunId", status.get("lastRunId")) + status["lastRunId"] = outer_run_id + status["outerRunId"] = outer_run_id + status["outerRunIndex"] = outer_run_index + (run_dir / "daemon-status.json").write_text( + json.dumps(status, indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + +inner_runs_path = run_dir / "daemon-runs.inner.jsonl" +if not inner_runs_path.exists(): + raw_runs_path = daemon_state_root / "daemon-runs.jsonl" + if raw_runs_path.exists(): + inner_runs_path.write_text(raw_runs_path.read_text(encoding="utf-8"), encoding="utf-8") +if inner_runs_path.exists(): + lines = [] + for line in inner_runs_path.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + lines.append(json.dumps(normalize_summary(json.loads(line)), sort_keys=True)) + (run_dir / "daemon-runs.jsonl").write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8") +PY } apply_outer_retention() { @@ -459,7 +550,7 @@ run_one_round() { daemon_exit_code=$? set -e - copy_inner_run_outputs "$daemon_state_root" "$run_dir" + copy_inner_run_outputs "$daemon_state_root" "$run_dir" "$run_index" "$run_id" completed_at="$(date -u +%Y-%m-%dT%H:%M:%SZ)" summary_state="$(summary_status "$run_dir/run-summary.json")" local final_status="failed" @@ -485,6 +576,7 @@ main() { validate_positive_int "MAX_RUNS" "$MAX_RUNS" validate_non_negative_int "INTERVAL_SECS" "$INTERVAL_SECS" validate_positive_int "RETAIN_RUNS" "$RETAIN_RUNS" + validate_rsync_scope if [[ -n "${DB_STATS_EXACT_EVERY:-}" && "$DB_STATS_EXACT_EVERY" != "0" ]]; then validate_positive_int "DB_STATS_EXACT_EVERY" "$DB_STATS_EXACT_EVERY" fi diff --git a/src/cli.rs b/src/cli.rs index 0a8944d..a6ed0fd 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -13,7 +13,7 @@ use crate::audit::{ use crate::ccr::canonical_vrp_prefix; use crate::fetch::http::{BlockingHttpFetcher, HttpFetcherConfig}; use crate::fetch::rsync::LocalDirRsyncFetcher; -use crate::fetch::rsync_system::{SystemRsyncConfig, SystemRsyncFetcher}; +use crate::fetch::rsync_system::{RsyncScopePolicy, SystemRsyncConfig, SystemRsyncFetcher}; use crate::parallel::config::{ParallelPhase1Config, ParallelPhase2Config}; use crate::parallel::types::TalInputSpec; use crate::policy::{Policy, StrictPolicy}; @@ -99,6 +99,7 @@ pub struct CliArgs { pub http_timeout_secs: u64, pub rsync_timeout_secs: u64, pub rsync_mirror_root: Option, + pub rsync_scope_policy: RsyncScopePolicy, pub max_depth: Option, pub max_instances: Option, @@ -175,6 +176,7 @@ Options: --http-timeout-secs HTTP fetch timeout seconds (default: 20) --rsync-timeout-secs rsync I/O timeout seconds (default: 60) --rsync-mirror-root Persist rsync mirrors under this directory (default: disabled) + --rsync-scope rsync scope policy: publication-point or module-root (default: publication-point) --max-depth Max CA instance depth (0 = root only) --max-instances Max number of CA instances to process --validation-time Validation time in RFC3339 (default: now UTC) @@ -225,6 +227,7 @@ pub fn parse_args(argv: &[String]) -> Result { let mut http_timeout_secs: u64 = 30; let mut rsync_timeout_secs: u64 = 30; let mut rsync_mirror_root: Option = None; + let mut rsync_scope_policy = RsyncScopePolicy::default(); let mut max_depth: Option = None; let mut max_instances: Option = None; let mut validation_time: Option = None; @@ -525,6 +528,11 @@ pub fn parse_args(argv: &[String]) -> Result { let v = argv.get(i).ok_or("--rsync-mirror-root requires a value")?; rsync_mirror_root = Some(PathBuf::from(v)); } + "--rsync-scope" => { + i += 1; + let v = argv.get(i).ok_or("--rsync-scope requires a value")?; + rsync_scope_policy = RsyncScopePolicy::parse_cli_value(v)?; + } "--max-depth" => { i += 1; let v = argv.get(i).ok_or("--max-depth requires a value")?; @@ -838,6 +846,7 @@ pub fn parse_args(argv: &[String]) -> Result { http_timeout_secs, rsync_timeout_secs, rsync_mirror_root, + rsync_scope_policy, max_depth, max_instances, validation_time, @@ -1712,6 +1721,7 @@ pub fn run(argv: &[String]) -> Result<(), String> { .unwrap_or_else(|| PathBuf::from("rsync")), timeout: std::time::Duration::from_secs(args.rsync_timeout_secs.max(1)), mirror_root: args.rsync_mirror_root.clone(), + scope_policy: args.rsync_scope_policy, ..SystemRsyncConfig::default() }); run_online_validation_with_fetchers( @@ -1990,6 +2000,7 @@ mod tests { assert!(err.contains("Usage:"), "{err}"); assert!(err.contains("--db"), "{err}"); assert!(err.contains("--rsync-mirror-root"), "{err}"); + assert!(err.contains("--rsync-scope"), "{err}"); assert!(err.contains("--parallel-phase2-object-workers"), "{err}"); assert!(!err.contains("--parallel-phase1"), "{err}"); assert!(!err.contains("--parallel-phase2 "), "{err}"); @@ -2084,6 +2095,36 @@ mod tests { assert!(args.report_json_compact); } + #[test] + fn parse_accepts_rsync_scope_policy() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/x.tal".to_string(), + "--rsync-scope".to_string(), + "module-root".to_string(), + ]; + let args = parse_args(&argv).expect("parse args"); + assert_eq!(args.rsync_scope_policy, RsyncScopePolicy::ModuleRoot); + } + + #[test] + fn parse_rejects_invalid_rsync_scope_policy() { + let argv = vec![ + "rpki".to_string(), + "--db".to_string(), + "db".to_string(), + "--tal-url".to_string(), + "https://example.test/x.tal".to_string(), + "--rsync-scope".to_string(), + "wide".to_string(), + ]; + let err = parse_args(&argv).expect_err("invalid rsync scope should fail"); + assert!(err.contains("invalid --rsync-scope"), "{err}"); + } + #[test] fn parse_accepts_strict_policy_list() { let argv = vec![ diff --git a/src/fetch/rsync_system.rs b/src/fetch/rsync_system.rs index 2f9407c..8433465 100644 --- a/src/fetch/rsync_system.rs +++ b/src/fetch/rsync_system.rs @@ -13,6 +13,30 @@ use crate::fetch::rsync::{ RsyncFetchError, RsyncFetchResult, RsyncFetcher, normalize_rsync_base_uri, }; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum RsyncScopePolicy { + PublicationPoint, + ModuleRoot, +} + +impl Default for RsyncScopePolicy { + fn default() -> Self { + Self::PublicationPoint + } +} + +impl RsyncScopePolicy { + pub fn parse_cli_value(value: &str) -> Result { + match value { + "publication-point" => Ok(Self::PublicationPoint), + "module-root" => Ok(Self::ModuleRoot), + _ => Err(format!( + "invalid --rsync-scope: {value}; expected publication-point or module-root" + )), + } + } +} + #[derive(Clone, Debug)] pub struct SystemRsyncConfig { pub rsync_bin: PathBuf, @@ -27,6 +51,7 @@ pub struct SystemRsyncConfig { /// /// Note: actual mirror behavior is implemented separately from config wiring. pub mirror_root: Option, + pub scope_policy: RsyncScopePolicy, } impl Default for SystemRsyncConfig { @@ -37,6 +62,7 @@ impl Default for SystemRsyncConfig { timeout: Duration::from_secs(30), extra_args: Vec::new(), mirror_root: None, + scope_policy: RsyncScopePolicy::default(), } } } @@ -268,9 +294,12 @@ impl SystemRsyncFetcher { } } - fn module_fetch_uri(&self, rsync_base_uri: &str) -> String { - rsync_module_root_uri(rsync_base_uri) - .unwrap_or_else(|| normalize_rsync_base_uri(rsync_base_uri)) + fn scope_fetch_uri(&self, rsync_base_uri: &str) -> String { + match self.config.scope_policy { + RsyncScopePolicy::PublicationPoint => normalize_rsync_base_uri(rsync_base_uri), + RsyncScopePolicy::ModuleRoot => rsync_module_root_uri(rsync_base_uri) + .unwrap_or_else(|| normalize_rsync_base_uri(rsync_base_uri)), + } } } @@ -289,7 +318,7 @@ impl RsyncFetcher for SystemRsyncFetcher { rsync_base_uri: &str, visitor: &mut dyn FnMut(String, Vec) -> Result<(), String>, ) -> RsyncFetchResult<(usize, u64)> { - let base = self.module_fetch_uri(rsync_base_uri); + let base = self.scope_fetch_uri(rsync_base_uri); let mut count = 0usize; let mut bytes_total = 0u64; let mut wrapped = |uri: String, bytes: Vec| -> Result<(), String> { @@ -317,7 +346,7 @@ impl RsyncFetcher for SystemRsyncFetcher { } fn dedup_key(&self, rsync_base_uri: &str) -> String { - self.module_fetch_uri(rsync_base_uri) + self.scope_fetch_uri(rsync_base_uri) } } @@ -506,8 +535,20 @@ mod tests { } #[test] - fn system_rsync_dedup_key_uses_module_root() { + fn system_rsync_dedup_key_uses_publication_point_scope_by_default() { let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig::default()); + assert_eq!( + fetcher.dedup_key("rsync://example.net/repo/ta/ca/publication-point/"), + "rsync://example.net/repo/ta/ca/publication-point/" + ); + } + + #[test] + fn system_rsync_dedup_key_uses_module_root_when_configured() { + let fetcher = SystemRsyncFetcher::new(SystemRsyncConfig { + scope_policy: RsyncScopePolicy::ModuleRoot, + ..SystemRsyncConfig::default() + }); assert_eq!( fetcher.dedup_key("rsync://example.net/repo/ta/ca/publication-point/"), "rsync://example.net/repo/" @@ -525,6 +566,7 @@ mod tests { timeout: Duration::from_secs(1), extra_args: Vec::new(), mirror_root: None, + scope_policy: RsyncScopePolicy::default(), }); let e = f .run_rsync("rsync://example.net/repo/", dst.path()) @@ -538,6 +580,7 @@ mod tests { timeout: Duration::from_secs(1), extra_args: Vec::new(), mirror_root: None, + scope_policy: RsyncScopePolicy::default(), }); let e = f .run_rsync("rsync://example.net/repo/", dst.path()) @@ -558,6 +601,7 @@ mod tests { timeout: Duration::from_secs(1), extra_args: Vec::new(), mirror_root: Some(root_file.clone()), + scope_policy: RsyncScopePolicy::default(), }); let err = fetcher @@ -585,6 +629,7 @@ mod tests { timeout: Duration::from_secs(1), extra_args: Vec::new(), mirror_root: Some(root.clone()), + scope_policy: RsyncScopePolicy::default(), }); let err = fetcher @@ -645,6 +690,7 @@ mod tests { timeout: Duration::from_secs(60), extra_args: Vec::new(), mirror_root: None, + scope_policy: RsyncScopePolicy::default(), }); fetcher @@ -683,6 +729,7 @@ mod tests { timeout: Duration::from_secs(60), extra_args: Vec::new(), mirror_root: None, + scope_policy: RsyncScopePolicy::default(), }); let err = fetcher @@ -726,6 +773,7 @@ mod tests { timeout: Duration::from_secs(60), extra_args: Vec::new(), mirror_root: None, + scope_policy: RsyncScopePolicy::default(), }); let err = fetcher @@ -771,6 +819,7 @@ mod tests { timeout: Duration::from_secs(30), extra_args: Vec::new(), mirror_root: None, + scope_policy: RsyncScopePolicy::default(), }); fetcher