Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Examples

This repository contains examples for deploying and running distributed applications.

## Job Examples

### 1. Hello World Job
**Directory:** `01_job_hello_world/`

A simple "Hello World" example demonstrating how to submit and run basic jobs.

### 2. Image Processing
**Directory:** `image_processing/`

Process large-scale image datasets using Ray Data. This example demonstrates processing the ReLAION-2B dataset with over 2 billion rows.

### 3. Megatron + Ray Fault Tolerant Training
**Directory:** `megatron_ray_fault_tolerant/`

Implements PPO-style distributed training with Megatron and Ray, featuring comprehensive fault tolerance capabilities:
- Automatic actor recovery from failures
- Backup actor groups for seamless replacement
- Distributed checkpoint saving/loading
- Process group re-initialization after failures
- Support for tensor, pipeline, data, and context parallelism

## Service Examples

### 1. Hello World Service
**Directory:** `02_service_hello_world/`

A simple service deployment example demonstrating the basics of Ray Serve.

### 2. Deploy Llama 3.1 8B
**Directory:** `03_deploy_llama_3_8b/`

Deploy Llama 3.1 8B model using Ray Serve and vLLM with autoscaling capabilities.

### 3. Deploy Llama 3.1 70B
**Directory:** `deploy_llama_3_1_70b/`

Deploy the larger Llama 3.1 70B model with optimized serving configuration.

### 4. Tensor Parallel Serving
**Directory:** `serve_tensor_parallel/`

Demonstrates tensor parallelism for serving large language models across multiple GPUs.

### 5. FastVideo Generation
**Directory:** `video_generation_with_fastvideo/`

Deploy a video generation service using the FastVideo framework.

## Reinforcement Learning Examples

### SkyRL
**Directory:** `skyrl/`

Reinforcement learning training example using Ray and distributed computing.

## Getting Started

Most examples include their own README with specific instructions. Generally, you'll need:

1. Install the Anyscale CLI:
```bash
pip install -U anyscale
anyscale login
```

2. Navigate to the example directory:
```bash
cd <example_directory>
```

3. Deploy the service or submit the job:
```bash
# For services
anyscale service deploy -f service.yaml

# For jobs
anyscale job submit -f job.yaml
```

## Requirements

- Anyscale account and CLI access
- Appropriate cloud credentials configured
- GPU resources for ML/LLM examples

## Contributing

When adding new examples:
1. Create a descriptive directory name
2. Include a README.md with setup and usage instructions
3. Add appropriate YAML configuration files
4. Update this main README with your example

## License

See individual example directories for specific licensing information.

1 change: 1 addition & 0 deletions megatron_ray_fault_tolerant/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
20 changes: 20 additions & 0 deletions megatron_ray_fault_tolerant/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.9
hooks:
- id: ruff
args: [ --fix, --exit-non-zero-on-fix ]
exclude: (^(skyagent)/.*)$

# Black needs to be ran after ruff with --fix
- repo: https://github.com/psf/black
rev: 24.10.0
hooks:
- id: black
exclude: (^(skyagent)/.*)$

# Detect secrets and sensitive information
- repo: https://github.com/gitleaks/gitleaks
rev: v8.24.2
hooks:
- id: gitleaks
34 changes: 34 additions & 0 deletions megatron_ray_fault_tolerant/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
FROM anyscale/ray:2.51.0-slim-py312-cu128

RUN sudo apt-get update -y && sudo apt-get install -y wget kmod libxml2 build-essential libnuma-dev

# the cuda compiler here is needed for deepspeed
RUN wget https://developer.download.nvidia.com/compute/cuda/12.8.0/local_installers/cuda_12.8.0_570.86.10_linux.run \
&& sudo sh cuda_12.8.0_570.86.10_linux.run --silent --toolkit && rm -rf cuda_12.8.0_570.86.10_linux.run

RUN curl -LsSf https://astral.sh/uv/0.9.4/install.sh | sh
RUN echo "export RAY_RUNTIME_ENV_HOOK=ray._private.runtime_env.uv_runtime_env_hook.hook" >> /home/ray/.bashrc


RUN sudo apt-get update \
&& sudo apt-get install -y openssh-server iputils-ping net-tools iproute2 traceroute netcat \
libopenexr-dev libxi-dev libglfw3-dev libglew-dev libomp-dev libxinerama-dev libxcursor-dev tzdata \
&& sudo apt-get clean && sudo rm -rf /var/lib/apt/lists/*

RUN sudo apt update && sudo apt install --fix-broken && sudo apt install -y default-jre-headless openjdk-8-jdk \
&& sudo apt-get clean \
&& sudo rm -rf /var/lib/apt/lists/*

# ---------- PyTorch + cuDNN + Transformer Engine ----------
# PyTorch + cuDNN + Transformer Engine
RUN pip install --no-cache-dir "torch==2.7.1" "nvidia-cudnn-cu12>=9.3" && \
CUDNN_PATH="$(python -c 'import inspect, nvidia.cudnn as c, os; print(os.path.dirname(inspect.getfile(c)))')" && \
sudo mkdir -p /opt && sudo ln -sfn "$CUDNN_PATH" /opt/cudnn && \
echo "/opt/cudnn/lib" | sudo tee /etc/ld.so.conf.d/cudnn.conf >/dev/null && sudo ldconfig

ENV CUDNN_PATH=/opt/cudnn
ENV CPATH=${CUDNN_PATH}/include:${CPATH}
ENV LD_LIBRARY_PATH=${CUDNN_PATH}/lib:${LD_LIBRARY_PATH}

RUN pip install --no-cache-dir --no-build-isolation "transformer_engine[pytorch]==2.5.0"
# --------------------
191 changes: 191 additions & 0 deletions megatron_ray_fault_tolerant/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Megatron + Ray Fault Tolerant Training

This example implements PPO-style distributed training using Megatron and Ray with comprehensive fault tolerance capabilities. The system can automatically recover from actor failures during training by utilizing backup actors and re-initializing process groups.

## Key Features

### Fault Tolerance Mechanisms

1. **Actor Health Monitoring**: Continuously monitors the health of distributed training actors
2. **Backup Actor Pool**: Pre-allocated backup actors ready to replace failed workers
3. **Automatic Recovery**: Seamlessly recovers from failures by:
- Detecting dead actors
- Destroying old process groups
- Replacing failed actors with backup actors
- Re-initializing process groups with new world size
- Reloading model and optimizer state from checkpoints

4. **Distributed Checkpointing**: Implements efficient sharded checkpoint saving/loading using Megatron's distributed checkpointing
5. **Process Group Management**: Handles NCCL process group initialization, destruction, and re-initialization

### Parallelism Support

- **Data Parallelism (DP)**: Distributes training data across multiple GPUs
- **Tensor Parallelism (TP)**: Splits model tensors across GPUs
- **Pipeline Parallelism (PP)**: Distributes model layers across GPUs
- **Context Parallelism (CP)**: Enables sequence parallelism for long contexts

### Advanced Training Features

- **PPO Training**: Implements Proximal Policy Optimization with micro-batch accumulation
- **Mixed Precision**: Supports BF16 training for improved performance
- **Gradient Accumulation**: Handles micro-batches with automatic gradient accumulation
- **Distributed Optimizer**: Uses Megatron's distributed optimizer for memory efficiency

## Architecture

### Core Components

1. **MegatronActor** (`megatron_actor.py`):
- Individual training actor wrapping Megatron models
- Handles model initialization, forward/backward passes, and checkpointing
- Supports dynamic process group re-initialization

2. **MegatronActorGroup** (`megatron_actor.py`):
- Manages a group of distributed actors
- Implements fault recovery logic
- Coordinates distributed training operations

3. **Dispatch System** (`dispatch.py`):
- **MeshDispatch**: Distributes data across the device mesh (DP, SP, TP, PP)
- **PassThroughDispatch**: Broadcasts same data/commands to all actors
- Handles data sharding and result collection

4. **Training Batch** (`training_batch.py`):
- Defines input/output batch structures for PPO training
- Supports chunking and concatenation for distributed operations

5. **Checkpoint I/O** (`file_io.py`):
- Cloud-aware file I/O supporting S3, GCS, and local storage
- Efficient checkpoint upload/download with parallel transfers

## Getting Started

### Quick Start

```bash
uv run --isolated main.py
```

This will:
1. Create a placement group with workers and backup GPUs
2. Initialize the actor group and model
3. Run a training step
4. Save a checkpoint
5. Simulate a failure by killing actors
6. Recover from the failure using backup actors
7. Resume training after recovery

### Configuration

Edit the `Config` class in `main.py` to customize:

```python
@dataclass
class Config:
model: str = "Qwen/Qwen3-0.6B" # HuggingFace model name
num_nodes: int = 1
num_gpus_per_node: int = 4
num_spare_gpus: int = 4 # Backup actors for fault tolerance
mini_batch_size: int = 16
micro_train_batch_size_per_gpu: int = 2

# Megatron parallelism settings
megatron_config: MegatronConfig = field(default_factory=MegatronConfig)
```

### Megatron Parallelism Configuration

```python
@dataclass
class MegatronConfig:
tensor_model_parallel_size: int = 1 # TP degree
pipeline_model_parallel_size: int = 1 # PP degree
context_parallel_size: int = 1 # CP degree
expert_model_parallel_size: int = 1 # For MoE models
```

## Fault Recovery Workflow

1. **Training Phase**:
- Actors perform distributed training using Megatron
- Periodic checkpoints saved to cloud storage

2. **Failure Detection**:
- System detects actor failures via health checks
- Identifies affected data parallel groups

3. **Recovery Process**:
- Destroy old process groups on healthy actors
- Pop backup actors from the backup pool
- Insert backup actors at failed ranks
- Update world size and reassign ranks
- Re-initialize process groups with new configuration
- Reload model/optimizer state from checkpoint

4. **Resume Training**:
- Continue training with recovered actor group
- No loss of training progress (from last checkpoint)

## Advanced Usage

### Custom Dispatch Types

Register custom dispatch strategies:

```python
from dispatch import register_dispatch_type, Dispatch

class CustomDispatch(Dispatch):
# Implement dispatch, collect, and validate methods
pass

register_dispatch_type("custom", CustomDispatch)
```

### CPU Offloading (Experimental)

For faster recovery, offload model/optimizer state to CPU memory:

```python
# Before failure
ray.get(actor_group.async_run_ray_method("pass_through", "offload_to_cpu"))

# After recovery, on healthy actors
ray.get(actor_group.async_run_ray_method("pass_through", "backload_to_gpu"))
```

## Dependencies

See `pyproject.toml` for full dependency list. Key dependencies:
- Ray for distributed orchestration
- Megatron-Core for model parallelism
- PyTorch with CUDA support
- Transformers for model loading
- vLLM and related libraries

## Running on Anyscale

Submit the job using:

```bash
anyscale job submit -f job.yaml
```

The job configuration in `job.yaml` specifies:
- Container image with dependencies
- GPU instance types (g6e.12xlarge with 4xL4)
- Resource limits and scaling
- Environment variables for NCCL configuration

## Limitations and Future Work

- Virtual pipeline parallelism not yet supported
- CPU offloading optimization in progress
- Async checkpoint saving planned for future releases

## References

- [Megatron-LM](https://github.com/NVIDIA/Megatron-LM)
- [Ray Documentation](https://docs.ray.io/)
- [Anyscale Platform](https://docs.anyscale.com/)
Loading