Skip to content

Conversation

@xyuzh
Copy link

@xyuzh xyuzh commented Nov 19, 2025

Summary

This PR adds a new production-ready example demonstrating fault-tolerant distributed training using Megatron and Ray. The implementation showcases how to build resilient ML training systems that can automatically recover from actor failures without losing progress.

What's New

🆕 megatron_ray_fault_tolerant Example

A complete implementation of PPO-style distributed training with enterprise-grade fault tolerance:

  • Automatic Failure Recovery: Detects and recovers from actor failures mid-training
  • Backup Actor Pool: Pre-allocated spare GPUs enable instant worker replacement
  • Zero Progress Loss: Checkpoint-based recovery ensures training continues from last saved state
  • Process Group Management: Handles NCCL re-initialization after topology changes
  • Multi-Dimensional Parallelism: Supports DP, TP, PP, and CP for large-scale training

Key Features

Fault Tolerance Architecture

  1. Health Monitoring: Continuous actor health checks with timeout handling
  2. Graceful Recovery:
    • Destroy stale process groups
    • Replace failed actors from backup pool
    • Re-initialize with updated world size
    • Reload state from distributed checkpoints
  3. Backup Strategy: Configurable spare GPU count for various failure scenarios

Distributed Training Capabilities

  • Megatron Integration: Full support for Megatron-Core parallelism strategies
  • Flexible Dispatch System:
    • MeshDispatch: Smart data sharding across device mesh
    • PassThroughDispatch: Broadcast operations to all workers
    • Extensible registry for custom strategies
  • Cloud-Native Checkpointing: S3/GCS support with parallel I/O
  • PPO Training: Reference implementation with gradient accumulation

Testing & Validation

The example includes a built-in fault tolerance demonstration:

  1. ✅ Initialize 4 workers + 4 backup actors
  2. ✅ Run training step and save checkpoint
  3. ✅ Simulate failure by killing a data-parallel group
  4. ✅ Automatically recover using backup actors
  5. ✅ Resume training and verify correctness

Run the demo:
use run.sh

Submit to Anyscale:
anyscale job submit -f job.yamlResource Requirements:

  • GPU instances: g6e.12xlarge (4x L40S GPUs)
  • Scales: 0-2 nodes (0-8 GPUs + spares)
  • Storage: S3/GCS for checkpoints

Use Cases

  • Research: Fault-tolerant experiments on preemptible instances
  • Production: Reliable long-running training jobs
  • Cost Optimization: Leverage spot instances with auto-recovery
  • Large Models: Scale beyond single-node with parallelism
  • RL Training: PPO and similar on-policy algorithms

Related Work

This example builds on:

  • Megatron-LM parallelism strategies
  • Ray's actor model and placement groups

Future Enhancements

  • Virtual pipeline parallelism support
  • CPU offload optimization for faster recovery
  • Async checkpoint saving
  • Multi-node failure recovery testing
  • Integration with Ray Train

Note: This example requires GPU resources and cloud storage configuration. See the README for detailed setup instructions.

robertnishihara and others added 5 commits November 18, 2025 19:09
- Update Ray base image to 2.51.1 and vLLM to 0.11.0
- Add boto3 dependency for S3 operations
- Update transformers to 4.57.1 for compatibility
- Configure compute resources with auto-selection (max 520 CPU, 128 GPU)
- Add disk size configuration options for customer-hosted deployments
- Implement robust URL validation and error handling
- Add base64 image encoding for Arrow serialization
- Add JPEG format validation and 128x128 image resizing
- Scale model replicas from 1 to 32 for higher throughput
- Optimize batch sizes and memory usage for large-scale processing
- Implement session pooling for HTTP requests with retry logic
- Add timestamp-based output paths to /mnt/shared_storage
- Add run.sh script for job submission with HF_TOKEN
…rance

- Implements PPO-style training with Megatron and Ray
- Features automatic actor recovery from failures
- Includes backup actor pool for seamless replacement
- Supports DP, TP, PP, and CP parallelism
- Distributed checkpoint saving/loading
- Process group re-initialization after failures
- Added comprehensive documentation in README files
pipeline_model_parallel_size: int = 1
context_parallel_size: int = 1
expert_model_parallel_size: int = 1
expert_tensor_parallel_size: int = 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a large scale recover system

BasicType = Union[int, float, str, bool]


@ray.remote(num_gpus=1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need actual GPU actors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants