#!/usr/bin/env bash set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # shellcheck source=lib.sh source "${SCRIPT_DIR}/lib.sh" MODEL_ID="${MODEL_ID:-Qwen/Qwen2.5-0.5B-Instruct}" PPO_DATA_DIR="${SHARED_ROOT}/datasets/gsm8k" SFT_DATA_DIR="${SHARED_ROOT}/datasets/gsm8k_sft" CODE_SNAPSHOT_DIR="${SHARED_ROOT}/common/code/verl/verl_repo" echo "[head] ensure dataset dirs exist" dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p '${PPO_DATA_DIR}' '${SFT_DATA_DIR}'" echo "[head] prepare PPO dataset (gsm8k RL parquet) -> ${PPO_DATA_DIR}" dexec "${HEAD_CONTAINER}" bash -lc "if [[ -f '${PPO_DATA_DIR}/train.parquet' && -f '${PPO_DATA_DIR}/test.parquet' ]]; then echo 'ppo_dataset_exists: skip'; else python3 /workspace/verl/examples/data_preprocess/gsm8k.py --local_save_dir '${PPO_DATA_DIR}'; fi" echo "[head] prepare SFT dataset (gsm8k messages parquet) -> ${SFT_DATA_DIR}" if dexec "${HEAD_CONTAINER}" bash -lc "test -f '${SFT_DATA_DIR}/train.parquet'"; then echo "[head] sft_dataset_exists: skip" else SFT_PY_CODE="$(cat <<'PY' import os import pandas as pd from datasets import load_dataset out_dir = os.environ["SFT_DATA_DIR"] os.makedirs(out_dir, exist_ok=True) ds = load_dataset("openai/gsm8k", "main") instruction = "Let's think step by step and output the final answer after \"####\"." def to_messages(example): q = example["question"].strip() + " " + instruction a = example["answer"] return { "messages": [ {"role": "user", "content": q}, {"role": "assistant", "content": a}, ] } train = ds["train"].map(to_messages, remove_columns=ds["train"].column_names) test = ds["test"].map(to_messages, remove_columns=ds["test"].column_names) pd.DataFrame(train).to_parquet(os.path.join(out_dir, "train.parquet"), index=False) pd.DataFrame(test).to_parquet(os.path.join(out_dir, "test.parquet"), index=False) print("sft_dataset_written_ok:", out_dir) PY )" printf "%s\n" "${SFT_PY_CODE}" | dexec "${HEAD_CONTAINER}" bash -lc "SFT_DATA_DIR='${SFT_DATA_DIR}' python3 -" fi echo "[head] ensure model cached to persistent HF_HOME (idempotent) -> ${MODEL_ID}" PY_CODE="$(cat <<'PY' import os model_id = os.environ["MODEL_ID"] hf_home = os.environ.get("HF_HOME", "/private/hf") os.environ.setdefault("HF_HOME", hf_home) os.environ.setdefault("HUGGINGFACE_HUB_CACHE", os.path.join(hf_home, "hub")) os.environ.setdefault("TRANSFORMERS_CACHE", os.path.join(hf_home, "transformers")) from huggingface_hub import snapshot_download try: snapshot_download(repo_id=model_id, local_files_only=True) print("model_cache_exists: skip", model_id) except Exception: print("model_cache_missing: downloading", model_id) snapshot_download(repo_id=model_id) print("model_cached_ok:", model_id) PY )" printf "%s\n" "${PY_CODE}" | dexec "${HEAD_CONTAINER}" bash -lc "MODEL_ID='${MODEL_ID}' python3 -" echo "[head] snapshot verl repo into shared common code path (idempotent best-effort) -> ${CODE_SNAPSHOT_DIR}" dexec "${HEAD_CONTAINER}" bash -lc "mkdir -p '${CODE_SNAPSHOT_DIR}' && if command -v rsync >/dev/null 2>&1; then rsync -a --delete /workspace/verl/ '${CODE_SNAPSHOT_DIR}/'; else rm -rf '${CODE_SNAPSHOT_DIR:?}/'* && cp -a /workspace/verl/. '${CODE_SNAPSHOT_DIR}/'; fi && echo 'code_snapshot_ok'"