add lifecycle run state
This commit is contained in:
parent
0610a8291c
commit
860bad2113
@ -46,6 +46,9 @@ FAILURE_SNAPSHOT_RESET=1
|
|||||||
# 0: keep existing behavior.
|
# 0: keep existing behavior.
|
||||||
# 1: after one successful snapshot, allow at most N successful delta runs;
|
# 1: after one successful snapshot, allow at most N successful delta runs;
|
||||||
# the next run is forced to snapshot and active state/db is rebuilt from empty.
|
# the next run is forced to snapshot and active state/db is rebuilt from empty.
|
||||||
|
# Lifecycle run state is persisted independently at:
|
||||||
|
# ${HOST_DATA_DIR}/state/run-lifecycle-state.json
|
||||||
|
# It is not affected by run retention or state/db reset.
|
||||||
PERIODIC_SNAPSHOT_RESET=0
|
PERIODIC_SNAPSHOT_RESET=0
|
||||||
PERIODIC_SNAPSHOT_MAX_DELTAS=100
|
PERIODIC_SNAPSHOT_MAX_DELTAS=100
|
||||||
DB_STATS_EXACT_EVERY=0
|
DB_STATS_EXACT_EVERY=0
|
||||||
|
|||||||
@ -86,6 +86,8 @@ Semantics:
|
|||||||
- when enabled, one successful snapshot is followed by at most `N` successful delta runs;
|
- when enabled, one successful snapshot is followed by at most `N` successful delta runs;
|
||||||
- after the threshold is reached, the next run is forced to snapshot;
|
- after the threshold is reached, the next run is forced to snapshot;
|
||||||
- before that forced snapshot, only the active `state/db` is reset, while `runs/`, `logs/`, `state/rsync-mirror`, `.env`, and Prometheus/Grafana data are preserved;
|
- before that forced snapshot, only the active `state/db` is reset, while `runs/`, `logs/`, `state/rsync-mirror`, `.env`, and Prometheus/Grafana data are preserved;
|
||||||
|
- the counter is persisted independently in `HOST_DATA_DIR/state/run-lifecycle-state.json`, so it does not depend on retained `runs/` history;
|
||||||
|
- if that lifecycle file is corrupt, it is backed up as `run-lifecycle-state.json.corrupt.<timestamp>.<pid>` before best-effort bootstrap from retained runs;
|
||||||
- after a successful forced snapshot, the old DB staging is deleted so disk usage does not keep growing elsewhere.
|
- after a successful forced snapshot, the old DB staging is deleted so disk usage does not keep growing elsewhere.
|
||||||
|
|
||||||
Check the latest `run-meta.json` for:
|
Check the latest `run-meta.json` for:
|
||||||
|
|||||||
@ -86,6 +86,8 @@ PERIODIC_SNAPSHOT_MAX_DELTAS=100
|
|||||||
- 开启后,一次成功 snapshot 后最多连续执行 `N` 个成功 delta;
|
- 开启后,一次成功 snapshot 后最多连续执行 `N` 个成功 delta;
|
||||||
- 达到阈值后,下一轮强制跑 snapshot;
|
- 达到阈值后,下一轮强制跑 snapshot;
|
||||||
- 强制 snapshot 前只重置 active `state/db`,保留 `runs/`、`logs/`、`state/rsync-mirror`、`.env`、Prometheus/Grafana 数据;
|
- 强制 snapshot 前只重置 active `state/db`,保留 `runs/`、`logs/`、`state/rsync-mirror`、`.env`、Prometheus/Grafana 数据;
|
||||||
|
- 周期计数保存在独立 lifecycle 文件 `HOST_DATA_DIR/state/run-lifecycle-state.json`,不依赖 `runs/` 保留窗口;
|
||||||
|
- lifecycle 文件损坏时会先备份为 `run-lifecycle-state.json.corrupt.<timestamp>.<pid>`,再从当前保留 run 尽力 bootstrap;
|
||||||
- 强制 snapshot 成功后旧 DB staging 会被删除,避免磁盘只是换目录继续增长。
|
- 强制 snapshot 成功后旧 DB staging 会被删除,避免磁盘只是换目录继续增长。
|
||||||
|
|
||||||
可通过最新 `run-meta.json` 中的以下字段确认:
|
可通过最新 `run-meta.json` 中的以下字段确认:
|
||||||
|
|||||||
@ -84,6 +84,13 @@ Then confirm the latest `run-meta.json` contains:
|
|||||||
snapshot_reason=periodic_snapshot_delta_limit
|
snapshot_reason=periodic_snapshot_delta_limit
|
||||||
```
|
```
|
||||||
|
|
||||||
|
And inspect the independent lifecycle state:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
jq '{last_run,last_success_snapshot,successful_deltas_since_snapshot,state_health}' \
|
||||||
|
"${HOST_DATA_DIR}/state/run-lifecycle-state.json"
|
||||||
|
```
|
||||||
|
|
||||||
After validation, restore:
|
After validation, restore:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|||||||
@ -84,6 +84,13 @@ PERIODIC_SNAPSHOT_MAX_DELTAS=2
|
|||||||
snapshot_reason=periodic_snapshot_delta_limit
|
snapshot_reason=periodic_snapshot_delta_limit
|
||||||
```
|
```
|
||||||
|
|
||||||
|
并检查独立 lifecycle 状态:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
jq '{last_run,last_success_snapshot,successful_deltas_since_snapshot,state_health}' \
|
||||||
|
"${HOST_DATA_DIR}/state/run-lifecycle-state.json"
|
||||||
|
```
|
||||||
|
|
||||||
验证完成后恢复:
|
验证完成后恢复:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|||||||
@ -81,3 +81,19 @@ For a threshold-triggered reset you should see:
|
|||||||
- `sync_mode: "snapshot"`
|
- `sync_mode: "snapshot"`
|
||||||
- `snapshot_reason: "periodic_snapshot_delta_limit"`
|
- `snapshot_reason: "periodic_snapshot_delta_limit"`
|
||||||
- `periodic_snapshot_forced: true`
|
- `periodic_snapshot_forced: true`
|
||||||
|
|
||||||
|
To confirm delta counting still advances after retained runs are pruned, also inspect:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
jq '{last_success_snapshot,successful_deltas_since_snapshot,recent_runs_count:(.recent_runs|length)}' \
|
||||||
|
"${HOST_DATA_DIR}/state/run-lifecycle-state.json"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Lifecycle State File Is Corrupt
|
||||||
|
|
||||||
|
The script backs up the corrupt file before best-effort bootstrap from retained runs:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ls -1 "${HOST_DATA_DIR}/state"/run-lifecycle-state.json.corrupt.*
|
||||||
|
jq . "${HOST_DATA_DIR}/state/run-lifecycle-state.json"
|
||||||
|
```
|
||||||
|
|||||||
@ -81,3 +81,19 @@ jq '{run_id,sync_mode,snapshot_reason,periodic_snapshot_delta_count,periodic_sna
|
|||||||
- `sync_mode: "snapshot"`
|
- `sync_mode: "snapshot"`
|
||||||
- `snapshot_reason: "periodic_snapshot_delta_limit"`
|
- `snapshot_reason: "periodic_snapshot_delta_limit"`
|
||||||
- `periodic_snapshot_forced: true`
|
- `periodic_snapshot_forced: true`
|
||||||
|
|
||||||
|
如需确认 retain 裁剪后仍在累计 delta,可继续检查:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
jq '{last_success_snapshot,successful_deltas_since_snapshot,recent_runs_count:(.recent_runs|length)}' \
|
||||||
|
"${HOST_DATA_DIR}/state/run-lifecycle-state.json"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Lifecycle 状态文件损坏
|
||||||
|
|
||||||
|
脚本会先备份损坏文件,再从当前保留的 `runs/` 尽力 bootstrap:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ls -1 "${HOST_DATA_DIR}/state"/run-lifecycle-state.json.corrupt.*
|
||||||
|
jq . "${HOST_DATA_DIR}/state/run-lifecycle-state.json"
|
||||||
|
```
|
||||||
|
|||||||
@ -57,6 +57,12 @@ RSYNC_SCOPE=module-root
|
|||||||
# 建议保持 1;设置为 0 时,检测到前一轮失败会直接停止。
|
# 建议保持 1;设置为 0 时,检测到前一轮失败会直接停止。
|
||||||
FAILURE_SNAPSHOT_RESET=1
|
FAILURE_SNAPSHOT_RESET=1
|
||||||
|
|
||||||
|
# 周期性 snapshot reset 独立生命周期状态文件固定保存到:
|
||||||
|
# ${RUN_ROOT}/state/run-lifecycle-state.json
|
||||||
|
# 该文件不会被 RETAIN_RUNS 清理,也不会随 state/db reset 删除。
|
||||||
|
PERIODIC_SNAPSHOT_RESET=0
|
||||||
|
PERIODIC_SNAPSHOT_MAX_DELTAS=100
|
||||||
|
|
||||||
# 每隔多少轮执行一次 db_stats --exact。设置为空或 0 表示关闭 exact 统计。
|
# 每隔多少轮执行一次 db_stats --exact。设置为空或 0 表示关闭 exact 统计。
|
||||||
DB_STATS_EXACT_EVERY=3
|
DB_STATS_EXACT_EVERY=3
|
||||||
|
|
||||||
|
|||||||
@ -51,6 +51,8 @@ LIVE_TA_REFRESH_DIR="${LIVE_TA_REFRESH_DIR:-$META_DIR/live-ta-refresh}"
|
|||||||
LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS="${LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS:-15}"
|
LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS="${LIVE_TA_REFRESH_CONNECT_TIMEOUT_SECS:-15}"
|
||||||
LIVE_TA_REFRESH_MAX_TIME_SECS="${LIVE_TA_REFRESH_MAX_TIME_SECS:-120}"
|
LIVE_TA_REFRESH_MAX_TIME_SECS="${LIVE_TA_REFRESH_MAX_TIME_SECS:-120}"
|
||||||
LIVE_TA_REFRESH_BEFORE_SNAPSHOT="${LIVE_TA_REFRESH_BEFORE_SNAPSHOT:-1}"
|
LIVE_TA_REFRESH_BEFORE_SNAPSHOT="${LIVE_TA_REFRESH_BEFORE_SNAPSHOT:-1}"
|
||||||
|
RUN_LIFECYCLE_STATE_PATH="$STATE_ROOT/run-lifecycle-state.json"
|
||||||
|
RUN_LIFECYCLE_RECENT_RUNS_LIMIT=200
|
||||||
|
|
||||||
RPKI_BIN="$BIN_DIR/rpki"
|
RPKI_BIN="$BIN_DIR/rpki"
|
||||||
RPKI_DAEMON_BIN="$BIN_DIR/rpki_daemon"
|
RPKI_DAEMON_BIN="$BIN_DIR/rpki_daemon"
|
||||||
@ -507,64 +509,348 @@ isolate_state_after_failure() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
periodic_snapshot_delta_scan() {
|
periodic_snapshot_delta_scan() {
|
||||||
python3 - "$RUNS_ROOT" <<'PY'
|
local command="$1"
|
||||||
|
shift
|
||||||
|
python3 - "$command" "$RUN_LIFECYCLE_STATE_PATH" "$RUNS_ROOT" "$RUN_LIFECYCLE_RECENT_RUNS_LIMIT" "$@" <<'PY'
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
import shlex
|
||||||
import sys
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
runs_root = pathlib.Path(sys.argv[1])
|
command = sys.argv[1]
|
||||||
delta_count = 0
|
state_path = pathlib.Path(sys.argv[2])
|
||||||
run_dirs = sorted(
|
runs_root = pathlib.Path(sys.argv[3])
|
||||||
[
|
recent_limit = int(sys.argv[4])
|
||||||
path
|
extra_args = sys.argv[5:]
|
||||||
for path in runs_root.glob("run_*")
|
|
||||||
if path.is_dir()
|
|
||||||
and path.name.startswith("run_")
|
def now_rfc3339() -> str:
|
||||||
and path.name[4:].isdigit()
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
],
|
|
||||||
key=lambda path: int(path.name[4:]),
|
|
||||||
reverse=True,
|
def to_int(value):
|
||||||
)
|
if value is None or value == "":
|
||||||
for run_dir in run_dirs:
|
return None
|
||||||
|
if isinstance(value, bool):
|
||||||
|
return None
|
||||||
|
return int(value)
|
||||||
|
|
||||||
|
|
||||||
|
def to_bool(value):
|
||||||
|
if isinstance(value, bool):
|
||||||
|
return value
|
||||||
|
if value in (None, ""):
|
||||||
|
return None
|
||||||
|
if isinstance(value, str):
|
||||||
|
lowered = value.strip().lower()
|
||||||
|
if lowered in {"1", "true", "yes", "on"}:
|
||||||
|
return True
|
||||||
|
if lowered in {"0", "false", "no", "off"}:
|
||||||
|
return False
|
||||||
|
raise ValueError(f"invalid bool value: {value!r}")
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_run_entry(entry):
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
raise ValueError("run entry must be an object")
|
||||||
|
run_id = entry.get("run_id") or entry.get("runId")
|
||||||
|
run_index = to_int(entry.get("run_index", entry.get("runSeq")))
|
||||||
|
if not run_id or run_index is None:
|
||||||
|
raise ValueError("run entry missing run_id/run_index")
|
||||||
|
return {
|
||||||
|
"run_id": run_id,
|
||||||
|
"run_index": run_index,
|
||||||
|
"status": entry.get("status") or "unknown",
|
||||||
|
"sync_mode": entry.get("sync_mode", entry.get("syncMode")),
|
||||||
|
"snapshot_reason": entry.get("snapshot_reason"),
|
||||||
|
"started_at_rfc3339_utc": entry.get("started_at_rfc3339_utc"),
|
||||||
|
"completed_at_rfc3339_utc": entry.get("completed_at_rfc3339_utc"),
|
||||||
|
"periodic_snapshot_reset_enabled": to_bool(entry.get("periodic_snapshot_reset_enabled")),
|
||||||
|
"periodic_snapshot_max_deltas": to_int(entry.get("periodic_snapshot_max_deltas")),
|
||||||
|
"periodic_snapshot_delta_count": to_int(entry.get("periodic_snapshot_delta_count")),
|
||||||
|
"periodic_snapshot_forced": to_bool(entry.get("periodic_snapshot_forced")),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def snapshot_ref(entry):
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
"run_id": entry["run_id"],
|
||||||
|
"run_index": entry["run_index"],
|
||||||
|
"snapshot_reason": entry.get("snapshot_reason"),
|
||||||
|
"completed_at_rfc3339_utc": entry.get("completed_at_rfc3339_utc"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_snapshot_ref(entry):
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
raise ValueError("snapshot ref must be an object")
|
||||||
|
run_id = entry.get("run_id") or entry.get("runId")
|
||||||
|
run_index = to_int(entry.get("run_index", entry.get("runSeq")))
|
||||||
|
if not run_id or run_index is None:
|
||||||
|
raise ValueError("snapshot ref missing run_id/run_index")
|
||||||
|
return {
|
||||||
|
"run_id": run_id,
|
||||||
|
"run_index": run_index,
|
||||||
|
"snapshot_reason": entry.get("snapshot_reason"),
|
||||||
|
"completed_at_rfc3339_utc": entry.get("completed_at_rfc3339_utc"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def state_health(last_run, last_success_snapshot):
|
||||||
|
if last_success_snapshot is not None:
|
||||||
|
return "ready", "ok"
|
||||||
|
if last_run is None:
|
||||||
|
return "empty", "no_runs"
|
||||||
|
return "bootstrap_incomplete", "no_successful_snapshot_in_retained_runs"
|
||||||
|
|
||||||
|
|
||||||
|
def default_state():
|
||||||
|
health, detail = state_health(None, None)
|
||||||
|
return {
|
||||||
|
"version": 1,
|
||||||
|
"updated_at_rfc3339_utc": now_rfc3339(),
|
||||||
|
"state_health": health,
|
||||||
|
"state_detail": detail,
|
||||||
|
"recent_runs_limit": recent_limit,
|
||||||
|
"last_run": None,
|
||||||
|
"last_success_snapshot": None,
|
||||||
|
"successful_deltas_since_snapshot": None,
|
||||||
|
"recent_runs": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def finalize_state(data):
|
||||||
|
data = dict(data)
|
||||||
|
data["version"] = 1
|
||||||
|
data["updated_at_rfc3339_utc"] = now_rfc3339()
|
||||||
|
data["recent_runs_limit"] = recent_limit
|
||||||
|
recent_runs = []
|
||||||
|
for item in data.get("recent_runs", []):
|
||||||
|
recent_runs.append(normalize_run_entry(item))
|
||||||
|
if len(recent_runs) > recent_limit:
|
||||||
|
recent_runs = recent_runs[-recent_limit:]
|
||||||
|
data["recent_runs"] = recent_runs
|
||||||
|
data["last_run"] = normalize_run_entry(data.get("last_run"))
|
||||||
|
data["last_success_snapshot"] = normalize_snapshot_ref(data.get("last_success_snapshot"))
|
||||||
|
delta_count = data.get("successful_deltas_since_snapshot")
|
||||||
|
data["successful_deltas_since_snapshot"] = to_int(delta_count)
|
||||||
|
if data["last_success_snapshot"] is None:
|
||||||
|
data["successful_deltas_since_snapshot"] = None
|
||||||
|
elif data["successful_deltas_since_snapshot"] is None:
|
||||||
|
data["successful_deltas_since_snapshot"] = 0
|
||||||
|
health, detail = state_health(data["last_run"], data["last_success_snapshot"])
|
||||||
|
if health == "ready":
|
||||||
|
detail = data.get("state_detail") or detail
|
||||||
|
data["state_health"] = health
|
||||||
|
data["state_detail"] = detail
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def sorted_run_dirs():
|
||||||
|
candidates = []
|
||||||
|
for path in runs_root.glob("run_*"):
|
||||||
|
if not path.is_dir():
|
||||||
|
continue
|
||||||
|
suffix = path.name[4:]
|
||||||
|
if suffix.isdigit():
|
||||||
|
candidates.append((int(suffix), path))
|
||||||
|
candidates.sort()
|
||||||
|
return [path for _, path in candidates]
|
||||||
|
|
||||||
|
|
||||||
|
def parse_run_dir(run_dir):
|
||||||
meta_path = run_dir / "run-meta.json"
|
meta_path = run_dir / "run-meta.json"
|
||||||
summary_path = run_dir / "run-summary.json"
|
summary_path = run_dir / "run-summary.json"
|
||||||
|
if not meta_path.exists() or not summary_path.exists():
|
||||||
|
return None
|
||||||
|
with meta_path.open("r", encoding="utf-8") as handle:
|
||||||
|
meta = json.load(handle)
|
||||||
|
with summary_path.open("r", encoding="utf-8") as handle:
|
||||||
|
summary = json.load(handle)
|
||||||
|
entry = normalize_run_entry(meta)
|
||||||
|
entry["status"] = meta.get("status") or summary.get("status") or entry["status"]
|
||||||
|
return entry
|
||||||
|
|
||||||
|
|
||||||
|
def bootstrap_state(exclude_run_id=None):
|
||||||
|
data = default_state()
|
||||||
|
last_success_snapshot = None
|
||||||
|
delta_count = None
|
||||||
|
recent_runs = []
|
||||||
|
last_run = None
|
||||||
|
for run_dir in sorted_run_dirs():
|
||||||
|
if exclude_run_id and run_dir.name == exclude_run_id:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
entry = parse_run_dir(run_dir)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if entry is None:
|
||||||
|
continue
|
||||||
|
last_run = entry
|
||||||
|
recent_runs.append(entry)
|
||||||
|
if entry["status"] != "success":
|
||||||
|
continue
|
||||||
|
if entry.get("sync_mode") == "snapshot":
|
||||||
|
last_success_snapshot = snapshot_ref(entry)
|
||||||
|
delta_count = 0
|
||||||
|
elif entry.get("sync_mode") == "delta":
|
||||||
|
if last_success_snapshot is not None and delta_count is not None:
|
||||||
|
delta_count += 1
|
||||||
|
data["last_run"] = last_run
|
||||||
|
data["last_success_snapshot"] = last_success_snapshot
|
||||||
|
data["successful_deltas_since_snapshot"] = delta_count
|
||||||
|
data["recent_runs"] = recent_runs[-recent_limit:]
|
||||||
|
return finalize_state(data)
|
||||||
|
|
||||||
|
|
||||||
|
def backup_corrupt_state():
|
||||||
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||||||
|
backup_path = state_path.with_name(f"{state_path.name}.corrupt.{timestamp}.{os.getpid()}")
|
||||||
|
backup_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
os.replace(state_path, backup_path)
|
||||||
|
return backup_path
|
||||||
|
|
||||||
|
|
||||||
|
def load_state_or_bootstrap(exclude_run_id=None):
|
||||||
|
if not state_path.exists():
|
||||||
|
return bootstrap_state(exclude_run_id), "bootstrapped_from_runs_missing_file", "", ""
|
||||||
try:
|
try:
|
||||||
with meta_path.open("r", encoding="utf-8") as handle:
|
with state_path.open("r", encoding="utf-8") as handle:
|
||||||
meta = json.load(handle)
|
raw = json.load(handle)
|
||||||
with summary_path.open("r", encoding="utf-8") as handle:
|
if not isinstance(raw, dict):
|
||||||
summary = json.load(handle)
|
raise ValueError("state root must be an object")
|
||||||
|
if to_int(raw.get("version")) != 1:
|
||||||
|
raise ValueError("unsupported version")
|
||||||
|
state = finalize_state(raw)
|
||||||
|
return state, "state_file", "", ""
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
print(f"error\t{delta_count}\t{run_dir.name}\tjson_parse:{exc.__class__.__name__}")
|
backup_path = backup_corrupt_state()
|
||||||
sys.exit(0)
|
state = bootstrap_state(exclude_run_id)
|
||||||
if meta.get("status") != "success" or summary.get("status") != "success":
|
return state, "bootstrapped_from_runs_after_corrupt_backup", exc.__class__.__name__, str(backup_path)
|
||||||
continue
|
|
||||||
sync_mode = meta.get("sync_mode") or meta.get("syncMode")
|
|
||||||
if sync_mode == "delta":
|
def atomic_write_json(path, payload):
|
||||||
delta_count += 1
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
continue
|
tmp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}")
|
||||||
if sync_mode == "snapshot":
|
with tmp_path.open("w", encoding="utf-8") as handle:
|
||||||
print(f"ok\t{delta_count}\t{run_dir.name}\t")
|
json.dump(payload, handle, indent=2, sort_keys=True)
|
||||||
sys.exit(0)
|
handle.write("\n")
|
||||||
print(f"error\t{delta_count}\t{run_dir.name}\tmissing_sync_mode")
|
handle.flush()
|
||||||
sys.exit(0)
|
os.fsync(handle.fileno())
|
||||||
print(f"error\t{delta_count}\t\tmissing_success_snapshot")
|
os.replace(tmp_path, path)
|
||||||
|
try:
|
||||||
|
dir_fd = os.open(path.parent, os.O_DIRECTORY)
|
||||||
|
except OSError:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
os.fsync(dir_fd)
|
||||||
|
finally:
|
||||||
|
os.close(dir_fd)
|
||||||
|
|
||||||
|
|
||||||
|
def emit(name, value):
|
||||||
|
if value is None:
|
||||||
|
value = ""
|
||||||
|
elif isinstance(value, bool):
|
||||||
|
value = "true" if value else "false"
|
||||||
|
else:
|
||||||
|
value = str(value)
|
||||||
|
print(f"{name}={shlex.quote(value)}")
|
||||||
|
|
||||||
|
|
||||||
|
if command == "load":
|
||||||
|
max_deltas = int(extra_args[0])
|
||||||
|
state, source, detail, backup_path = load_state_or_bootstrap()
|
||||||
|
if source != "state_file":
|
||||||
|
atomic_write_json(state_path, state)
|
||||||
|
delta_count = state.get("successful_deltas_since_snapshot")
|
||||||
|
force_needed = bool(
|
||||||
|
state.get("last_success_snapshot") is not None
|
||||||
|
and delta_count is not None
|
||||||
|
and delta_count >= max_deltas
|
||||||
|
)
|
||||||
|
emit("PERIODIC_LIFECYCLE_SOURCE", source)
|
||||||
|
emit("PERIODIC_LIFECYCLE_DETAIL", detail or state.get("state_detail"))
|
||||||
|
emit("PERIODIC_LIFECYCLE_CORRUPT_BACKUP_PATH", backup_path)
|
||||||
|
emit("PERIODIC_LIFECYCLE_STATE_HEALTH", state.get("state_health"))
|
||||||
|
emit(
|
||||||
|
"PERIODIC_LIFECYCLE_LAST_SNAPSHOT_RUN_ID",
|
||||||
|
(state.get("last_success_snapshot") or {}).get("run_id"),
|
||||||
|
)
|
||||||
|
emit("PERIODIC_LIFECYCLE_DELTA_COUNT", delta_count)
|
||||||
|
emit("PERIODIC_LIFECYCLE_FORCE_NEEDED", force_needed)
|
||||||
|
elif command == "update":
|
||||||
|
run_dir = pathlib.Path(extra_args[0])
|
||||||
|
state, source, detail, backup_path = load_state_or_bootstrap(exclude_run_id=run_dir.name)
|
||||||
|
entry = parse_run_dir(run_dir)
|
||||||
|
if entry is None:
|
||||||
|
raise SystemExit(f"missing run metadata for lifecycle update: {run_dir}")
|
||||||
|
previous_snapshot = state.get("last_success_snapshot")
|
||||||
|
previous_delta_count = state.get("successful_deltas_since_snapshot")
|
||||||
|
state["last_run"] = entry
|
||||||
|
recent_runs = [item for item in state.get("recent_runs", []) if item.get("run_id") != entry["run_id"]]
|
||||||
|
recent_runs.append(entry)
|
||||||
|
state["recent_runs"] = recent_runs[-recent_limit:]
|
||||||
|
if entry["status"] == "success" and entry.get("sync_mode") == "snapshot":
|
||||||
|
state["last_success_snapshot"] = snapshot_ref(entry)
|
||||||
|
state["successful_deltas_since_snapshot"] = 0
|
||||||
|
state["state_detail"] = "ok"
|
||||||
|
elif entry["status"] == "success" and entry.get("sync_mode") == "delta":
|
||||||
|
if previous_snapshot is not None and previous_delta_count is not None:
|
||||||
|
state["last_success_snapshot"] = previous_snapshot
|
||||||
|
state["successful_deltas_since_snapshot"] = previous_delta_count + 1
|
||||||
|
state["state_detail"] = "ok"
|
||||||
|
else:
|
||||||
|
state["last_success_snapshot"] = previous_snapshot
|
||||||
|
state["successful_deltas_since_snapshot"] = None
|
||||||
|
state["state_detail"] = "success_delta_without_known_snapshot"
|
||||||
|
else:
|
||||||
|
state["last_success_snapshot"] = previous_snapshot
|
||||||
|
state["successful_deltas_since_snapshot"] = previous_delta_count
|
||||||
|
state = finalize_state(state)
|
||||||
|
atomic_write_json(state_path, state)
|
||||||
|
emit("RUN_LIFECYCLE_UPDATE_SOURCE", source)
|
||||||
|
emit("RUN_LIFECYCLE_UPDATE_DETAIL", detail or state.get("state_detail"))
|
||||||
|
emit("RUN_LIFECYCLE_UPDATE_CORRUPT_BACKUP_PATH", backup_path)
|
||||||
|
emit("RUN_LIFECYCLE_UPDATE_STATE_HEALTH", state.get("state_health"))
|
||||||
|
emit("RUN_LIFECYCLE_UPDATE_DELTA_COUNT", state.get("successful_deltas_since_snapshot"))
|
||||||
|
emit(
|
||||||
|
"RUN_LIFECYCLE_UPDATE_LAST_SNAPSHOT_RUN_ID",
|
||||||
|
(state.get("last_success_snapshot") or {}).get("run_id"),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise SystemExit(f"unknown lifecycle helper command: {command}")
|
||||||
PY
|
PY
|
||||||
}
|
}
|
||||||
|
|
||||||
periodic_snapshot_force_needed() {
|
load_periodic_snapshot_lifecycle_context() {
|
||||||
PERIODIC_SCAN_STATUS=""
|
eval "$(periodic_snapshot_delta_scan load "$PERIODIC_SNAPSHOT_MAX_DELTAS")"
|
||||||
PERIODIC_SCAN_DELTA_COUNT=""
|
if [[ -n "$PERIODIC_LIFECYCLE_CORRUPT_BACKUP_PATH" ]]; then
|
||||||
PERIODIC_SCAN_SNAPSHOT_RUN_ID=""
|
warn "run lifecycle state corrupt; backed up to $PERIODIC_LIFECYCLE_CORRUPT_BACKUP_PATH detail=${PERIODIC_LIFECYCLE_DETAIL:-unknown}"
|
||||||
PERIODIC_SCAN_DETAIL=""
|
fi
|
||||||
local scan_output
|
if [[ "$PERIODIC_LIFECYCLE_SOURCE" != "state_file" ]]; then
|
||||||
scan_output="$(periodic_snapshot_delta_scan)"
|
echo "run lifecycle state source=$PERIODIC_LIFECYCLE_SOURCE state_health=${PERIODIC_LIFECYCLE_STATE_HEALTH:-unknown} snapshot_run=${PERIODIC_LIFECYCLE_LAST_SNAPSHOT_RUN_ID:-none} delta_count=${PERIODIC_LIFECYCLE_DELTA_COUNT:-unknown}"
|
||||||
IFS=$'\t' read -r PERIODIC_SCAN_STATUS PERIODIC_SCAN_DELTA_COUNT PERIODIC_SCAN_SNAPSHOT_RUN_ID PERIODIC_SCAN_DETAIL <<< "$scan_output"
|
fi
|
||||||
if [[ "$PERIODIC_SCAN_STATUS" != "ok" ]]; then
|
if [[ "$PERIODIC_LIFECYCLE_STATE_HEALTH" == "bootstrap_incomplete" ]]; then
|
||||||
warn "periodic snapshot reset scan skipped status=${PERIODIC_SCAN_STATUS:-missing} snapshot_run=${PERIODIC_SCAN_SNAPSHOT_RUN_ID:-none} detail=${PERIODIC_SCAN_DETAIL:-unknown}"
|
warn "run lifecycle state bootstrap incomplete detail=${PERIODIC_LIFECYCLE_DETAIL:-unknown}; forced snapshot counting resumes after the next successful snapshot"
|
||||||
return 1
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
update_run_lifecycle_state() {
|
||||||
|
local run_dir="$1"
|
||||||
|
eval "$(periodic_snapshot_delta_scan update "$run_dir")"
|
||||||
|
if [[ -n "$RUN_LIFECYCLE_UPDATE_CORRUPT_BACKUP_PATH" ]]; then
|
||||||
|
warn "run lifecycle state corrupt during update; backed up to $RUN_LIFECYCLE_UPDATE_CORRUPT_BACKUP_PATH detail=${RUN_LIFECYCLE_UPDATE_DETAIL:-unknown}"
|
||||||
fi
|
fi
|
||||||
[[ -n "$PERIODIC_SCAN_DELTA_COUNT" ]] || PERIODIC_SCAN_DELTA_COUNT="0"
|
|
||||||
(( PERIODIC_SCAN_DELTA_COUNT >= PERIODIC_SNAPSHOT_MAX_DELTAS ))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
prepare_periodic_reset_state_db() {
|
prepare_periodic_reset_state_db() {
|
||||||
@ -1012,6 +1298,7 @@ run_one_round() {
|
|||||||
"$RUN_META_PERIODIC_ENABLED" "$RUN_META_PERIODIC_MAX_DELTAS" "$RUN_META_PERIODIC_DELTA_COUNT" \
|
"$RUN_META_PERIODIC_ENABLED" "$RUN_META_PERIODIC_MAX_DELTAS" "$RUN_META_PERIODIC_DELTA_COUNT" \
|
||||||
"$RUN_META_PERIODIC_FORCED" "$RUN_META_RESET_DB_STAGING_PATH" "$RUN_META_RESET_DB_CLEANUP_STATUS" \
|
"$RUN_META_PERIODIC_FORCED" "$RUN_META_RESET_DB_STAGING_PATH" "$RUN_META_RESET_DB_CLEANUP_STATUS" \
|
||||||
"$RUN_META_TMP_CLEANUP_STATUS" "$RUN_META_TMP_CLEANUP_REASON"
|
"$RUN_META_TMP_CLEANUP_STATUS" "$RUN_META_TMP_CLEANUP_REASON"
|
||||||
|
update_run_lifecycle_state "$run_dir"
|
||||||
printf '%s\n' "$run_id" > "$META_DIR/last-run-id"
|
printf '%s\n' "$run_id" > "$META_DIR/last-run-id"
|
||||||
if is_true "$CLEAN_TMP_AFTER_RUN"; then
|
if is_true "$CLEAN_TMP_AFTER_RUN"; then
|
||||||
rm -rf "$daemon_state_root"
|
rm -rf "$daemon_state_root"
|
||||||
@ -1091,14 +1378,21 @@ main() {
|
|||||||
INVALID_TMP_PATH=""
|
INVALID_TMP_PATH=""
|
||||||
TMP_CLEANUP_STATUS=""
|
TMP_CLEANUP_STATUS=""
|
||||||
TMP_CLEANUP_REASON=""
|
TMP_CLEANUP_REASON=""
|
||||||
PERIODIC_SCAN_STATUS=""
|
PERIODIC_LIFECYCLE_SOURCE=""
|
||||||
PERIODIC_SCAN_DELTA_COUNT=""
|
PERIODIC_LIFECYCLE_DETAIL=""
|
||||||
PERIODIC_SCAN_SNAPSHOT_RUN_ID=""
|
PERIODIC_LIFECYCLE_CORRUPT_BACKUP_PATH=""
|
||||||
PERIODIC_SCAN_DETAIL=""
|
PERIODIC_LIFECYCLE_STATE_HEALTH=""
|
||||||
|
PERIODIC_LIFECYCLE_LAST_SNAPSHOT_RUN_ID=""
|
||||||
|
PERIODIC_LIFECYCLE_DELTA_COUNT=""
|
||||||
|
PERIODIC_LIFECYCLE_FORCE_NEEDED="false"
|
||||||
RESET_DB_STAGING_PATH=""
|
RESET_DB_STAGING_PATH=""
|
||||||
RESET_DB_CLEANUP_STATUS=""
|
RESET_DB_CLEANUP_STATUS=""
|
||||||
if is_true "$PERIODIC_SNAPSHOT_RESET"; then
|
if is_true "$PERIODIC_SNAPSHOT_RESET"; then
|
||||||
RUN_META_PERIODIC_ENABLED="true"
|
RUN_META_PERIODIC_ENABLED="true"
|
||||||
|
load_periodic_snapshot_lifecycle_context
|
||||||
|
if [[ -n "$PERIODIC_LIFECYCLE_DELTA_COUNT" ]]; then
|
||||||
|
RUN_META_PERIODIC_DELTA_COUNT="$PERIODIC_LIFECYCLE_DELTA_COUNT"
|
||||||
|
fi
|
||||||
else
|
else
|
||||||
RUN_META_PERIODIC_ENABLED="false"
|
RUN_META_PERIODIC_ENABLED="false"
|
||||||
fi
|
fi
|
||||||
@ -1120,19 +1414,14 @@ main() {
|
|||||||
if delta_state_available; then
|
if delta_state_available; then
|
||||||
sync_mode="delta"
|
sync_mode="delta"
|
||||||
if is_true "$PERIODIC_SNAPSHOT_RESET"; then
|
if is_true "$PERIODIC_SNAPSHOT_RESET"; then
|
||||||
if periodic_snapshot_force_needed; then
|
if [[ "$PERIODIC_LIFECYCLE_FORCE_NEEDED" == "true" ]]; then
|
||||||
RUN_META_PERIODIC_DELTA_COUNT="$PERIODIC_SCAN_DELTA_COUNT"
|
|
||||||
RUN_META_PERIODIC_FORCED="true"
|
RUN_META_PERIODIC_FORCED="true"
|
||||||
sync_mode="snapshot"
|
sync_mode="snapshot"
|
||||||
snapshot_reason="periodic_snapshot_delta_limit"
|
snapshot_reason="periodic_snapshot_delta_limit"
|
||||||
prepare_periodic_reset_state_db "$(printf 'run_%04d' "$next_index")"
|
prepare_periodic_reset_state_db "$(printf 'run_%04d' "$next_index")"
|
||||||
RUN_META_RESET_DB_STAGING_PATH="$RESET_DB_STAGING_PATH"
|
RUN_META_RESET_DB_STAGING_PATH="$RESET_DB_STAGING_PATH"
|
||||||
RUN_META_RESET_DB_CLEANUP_STATUS="$RESET_DB_CLEANUP_STATUS"
|
RUN_META_RESET_DB_CLEANUP_STATUS="$RESET_DB_CLEANUP_STATUS"
|
||||||
echo "periodic snapshot reset forcing snapshot run=$(printf 'run_%04d' "$next_index") delta_count=$PERIODIC_SCAN_DELTA_COUNT max_deltas=$PERIODIC_SNAPSHOT_MAX_DELTAS"
|
echo "periodic snapshot reset forcing snapshot run=$(printf 'run_%04d' "$next_index") delta_count=$PERIODIC_LIFECYCLE_DELTA_COUNT max_deltas=$PERIODIC_SNAPSHOT_MAX_DELTAS"
|
||||||
else
|
|
||||||
if [[ "$PERIODIC_SCAN_STATUS" == "ok" ]]; then
|
|
||||||
RUN_META_PERIODIC_DELTA_COUNT="$PERIODIC_SCAN_DELTA_COUNT"
|
|
||||||
fi
|
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
else
|
else
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user