123 lines
4.8 KiB
Bash
Executable File
123 lines
4.8 KiB
Bash
Executable File
#!/usr/bin/env bash
|
|
set -euo pipefail
|
|
|
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
|
# shellcheck source=lib.sh
|
|
source "${SCRIPT_DIR}/lib.sh"
|
|
|
|
MODEL_ID="${MODEL_ID:-Qwen/Qwen2.5-0.5B-Instruct}"
|
|
|
|
PPO_DATA_DIR="${SHARED_ROOT}/datasets/gsm8k"
|
|
SFT_DATA_DIR="${SHARED_ROOT}/datasets/gsm8k_sft"
|
|
|
|
CODE_SNAPSHOT_DIR="${SHARED_ROOT}/common/code/verl/verl_repo"
|
|
COMMON_DIR="${SHARED_ROOT}/common"
|
|
COMMON_DATASETS_DIR="${SHARED_ROOT}/common/datasets"
|
|
COMMON_HF_DIR="${SHARED_ROOT}/common/hf"
|
|
|
|
echo "[head] ensure dataset dirs exist"
|
|
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p '${PPO_DATA_DIR}' '${SFT_DATA_DIR}'"
|
|
|
|
echo "[head] ensure v2.5 common links (idempotent)"
|
|
# In existing deployments, /private/common/{datasets,hf} may already exist as directories (not symlinks).
|
|
# For v2.5 taskspecs, we only require:
|
|
# /private/common/datasets/gsm8k -> /private/datasets/gsm8k
|
|
# /private/common/datasets/gsm8k_sft -> /private/datasets/gsm8k_sft
|
|
# /private/common/hf/{hub,transformers} -> /private/hf/{hub,transformers} (best-effort)
|
|
dexec "${HEAD_CONTAINER}" bash -lc "
|
|
set -euo pipefail
|
|
mkdir -p '${COMMON_DIR}' '${COMMON_DATASETS_DIR}' '${COMMON_HF_DIR}'
|
|
ln -sfn '${SHARED_ROOT}/datasets/gsm8k' '${COMMON_DATASETS_DIR}/gsm8k'
|
|
ln -sfn '${SHARED_ROOT}/datasets/gsm8k_sft' '${COMMON_DATASETS_DIR}/gsm8k_sft'
|
|
mkdir -p '${SHARED_ROOT}/hf/hub' '${SHARED_ROOT}/hf/transformers'
|
|
ln -sfn '${SHARED_ROOT}/hf/hub' '${COMMON_HF_DIR}/hub'
|
|
ln -sfn '${SHARED_ROOT}/hf/transformers' '${COMMON_HF_DIR}/transformers'
|
|
echo 'common_links_ok'
|
|
"
|
|
|
|
echo "[head] prepare PPO dataset (gsm8k RL parquet) -> ${PPO_DATA_DIR}"
|
|
dexec "${HEAD_CONTAINER}" bash -lc "if [[ -f '${PPO_DATA_DIR}/train.parquet' && -f '${PPO_DATA_DIR}/test.parquet' ]]; then echo 'ppo_dataset_exists: skip'; else PYTHONPATH=\"/workspace/verl:${PYTHONPATH:-}\" python3 /workspace/verl/examples/data_preprocess/gsm8k.py --local_save_dir '${PPO_DATA_DIR}'; fi"
|
|
|
|
echo "[head] prepare SFT dataset (gsm8k messages parquet) -> ${SFT_DATA_DIR}"
|
|
if dexec "${HEAD_CONTAINER}" bash -lc "test -f '${SFT_DATA_DIR}/train.parquet'"; then
|
|
echo "[head] sft_dataset_exists: skip"
|
|
else
|
|
SFT_PY_CODE="$(cat <<'PY'
|
|
import os
|
|
|
|
import pandas as pd
|
|
from datasets import load_dataset
|
|
|
|
out_dir = os.environ["SFT_DATA_DIR"]
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
ds = load_dataset("openai/gsm8k", "main")
|
|
|
|
instruction = "Let's think step by step and output the final answer after \"####\"."
|
|
|
|
def to_messages(example):
|
|
q = example["question"].strip() + " " + instruction
|
|
a = example["answer"]
|
|
return {
|
|
"messages": [
|
|
{"role": "user", "content": q},
|
|
{"role": "assistant", "content": a},
|
|
]
|
|
}
|
|
|
|
train = ds["train"].map(to_messages, remove_columns=ds["train"].column_names)
|
|
test = ds["test"].map(to_messages, remove_columns=ds["test"].column_names)
|
|
|
|
pd.DataFrame(train).to_parquet(os.path.join(out_dir, "train.parquet"), index=False)
|
|
pd.DataFrame(test).to_parquet(os.path.join(out_dir, "test.parquet"), index=False)
|
|
|
|
print("sft_dataset_written_ok:", out_dir)
|
|
PY
|
|
)"
|
|
printf "%s\n" "${SFT_PY_CODE}" | dexec "${HEAD_CONTAINER}" bash -lc "SFT_DATA_DIR='${SFT_DATA_DIR}' python3 -"
|
|
fi
|
|
|
|
echo "[head] ensure model cached to persistent HF_HOME (idempotent) -> ${MODEL_ID}"
|
|
PY_CODE="$(cat <<'PY'
|
|
import os
|
|
|
|
model_id = os.environ["MODEL_ID"]
|
|
link_name = model_id.replace("/", "--")
|
|
|
|
hf_home = os.environ.get("HF_HOME", "/private/hf")
|
|
os.environ.setdefault("HF_HOME", hf_home)
|
|
os.environ.setdefault("HUGGINGFACE_HUB_CACHE", os.path.join(hf_home, "hub"))
|
|
os.environ.setdefault("TRANSFORMERS_CACHE", os.path.join(hf_home, "transformers"))
|
|
|
|
from huggingface_hub import snapshot_download
|
|
|
|
try:
|
|
path = snapshot_download(repo_id=model_id, local_files_only=True)
|
|
print("model_cache_exists: skip", model_id, path)
|
|
except Exception:
|
|
print("model_cache_missing: downloading", model_id)
|
|
path = snapshot_download(repo_id=model_id)
|
|
print("model_cached_ok:", model_id, path)
|
|
|
|
# v3.0 path policy: use a stable symlink under /private/common/models/...
|
|
common_models_dir = "/private/common/models"
|
|
os.makedirs(common_models_dir, exist_ok=True)
|
|
dst = os.path.join(common_models_dir, link_name)
|
|
try:
|
|
if os.path.islink(dst) or os.path.exists(dst):
|
|
os.unlink(dst)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
os.symlink(path, dst)
|
|
print("model_common_link_ok:", dst)
|
|
except Exception as e:
|
|
print("WARN: model_common_link_failed:", dst, repr(e))
|
|
PY
|
|
)"
|
|
|
|
printf "%s\n" "${PY_CODE}" | dexec "${HEAD_CONTAINER}" bash -lc "MODEL_ID='${MODEL_ID}' python3 -"
|
|
|
|
echo "[head] snapshot verl repo into shared common code path (idempotent best-effort) -> ${CODE_SNAPSHOT_DIR}"
|
|
dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p '${CODE_SNAPSHOT_DIR}' && if command -v rsync >/dev/null 2>&1; then rsync -a --delete /workspace/verl/ '${CODE_SNAPSHOT_DIR}/'; else rm -rf '${CODE_SNAPSHOT_DIR:?}/'* && cp -a /workspace/verl/. '${CODE_SNAPSHOT_DIR}/'; fi && echo 'code_snapshot_ok'"
|