diff --git a/scripts/batch_launch_online.sh b/scripts/batch_launch_online.sh new file mode 100755 index 0000000..3424c40 --- /dev/null +++ b/scripts/batch_launch_online.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +export HALO_DIR=/home/sl2998/workspace/HALOs + +for v in 1 2 3; do + sbatch ${HALO_DIR}/scripts/launch_llama_instruct_dpo_online_v${v}.sh 0.1 1e-6 + sbatch ${HALO_DIR}/scripts/launch_llama_instruct_kto_online_v${v}.sh 0.5 1e-6 + sbatch ${HALO_DIR}/scripts/launch_llama_instruct_ppo_online_v${v}.sh 0.025 1e-6 0.7 + sbatch ${HALO_DIR}/scripts/launch_llama_instruct_grpo_online_v${v}.sh 0.005 1e-6 0.005 1 1 +done diff --git a/scripts/launch_llama_instruct_dpo_online_v1.sh b/scripts/launch_llama_instruct_dpo_online_v1.sh new file mode 100755 index 0000000..8f6cb5f --- /dev/null +++ b/scripts/launch_llama_instruct_dpo_online_v1.sh @@ -0,0 +1,136 @@ +#!/bin/bash +#SBATCH --job-name=llama-odpo-v1 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=4 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 +#SBATCH --output=logs/llama-odpo-v1-%j.out +#SBATCH --error=logs/llama-odpo-v1-%j.err + + +BETA=$1 +LR=$2 +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=1024 + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:False +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip show vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export HALO_DIR=/home/sl2998/workspace/HALOs +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-dpo-online-prompt-v1 + +if [ ! -d \$CACHE_DIR ]; then + mkdir -p \$CACHE_DIR +fi + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-dpo-prompt-v1-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v1 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train DPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=dpo ++loss.beta=${BETA} model=llama exp_name=\$EXP_NAME \ + train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] ++lr=${LR} \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++model.batch_size=4 ++model.gradient_accumulation_steps=8 ++model.eval_batch_size=4 ++model.max_grad_norm=0.1 \ + ++online=true ++sync_reference=true + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done + +" \ No newline at end of file diff --git a/scripts/launch_llama_instruct_dpo_online_v2.sh b/scripts/launch_llama_instruct_dpo_online_v2.sh new file mode 100755 index 0000000..ac363ba --- /dev/null +++ b/scripts/launch_llama_instruct_dpo_online_v2.sh @@ -0,0 +1,135 @@ +#!/bin/bash +#SBATCH --job-name=llama-odpo-v2 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 +#SBATCH --output=logs/llama-odpo-v2-%j.out +#SBATCH --error=logs/llama-odpo-v2-%j.err + + +BETA=$1 +LR=$2 +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=1024 + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:False +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip show vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-dpo-online-prompt-v2 + +if [ ! -d \$CACHE_DIR ]; then + mkdir -p \$CACHE_DIR +fi + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-dpo-prompt-v2-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v2 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train DPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=dpo ++loss.beta=${BETA} model=llama exp_name=\$EXP_NAME \ + train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] ++lr=${LR} \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++model.batch_size=4 ++model.gradient_accumulation_steps=8 ++model.eval_batch_size=4 ++model.max_grad_norm=0.1 \ + ++online=true ++sync_reference=true + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done + +" diff --git a/scripts/launch_llama_instruct_dpo_online_v3.sh b/scripts/launch_llama_instruct_dpo_online_v3.sh new file mode 100755 index 0000000..d2014de --- /dev/null +++ b/scripts/launch_llama_instruct_dpo_online_v3.sh @@ -0,0 +1,130 @@ +#!/bin/bash +#SBATCH --job-name=llama-odpo-v3 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 +#SBATCH --output=logs/llama-odpo-v3-%j.out +#SBATCH --error=logs/llama-odpo-v3-%j.err + +BETA=$1 +LR=$2 +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=1024 + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:False +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-dpo-online-prompt-v3 + +mkdir \$CACHE_DIR + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-dpo-prompt-v3-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v1 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train DPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=dpo ++loss.beta=${BETA} model=llama exp_name=\$EXP_NAME \ + train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] ++lr=${LR} \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++model.batch_size=4 ++model.gradient_accumulation_steps=8 ++model.eval_batch_size=4 ++model.max_grad_norm=0.1 \ + ++online=true ++sync_reference=true + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done + +" diff --git a/scripts/launch_llama_instruct_grpo_online_v1.sh b/scripts/launch_llama_instruct_grpo_online_v1.sh new file mode 100644 index 0000000..ec3ce67 --- /dev/null +++ b/scripts/launch_llama_instruct_grpo_online_v1.sh @@ -0,0 +1,158 @@ +#!/bin/bash +#SBATCH --job-name=llama-ogrpo-v1 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --output=logs/llama-ogrpo-v1-%j.out +#SBATCH --error=logs/llama-ogrpo-v1-%j.err +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 + + +BETA=$1 # 0.005 +LR=$2 # 1e-6 +EPS=$3 # 0.005 +EPOCHS=$4 # 1 +GRADACC=$5 # 1 + +# Function to find an available port +find_free_port() { + local port + while true; do + # Generate a random port number between 20000 and 65000 + port=$(shuf -i 29500-29510 -n 1) + # Check if the port is in use + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment and print diagnostic information +# very important that this is run within srun for training to work!!! +init_env() { + # Load necessary modules (adjust as needed for your system) + module load anaconda3/2024.2 + + # Activate your conda environment + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + + echo "Master node: $MASTER_ADDR" + echo "Number of nodes: $SLURM_JOB_NUM_NODES" + echo "GPUs per node: $SLURM_GPUS_PER_NODE" +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip install --upgrade vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-grpo-online-prompt-v1 + +if [ ! -d \$CACHE_DIR ]; then + mkdir -p \$CACHE_DIR +fi + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-grpo-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v1 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train GRPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=grpo model=llama train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] exp_name=\$EXP_NAME \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++lr=${LR} \ + ++loss.beta=${BETA} ++loss.epsilon=${EPS} \ + ++humanline=false ++n_epochs=${EPOCHS} ++sync_reference=true ++model.max_grad_norm=0.1 \ + ++model.batch_size=32 ++model.gradient_accumulation_steps=${GRADACC} ++model.eval_batch_size=32 + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done + +# lm_eval --model hf \ +# --model_args pretrained=\$CKPT,tokenizer=\$CKPT,parallelize=True \ +# --tasks arc_easy,arc_challenge,winogrande,bbh_cot_fewshot,gsm8k_cot \ +# --batch_size 4 + +python -m train.sample \$CKPT --gpu_count 4 --output_file outputs/\$EXP_NAME.json + +ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + alpaca_eval evaluate --annotators_config /home/sl2998/workspace/HALOs/alpaca_eval_gpt-4.1.yaml --model_outputs=outputs/\$EXP_NAME.json \" + +" diff --git a/scripts/launch_llama_instruct_grpo_online_v2.sh b/scripts/launch_llama_instruct_grpo_online_v2.sh new file mode 100644 index 0000000..fb7af3e --- /dev/null +++ b/scripts/launch_llama_instruct_grpo_online_v2.sh @@ -0,0 +1,159 @@ +#!/bin/bash +#SBATCH --job-name=llama-ogrpo-v2 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --output=logs/llama-ogrpo-v2-%j.out +#SBATCH --error=logs/llama-ogrpo-v2-%j.err +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 + + +BETA=$1 # 0.005 +LR=$2 # 1e-6 +EPS=$3 # 0.005 +EPOCHS=$4 # 1 +GRADACC=$5 # 1 + +# Function to find an available port +find_free_port() { + local port + while true; do + # Generate a random port number between 20000 and 65000 + port=$(shuf -i 29500-29510 -n 1) + # Check if the port is in use + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment and print diagnostic information +# very important that this is run within srun for training to work!!! +init_env() { + # Load necessary modules (adjust as needed for your system) + module load anaconda3/2024.2 + + # Activate your conda environment + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + + echo "Master node: $MASTER_ADDR" + echo "Number of nodes: $SLURM_JOB_NUM_NODES" + echo "GPUs per node: $SLURM_GPUS_PER_NODE" +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip show vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-grpo-online-prompt-v2 + +if [ ! -d \$CACHE_DIR ]; then + mkdir -p \$CACHE_DIR +fi + + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-grpo-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v2 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train GRPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=grpo model=llama train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] exp_name=\$EXP_NAME \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++lr=${LR} \ + ++loss.beta=${BETA} ++loss.epsilon=${EPS} \ + ++humanline=false ++n_epochs=${EPOCHS} ++sync_reference=true ++model.max_grad_norm=0.1 \ + ++model.batch_size=32 ++model.gradient_accumulation_steps=${GRADACC} ++model.eval_batch_size=32 + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done + +# lm_eval --model hf \ +# --model_args pretrained=\$CKPT,tokenizer=\$CKPT,parallelize=True \ +# --tasks arc_easy,arc_challenge,winogrande,bbh_cot_fewshot,gsm8k_cot \ +# --batch_size 4 + +python -m train.sample \$CKPT --gpu_count 4 --output_file outputs/\$EXP_NAME.json + +ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + alpaca_eval evaluate --annotators_config /home/sl2998/workspace/HALOs/alpaca_eval_gpt-4.1.yaml --model_outputs=outputs/\$EXP_NAME.json \" + +" \ No newline at end of file diff --git a/scripts/launch_llama_instruct_grpo_online_v3.sh b/scripts/launch_llama_instruct_grpo_online_v3.sh new file mode 100644 index 0000000..c84dddb --- /dev/null +++ b/scripts/launch_llama_instruct_grpo_online_v3.sh @@ -0,0 +1,159 @@ +#!/bin/bash +#SBATCH --job-name=llama-ogrpo-v3 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --output=logs/llama-ogrpo-v3-%j.out +#SBATCH --error=logs/llama-ogrpo-v3-%j.err +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 + + +BETA=$1 # 0.005 +LR=$2 # 1e-6 +EPS=$3 # 0.005 +EPOCHS=$4 # 1 +GRADACC=$5 # 1 + +# Function to find an available port +find_free_port() { + local port + while true; do + # Generate a random port number between 20000 and 65000 + port=$(shuf -i 29500-29510 -n 1) + # Check if the port is in use + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment and print diagnostic information +# very important that this is run within srun for training to work!!! +init_env() { + # Load necessary modules (adjust as needed for your system) + module load anaconda3/2024.2 + + # Activate your conda environment + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + + echo "Master node: $MASTER_ADDR" + echo "Number of nodes: $SLURM_JOB_NUM_NODES" + echo "GPUs per node: $SLURM_GPUS_PER_NODE" +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip show vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-grpo-online-prompt-v3 + +if [ ! -d \$CACHE_DIR ]; then + mkdir -p \$CACHE_DIR +fi + + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-grpo-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v3 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train GRPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=grpo model=llama train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] exp_name=\$EXP_NAME \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++lr=${LR} \ + ++loss.beta=${BETA} ++loss.epsilon=${EPS} \ + ++humanline=false ++n_epochs=${EPOCHS} ++sync_reference=true ++model.max_grad_norm=0.1 \ + ++model.batch_size=32 ++model.gradient_accumulation_steps=${GRADACC} ++model.eval_batch_size=32 + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done + +# lm_eval --model hf \ +# --model_args pretrained=\$CKPT,tokenizer=\$CKPT,parallelize=True \ +# --tasks arc_easy,arc_challenge,winogrande,bbh_cot_fewshot,gsm8k_cot \ +# --batch_size 4 + +python -m train.sample \$CKPT --gpu_count 4 --output_file outputs/\$EXP_NAME.json + +ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + alpaca_eval evaluate --annotators_config /home/sl2998/workspace/HALOs/alpaca_eval_gpt-4.1.yaml --model_outputs=outputs/\$EXP_NAME.json \" + +" \ No newline at end of file diff --git a/scripts/launch_llama_instruct_kto_online_v1.sh b/scripts/launch_llama_instruct_kto_online_v1.sh new file mode 100644 index 0000000..3edaffe --- /dev/null +++ b/scripts/launch_llama_instruct_kto_online_v1.sh @@ -0,0 +1,132 @@ +#!/bin/bash +#SBATCH --job-name=llama-okto-v1 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --output=logs/llama-okto-v1-%j.out +#SBATCH --error=logs/llama-okto-v1-%j.err +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 + + +BETA=$1 +LR=$2 +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=1024 + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip install --upgrade vllm +pip show vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-kto-online-prompt-v1 + +mkdir \$CACHE_DIR + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-kto-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v1 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train KTO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=kto ++loss.beta=${BETA} model=llama exp_name=\$EXP_NAME \ + train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] ++lr=${LR} \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++model.batch_size=8 ++model.gradient_accumulation_steps=4 ++model.eval_batch_size=8 ++model.max_grad_norm=0.1 \ + ++online=true ++sync_reference=true + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done +" diff --git a/scripts/launch_llama_instruct_kto_online_v2.sh b/scripts/launch_llama_instruct_kto_online_v2.sh new file mode 100644 index 0000000..4017262 --- /dev/null +++ b/scripts/launch_llama_instruct_kto_online_v2.sh @@ -0,0 +1,131 @@ +#!/bin/bash +#SBATCH --job-name=llama-okto-v2 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --output=logs/llama-okto-v2-%j.out +#SBATCH --error=logs/llama-okto-v2-%j.err +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 + + +BETA=$1 +LR=$2 +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=1024 + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip install --upgrade vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-kto-online-prompt-v2 + +mkdir \$CACHE_DIR + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-kto-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v2 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train KTO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=kto ++loss.beta=${BETA} model=llama exp_name=\$EXP_NAME \ + train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] ++lr=${LR} \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++model.batch_size=8 ++model.gradient_accumulation_steps=4 ++model.eval_batch_size=8 ++model.max_grad_norm=0.1 \ + ++online=true ++sync_reference=true + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done +" diff --git a/scripts/launch_llama_instruct_kto_online_v3.sh b/scripts/launch_llama_instruct_kto_online_v3.sh new file mode 100644 index 0000000..0ed49fe --- /dev/null +++ b/scripts/launch_llama_instruct_kto_online_v3.sh @@ -0,0 +1,131 @@ +#!/bin/bash +#SBATCH --job-name=llama-okto-v3 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --output=logs/llama-okto-v3-%j.out +#SBATCH --error=logs/llama-okto-v3-%j.err +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 + + +BETA=$1 +LR=$2 +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=1024 + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip install --upgrade vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-instruct-kto-online-prompt-v3 + +mkdir \$CACHE_DIR + +CURRENT_CKPT=\$MODEL_PATH +CUMULATIVE_PROMPTS=0 + +while [ \$CUMULATIVE_PROMPTS -lt ${TOTAL_PROMPTS} ]; do + END_CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + SAMPLES_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_samples.json + DATA_FILE=\${CACHE_DIR}/\${CUMULATIVE_PROMPTS}_\${END_CUMULATIVE_PROMPTS}_data.json + + # if creating a pairwise dataset, you need to oversample, since ~half of pairs will be discarded (e.g., both outputs have the same score) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --mode train \ + --split train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --num_epochs 1 + PYTHON_EXIT_CODE=\$? + + if [ \$PYTHON_EXIT_CODE -eq 0 ]; then + echo \"Training on \$CUMULATIVE_PROMPTS through \$END_CUMULATIVE_PROMPTS prompts ... \" + + EXP_NAME=llama3-8B-instruct-kto-\${CUMULATIVE_PROMPTS} + + # Label samples with API (must ssh into the login node for internet access) + if [ ! -f \$DATA_FILE ]; then + echo \"Labeling samples with API...\" + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v2 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + else + echo \"\$DATA_FILE already exists, skipping labeling...\" + fi + + if [ \$CUMULATIVE_PROMPTS -eq 0 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=null \" + else + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT ++model.from_checkpoint=\$CURRENT_CKPT \" + fi + + # Train KTO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=kto ++loss.beta=${BETA} model=llama exp_name=\$EXP_NAME \ + train_datasets=[\$DATA_FILE] test_datasets=[ultrafeedback_armorm] ++lr=${LR} \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \$MODEL_LOAD_ARG \ + ++model.batch_size=8 ++model.gradient_accumulation_steps=4 ++model.eval_batch_size=8 ++model.max_grad_norm=0.1 \ + ++online=true ++sync_reference=true + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + rm -rf \$CURRENT_CKPT + fi + + CURRENT_CKPT=\$NEW_CKPT + CUMULATIVE_PROMPTS=\$END_CUMULATIVE_PROMPTS + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done +" diff --git a/scripts/launch_llama_instruct_ppo_online_v1.sh b/scripts/launch_llama_instruct_ppo_online_v1.sh new file mode 100644 index 0000000..3028e4b --- /dev/null +++ b/scripts/launch_llama_instruct_ppo_online_v1.sh @@ -0,0 +1,185 @@ +#!/bin/bash +#SBATCH --job-name=llama-oppo-binary-v1 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 +#SBATCH --output=logs/%x_%j.out +#SBATCH --error=logs/%x_%j.err + +# example usage: +# sbatch launch_llama_ppo_online_binary.sh 0.05 1e-6 0.2 + +KL_COEF=$1 # 0.05 +LR=$2 # 1e-6 +CLIP_RANGE=$3 # 0.2 + +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=512 # Train on fewer examples before sampling +NUM_ROUNDS=$(($TOTAL_PROMPTS / $PROMPTS_PER_ROUND)) # Calculate this once at the start + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + export HF_ALLOW_CODE_EVAL=1 +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip show vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export HALO_DIR=/scratch/gpfs/sl2998/workspace/HALOs +export REWARD_CKPT=RLHFlow/ArmoRM-Llama3-8B-v0.1 +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v1 + +if [ ! -d \"\$CACHE_DIR\" ]; then + mkdir -p \$CACHE_DIR +fi + +# Start iterative training from SFT checkpoint or last saved checkpoint +CURRENT_CKPT=\$MODEL_PATH +ROUND=1 +CUMULATIVE_PROMPTS=0 + +# Check if there are previous checkpoints to resume from +for r in \$(seq ${NUM_ROUNDS} -1 1); do + LAST_EXP_NAME=llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v1-R\${r} + LAST_CKPT=\${CACHE_DIR}/\${LAST_EXP_NAME}/FINAL + if [ -d \"\$LAST_CKPT\" ]; then + CURRENT_CKPT=\$LAST_CKPT + ROUND=\$((r + 1)) + CUMULATIVE_PROMPTS=\$((r * PROMPTS_PER_ROUND)) + echo \"Resuming from round \$r checkpoint: \$CURRENT_CKPT\" + echo \"Starting at round \$ROUND with \$CUMULATIVE_PROMPTS prompts already processed\" + break + fi +done + +while [ \$ROUND -le ${NUM_ROUNDS} ]; do + echo \"Starting round \$ROUND of ${NUM_ROUNDS} \" + SAMPLES_FILE=\${CACHE_DIR}/R\${ROUND}_samples.json + + echo \"Sampling from current model...\" + # Sample from current model (first round uses SFT checkpoint) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --split train \ + --mode train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --batch_size ${PROMPTS_PER_ROUND} + echo \"Sampling complete. Labeling samples...\" + + # if no more prompts left, end training early + NUM_SAMPLES=\$(jq '. | length' \$SAMPLES_FILE) + + if [[ \$NUM_SAMPLES -gt 0 ]]; then + EXP_NAME=llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v1-R\${ROUND} + CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + DATA_FILE=\${CACHE_DIR}/R\${ROUND}_data.json + + # Label samples with API (must ssh into the login node for internet access) + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v1 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + + # accelerate launch --config_file accelerate_config/fsdp_4gpu.yaml \ + # -m train.label \$SAMPLES_FILE \$DATA_FILE \ + # --reward_model_path \$REWARD_CKPT \ + # --feedback_type binary --batch_size 16 + + # First round: load from checkpoint + # Subsequent rounds: policy resumes from previous checkpoint; reference model stays at SFT checkpoint + if [ \$ROUND -eq 1 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT\" + else + echo \"Loading from checkpoint: \$CURRENT_CKPT\" + MODEL_LOAD_ARG=\"++model.from_checkpoint=\$CURRENT_CKPT ++model.load_from=\$MODEL_PATH\" + fi + + # Train PPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=ppo model=llama train_datasets=[\$DATA_FILE] test_datasets=[ultrabin] exp_name=\$EXP_NAME \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \ + ++lr=${LR} \ + ++intermediate_checkpoints=true \ + ++online=true \ + ++eval_every=256 \ + ++save_every=5120 \ + ++weight_decay=0.0 \ + ++beta1=0.9 \ + ++beta2=0.95 \ + ++eps=1e-5 \ + ++warmup=0.10 \ + ++loss.ppo_epochs=1 \ + ++loss.cliprange=${CLIP_RANGE} \ + ++loss.lam=0.95 \ + ++loss.gamma=1.0 \ + ++loss.critic_coef=0.1 \ + ++loss.KL_coef=${KL_COEF} \ + ++n_epochs=1 \ + ++model.batch_size=4 \ + ++model.max_grad_norm=1 \ + ++model.gradient_accumulation_steps=8 \ + ++model.eval_batch_size=16 \ + \$MODEL_LOAD_ARG 2>&1 | tee \$HALO_DIR/logs/\${EXP_NAME}.log + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + # Clean up old checkpoint directory if it's not the SFT checkpoint + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + OLD_EXP_DIR=\$(dirname \$CURRENT_CKPT) + echo \"Cleaning up \$OLD_EXP_DIR\" + rm -rf \$OLD_EXP_DIR + fi + + CURRENT_CKPT=\$NEW_CKPT + ROUND=\$((ROUND + 1)) + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done +" diff --git a/scripts/launch_llama_instruct_ppo_online_v2.sh b/scripts/launch_llama_instruct_ppo_online_v2.sh new file mode 100644 index 0000000..801a70f --- /dev/null +++ b/scripts/launch_llama_instruct_ppo_online_v2.sh @@ -0,0 +1,184 @@ +#!/bin/bash +#SBATCH --job-name=llama-oppo-binary-v2 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 +#SBATCH --output=logs/%x_%j.out +#SBATCH --error=logs/%x_%j.err + +# example usage: +# sbatch launch_llama_ppo_online_binary.sh 0.05 1e-6 0.2 + +KL_COEF=$1 # 0.05 +LR=$2 # 1e-6 +CLIP_RANGE=$3 # 0.2 + +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=512 # Train on fewer examples before sampling +NUM_ROUNDS=$(($TOTAL_PROMPTS / $PROMPTS_PER_ROUND)) # Calculate this once at the start + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + export HF_ALLOW_CODE_EVAL=1 +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip show vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export HALO_DIR=/scratch/gpfs/sl2998/workspace/HALOs +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v2 + +if [ ! -d \"\$CACHE_DIR\" ]; then + mkdir -p \$CACHE_DIR +fi + +# Start iterative training from SFT checkpoint or last saved checkpoint +CURRENT_CKPT=\$MODEL_PATH +ROUND=1 +CUMULATIVE_PROMPTS=0 + +# Check if there are previous checkpoints to resume from +for r in \$(seq ${NUM_ROUNDS} -1 1); do + LAST_EXP_NAME=llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v2-R\${r} + LAST_CKPT=\${CACHE_DIR}/\${LAST_EXP_NAME}/FINAL + if [ -d \"\$LAST_CKPT\" ]; then + CURRENT_CKPT=\$LAST_CKPT + ROUND=\$((r + 1)) + CUMULATIVE_PROMPTS=\$((r * PROMPTS_PER_ROUND)) + echo \"Resuming from round \$r checkpoint: \$CURRENT_CKPT\" + echo \"Starting at round \$ROUND with \$CUMULATIVE_PROMPTS prompts already processed\" + break + fi +done + +while [ \$ROUND -le ${NUM_ROUNDS} ]; do + echo \"Starting round \$ROUND of ${NUM_ROUNDS} \" + SAMPLES_FILE=\${CACHE_DIR}/R\${ROUND}_samples.json + + echo \"Sampling from current model...\" + # Sample from current model (first round uses SFT checkpoint) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --split train \ + --mode train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --batch_size ${PROMPTS_PER_ROUND} + echo \"Sampling complete. Labeling samples...\" + + # if no more prompts left, end training early + NUM_SAMPLES=\$(jq '. | length' \$SAMPLES_FILE) + + if [[ \$NUM_SAMPLES -gt 0 ]]; then + EXP_NAME=llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v2-R\${ROUND} + CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + DATA_FILE=\${CACHE_DIR}/R\${ROUND}_data.json + + # Label samples with API (must ssh into the login node for internet access) + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v2 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + + # accelerate launch --config_file accelerate_config/fsdp_4gpu.yaml \ + # -m train.label \$SAMPLES_FILE \$DATA_FILE \ + # --reward_model_path \$REWARD_CKPT \ + # --feedback_type binary --batch_size 16 + + # First round: load from checkpoint + # Subsequent rounds: policy resumes from previous checkpoint; reference model stays at SFT checkpoint + if [ \$ROUND -eq 1 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT\" + else + echo \"Loading from checkpoint: \$CURRENT_CKPT\" + MODEL_LOAD_ARG=\"++model.from_checkpoint=\$CURRENT_CKPT ++model.load_from=\$MODEL_PATH\" + fi + + # Train PPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=ppo model=llama train_datasets=[\$DATA_FILE] test_datasets=[ultrabin] exp_name=\$EXP_NAME \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \ + ++lr=${LR} \ + ++intermediate_checkpoints=true \ + ++online=true \ + ++eval_every=256 \ + ++save_every=5120 \ + ++weight_decay=0.0 \ + ++beta1=0.9 \ + ++beta2=0.95 \ + ++eps=1e-5 \ + ++warmup=0.10 \ + ++loss.ppo_epochs=1 \ + ++loss.cliprange=${CLIP_RANGE} \ + ++loss.lam=0.95 \ + ++loss.gamma=1.0 \ + ++loss.critic_coef=0.1 \ + ++loss.KL_coef=${KL_COEF} \ + ++n_epochs=1 \ + ++model.batch_size=4 \ + ++model.max_grad_norm=1 \ + ++model.gradient_accumulation_steps=8 \ + ++model.eval_batch_size=16 \ + \$MODEL_LOAD_ARG 2>&1 | tee \$HALO_DIR/logs/\${EXP_NAME}.log + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + # Clean up old checkpoint directory if it's not the SFT checkpoint + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + OLD_EXP_DIR=\$(dirname \$CURRENT_CKPT) + echo \"Cleaning up \$OLD_EXP_DIR\" + rm -rf \$OLD_EXP_DIR + fi + + CURRENT_CKPT=\$NEW_CKPT + ROUND=\$((ROUND + 1)) + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done +" diff --git a/scripts/launch_llama_instruct_ppo_online_v3.sh b/scripts/launch_llama_instruct_ppo_online_v3.sh new file mode 100644 index 0000000..74d1d4e --- /dev/null +++ b/scripts/launch_llama_instruct_ppo_online_v3.sh @@ -0,0 +1,184 @@ +#!/bin/bash +#SBATCH --job-name=llama-oppo-binary-v3 +#SBATCH --nodes=1 +#SBATCH --mem=100G +#SBATCH --ntasks-per-node=1 +#SBATCH --gres=gpu:4 +#SBATCH --cpus-per-task=8 +#SBATCH --time=23:55:00 +#SBATCH --partition=pli-c +#SBATCH --exclude=della-j14g1 +#SBATCH --constraint=rh9|rh8 +#SBATCH --output=logs/%x_%j.out +#SBATCH --error=logs/%x_%j.err + +# example usage: +# sbatch launch_llama_ppo_online_binary.sh 0.05 1e-6 0.2 + +KL_COEF=$1 # 0.05 +LR=$2 # 1e-6 +CLIP_RANGE=$3 # 0.2 + +TOTAL_PROMPTS=11264 +PROMPTS_PER_ROUND=512 # Train on fewer examples before sampling +NUM_ROUNDS=$(($TOTAL_PROMPTS / $PROMPTS_PER_ROUND)) # Calculate this once at the start + +# Function to find an available port +find_free_port() { + local port + while true; do + port=$(shuf -i 29500-29510 -n 1) + if ! netstat -tuln | grep -q ":$port "; then + echo "$port" + break + fi + done +} + +# Function to initialize the environment +init_env() { + module load anaconda3/2024.2 + source $(conda info --base)/etc/profile.d/conda.sh + conda activate halos + + echo "Running on node: $(hostname)" + echo "Machine Rank: $SLURM_PROCID" + + export MASTER_ADDR=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + export MASTER_PORT=$(find_free_port | tr -d '\n') + export HF_DATASETS_OFFLINE=1 + export HF_HUB_OFFLINE=1 + export HF_ALLOW_CODE_EVAL=1 +} + +export -f find_free_port +export -f init_env + +# Run the training script using srun +srun --jobid=$SLURM_JOB_ID --nodes=$SLURM_JOB_NUM_NODES --ntasks-per-node=1 bash -c " +init_env +pip show vllm + +export MODEL_PATH=meta-llama/Meta-Llama-3-8B-Instruct +export HALO_DIR=/scratch/gpfs/sl2998/workspace/HALOs +export CACHE_DIR=/scratch/gpfs/sl2998/models/llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v3 + +if [ ! -d \"\$CACHE_DIR\" ]; then + mkdir -p \$CACHE_DIR +fi + +# Start iterative training from SFT checkpoint or last saved checkpoint +CURRENT_CKPT=\$MODEL_PATH +ROUND=1 +CUMULATIVE_PROMPTS=0 + +# Check if there are previous checkpoints to resume from +for r in \$(seq ${NUM_ROUNDS} -1 1); do + LAST_EXP_NAME=llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v3-R\${r} + LAST_CKPT=\${CACHE_DIR}/\${LAST_EXP_NAME}/FINAL + if [ -d \"\$LAST_CKPT\" ]; then + CURRENT_CKPT=\$LAST_CKPT + ROUND=\$((r + 1)) + CUMULATIVE_PROMPTS=\$((r * PROMPTS_PER_ROUND)) + echo \"Resuming from round \$r checkpoint: \$CURRENT_CKPT\" + echo \"Starting at round \$ROUND with \$CUMULATIVE_PROMPTS prompts already processed\" + break + fi +done + +while [ \$ROUND -le ${NUM_ROUNDS} ]; do + echo \"Starting round \$ROUND of ${NUM_ROUNDS} \" + SAMPLES_FILE=\${CACHE_DIR}/R\${ROUND}_samples.json + + echo \"Sampling from current model...\" + # Sample from current model (first round uses SFT checkpoint) + python -m train.sample \$CURRENT_CKPT \ + --output_file \$SAMPLES_FILE \ + --gpu_count 4 \ + --datasets ultrafeedback_armorm \ + --split train \ + --mode train \ + --num_samples_per_prompt 4 \ + --num_prompts ${PROMPTS_PER_ROUND} \ + --num_skip \$CUMULATIVE_PROMPTS \ + --batch_size ${PROMPTS_PER_ROUND} + echo \"Sampling complete. Labeling samples...\" + + # if no more prompts left, end training early + NUM_SAMPLES=\$(jq '. | length' \$SAMPLES_FILE) + + if [[ \$NUM_SAMPLES -gt 0 ]]; then + EXP_NAME=llama-3-8b-instruct-online-ppo-kl-${KL_COEF}-lr-${LR}-clip-${CLIP_RANGE}-binary-v3-R\${ROUND} + CUMULATIVE_PROMPTS=\$((CUMULATIVE_PROMPTS + ${PROMPTS_PER_ROUND})) + DATA_FILE=\${CACHE_DIR}/R\${ROUND}_data.json + + # Label samples with API (must ssh into the login node for internet access) + ssh della-gpu \"source ~/.bashrc && \ + conda activate halos && \ + cd /home/sl2998/workspace/HALOs && \ + accelerate launch -m train.label_v3 \$SAMPLES_FILE \$DATA_FILE \ + --api_type openai --api_key \$OPENAI_API_KEY --feedback_type pairwise --batch_size 16 \" + + # accelerate launch --config_file accelerate_config/fsdp_4gpu.yaml \ + # -m train.label \$SAMPLES_FILE \$DATA_FILE \ + # --reward_model_path \$REWARD_CKPT \ + # --feedback_type binary --batch_size 16 + + # First round: load from checkpoint + # Subsequent rounds: policy resumes from previous checkpoint; reference model stays at SFT checkpoint + if [ \$ROUND -eq 1 ]; then + MODEL_LOAD_ARG=\"++model.load_from=\$CURRENT_CKPT\" + else + echo \"Loading from checkpoint: \$CURRENT_CKPT\" + MODEL_LOAD_ARG=\"++model.from_checkpoint=\$CURRENT_CKPT ++model.load_from=\$MODEL_PATH\" + fi + + # Train PPO model on the newly sampled and labeled data + accelerate launch \ + --config_file accelerate_config/fsdp_4gpu.yaml \ + --machine_rank \$SLURM_PROCID \ + --main_process_ip \$MASTER_ADDR \ + --main_process_port \$MASTER_PORT \ + launch.py loss=ppo model=llama train_datasets=[\$DATA_FILE] test_datasets=[ultrabin] exp_name=\$EXP_NAME \ + ++cache_dir=\$CACHE_DIR \ + ++model.name_or_path=\$MODEL_PATH \ + ++lr=${LR} \ + ++intermediate_checkpoints=true \ + ++online=true \ + ++eval_every=256 \ + ++save_every=5120 \ + ++weight_decay=0.0 \ + ++beta1=0.9 \ + ++beta2=0.95 \ + ++eps=1e-5 \ + ++warmup=0.10 \ + ++loss.ppo_epochs=1 \ + ++loss.cliprange=${CLIP_RANGE} \ + ++loss.lam=0.95 \ + ++loss.gamma=1.0 \ + ++loss.critic_coef=0.1 \ + ++loss.KL_coef=${KL_COEF} \ + ++n_epochs=1 \ + ++model.batch_size=4 \ + ++model.max_grad_norm=1 \ + ++model.gradient_accumulation_steps=8 \ + ++model.eval_batch_size=16 \ + \$MODEL_LOAD_ARG 2>&1 | tee \$HALO_DIR/logs/\${EXP_NAME}.log + + NEW_CKPT=\${CACHE_DIR}/\${EXP_NAME}/FINAL + + # Clean up old checkpoint directory if it's not the SFT checkpoint + if [ \$CURRENT_CKPT != \$MODEL_PATH ] && [ \$SLURM_PROCID -eq 0 ]; then + OLD_EXP_DIR=\$(dirname \$CURRENT_CKPT) + echo \"Cleaning up \$OLD_EXP_DIR\" + rm -rf \$OLD_EXP_DIR + fi + + CURRENT_CKPT=\$NEW_CKPT + ROUND=\$((ROUND + 1)) + else + echo \"Ending training early due to zero new samples ...\" + break + fi +done +" diff --git a/train/label.py b/train/label.py index 76f8b54..f67c886 100644 --- a/train/label.py +++ b/train/label.py @@ -23,10 +23,11 @@ import random from accelerate import Accelerator from transformers import AutoTokenizer, AutoModelForSequenceClassification -from tqdm import tqdm, trange +from tqdm import tqdm, trange, trange from typing import List, Dict, Optional, Union import re import os +import os from .utils import StreamingJSONWriter, get_api_completion from .dataloader import SFTDataLoader from collections import defaultdict @@ -39,9 +40,38 @@ def process_batch_with_reward_model( samples: List, reward_model: AutoModelForSequenceClassification, tokenizer: AutoTokenizer, + tokenizer: AutoTokenizer, accelerator: Accelerator ) -> List[Dict]: """Process a batch through the reward model using the already tokenized sequences.""" + + processed_samples = [] + chop = lambda txt: re.sub(r'([.!?])[^.!?]*\Z', r'\1', txt.strip()) + + for i in trange(len(samples), disable=(not accelerator.is_main_process)): + if i % accelerator.num_processes == accelerator.process_index: + input_ids = tokenizer.apply_chat_template( + samples[i]["prompt"] + [{"role":"assistant", "content":chop(samples[i]["output"])}], + return_tensors="pt", + max_length=2048, + ).to(accelerator.device) + + with torch.no_grad(): + outputs = reward_model( + input_ids=input_ids, + ) + if args.reward_model_path == "RLHFlow/ArmoRM-Llama3-8B-v0.1": + reward_scores = outputs.score + else: + reward_scores = outputs.logits[:, 1] + + processed_sample = samples[i].copy() + processed_sample['reward'] = reward_scores[0].item() + # since a dataloader isn't used, the output has to be explicitly formatted + processed_sample['output'] = [{ "role" : "assistant", "content" : processed_sample['output'] }] + processed_samples.append(processed_sample) + + return processed_samples processed_samples = [] chop = lambda txt: re.sub(r'([.!?])[^.!?]*\Z', r'\1', txt.strip()) @@ -72,6 +102,7 @@ def process_batch_with_reward_model( async def process_samples_with_api( + samples: List, samples: List, client: openai.AsyncOpenAI, system_prompt: str, @@ -171,7 +202,7 @@ def convert_to_pairwise_feedback(samples: List[Dict], seed: int, threshold=0) -> for i in range(0, len(group) - 1, 2): sample_A, sample_B = group[i], group[i + 1] - if abs(float(sample_A['reward']) - float(sample_B['reward'])) <= float(threshold): + if abs(float(float(sample_A['reward'])) - float(float(sample_B['reward']))) <= float(float(threshold)): continue label = int(sample_A['reward'] > sample_B['reward']) @@ -217,7 +248,7 @@ async def main(args): print(f"Labelled {len(processed_samples)} samples using {args.api_type} API") else: if accelerator.is_main_process: - accelerator.print(f"Loading reward model from {args.reward_model_path}") + accelerator.accelerator.print(f"Loading reward model from {args.reward_model_path}") # trust_remote_code is necessary for Armo RM to be downloaded correctly reward_model = AutoModelForSequenceClassification.from_pretrained(args.reward_model_path, trust_remote_code=True) @@ -228,10 +259,14 @@ async def main(args): if accelerator.num_processes > 1 and not dist.is_initialized(): accelerator.wait_for_everyone() samples = samples[:((len(samples) // accelerator.num_processes) * accelerator.num_processes)] + samples = samples[:((len(samples) // accelerator.num_processes) * accelerator.num_processes)] if accelerator.is_main_process: dist.init_process_group(backend='nccl') + processed_samples = process_batch_with_reward_model(samples, reward_model, tokenizer, accelerator) + json.dump(processed_samples, open(f'temp_{accelerator.process_index}.json', 'w')) + accelerator.wait_for_everyone() processed_samples = process_batch_with_reward_model(samples, reward_model, tokenizer, accelerator) json.dump(processed_samples, open(f'temp_{accelerator.process_index}.json', 'w')) accelerator.wait_for_everyone() @@ -244,6 +279,13 @@ async def main(args): accelerator.print(f"Labelled {len(processed_samples)} samples using {args.reward_model_path}") + if accelerator.is_main_process: + for i in range(accelerator.num_processes): + processed_samples.extend(json.load(open(f'temp_{i}.json'))) + os.remove(f'temp_{i}.json') + + accelerator.print(f"Labelled {len(processed_samples)} samples using {args.reward_model_path}") + # Set up output writer if accelerator.is_main_process: accelerator.print(f"Writing feedback to {args.output_path}") diff --git a/train/label_v1.py b/train/label_v1.py new file mode 100644 index 0000000..d9f36a7 --- /dev/null +++ b/train/label_v1.py @@ -0,0 +1,339 @@ +""" +A script for creating feedback datasets from data that was sampled with train.sample. +Supports both reward model-based and API-based labeling. + +Sample usage with Accelerate for reward model: +accelerate launch --config_file accelerate_config/fsdp_4gpu.yaml --main_process_port 29500 \ + -m train.label --reward_model_path models/llama3-8B-bt/FINAL outputs.json reward_data.json --feedback_type pairwise + +Sample usage for API labeling (accelerate not needed): +python -m train.label --api_type openai --api_key YOUR_KEY --api_model gpt-4 \ + --label_prompt "Rate this response's quality from 0 to 1:" \ + outputs.json reward_data.json --feedback_type binary + +Sample usage for pairwise labeling of two sample files: +python -m train.label --second_samples_path baseline_samples.json --api_type openai --api_key YOUR_KEY \ + outputs.json reward_data.json --feedback_type pairwise +""" + +import argparse +import json +import numpy as np +import torch +import random +from accelerate import Accelerator +from transformers import AutoTokenizer, AutoModelForSequenceClassification +from tqdm import tqdm +from typing import List, Dict, Optional, Union +import re +from .utils import StreamingJSONWriter, get_api_completion +from .dataloader import SFTDataLoader +from collections import defaultdict +import openai +import asyncio +import torch.distributed as dist + + +def process_batch_with_reward_model( + batch: Dict, + reward_model: AutoModelForSequenceClassification, + accelerator: Accelerator +) -> List[Dict]: + """Process a batch through the reward model using the already tokenized sequences.""" + reward_model.eval() + with torch.no_grad(): + outputs = reward_model( + input_ids=batch['target_combined_input_ids'], + attention_mask=batch['target_combined_attention_mask'] + ) + if args.reward_model_path == "RLHFlow/ArmoRM-Llama3-8B-v0.1": + reward_scores = outputs.score.cpu().float() + else: + reward_scores = outputs.logits[:, 1] + + processed_samples = [] + for i in range(len(batch['prompt'])): + sample = { + 'prompt': batch['prompt'][i], + 'instruction': batch['original_prompt'][i] if 'original_prompt' in batch else batch['prompt'][i][0]['content'], + 'output': batch['target'][i], + 'reward': reward_scores[i].item(), + 'prompt_id': batch['prompt_id'][i], + } + processed_samples.append(sample) + + + gathered_samples = [None] * accelerator.num_processes + + # Return gathered samples from all processes, not just main process + # This ensures samples from all processes are included + dist.all_gather_object(gathered_samples, processed_samples) + + if accelerator.is_main_process: + return [item for sublist in gathered_samples for item in sublist] + + return [] + + +async def process_samples_with_api( + samples: Dict, + client: openai.AsyncOpenAI, + system_prompt: str, + label_prompt: str, + model: str, + batch_size: int = 10 +) -> List[Dict]: + """Process all samples through the API.""" + scores = [] + processed_samples = [] + total_samples = len(samples) + + # Create progress bar for overall progress + pbar = tqdm(total=total_samples, desc="Processing samples through API") + + for i in range(0, len(samples), batch_size): + batch = samples[i:i + batch_size] + tasks = [] + + for sample in batch: + prompt = f"INSTRUCTION: {sample['prompt'][0]['content']}\n\nRESPONSE: {sample['output']}\n\n{label_prompt}" + tasks.append(get_api_completion(client, system_prompt, prompt, model)) + + batch_scores = await asyncio.gather(*tasks) + scores.extend(batch_scores) + + # Update progress bar + pbar.update(len(batch)) + pbar.set_postfix({'Batch': f'{i//batch_size + 1}/{(total_samples + batch_size - 1)//batch_size}', + 'Samples': f'{min(i + batch_size, total_samples)}/{total_samples}'}) + + pbar.close() + + for sample, score in zip(samples, scores): + try: + # Try to extract 'Final Score: X' from the response + match = re.search(r"Final Score:\s*([0-9]+(?:\.[0-9]+)?)", str(score), re.IGNORECASE) + if match: + score = float(match.group(1)) + else: + # Fallback: extract first number + score = float(re.search(r'\d+(?:\.\d+)?', str(score)).group()) + except Exception: + print(f"Warning: Could not parse API response {score} as float. skipping prompt {sample['prompt_id']}") + continue + + processed_sample = sample.copy() + processed_sample['reward'] = score + # since a dataloader isn't used, the output has to be explicitly formatted + processed_sample['output'] = [{ "role" : "assistant", "content" : processed_sample['output'] }] + processed_samples.append(processed_sample) + + return processed_samples + + +def convert_to_binary_feedback(samples: List[Dict], threshold=0) -> List[Dict]: + """Convert samples to binary feedback format.""" + feedback = [] + + if threshold == 'mean': + rewards = [ sample['reward'] for sample in samples ] + threshold = np.mean(rewards) + elif threshold == 'median': + rewards = [ sample['reward'] for sample in samples ] + threshold = np.median(rewards) + else: + threshold = int(threshold) + + for sample in samples: + feedback_item = { + 'prompt_id': sample['prompt_id'], + 'prompt': sample['prompt'], + 'output': sample['output'], + 'label': 1 if sample['reward'] >= threshold else 0, + 'reward': sample['reward'], + 'type': 'binary_feedback', + } + feedback.append(feedback_item) + return feedback + + +def convert_to_pairwise_feedback(samples: List[Dict], seed: int) -> List[Dict]: + """Convert samples to pairwise feedback format.""" + random.seed(seed) + + grouped = defaultdict(list) + for sample in samples: + grouped[sample['prompt_id']].append(sample) + + feedback = [] + for prompt_id, group in grouped.items(): + if len(group) < 2: + continue + + group.sort(key=lambda x: x['reward'], reverse=True) + + for i in range(len(group) - 1): + higher_reward, lower_reward = group[i], group[i + 1] + if higher_reward['reward'] == lower_reward['reward']: + continue + + if random.random() < 0.5: + sample_A, sample_B = higher_reward, lower_reward + label = 1 + else: + sample_A, sample_B = lower_reward, higher_reward + label = 0 + + feedback_item = { + 'prompt_id': prompt_id, + 'prompt': sample_A['prompt'], + 'output_A': sample_A['output'], + 'output_B': sample_B['output'], + 'label': label, + 'reward_A': sample_A['reward'], + 'reward_B': sample_B['reward'], + 'reward_difference': abs(sample_A['reward'] - sample_B['reward']), + 'type': 'pairwise_feedback', + } + feedback.append(feedback_item) + + return feedback + + +async def main(args): + accelerator = Accelerator() + # Load samples + with open(args.samples_path, 'r') as f: + samples = json.load(f) + + if args.api_type: + # API-based labeling path + print(f"Processing {len(samples)} samples using {args.api_type} API") + + if args.api_type == "openai": + if not args.api_key: + raise ValueError("API key must be provided when using API labeling") + client = openai.AsyncOpenAI(api_key=args.api_key) + else: + raise ValueError(f"Unsupported API type: {args.api_type}") + + processed_samples = await process_samples_with_api( + samples, client, args.system_prompt, args.label_prompt, + args.api_model, args.batch_size + ) + + print(f"Labelled {len(processed_samples)} samples using {args.api_type} API") + else: + if accelerator.is_main_process: + print(f"Loading reward model from {args.reward_model_path}") + + reward_model = AutoModelForSequenceClassification.from_pretrained( + args.reward_model_path, + local_files_only=True, + trust_remote_code=True + ) + tokenizer = AutoTokenizer.from_pretrained( + args.reward_model_path, + local_files_only=True, + trust_remote_code=True + ) + reward_model = accelerator.prepare(reward_model) + + # Initialize the dataloader for reward model processing + dataloader = SFTDataLoader( + dataset_names=[args.samples_path], + tokenizer=tokenizer, + process_index=accelerator.process_index, + num_processes=accelerator.num_processes, + split='train', + microbatch_size=(args.batch_size // accelerator.num_processes), + max_length=args.max_length, + max_prompt_length=args.max_prompt_length, + n_epochs=1, + seed=args.seed + ) + dataloader, reward_model = accelerator.prepare(dataloader, reward_model) + + # Initialize distributed setup if not already done + if accelerator.num_processes > 1 and not dist.is_initialized(): + accelerator.wait_for_everyone() + + if accelerator.is_main_process: + dist.init_process_group(backend='nccl') + + processed_samples = [] + for batch in tqdm(dataloader, disable=not accelerator.is_main_process): + batch_samples = process_batch_with_reward_model(batch, reward_model, accelerator) + if accelerator.is_main_process: + processed_samples.extend(batch_samples) + + # Wait for all processes to complete + accelerator.wait_for_everyone() + + print(f"Labelled {len(processed_samples)} samples using {args.reward_model_path}") + + # Set up output writer + if accelerator.is_main_process: + accelerator.print(f"Writing feedback to {args.output_path}") + with open(args.output_path, 'w') as f: + writer = StreamingJSONWriter(f) + + # Process and write feedback + if args.feedback_type == 'binary': + feedback = convert_to_binary_feedback(processed_samples, threshold=args.threshold) + elif args.feedback_type == 'pairwise': + feedback = convert_to_pairwise_feedback(processed_samples, args.seed) + else: + feedback = processed_samples + for x in feedback: + x['type'] = 'scalar_feedback' + + # Include split if specified + if args.split: + for x in feedback: + item['split'] = args.split + + # Add split information and write + for item in feedback: + writer.write_item(item) + + writer.close() + + # Clean up distributed training resources if using reward model + accelerator.end_training() + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Label samples using either a reward model or API") + + # Input/output arguments + parser.add_argument("samples_path", type=str, help="Path to the JSON file containing samples") + parser.add_argument("output_path", type=str, help="Path to save the feedback file") + + # Labeling method arguments + labeling_group = parser.add_mutually_exclusive_group(required=True) + labeling_group.add_argument("--reward_model_path", type=str, help="Path to the reward model") + labeling_group.add_argument("--api_type", type=str, choices=['openai'], help="Type of API to use for labeling") + + # API-specific arguments + parser.add_argument("--api_key", type=str, help="API key for the chosen API service") + parser.add_argument("--api_model", type=str, default="gpt-4.1-mini", help="Model to use for API labeling") + parser.add_argument("--system_prompt", type=str, default="You are a helpful assistant that rates the quality of responses to given instructions.", help="System prompt for API labeling") + parser.add_argument("--label_prompt", type=str, default="Provide only a RATING from 0 to 10 based on how well the RESPONSE satisfied the INSTRUCTION: ", help="Prompt template for API labeling") + # Processing arguments + parser.add_argument("--batch_size", type=int, default=16, help="Batch size for processing") + parser.add_argument("--max_length", type=int, default=2048, help="Maximum sequence length for input") + parser.add_argument("--max_prompt_length", type=int, default=1024, help="Maximum prompt length for input") + parser.add_argument("--feedback_type", type=str, choices=['binary', 'pairwise', None], default=None, help="Type of feedback to generate") + parser.add_argument("--threshold", type=str, default="median", help="How the reward threshold is calculated; this can also be a number (e.g., 0.5)") + parser.add_argument("--seed", type=int, default=0, help="Random seed for reproducibility") + parser.add_argument("--split", type=str, default=None, help="Split of data") + + args = parser.parse_args() + + if args.api_type and not args.api_key: + parser.error("--api_key is required when using --api_type") + + asyncio.run(main(args)) \ No newline at end of file diff --git a/train/label_v2.py b/train/label_v2.py new file mode 100644 index 0000000..6ad47c1 --- /dev/null +++ b/train/label_v2.py @@ -0,0 +1,339 @@ +""" +A script for creating feedback datasets from data that was sampled with train.sample. +Supports both reward model-based and API-based labeling. + +Sample usage with Accelerate for reward model: +accelerate launch --config_file accelerate_config/fsdp_4gpu.yaml --main_process_port 29500 \ + -m train.label --reward_model_path models/llama3-8B-bt/FINAL outputs.json reward_data.json --feedback_type pairwise + +Sample usage for API labeling (accelerate not needed): +python -m train.label --api_type openai --api_key YOUR_KEY --api_model gpt-4 \ + --label_prompt "Rate this response's quality from 0 to 1:" \ + outputs.json reward_data.json --feedback_type binary + +Sample usage for pairwise labeling of two sample files: +python -m train.label --second_samples_path baseline_samples.json --api_type openai --api_key YOUR_KEY \ + outputs.json reward_data.json --feedback_type pairwise +""" + +import argparse +import json +import numpy as np +import torch +import random +from accelerate import Accelerator +from transformers import AutoTokenizer, AutoModelForSequenceClassification +from tqdm import tqdm +from typing import List, Dict, Optional, Union +import re +from .utils import StreamingJSONWriter, get_api_completion +from .dataloader import SFTDataLoader +from collections import defaultdict +import openai +import asyncio +import torch.distributed as dist + + +def process_batch_with_reward_model( + batch: Dict, + reward_model: AutoModelForSequenceClassification, + accelerator: Accelerator +) -> List[Dict]: + """Process a batch through the reward model using the already tokenized sequences.""" + + reward_model.eval() + with torch.no_grad(): + outputs = reward_model( + input_ids=batch['target_combined_input_ids'], + attention_mask=batch['target_combined_attention_mask'] + ) + if args.reward_model_path == "RLHFlow/ArmoRM-Llama3-8B-v0.1": + reward_scores = outputs.score.cpu().float() + else: + reward_scores = outputs.logits[:, 1] + + processed_samples = [] + for i in range(len(batch['prompt'])): + sample = { + 'prompt': batch['prompt'][i], + 'instruction': batch['original_prompt'][i] if 'original_prompt' in batch else batch['prompt'][i][0]['content'], + 'output': batch['target'][i], + 'reward': reward_scores[i].item(), + 'prompt_id': batch['prompt_id'][i], + } + processed_samples.append(sample) + + gathered_samples = [None] * accelerator.num_processes + + # Return gathered samples from all processes, not just main process + # This ensures samples from all processes are included + dist.all_gather_object(gathered_samples, processed_samples) + + if accelerator.is_main_process: + return [item for sublist in gathered_samples for item in sublist] + + return [] + + +async def process_samples_with_api( + samples: Dict, + client: openai.AsyncOpenAI, + system_prompt: str, + label_prompt: str, + model: str, + batch_size: int = 10 +) -> List[Dict]: + """Process all samples through the API.""" + scores = [] + processed_samples = [] + total_samples = len(samples) + + # Create progress bar for overall progress + pbar = tqdm(total=total_samples, desc="Processing samples through API") + + for i in range(0, len(samples), batch_size): + batch = samples[i:i + batch_size] + tasks = [] + + for sample in batch: + prompt = f"INSTRUCTION: {sample['prompt'][0]['content']}\n\nRESPONSE: {sample['output']}\n\n{label_prompt}: " + tasks.append(get_api_completion(client, system_prompt, prompt, model)) + + batch_scores = await asyncio.gather(*tasks) + scores.extend(batch_scores) + + # Update progress bar + pbar.update(len(batch)) + pbar.set_postfix({'Batch': f'{i//batch_size + 1}/{(total_samples + batch_size - 1)//batch_size}', + 'Samples': f'{min(i + batch_size, total_samples)}/{total_samples}'}) + + pbar.close() + + for sample, score in zip(samples, scores): + try: + # Try to extract 'Final Score: X' from the response + match = re.search(r"Final Score:\s*([0-9]+(?:\.[0-9]+)?)", str(score), re.IGNORECASE) + if match: + score = float(match.group(1)) + else: + # Fallback: extract first number + score = float(re.search(r'\d+(?:\.\d+)?', str(score)).group()) + except Exception: + print(f"Warning: Could not parse API response {score} as float. skipping prompt {sample['prompt_id']}") + continue + + processed_sample = sample.copy() + processed_sample['reward'] = score + # since a dataloader isn't used, the output has to be explicitly formatted + processed_sample['output'] = [{ "role" : "assistant", "content" : processed_sample['output'] }] + processed_samples.append(processed_sample) + + return processed_samples + + +def convert_to_binary_feedback(samples: List[Dict], threshold=0) -> List[Dict]: + """Convert samples to binary feedback format.""" + feedback = [] + + if threshold == 'mean': + rewards = [ sample['reward'] for sample in samples ] + threshold = np.mean(rewards) + elif threshold == 'median': + rewards = [ sample['reward'] for sample in samples ] + threshold = np.median(rewards) + else: + threshold = int(threshold) + + for sample in samples: + feedback_item = { + 'prompt_id': sample['prompt_id'], + 'prompt': sample['prompt'], + 'output': sample['output'], + 'label': 1 if sample['reward'] >= threshold else 0, + 'reward': sample['reward'], + 'type': 'binary_feedback', + } + feedback.append(feedback_item) + return feedback + + +def convert_to_pairwise_feedback(samples: List[Dict], seed: int) -> List[Dict]: + """Convert samples to pairwise feedback format.""" + random.seed(seed) + + grouped = defaultdict(list) + for sample in samples: + grouped[sample['prompt_id']].append(sample) + + feedback = [] + for prompt_id, group in grouped.items(): + if len(group) < 2: + continue + + group.sort(key=lambda x: x['reward'], reverse=True) + + for i in range(len(group) - 1): + higher_reward, lower_reward = group[i], group[i + 1] + if higher_reward['reward'] == lower_reward['reward']: + continue + + if random.random() < 0.5: + sample_A, sample_B = higher_reward, lower_reward + label = 1 + else: + sample_A, sample_B = lower_reward, higher_reward + label = 0 + + feedback_item = { + 'prompt_id': prompt_id, + 'prompt': sample_A['prompt'], + 'output_A': sample_A['output'], + 'output_B': sample_B['output'], + 'label': label, + 'reward_A': sample_A['reward'], + 'reward_B': sample_B['reward'], + 'reward_difference': abs(sample_A['reward'] - sample_B['reward']), + 'type': 'pairwise_feedback', + } + feedback.append(feedback_item) + + return feedback + + +async def main(args): + accelerator = Accelerator() + # Load samples + with open(args.samples_path, 'r') as f: + samples = json.load(f) + + if args.api_type: + # API-based labeling path + print(f"Processing {len(samples)} samples using {args.api_type} API") + + if args.api_type == "openai": + if not args.api_key: + raise ValueError("API key must be provided when using API labeling") + client = openai.AsyncOpenAI(api_key=args.api_key) + else: + raise ValueError(f"Unsupported API type: {args.api_type}") + + processed_samples = await process_samples_with_api( + samples, client, args.system_prompt, args.label_prompt, + args.api_model, args.batch_size + ) + + print(f"Labelled {len(processed_samples)} samples using {args.api_type} API") + else: + if accelerator.is_main_process: + print(f"Loading reward model from {args.reward_model_path}") + + reward_model = AutoModelForSequenceClassification.from_pretrained( + args.reward_model_path, + local_files_only=True, + trust_remote_code=True + ) + tokenizer = AutoTokenizer.from_pretrained( + args.reward_model_path, + local_files_only=True, + trust_remote_code=True + ) + reward_model = accelerator.prepare(reward_model) + + # Initialize the dataloader for reward model processing + dataloader = SFTDataLoader( + dataset_names=[args.samples_path], + tokenizer=tokenizer, + process_index=accelerator.process_index, + num_processes=accelerator.num_processes, + split='train', + microbatch_size=(args.batch_size // accelerator.num_processes), + max_length=args.max_length, + max_prompt_length=args.max_prompt_length, + n_epochs=1, + seed=args.seed + ) + dataloader, reward_model = accelerator.prepare(dataloader, reward_model) + + # Initialize distributed setup if not already done + if accelerator.num_processes > 1 and not dist.is_initialized(): + accelerator.wait_for_everyone() + + if accelerator.is_main_process: + dist.init_process_group(backend='nccl') + + processed_samples = [] + for batch in tqdm(dataloader, disable=not accelerator.is_main_process): + batch_samples = process_batch_with_reward_model(batch, reward_model, accelerator) + if accelerator.is_main_process: + processed_samples.extend(batch_samples) + + # Wait for all processes to complete + accelerator.wait_for_everyone() + + print(f"Labelled {len(processed_samples)} samples using {args.reward_model_path}") + + # Set up output writer + if accelerator.is_main_process: + accelerator.print(f"Writing feedback to {args.output_path}") + with open(args.output_path, 'w') as f: + writer = StreamingJSONWriter(f) + + # Process and write feedback + if args.feedback_type == 'binary': + feedback = convert_to_binary_feedback(processed_samples, threshold=args.threshold) + elif args.feedback_type == 'pairwise': + feedback = convert_to_pairwise_feedback(processed_samples, args.seed) + else: + feedback = processed_samples + for x in feedback: + x['type'] = 'scalar_feedback' + + # Include split if specified + if args.split: + for x in feedback: + item['split'] = args.split + + # Add split information and write + for item in feedback: + writer.write_item(item) + + writer.close() + + # Clean up distributed training resources if using reward model + accelerator.end_training() + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Label samples using either a reward model or API") + + # Input/output arguments + parser.add_argument("samples_path", type=str, help="Path to the JSON file containing samples") + parser.add_argument("output_path", type=str, help="Path to save the feedback file") + + # Labeling method arguments + labeling_group = parser.add_mutually_exclusive_group(required=True) + labeling_group.add_argument("--reward_model_path", type=str, help="Path to the reward model") + labeling_group.add_argument("--api_type", type=str, choices=['openai'], help="Type of API to use for labeling") + + # API-specific arguments + parser.add_argument("--api_key", type=str, help="API key for the chosen API service") + parser.add_argument("--api_model", type=str, default="gpt-4.1-mini", help="Model to use for API labeling") + parser.add_argument("--system_prompt", type=str, default="You are a helpful assistant that rates the quality of responses to given instructions.", help="System prompt for API labeling") + parser.add_argument("--label_prompt", type=str, default="""# Instructions for Rating the Response\n\nYou are an expert evaluator. Please rate the quality of the RESPONSE to the INSTRUCTION according to the following rules:\n\n- Consider how well the RESPONSE satisfies the INSTRUCTION.\n- Evaluate correctness, completeness, clarity, and helpfulness.\n- Penalize hallucinations, factual errors, or irrelevant content.\n- If the RESPONSE is harmful, unsafe, or violates guidelines, assign a low score.\n\n# Workflow\n\n1. Briefly explain your reasoning for the score (1-2 sentences).\n2. At the end, provide your final rating as a single number from 0 (worst) to 10 (best) on a new line, in the format: Final Score: X\n\n# Example\n\nReasoning: The response is accurate, clear, and directly addresses the instruction.\n\nFinal Score: 9\n\n---\n\n""", help="Prompt template for API labeling") + # Processing arguments + parser.add_argument("--batch_size", type=int, default=16, help="Batch size for processing") + parser.add_argument("--max_length", type=int, default=2048, help="Maximum sequence length for input") + parser.add_argument("--max_prompt_length", type=int, default=1024, help="Maximum prompt length for input") + parser.add_argument("--feedback_type", type=str, choices=['binary', 'pairwise', None], default=None, help="Type of feedback to generate") + parser.add_argument("--threshold", type=str, default="median", help="How the reward threshold is calculated; this can also be a number (e.g., 0.5)") + parser.add_argument("--seed", type=int, default=0, help="Random seed for reproducibility") + parser.add_argument("--split", type=str, default=None, help="Split of data") + + args = parser.parse_args() + + if args.api_type and not args.api_key: + parser.error("--api_key is required when using --api_type") + + asyncio.run(main(args)) \ No newline at end of file diff --git a/train/label_v3.py b/train/label_v3.py new file mode 100644 index 0000000..9cc6168 --- /dev/null +++ b/train/label_v3.py @@ -0,0 +1,384 @@ +""" +A script for creating feedback datasets from data that was sampled with train.sample. +Supports both reward model-based and API-based labeling. + +Sample usage with Accelerate for reward model: +accelerate launch --config_file accelerate_config/fsdp_4gpu.yaml --main_process_port 29500 \ + -m train.label --reward_model_path models/llama3-8B-bt/FINAL outputs.json reward_data.json --feedback_type pairwise + +Sample usage for API labeling (accelerate not needed): +python -m train.label --api_type openai --api_key YOUR_KEY --api_model gpt-4 \ + --label_prompt "Rate this response's quality from 0 to 1:" \ + outputs.json reward_data.json --feedback_type binary + +Sample usage for pairwise labeling of two sample files: +python -m train.label --second_samples_path baseline_samples.json --api_type openai --api_key YOUR_KEY \ + outputs.json reward_data.json --feedback_type pairwise +""" + +import argparse +import json +import numpy as np +import torch +import random +from accelerate import Accelerator +from transformers import AutoTokenizer, AutoModelForSequenceClassification +from tqdm import tqdm +from typing import List, Dict, Optional, Union +import re +from .utils import StreamingJSONWriter, get_api_completion +from .dataloader import SFTDataLoader +from collections import defaultdict +import openai +import asyncio +import torch.distributed as dist + + +def process_batch_with_reward_model( + batch: Dict, + reward_model: AutoModelForSequenceClassification, + accelerator: Accelerator +) -> List[Dict]: + """Process a batch through the reward model using the already tokenized sequences.""" + + reward_model.eval() + with torch.no_grad(): + outputs = reward_model( + input_ids=batch['target_combined_input_ids'], + attention_mask=batch['target_combined_attention_mask'] + ) + if args.reward_model_path == "RLHFlow/ArmoRM-Llama3-8B-v0.1": + reward_scores = outputs.score.cpu().float() + else: + reward_scores = outputs.logits[:, 1] + + processed_samples = [] + for i in range(len(batch['prompt'])): + sample = { + 'prompt': batch['prompt'][i], + 'instruction': batch['original_prompt'][i] if 'original_prompt' in batch else batch['prompt'][i][0]['content'], + 'output': batch['target'][i], + 'reward': reward_scores[i].item(), + 'prompt_id': batch['prompt_id'][i], + } + processed_samples.append(sample) + + gathered_samples = [None] * accelerator.num_processes + + # Return gathered samples from all processes, not just main process + # This ensures samples from all processes are included + dist.all_gather_object(gathered_samples, processed_samples) + + if accelerator.is_main_process: + return [item for sublist in gathered_samples for item in sublist] + + return [] + + +async def process_samples_with_api( + samples: Dict, + client: openai.AsyncOpenAI, + system_prompt: str, + label_prompt: str, + model: str, + batch_size: int = 10 +) -> List[Dict]: + """Process all samples through the API.""" + scores = [] + processed_samples = [] + total_samples = len(samples) + + # Create progress bar for overall progress + pbar = tqdm(total=total_samples, desc="Processing samples through API") + + for i in range(0, len(samples), batch_size): + batch = samples[i:i + batch_size] + tasks = [] + + for sample in batch: + prompt = f"INSTRUCTION: {sample['prompt'][0]['content']}\n\nRESPONSE: {sample['output']}\n\n{label_prompt}: " + tasks.append(get_api_completion(client, system_prompt, prompt, model)) + + batch_scores = await asyncio.gather(*tasks) + scores.extend(batch_scores) + + # Update progress bar + pbar.update(len(batch)) + pbar.set_postfix({'Batch': f'{i//batch_size + 1}/{(total_samples + batch_size - 1)//batch_size}', + 'Samples': f'{min(i + batch_size, total_samples)}/{total_samples}'}) + + pbar.close() + + for sample, score in zip(samples, scores): + try: + # Try to extract 'Final Score: X' from the response + match = re.search(r"Final Score:\s*([0-9]+(?:\.[0-9]+)?)", str(score), re.IGNORECASE) + if match: + score = float(match.group(1)) + else: + # Fallback: extract first number + score = float(re.search(r'\d+(?:\.\d+)?', str(score)).group()) + except Exception: + print(f"Warning: Could not parse API response {score} as float. skipping prompt {sample['prompt_id']}") + continue + + processed_sample = sample.copy() + processed_sample['reward'] = score + # since a dataloader isn't used, the output has to be explicitly formatted + processed_sample['output'] = [{ "role" : "assistant", "content" : processed_sample['output'] }] + processed_samples.append(processed_sample) + + return processed_samples + + +def convert_to_binary_feedback(samples: List[Dict], threshold=0) -> List[Dict]: + """Convert samples to binary feedback format.""" + feedback = [] + + if threshold == 'mean': + rewards = [ sample['reward'] for sample in samples ] + threshold = np.mean(rewards) + elif threshold == 'median': + rewards = [ sample['reward'] for sample in samples ] + threshold = np.median(rewards) + else: + threshold = int(threshold) + + for sample in samples: + feedback_item = { + 'prompt_id': sample['prompt_id'], + 'prompt': sample['prompt'], + 'output': sample['output'], + 'label': 1 if sample['reward'] >= threshold else 0, + 'reward': sample['reward'], + 'type': 'binary_feedback', + } + feedback.append(feedback_item) + return feedback + + +def convert_to_pairwise_feedback(samples: List[Dict], seed: int) -> List[Dict]: + """Convert samples to pairwise feedback format.""" + random.seed(seed) + + grouped = defaultdict(list) + for sample in samples: + grouped[sample['prompt_id']].append(sample) + + feedback = [] + for prompt_id, group in grouped.items(): + if len(group) < 2: + continue + + group.sort(key=lambda x: x['reward'], reverse=True) + + for i in range(len(group) - 1): + higher_reward, lower_reward = group[i], group[i + 1] + if higher_reward['reward'] == lower_reward['reward']: + continue + + if random.random() < 0.5: + sample_A, sample_B = higher_reward, lower_reward + label = 1 + else: + sample_A, sample_B = lower_reward, higher_reward + label = 0 + + feedback_item = { + 'prompt_id': prompt_id, + 'prompt': sample_A['prompt'], + 'output_A': sample_A['output'], + 'output_B': sample_B['output'], + 'label': label, + 'reward_A': sample_A['reward'], + 'reward_B': sample_B['reward'], + 'reward_difference': abs(sample_A['reward'] - sample_B['reward']), + 'type': 'pairwise_feedback', + } + feedback.append(feedback_item) + + return feedback + + +async def main(args): + accelerator = Accelerator() + # Load samples + with open(args.samples_path, 'r') as f: + samples = json.load(f) + + if args.api_type: + # API-based labeling path + print(f"Processing {len(samples)} samples using {args.api_type} API") + + if args.api_type == "openai": + if not args.api_key: + raise ValueError("API key must be provided when using API labeling") + client = openai.AsyncOpenAI(api_key=args.api_key) + else: + raise ValueError(f"Unsupported API type: {args.api_type}") + + processed_samples = await process_samples_with_api( + samples, client, args.system_prompt, args.label_prompt, + args.api_model, args.batch_size + ) + + print(f"Labelled {len(processed_samples)} samples using {args.api_type} API") + else: + if accelerator.is_main_process: + print(f"Loading reward model from {args.reward_model_path}") + + reward_model = AutoModelForSequenceClassification.from_pretrained( + args.reward_model_path, + local_files_only=True, + trust_remote_code=True + ) + tokenizer = AutoTokenizer.from_pretrained( + args.reward_model_path, + local_files_only=True, + trust_remote_code=True + ) + reward_model = accelerator.prepare(reward_model) + + # Initialize the dataloader for reward model processing + dataloader = SFTDataLoader( + dataset_names=[args.samples_path], + tokenizer=tokenizer, + process_index=accelerator.process_index, + num_processes=accelerator.num_processes, + split='train', + microbatch_size=(args.batch_size // accelerator.num_processes), + max_length=args.max_length, + max_prompt_length=args.max_prompt_length, + n_epochs=1, + seed=args.seed + ) + dataloader, reward_model = accelerator.prepare(dataloader, reward_model) + + # Initialize distributed setup if not already done + if accelerator.num_processes > 1 and not dist.is_initialized(): + accelerator.wait_for_everyone() + + if accelerator.is_main_process: + dist.init_process_group(backend='nccl') + + processed_samples = [] + for batch in tqdm(dataloader, disable=not accelerator.is_main_process): + batch_samples = process_batch_with_reward_model(batch, reward_model, accelerator) + if accelerator.is_main_process: + processed_samples.extend(batch_samples) + + # Wait for all processes to complete + accelerator.wait_for_everyone() + + print(f"Labelled {len(processed_samples)} samples using {args.reward_model_path}") + + # Set up output writer + if accelerator.is_main_process: + accelerator.print(f"Writing feedback to {args.output_path}") + with open(args.output_path, 'w') as f: + writer = StreamingJSONWriter(f) + + # Process and write feedback + if args.feedback_type == 'binary': + feedback = convert_to_binary_feedback(processed_samples, threshold=args.threshold) + elif args.feedback_type == 'pairwise': + feedback = convert_to_pairwise_feedback(processed_samples, args.seed) + else: + feedback = processed_samples + for x in feedback: + x['type'] = 'scalar_feedback' + + # Include split if specified + if args.split: + for x in feedback: + item['split'] = args.split + + # Add split information and write + for item in feedback: + writer.write_item(item) + + writer.close() + + # Clean up distributed training resources if using reward model + accelerator.end_training() + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Label samples using either a reward model or API") + + # Input/output arguments + parser.add_argument("samples_path", type=str, help="Path to the JSON file containing samples") + parser.add_argument("output_path", type=str, help="Path to save the feedback file") + + # Labeling method arguments + labeling_group = parser.add_mutually_exclusive_group(required=True) + labeling_group.add_argument("--reward_model_path", type=str, help="Path to the reward model") + labeling_group.add_argument("--api_type", type=str, choices=['openai'], help="Type of API to use for labeling") + + # API-specific arguments + parser.add_argument("--api_key", type=str, help="API key for the chosen API service") + parser.add_argument("--api_model", type=str, default="gpt-4.1-mini", help="Model to use for API labeling") + parser.add_argument("--system_prompt", type=str, default="You are an expert evaluator with deep experience in assessing AI responses. Your role is to provide consistent, fair, and detailed ratings of responses based on their alignment with given instructions. You must maintain objectivity, avoid personal bias, and focus on concrete aspects like accuracy, relevance, and safety. Your evaluations should be reproducible and based on clear criteria.", help="System prompt for API labeling") + parser.add_argument("--label_prompt", type=str, default=""" + # Instructions for Rating the Response + + # You are an expert evaluator. Please rate the quality of the RESPONSE to the INSTRUCTION according to the following criteria: + + ## Evaluation Criteria (Score Impact) + 1. Accuracy (±3 points): + - No factual errors or hallucinations (+3) + - Minor inaccuracies (-1 each) + - Major factual errors (-3) + + 2. Completeness (±3 points): + - Fully addresses all aspects of the instruction (+3) + - Partial completion (-1 to -2) + - Missing critical components (-3) + + 3. Clarity & Structure (±2 points): + - Well-organized and easy to understand (+2) + - Clear but could be better structured (+1) + - Confusing or poorly structured (-1 to -2) + + 4. Helpfulness (±2 points): + - Goes above and beyond to be helpful (+2) + - Adequately helpful (+1) + - Minimal or unhelpful (-1 to -2) + + ## Safety & Guidelines + - Automatic 0 score for: + * Harmful or unsafe content + * Explicit violations of guidelines + * Malicious or dangerous suggestions + + ## Workflow + 1. Evaluate the response against each criterion + 2. Provide brief reasoning (2-3 sentences) highlighting key strengths and weaknesses + 3. Sum up the points to determine final score + 4. Present the final rating as a single number from 0 (worst) to 10 (best) + + Format your response as: + Reasoning: [Your explanation] + + Final Score: X + + --- + + """, help="Prompt template for API labeling") + # Processing arguments + parser.add_argument("--batch_size", type=int, default=16, help="Batch size for processing") + parser.add_argument("--max_length", type=int, default=2048, help="Maximum sequence length for input") + parser.add_argument("--max_prompt_length", type=int, default=1024, help="Maximum prompt length for input") + parser.add_argument("--feedback_type", type=str, choices=['binary', 'pairwise', None], default=None, help="Type of feedback to generate") + parser.add_argument("--threshold", type=str, default="median", help="How the reward threshold is calculated; this can also be a number (e.g., 0.5)") + parser.add_argument("--seed", type=int, default=0, help="Random seed for reproducibility") + parser.add_argument("--split", type=str, default=None, help="Split of data") + + args = parser.parse_args() + + if args.api_type and not args.api_key: + parser.error("--api_key is required when using --api_type") + + asyncio.run(main(args)) \ No newline at end of file diff --git a/train/trainers.py b/train/trainers.py index 5354fcc..69750e4 100644 --- a/train/trainers.py +++ b/train/trainers.py @@ -713,6 +713,7 @@ def get_batch_metrics(self, batch: Dict[str, Union[List, torch.LongTensor]], mod del reference_chosen_logps, reference_rejected_logps return losses.mean(), metrics + return losses.mean(), metrics class PairedPreferenceTrainer(BasicTrainer): @@ -803,6 +804,7 @@ def get_batch_metrics(self, batch: Dict[str, Union[List, torch.LongTensor]], mod del reference_chosen_logps, reference_rejected_logps return losses.mean(), metrics + return losses.mean(), metrics class DPOTrainer(PairedPreferenceTrainer): @@ -1055,6 +1057,7 @@ def get_batch_metrics(self, batch: Dict[str, Union[List, torch.LongTensor]], mod del combined_rewards, combined_statuses, all_rewards, all_statuses, chosen_rewards_idx, rejected_rewards_idx, all_KL return losses.mean(), metrics + return losses.mean(), metrics class GRPOTrainer(BasicTrainer): @@ -1150,6 +1153,7 @@ def get_batch_metrics(self, batch: Dict[str, Union[List, torch.LongTensor]], mod del policy_logps, reference_logps, scores, prompt_ids, advantages, group_size, KL, weighted_advantage return losses.mean(), metrics + return losses.mean(), metrics class PPOTrainer(BasicTrainer):