mvp v1.1 达成基本目标

This commit is contained in:
yuyr 2025-12-23 15:18:22 +08:00
parent c405adc4fc
commit 2a1168c43c
14 changed files with 60 additions and 830 deletions

View File

@ -15,26 +15,30 @@
`/home2/argus/infra/mvp/v1.1/` 下执行:
```bash
./scripts/run_all.sh
```
说明:
- `run_all.sh` 会按顺序提交 `ppo -> grpo -> sft`,并等待每个 job 结束后再提交下一个(避免 8 卡集群并发提交导致 “available GPUs 0” 直接失败)。
等价的“分步执行”:
```bash
./scripts/00_prereq_check.sh
./scripts/03_cleanup_v1_legacy.sh
./scripts/05_ensure_verl_repo.sh
./scripts/01_up.sh
./scripts/20_start_head.sh
./scripts/21_start_workers.sh
./scripts/30_prepare_data_and_model.sh
./scripts/12_install_py_deps.sh
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/ppo.yaml
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/grpo.yaml
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/sft.yaml
./scripts/40_submit_ppo_epoch1.sh
./scripts/41_submit_grpo_epoch1.sh
./scripts/42_submit_sft_minimal.sh
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/ppo.yaml # no-wait
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/grpo.yaml # no-wait
./scripts/44_submit_sdk.sh /workspace/mvp/v1.1/py/configs/dev.yaml /workspace/mvp/v1.1/py/jobspecs/sft.yaml # no-wait
./scripts/50_status.sh
```
说明:
- `scripts/40/41/42` 是历史的 “CLI 提交脚本”(仍可用),但 v1.1 的工程化目标是把提交机制迁移到 `scripts/44_submit_sdk.sh`Ray Python SDK + YAML 配置)。
停止并清理:
```bash
@ -48,7 +52,7 @@
- **必须通过 head 执行 `ray job submit`** 提交任务(满足“从 head 提交”要求)。
- **head 不跑训练**head 以 `--num-cpus=0 --num-gpus=0` 启动worker 具备自定义资源 `worker_node`;提交时 `--entrypoint-resources='{"worker_node": 1}'` 强制 driver 落 worker。
- **共享路径统一为 `/private`(容器内)**compose 将宿主机 `../shared` 挂载到容器内 `/private`,对齐生产环境。
- **多版本 verl**:通过 Ray Job `runtime_env.env_vars.PYTHONPATH` 注入 `${SHARED_ROOT}/common/code/verl/...`job 粒度选择代码快照
- **job 级别 code_path**:训练 JobSpec 中的 `code_path` 指向 `/private/common/code/verl/verl_repo`(由 `scripts/30_prepare_data_and_model.sh` 准备)
---

View File

@ -1,33 +0,0 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "mvp-v1.1-job-spec",
"type": "object",
"required": ["workload", "shared_root", "code_path", "model_id", "ray", "runtime_env"],
"properties": {
"submission_id": { "type": "string" },
"workload": { "type": "string", "enum": ["ppo", "grpo", "sft"] },
"shared_root": { "type": "string" },
"code_path": { "type": "string" },
"model_id": { "type": "string" },
"ppo": { "type": "object" },
"grpo": { "type": "object" },
"sft": { "type": "object" },
"ray": {
"type": "object",
"required": ["address", "entrypoint_num_cpus", "entrypoint_resources"],
"properties": {
"address": { "type": "string" },
"entrypoint_num_cpus": { "type": "number" },
"entrypoint_resources": { "type": "object" }
}
},
"runtime_env": {
"type": "object",
"required": ["env_vars"],
"properties": {
"env_vars": { "type": "object" }
}
}
}
}

View File

@ -1,57 +0,0 @@
"""
Job-scoped compatibility shims loaded automatically by Python at startup.
This is intentionally lightweight and safe-by-default:
- Only patches missing symbols.
- Never raises (best-effort).
Primary use case in MVP v1.1:
- Allow multiple `verl` versions (e.g. v0.6.0 vs v0.6.1) to run on the same
base image where `sglang` APIs may differ slightly.
"""
from __future__ import annotations
def _patch_sglang_get_ip() -> None:
try:
import sglang.srt.utils as srt_utils # type: ignore
except Exception:
return
if hasattr(srt_utils, "get_ip"):
return
def get_ip() -> str:
# Best-effort local IP without external dependency.
try:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# Doesn't send packets; used to pick the default route/interface.
s.connect(("8.8.8.8", 80))
return str(s.getsockname()[0])
finally:
s.close()
except Exception:
# Fallback: hostname resolution
try:
import socket
return str(socket.gethostbyname(socket.gethostname()))
except Exception:
return "127.0.0.1"
try:
setattr(srt_utils, "get_ip", get_ip)
except Exception:
return
try:
_patch_sglang_get_ip()
except Exception:
# Never block interpreter startup.
pass

View File

@ -1,39 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
VERL_REPO_URL="${VERL_REPO_URL:-https://github.com/volcengine/verl.git}"
DEST_BASE="${SHARED_ROOT}/common/code/verl"
TAGS=("v0.6.0" "v0.6.1")
echo "[head] ensure base dir: ${DEST_BASE}"
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p '${DEST_BASE}'"
for tag in "${TAGS[@]}"; do
dest="${DEST_BASE}/verl_${tag}"
echo "[head] prepare verl tag ${tag} -> ${dest}"
verify_repo_cmd="test -d '${dest}/.git' && git -C '${dest}' rev-parse --is-inside-work-tree >/dev/null 2>&1"
if dexec "${HEAD_CONTAINER}" bash -lc "${verify_repo_cmd}"; then
echo "[head] exists: verified git repo: ${dest}"
else
echo "[head] cloning ${tag} (retry with HTTP/1.1 if needed)"
dexec "${HEAD_CONTAINER}" bash -lc "rm -rf '${dest}'"
# Retry a few times because GitHub over HTTP/2 can occasionally fail with curl framing errors/timeouts.
dexec "${HEAD_CONTAINER}" bash -lc "set -euo pipefail; for i in 1 2 3; do echo \"clone_attempt=\\$i\"; if git -c http.version=HTTP/1.1 clone --filter=blob:none --single-branch --branch '${tag}' --depth 1 '${VERL_REPO_URL}' '${dest}'; then exit 0; fi; rm -rf '${dest}'; sleep 3; done; exit 1"
dexec "${HEAD_CONTAINER}" bash -lc "${verify_repo_cmd}" || { echo \"[head] clone failed or repo invalid: ${dest}\" >&2; exit 1; }
fi
# Avoid git safe.directory issues when reading repo state
dexec "${HEAD_CONTAINER}" bash -lc "git config --global --add safe.directory '${dest}' >/dev/null 2>&1 || true"
dexec "${HEAD_CONTAINER}" bash -lc "printf 'tag='; git -C '${dest}' describe --tags --exact-match 2>/dev/null || true; printf '\\nhead='; git -C '${dest}' rev-parse HEAD; printf '\\n'"
# Add marker for multi-version verification in Ray job logs
dexec "${HEAD_CONTAINER}" bash -lc "printf \"%s\\n\" \"MARKER = '${tag}'\" > '${dest}/mvp_marker.py'"
done
echo "[head] done"

View File

@ -1,72 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
SUBMISSION_ID="${SUBMISSION_ID:-mvp11_ppo_$(timestamp)_$RANDOM}"
JOB_DIR="${SHARED_ROOT}/jobs/${SUBMISSION_ID}"
MODEL_ID="${MODEL_ID:-Qwen/Qwen2.5-0.5B-Instruct}"
TRAIN_FILE="${SHARED_ROOT}/datasets/gsm8k/train.parquet"
VAL_FILE="${SHARED_ROOT}/datasets/gsm8k/test.parquet"
CODE_PATH="${CODE_PATH:-${SHARED_ROOT}/common/code/verl/verl_repo}"
TOTAL_TRAINING_STEPS="${TOTAL_TRAINING_STEPS:-10}"
SAVE_FREQ="${SAVE_FREQ:-10}"
TEST_FREQ="${TEST_FREQ:--1}"
echo "[head] create job dir: ${JOB_DIR}"
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p '${JOB_DIR}'/{logs,checkpoints,config,debug}"
SUBMIT_CMD="python3 -m verl.trainer.main_ppo \
data.train_files=${TRAIN_FILE} \
data.val_files=${VAL_FILE} \
data.train_batch_size=256 \
data.max_prompt_length=512 \
data.max_response_length=512 \
actor_rollout_ref.model.path=${MODEL_ID} \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.actor.ppo_mini_batch_size=64 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
actor_rollout_ref.rollout.name=sglang \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \
actor_rollout_ref.rollout.tensor_model_parallel_size=1 \
actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \
critic.optim.lr=1e-5 \
critic.model.path=${MODEL_ID} \
critic.ppo_micro_batch_size_per_gpu=4 \
algorithm.kl_ctrl.kl_coef=0.001 \
trainer.logger=console \
trainer.val_before_train=False \
trainer.n_gpus_per_node=4 \
trainer.nnodes=2 \
trainer.save_freq=${SAVE_FREQ} \
trainer.test_freq=${TEST_FREQ} \
trainer.total_epochs=1 \
trainer.total_training_steps=${TOTAL_TRAINING_STEPS} \
trainer.resume_mode=disable \
trainer.default_local_dir=${JOB_DIR}/checkpoints \
+ray_kwargs.ray_init.address=auto \
hydra.run.dir=${JOB_DIR}/logs/hydra"
printf "%s\n" "${SUBMIT_CMD}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/config/submit_cmd.txt'"
echo "[head] debug snapshot (pre-submit)"
dexec "${HEAD_CONTAINER}" bash -lc "ray status >'${JOB_DIR}/debug/ray_status_pre.txt' 2>&1 || true"
echo "[head] submit PPO via ray job submit (driver forced on worker)"
SUBMIT_OUT="$(dexec "${HEAD_CONTAINER}" bash -lc "ray job submit --address='${RAY_DASHBOARD_ADDR}' --submission-id='${SUBMISSION_ID}' --entrypoint-num-cpus=1 --entrypoint-resources='{\"worker_node\": 1}' --runtime-env-json='{\"env_vars\":{\"HF_HOME\":\"${SHARED_ROOT}/hf\",\"HUGGINGFACE_HUB_CACHE\":\"${SHARED_ROOT}/hf/hub\",\"TRANSFORMERS_CACHE\":\"${SHARED_ROOT}/hf/transformers\",\"HF_ENDPOINT\":\"https://hf-mirror.com\",\"PYTHONUNBUFFERED\":\"1\",\"PYTHONPATH\":\"${CODE_PATH}:${SHARED_ROOT}/user/code\"}}' --no-wait -- ${SUBMIT_CMD}")"
printf "%s\n" "${SUBMIT_OUT}"
printf "%s\n" "${SUBMIT_OUT}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/logs/ray_job_submit.out'"
echo "${SUBMISSION_ID}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/config/ray_submission_id.txt'"
echo "[head] debug snapshot (post-submit)"
dexec "${HEAD_CONTAINER}" bash -lc "ray job list >'${JOB_DIR}/debug/ray_job_list_post.txt' 2>&1 || true"
dexec "${HEAD_CONTAINER}" bash -lc "ray status >'${JOB_DIR}/debug/ray_status_post.txt' 2>&1 || true"
echo "submitted: ${SUBMISSION_ID}"
echo "job dir: ${JOB_DIR}"

View File

@ -1,73 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
SUBMISSION_ID="${SUBMISSION_ID:-mvp11_grpo_$(timestamp)_$RANDOM}"
JOB_DIR="${SHARED_ROOT}/jobs/${SUBMISSION_ID}"
MODEL_ID="${MODEL_ID:-Qwen/Qwen2.5-0.5B-Instruct}"
TRAIN_FILE="${SHARED_ROOT}/datasets/gsm8k/train.parquet"
VAL_FILE="${SHARED_ROOT}/datasets/gsm8k/test.parquet"
CODE_PATH="${CODE_PATH:-${SHARED_ROOT}/common/code/verl/verl_repo}"
TOTAL_TRAINING_STEPS="${TOTAL_TRAINING_STEPS:-10}"
SAVE_FREQ="${SAVE_FREQ:-10}"
TEST_FREQ="${TEST_FREQ:--1}"
echo "[head] create job dir: ${JOB_DIR}"
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p '${JOB_DIR}'/{logs,checkpoints,config,debug}"
SUBMIT_CMD="python3 -m verl.trainer.main_ppo \
data.train_files=${TRAIN_FILE} \
data.val_files=${VAL_FILE} \
data.train_batch_size=256 \
data.max_prompt_length=512 \
data.max_response_length=512 \
actor_rollout_ref.model.path=${MODEL_ID} \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.actor.ppo_mini_batch_size=64 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
actor_rollout_ref.rollout.name=sglang \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \
actor_rollout_ref.rollout.tensor_model_parallel_size=1 \
actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \
critic.optim.lr=1e-5 \
critic.model.path=${MODEL_ID} \
critic.ppo_micro_batch_size_per_gpu=4 \
algorithm.adv_estimator=grpo \
algorithm.kl_ctrl.kl_coef=0.001 \
trainer.logger=console \
trainer.val_before_train=False \
trainer.n_gpus_per_node=4 \
trainer.nnodes=2 \
trainer.save_freq=${SAVE_FREQ} \
trainer.test_freq=${TEST_FREQ} \
trainer.total_epochs=1 \
trainer.total_training_steps=${TOTAL_TRAINING_STEPS} \
trainer.resume_mode=disable \
trainer.default_local_dir=${JOB_DIR}/checkpoints \
+ray_kwargs.ray_init.address=auto \
hydra.run.dir=${JOB_DIR}/logs/hydra"
printf "%s\n" "${SUBMIT_CMD}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/config/submit_cmd.txt'"
echo "[head] debug snapshot (pre-submit)"
dexec "${HEAD_CONTAINER}" bash -lc "ray status >'${JOB_DIR}/debug/ray_status_pre.txt' 2>&1 || true"
echo "[head] submit GRPO via ray job submit (driver forced on worker)"
SUBMIT_OUT="$(dexec "${HEAD_CONTAINER}" bash -lc "ray job submit --address='${RAY_DASHBOARD_ADDR}' --submission-id='${SUBMISSION_ID}' --entrypoint-num-cpus=1 --entrypoint-resources='{\"worker_node\": 1}' --runtime-env-json='{\"env_vars\":{\"HF_HOME\":\"${SHARED_ROOT}/hf\",\"HUGGINGFACE_HUB_CACHE\":\"${SHARED_ROOT}/hf/hub\",\"TRANSFORMERS_CACHE\":\"${SHARED_ROOT}/hf/transformers\",\"HF_ENDPOINT\":\"https://hf-mirror.com\",\"PYTHONUNBUFFERED\":\"1\",\"PYTHONPATH\":\"${CODE_PATH}:${SHARED_ROOT}/user/code\"}}' --no-wait -- ${SUBMIT_CMD}")"
printf "%s\n" "${SUBMIT_OUT}"
printf "%s\n" "${SUBMIT_OUT}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/logs/ray_job_submit.out'"
echo "${SUBMISSION_ID}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/config/ray_submission_id.txt'"
echo "[head] debug snapshot (post-submit)"
dexec "${HEAD_CONTAINER}" bash -lc "ray job list >'${JOB_DIR}/debug/ray_job_list_post.txt' 2>&1 || true"
dexec "${HEAD_CONTAINER}" bash -lc "ray status >'${JOB_DIR}/debug/ray_status_post.txt' 2>&1 || true"
echo "submitted: ${SUBMISSION_ID}"
echo "job dir: ${JOB_DIR}"

View File

@ -1,62 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
SUBMISSION_ID="${SUBMISSION_ID:-mvp11_sft_$(timestamp)_$RANDOM}"
JOB_DIR="${SHARED_ROOT}/jobs/${SUBMISSION_ID}"
MODEL_ID="${MODEL_ID:-Qwen/Qwen2.5-0.5B-Instruct}"
TRAIN_FILE="${SHARED_ROOT}/datasets/gsm8k_sft/train.parquet"
VAL_FILE="${SHARED_ROOT}/datasets/gsm8k_sft/test.parquet"
CODE_PATH="${CODE_PATH:-${SHARED_ROOT}/common/code/verl/verl_repo}"
TOTAL_TRAINING_STEPS="${TOTAL_TRAINING_STEPS:-10}"
SAVE_FREQ="${SAVE_FREQ:-10}"
SFT_DRIVER_DEVICE="${SFT_DRIVER_DEVICE:-cpu}"
echo "[head] create job dir: ${JOB_DIR}"
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p '${JOB_DIR}'/{logs,checkpoints,config,debug}"
SUBMIT_CMD="python3 -m verl.trainer.sft_trainer_ray \
model.path=${MODEL_ID} \
data.train_files=${TRAIN_FILE} \
data.val_files=null \
data.train_batch_size=64 \
data.micro_batch_size_per_gpu=1 \
data.max_token_len_per_gpu=2048 \
data.max_length=1024 \
trainer.logger=console \
trainer.project_name=mvp11-sft \
trainer.experiment_name=${SUBMISSION_ID} \
trainer.total_epochs=1 \
trainer.total_training_steps=${TOTAL_TRAINING_STEPS} \
trainer.save_freq=${SAVE_FREQ} \
trainer.test_freq=-1 \
trainer.resume_mode=disable \
trainer.device=${SFT_DRIVER_DEVICE} \
trainer.default_local_dir=${JOB_DIR}/checkpoints \
trainer.nnodes=2 \
trainer.n_gpus_per_node=4 \
hydra.run.dir=${JOB_DIR}/logs/hydra"
printf "%s\n" "${SUBMIT_CMD}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/config/submit_cmd.txt'"
echo "[head] debug snapshot (pre-submit)"
dexec "${HEAD_CONTAINER}" bash -lc "ray status >'${JOB_DIR}/debug/ray_status_pre.txt' 2>&1 || true"
echo "[head] submit SFT via ray job submit (driver forced on worker)"
SUBMIT_OUT="$(dexec "${HEAD_CONTAINER}" bash -lc "ray job submit --address='${RAY_DASHBOARD_ADDR}' --submission-id='${SUBMISSION_ID}' --entrypoint-num-cpus=1 --entrypoint-resources='{\"worker_node\": 1}' --runtime-env-json='{\"env_vars\":{\"HF_HOME\":\"${SHARED_ROOT}/hf\",\"HUGGINGFACE_HUB_CACHE\":\"${SHARED_ROOT}/hf/hub\",\"TRANSFORMERS_CACHE\":\"${SHARED_ROOT}/hf/transformers\",\"HF_ENDPOINT\":\"https://hf-mirror.com\",\"PYTHONUNBUFFERED\":\"1\",\"RAY_ADDRESS\":\"auto\",\"PYTHONPATH\":\"${CODE_PATH}:${SHARED_ROOT}/user/code\"}}' --no-wait -- ${SUBMIT_CMD}")"
printf "%s\n" "${SUBMIT_OUT}"
printf "%s\n" "${SUBMIT_OUT}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/logs/ray_job_submit.out'"
echo "${SUBMISSION_ID}" | dexec "${HEAD_CONTAINER}" bash -lc "cat > '${JOB_DIR}/config/ray_submission_id.txt'"
echo "[head] debug snapshot (post-submit)"
dexec "${HEAD_CONTAINER}" bash -lc "ray job list >'${JOB_DIR}/debug/ray_job_list_post.txt' 2>&1 || true"
dexec "${HEAD_CONTAINER}" bash -lc "ray status >'${JOB_DIR}/debug/ray_status_post.txt' 2>&1 || true"
echo "submitted: ${SUBMISSION_ID}"
echo "job dir: ${JOB_DIR}"

View File

@ -1,17 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
SPEC_PATH="${1:-}"
if [[ -z "${SPEC_PATH}" ]]; then
echo "usage: $0 <spec_path_inside_container>" >&2
echo "example: $0 /workspace/mvp/v1.1/templates/ppo.json" >&2
exit 1
fi
# Submit from head container (required), but with driver forced onto worker via entrypoint resources in spec.
dexec "${HEAD_CONTAINER}" bash -lc "SHARED_ROOT='${SHARED_ROOT}' python3 /workspace/mvp/v1.1/submit_job.py --spec '${SPEC_PATH}' --no-wait"

View File

@ -1,86 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
CONFIG_PATH="${1:-/workspace/mvp/v1.1/py/configs/dev.yaml}"
TS="$(timestamp)"
BASE="/workspace/mvp/v1.1/py/jobspecs"
NNODES="${NNODES:-1}"
N_GPUS_PER_NODE="${N_GPUS_PER_NODE:-1}"
wait_job() {
local sid="$1"
echo "[head] wait: ${sid}"
while true; do
# Ray returns one of: PENDING/RUNNING/SUCCEEDED/FAILED/STOPPED
st="$(dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config '${CONFIG_PATH}' --action status --submission-id '${sid}'" | tr -d '\r' | tail -n 1)"
echo "[head] status: ${sid} -> ${st}"
case "${st}" in
*SUCCEEDED*)
return 0
;;
*FAILED*|*STOPPED*)
echo "[head] job not successful: ${sid} (${st})" >&2
echo "[head] last logs:"
dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config '${CONFIG_PATH}' --action logs --submission-id '${sid}' --tail 200" || true
return 1
;;
*)
sleep 10
;;
esac
done
}
show_precheck() {
local sid="$1"
echo "[head] verify precheck: ${sid}"
dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config '${CONFIG_PATH}' --action logs --submission-id '${sid}' --tail 2000 | egrep 'MVP_PRECHECK_VERL_FILE|MVP_PRECHECK_MARKER' || true"
}
make_spec() {
local tag="$1"
local code_path="$2"
local out_path="$3"
local sid="mvp11_ppo_${tag//./_}_${TS}"
dexec "${HEAD_CONTAINER}" bash -lc "cat > '${out_path}' <<'YAML'
workload: \"ppo\"
submission_id: \"${sid}\"
code_path: \"${code_path}\"
model_id: \"Qwen/Qwen2.5-0.5B-Instruct\"
train_file: \"/private/datasets/gsm8k/train.parquet\"
val_file: \"/private/datasets/gsm8k/test.parquet\"
nnodes: ${NNODES}
n_gpus_per_node: ${N_GPUS_PER_NODE}
total_epochs: 1
total_training_steps: 10
save_freq: 10
test_freq: -1
YAML"
echo "${sid}"
}
echo "[head] submit PPO sequentially with verl v0.6.0 then v0.6.1"
echo "[head] resources: nnodes=${NNODES} n_gpus_per_node=${N_GPUS_PER_NODE}"
sid0="$(make_spec "v0.6.0" "/private/common/code/verl/verl_v0.6.0" "${BASE}/tmp_ppo_verl_v0.6.0_${TS}.yaml")"
echo "[head] submit: ${sid0}"
dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config '${CONFIG_PATH}' --jobspec '${BASE}/tmp_ppo_verl_v0.6.0_${TS}.yaml' --action submit --no-wait"
wait_job "${sid0}"
show_precheck "${sid0}"
sid1="$(make_spec "v0.6.1" "/private/common/code/verl/verl_v0.6.1" "${BASE}/tmp_ppo_verl_v0.6.1_${TS}.yaml")"
echo "[head] submit: ${sid1}"
dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config '${CONFIG_PATH}' --jobspec '${BASE}/tmp_ppo_verl_v0.6.1_${TS}.yaml' --action submit --no-wait"
wait_job "${sid1}"
show_precheck "${sid1}"
echo "[head] done"
echo "submitted:"
echo " ${sid0} (verl v0.6.0)"
echo " ${sid1} (verl v0.6.1)"

View File

@ -2,14 +2,55 @@
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# shellcheck source=lib.sh
source "${SCRIPT_DIR}/lib.sh"
submit_and_wait() {
local jobspec_in_container="$1"
local sid
local out
echo "[host] submit via SDK: ${jobspec_in_container}"
out="$(dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --jobspec '${jobspec_in_container}' --action submit --no-wait" | tr -d '\r')"
sid="$(printf '%s\n' "${out}" | tail -n 1)"
if [[ -z "${sid}" ]]; then
echo "[host] failed to parse submission id from output:" >&2
printf '%s\n' "${out}" >&2
exit 1
fi
echo "[host] submitted: ${sid}"
while true; do
st="$(dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --action status --submission-id '${sid}'" | tr -d '\r' | tail -n 1)"
echo "[host] status: ${sid} -> ${st}"
case "${st}" in
*SUCCEEDED*)
return 0
;;
*FAILED*|*STOPPED*)
echo "[host] job failed: ${sid} (${st})" >&2
echo "[host] last logs:" >&2
dexec "${HEAD_CONTAINER}" bash -lc "python3 /workspace/mvp/v1.1/py/run.py --config /workspace/mvp/v1.1/py/configs/dev.yaml --action logs --submission-id '${sid}' --tail 200" >&2 || true
return 1
;;
*)
sleep 10
;;
esac
done
}
"${SCRIPT_DIR}/00_prereq_check.sh"
"${SCRIPT_DIR}/03_cleanup_v1_legacy.sh"
"${SCRIPT_DIR}/05_ensure_verl_repo.sh"
"${SCRIPT_DIR}/01_up.sh"
"${SCRIPT_DIR}/20_start_head.sh"
"${SCRIPT_DIR}/21_start_workers.sh"
"${SCRIPT_DIR}/30_prepare_data_and_model.sh"
"${SCRIPT_DIR}/40_submit_ppo_epoch1.sh"
"${SCRIPT_DIR}/41_submit_grpo_epoch1.sh"
"${SCRIPT_DIR}/42_submit_sft_minimal.sh"
"${SCRIPT_DIR}/12_install_py_deps.sh"
submit_and_wait /workspace/mvp/v1.1/py/jobspecs/ppo.yaml
submit_and_wait /workspace/mvp/v1.1/py/jobspecs/grpo.yaml
submit_and_wait /workspace/mvp/v1.1/py/jobspecs/sft.yaml
"${SCRIPT_DIR}/50_status.sh"

View File

@ -1,282 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
import os
import shlex
import subprocess
from datetime import datetime
from pathlib import Path
def _ts():
return datetime.now().strftime("%Y%m%d_%H%M%S")
def _expand(value: str) -> str:
return os.path.expandvars(value)
def _mkdir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def _write_text(path: Path, content: str) -> None:
_mkdir(path.parent)
path.write_text(content, encoding="utf-8")
def _write_json(path: Path, obj) -> None:
_mkdir(path.parent)
path.write_text(json.dumps(obj, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def _require(spec: dict, key: str):
if key not in spec:
raise SystemExit(f"missing required key in spec: {key}")
return spec[key]
def _default_submission_id(workload: str) -> str:
return f"mvp11_{workload}_{_ts()}_{os.getpid()}"
def _runtime_env(spec: dict) -> dict:
shared_root = _expand(_require(spec, "shared_root"))
code_path = _expand(_require(spec, "code_path"))
env_vars = dict(spec.get("runtime_env", {}).get("env_vars", {}))
env_vars.setdefault("HF_HOME", f"{shared_root}/hf")
env_vars.setdefault("HUGGINGFACE_HUB_CACHE", f"{shared_root}/hf/hub")
env_vars.setdefault("TRANSFORMERS_CACHE", f"{shared_root}/hf/transformers")
env_vars.setdefault("PYTHONUNBUFFERED", "1")
user_code = f"{shared_root}/user/code"
existing = env_vars.get("PYTHONPATH", "")
prefix = f"{code_path}:{user_code}"
env_vars["PYTHONPATH"] = f"{prefix}:{existing}" if existing else prefix
# Helpful marker for logs/debugging
env_vars.setdefault("MVP_CODE_PATH", code_path)
return {"env_vars": env_vars}
def _preflight_shell() -> str:
# Make multi-version/debugging observable in Ray job logs.
# `mvp_marker.py` is written by our snapshot script (optional); if missing, ignore.
py = r"""
import os
import sys
print("MVP_PRECHECK_PYTHON:", sys.executable)
print("MVP_PRECHECK_PYTHONPATH:", os.environ.get("PYTHONPATH"))
try:
import verl
print("MVP_PRECHECK_VERL_FILE:", getattr(verl, "__file__", None))
except Exception as e:
print("MVP_PRECHECK_VERL_IMPORT_ERROR:", repr(e))
try:
import mvp_marker
print("MVP_PRECHECK_MARKER:", getattr(mvp_marker, "MARKER", None))
except Exception as e:
print("MVP_PRECHECK_MARKER_MISSING:", repr(e))
"""
return f"python3 - <<'PY'\n{py.strip()}\nPY"
def _build_entrypoint(spec: dict, submission_id: str, job_dir: str) -> str:
workload = _require(spec, "workload")
model_id = _expand(_require(spec, "model_id"))
shared_root = _expand(_require(spec, "shared_root"))
if workload in ("ppo", "grpo"):
cfg = spec.get(workload, {})
train_file = _expand(cfg.get("train_file", f"{shared_root}/datasets/gsm8k/train.parquet"))
val_file = _expand(cfg.get("val_file", f"{shared_root}/datasets/gsm8k/test.parquet"))
nnodes = int(cfg.get("nnodes", 2))
gpus_per_node = int(cfg.get("n_gpus_per_node", 4))
total_epochs = int(cfg.get("total_epochs", 1))
total_steps = int(cfg.get("total_training_steps", 10))
save_freq = int(cfg.get("save_freq", 10))
test_freq = int(cfg.get("test_freq", -1))
algo_overrides = ""
if workload == "grpo":
algo_overrides = "algorithm.adv_estimator=grpo"
cmd = f"""python3 -m verl.trainer.main_ppo \
data.train_files={train_file} \
data.val_files={val_file} \
data.train_batch_size=256 \
data.max_prompt_length=512 \
data.max_response_length=512 \
actor_rollout_ref.model.path={model_id} \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.actor.ppo_mini_batch_size=64 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
actor_rollout_ref.rollout.name=sglang \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=8 \
actor_rollout_ref.rollout.tensor_model_parallel_size=1 \
actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=4 \
critic.optim.lr=1e-5 \
critic.model.path={model_id} \
critic.ppo_micro_batch_size_per_gpu=4 \
algorithm.kl_ctrl.kl_coef=0.001 \
{algo_overrides} \
trainer.logger=console \
trainer.val_before_train=False \
trainer.n_gpus_per_node={gpus_per_node} \
trainer.nnodes={nnodes} \
trainer.save_freq={save_freq} \
trainer.test_freq={test_freq} \
trainer.total_epochs={total_epochs} \
trainer.total_training_steps={total_steps} \
trainer.resume_mode=disable \
trainer.default_local_dir={job_dir}/checkpoints \
+ray_kwargs.ray_init.address=auto \
hydra.run.dir={job_dir}/logs/hydra"""
return "\n".join([_preflight_shell(), "exec " + cmd])
if workload == "sft":
cfg = spec.get("sft", {})
train_file = _expand(cfg.get("train_file", f"{shared_root}/datasets/gsm8k_sft/train.parquet"))
val_file = cfg.get("val_file", None)
nnodes = int(cfg.get("nnodes", 2))
gpus_per_node = int(cfg.get("n_gpus_per_node", 4))
total_epochs = int(cfg.get("total_epochs", 1))
total_steps = int(cfg.get("total_training_steps", 10))
save_freq = int(cfg.get("save_freq", 10))
device = cfg.get("device", "cpu")
val_override = "data.val_files=null" if val_file is None else f"data.val_files={_expand(val_file)}"
# Note: driver should not require CUDA under ray job submit (no entrypoint GPUs by default).
cmd = f"""python3 -m verl.trainer.sft_trainer_ray \
model.path={model_id} \
data.train_files={train_file} \
{val_override} \
data.train_batch_size=64 \
data.micro_batch_size_per_gpu=1 \
data.max_token_len_per_gpu=2048 \
data.max_length=1024 \
trainer.logger=console \
trainer.project_name=mvp11-sft \
trainer.experiment_name={submission_id} \
trainer.total_epochs={total_epochs} \
trainer.total_training_steps={total_steps} \
trainer.save_freq={save_freq} \
trainer.test_freq=-1 \
trainer.resume_mode=disable \
trainer.device={device} \
trainer.default_local_dir={job_dir}/checkpoints \
trainer.nnodes={nnodes} \
trainer.n_gpus_per_node={gpus_per_node} \
hydra.run.dir={job_dir}/logs/hydra"""
return "\n".join([_preflight_shell(), "exec " + cmd])
raise SystemExit(f"unsupported workload: {workload}")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--spec", required=True, help="Path to JobSpec json (inside this container)")
parser.add_argument("--no-wait", action="store_true", help="Submit and return immediately")
parser.add_argument("--dry-run", action="store_true", help="Only print the submit command")
args = parser.parse_args()
spec_path = Path(args.spec)
spec = json.loads(spec_path.read_text(encoding="utf-8"))
shared_root = _expand(_require(spec, "shared_root"))
workload = _require(spec, "workload")
submission_id = spec.get("submission_id") or _default_submission_id(workload)
job_dir = f"{shared_root}/jobs/{submission_id}"
ray_cfg = _require(spec, "ray")
ray_addr = ray_cfg.get("address", "http://127.0.0.1:8265")
entrypoint_num_cpus = ray_cfg.get("entrypoint_num_cpus", 1)
entrypoint_resources = ray_cfg.get("entrypoint_resources", {"worker_node": 1})
runtime_env = _runtime_env(spec)
entrypoint = _build_entrypoint(spec, submission_id=submission_id, job_dir=job_dir)
# Prepare job dir
job_root = Path(job_dir)
_mkdir(job_root / "config")
_mkdir(job_root / "logs")
_mkdir(job_root / "checkpoints")
_mkdir(job_root / "debug")
# Snapshot config for audit/debug
_write_json(job_root / "config" / "job_spec.json", spec)
_write_json(job_root / "config" / "runtime_env.json", runtime_env)
_write_text(job_root / "config" / "ray_submission_id.txt", submission_id + "\n")
submit_cmd_txt = "\n".join(
[
"ray job submit",
f" --address={ray_addr}",
f" --submission-id={submission_id}",
f" --entrypoint-num-cpus={entrypoint_num_cpus}",
f" --entrypoint-resources={json.dumps(entrypoint_resources)}",
f" --runtime-env-json=<see runtime_env.json>",
f" {'--no-wait' if args.no_wait else ''}",
" -- bash -lc '<entrypoint>'",
]
)
_write_text(job_root / "config" / "submit_cmd.txt", submit_cmd_txt + "\n")
# Debug snapshot (pre-submit)
try:
pre = subprocess.run(["ray", "status"], capture_output=True, text=True, check=False)
_write_text(job_root / "debug" / "ray_status_pre.txt", (pre.stdout or "") + (pre.stderr or ""))
except FileNotFoundError:
_write_text(job_root / "debug" / "ray_status_pre.txt", "ray cli not found\n")
submit_args = [
"ray",
"job",
"submit",
"--address",
ray_addr,
"--submission-id",
submission_id,
"--entrypoint-num-cpus",
str(entrypoint_num_cpus),
"--entrypoint-resources",
json.dumps(entrypoint_resources),
"--runtime-env-json",
json.dumps(runtime_env),
]
if args.no_wait:
submit_args.append("--no-wait")
submit_args += ["--", "bash", "-lc", entrypoint]
if args.dry_run:
print(" ".join(shlex.quote(x) for x in submit_args))
return 0
proc = subprocess.run(submit_args, capture_output=True, text=True, check=False)
_write_text(job_root / "logs" / "ray_job_submit.out", (proc.stdout or "") + (proc.stderr or ""))
print(proc.stdout, end="")
if proc.returncode != 0:
print(proc.stderr, end="", file=os.sys.stderr)
return proc.returncode
# Debug snapshot (post-submit)
try:
post = subprocess.run(["ray", "job", "list", "--log-style=record", "-v"], capture_output=True, text=True, check=False)
_write_text(job_root / "debug" / "ray_job_list_post.txt", (post.stdout or "") + (post.stderr or ""))
post2 = subprocess.run(["ray", "status"], capture_output=True, text=True, check=False)
_write_text(job_root / "debug" / "ray_status_post.txt", (post2.stdout or "") + (post2.stderr or ""))
except FileNotFoundError:
pass
print(f"job_dir: {job_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -1,31 +0,0 @@
{
"submission_id": "",
"workload": "grpo",
"shared_root": "${SHARED_ROOT}",
"code_path": "${SHARED_ROOT}/common/code/verl/verl_repo",
"model_id": "Qwen/Qwen2.5-0.5B-Instruct",
"grpo": {
"train_file": "${SHARED_ROOT}/datasets/gsm8k/train.parquet",
"val_file": "${SHARED_ROOT}/datasets/gsm8k/test.parquet",
"nnodes": 2,
"n_gpus_per_node": 4,
"total_epochs": 1,
"total_training_steps": 10,
"save_freq": 10,
"test_freq": -1
},
"ray": {
"address": "http://127.0.0.1:8265",
"entrypoint_num_cpus": 1,
"entrypoint_resources": {
"worker_node": 1
}
},
"runtime_env": {
"env_vars": {
"HF_ENDPOINT": "https://hf-mirror.com",
"PYTHONUNBUFFERED": "1"
}
}
}

View File

@ -1,31 +0,0 @@
{
"submission_id": "",
"workload": "ppo",
"shared_root": "${SHARED_ROOT}",
"code_path": "${SHARED_ROOT}/common/code/verl/verl_repo",
"model_id": "Qwen/Qwen2.5-0.5B-Instruct",
"ppo": {
"train_file": "${SHARED_ROOT}/datasets/gsm8k/train.parquet",
"val_file": "${SHARED_ROOT}/datasets/gsm8k/test.parquet",
"nnodes": 2,
"n_gpus_per_node": 4,
"total_epochs": 1,
"total_training_steps": 10,
"save_freq": 10,
"test_freq": -1
},
"ray": {
"address": "http://127.0.0.1:8265",
"entrypoint_num_cpus": 1,
"entrypoint_resources": {
"worker_node": 1
}
},
"runtime_env": {
"env_vars": {
"HF_ENDPOINT": "https://hf-mirror.com",
"PYTHONUNBUFFERED": "1"
}
}
}

View File

@ -1,32 +0,0 @@
{
"submission_id": "",
"workload": "sft",
"shared_root": "${SHARED_ROOT}",
"code_path": "${SHARED_ROOT}/common/code/verl/verl_repo",
"model_id": "Qwen/Qwen2.5-0.5B-Instruct",
"sft": {
"train_file": "${SHARED_ROOT}/datasets/gsm8k_sft/train.parquet",
"val_file": null,
"nnodes": 2,
"n_gpus_per_node": 4,
"total_epochs": 1,
"total_training_steps": 10,
"save_freq": 10,
"device": "cpu"
},
"ray": {
"address": "http://127.0.0.1:8265",
"entrypoint_num_cpus": 1,
"entrypoint_resources": {
"worker_node": 1
}
},
"runtime_env": {
"env_vars": {
"HF_ENDPOINT": "https://hf-mirror.com",
"PYTHONUNBUFFERED": "1",
"RAY_ADDRESS": "auto"
}
}
}