Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions csrc/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ struct LowLatencyLayout {
return reinterpret_cast<out_ptr_t>(reinterpret_cast<count_ptr_t>(ptr) + count);
}

LowLatencyLayout(void* rdma_buffer, int num_max_dispatch_tokens_per_rank, int hidden, int num_ranks, int num_experts) {
LowLatencyLayout(
bool disable_ll_layered, void* rdma_buffer, int num_max_dispatch_tokens_per_rank, int hidden, int num_ranks, int num_experts) {
const int num_scales = hidden / 128;
const int num_nodes = num_ranks / NUM_MAX_NVL_PEERS; // TODO Automatically calculate the value of NUM_MAX_NVL_PEERS according to
// the running situation of the process

// Dispatch and combine layout:
// - 2 symmetric odd/even send buffer
Expand All @@ -145,7 +148,12 @@ struct LowLatencyLayout {
// NOTES: you should add a control `int4` for combine messages if you want to do data transformation
// NOTES: `num_scales * sizeof(nv_bfloat162)` means the per-128-channel min/max
EP_HOST_ASSERT(num_scales * sizeof(float) <= hidden);
size_t per_meta_data_size = sizeof(int4);
size_t per_token_size = std::max(hidden * sizeof(nv_bfloat16), hidden + num_scales * sizeof(float));
size_t num_bytes_per_dispatch_msg = sizeof(int4) + std::max(hidden * sizeof(nv_bfloat16), hidden + num_scales * sizeof(float));
if (!disable_ll_layered) {
num_bytes_per_dispatch_msg = per_meta_data_size + per_token_size;
}
size_t num_bytes_per_combine_msg = num_scales * sizeof(nv_bfloat162) + hidden * sizeof(nv_bfloat16);

// Send buffer
Expand All @@ -158,13 +166,23 @@ struct LowLatencyLayout {
// Symmetric receive buffers
// TODO: optimize memory usages
size_t dispatch_recv_data_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg;
if (!disable_ll_layered) {
dispatch_recv_data_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * per_meta_data_size +
num_nodes * num_max_dispatch_tokens_per_rank * per_token_size; // means num_experts == local_experts * num_ranks
}
size_t combine_recv_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_combine_msg;
size_t recv_buffer_bytes = std::max(dispatch_recv_data_buffer_bytes, combine_recv_buffer_bytes);
EP_HOST_ASSERT(recv_buffer_bytes % sizeof(int4) == 0);
total_bytes += recv_buffer_bytes * 2;

// Symmetric signaling buffers
size_t dispatch_recv_count_buffer_bytes = num_experts * sizeof(int);
size_t dispatch_recv_count_buffer_bytes =
num_experts * sizeof(int); // means num_experts == local_experts * num_ranks == local_experts * NUM_MAX_NVL_PEERS * num_nodes,
// Half is used in dispatch, and the other half is used in combine.
if (!disable_ll_layered) {
dispatch_recv_count_buffer_bytes +=
NUM_MAX_NVL_PEERS * num_nodes * num_max_dispatch_tokens_per_rank * sizeof(int) + NUM_MAX_NVL_PEERS * sizeof(int);
}
size_t combine_recv_flag_buffer_bytes = dispatch_recv_count_buffer_bytes;
size_t signaling_buffer_bytes = std::max(dispatch_recv_count_buffer_bytes, combine_recv_flag_buffer_bytes);
size_t signaling_buffer_bytes_aligned = align_up<size_t>(signaling_buffer_bytes, 128);
Expand All @@ -187,8 +205,10 @@ struct LowLatencyLayout {
}
};

size_t get_low_latency_rdma_size_hint(int num_max_dispatch_tokens_per_rank, int hidden, int num_ranks, int num_experts) {
auto num_bytes = LowLatencyLayout(nullptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts).total_bytes;
size_t get_low_latency_rdma_size_hint(
bool dispatch_ll_dispatch_opt, int num_max_dispatch_tokens_per_rank, int hidden, int num_ranks, int num_experts) {
auto num_bytes =
LowLatencyLayout(dispatch_ll_dispatch_opt, nullptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts).total_bytes;
return ((num_bytes + NUM_BUFFER_ALIGNMENT_BYTES) / NUM_BUFFER_ALIGNMENT_BYTES) * NUM_BUFFER_ALIGNMENT_BYTES;
}

Expand Down
18 changes: 11 additions & 7 deletions csrc/deep_ep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ Buffer::Buffer(int rank,
bool low_latency_mode,
bool explicitly_destroy,
bool enable_shrink,
bool use_fabric)
bool use_fabric,
bool disable_ll_layered)
: rank(rank),
num_ranks(num_ranks),
num_nvl_bytes(num_nvl_bytes),
num_rdma_bytes(num_rdma_bytes),
enable_shrink(enable_shrink),
_disable_ll_layered(disable_ll_layered),
low_latency_mode(low_latency_mode),
explicitly_destroy(explicitly_destroy),
comm_stream(at::cuda::getStreamFromPool(true)),
Expand Down Expand Up @@ -1499,7 +1501,7 @@ void Buffer::clean_low_latency_buffer(int num_max_dispatch_tokens_per_rank, int
#ifndef DISABLE_NVSHMEM
EP_HOST_ASSERT(low_latency_mode);

auto layout = LowLatencyLayout(rdma_buffer_ptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts);
auto layout = LowLatencyLayout(_disable_ll_layered, rdma_buffer_ptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts);
auto clean_meta_0 = layout.buffers[0].clean_meta();
auto clean_meta_1 = layout.buffers[1].clean_meta();

Expand Down Expand Up @@ -1571,7 +1573,7 @@ Buffer::low_latency_dispatch(const torch::Tensor& x,
auto num_local_experts = num_experts / num_ranks;

// Buffer control
LowLatencyLayout layout(rdma_buffer_ptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts);
LowLatencyLayout layout(_disable_ll_layered, rdma_buffer_ptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts);
EP_HOST_ASSERT(layout.total_bytes <= num_rdma_bytes);
auto buffer = layout.buffers[low_latency_buffer_idx];
auto next_buffer = layout.buffers[low_latency_buffer_idx ^= 1];
Expand Down Expand Up @@ -1616,6 +1618,7 @@ Buffer::low_latency_dispatch(const torch::Tensor& x,
auto next_clean_meta = next_buffer.clean_meta();
auto launcher = [=](int phases) {
internode_ll::dispatch(
_disable_ll_layered,
packed_recv_x.data_ptr(),
packed_recv_x_scales_ptr,
packed_recv_src_info.data_ptr<int64_t>(),
Expand Down Expand Up @@ -1729,7 +1732,7 @@ std::tuple<torch::Tensor, std::optional<EventHandle>, std::optional<std::functio
auto num_combined_tokens = static_cast<int>(topk_weights.size(0));

// Buffer control
LowLatencyLayout layout(rdma_buffer_ptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts);
LowLatencyLayout layout(_disable_ll_layered, rdma_buffer_ptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts);
EP_HOST_ASSERT(layout.total_bytes <= num_rdma_bytes);
auto buffer = layout.buffers[low_latency_buffer_idx];
auto next_buffer = layout.buffers[low_latency_buffer_idx ^= 1];
Expand All @@ -1756,7 +1759,8 @@ std::tuple<torch::Tensor, std::optional<EventHandle>, std::optional<std::functio
// Kernel launch
auto next_clean_meta = next_buffer.clean_meta();
auto launcher = [=](int phases) {
internode_ll::combine(combined_x.data_ptr(),
internode_ll::combine(_disable_ll_layered,
combined_x.data_ptr(),
buffer.combine_rdma_recv_data_buffer,
buffer.combine_rdma_recv_flag_buffer,
buffer.combine_rdma_send_buffer,
Expand Down Expand Up @@ -1816,7 +1820,7 @@ std::tuple<torch::Tensor, std::optional<EventHandle>, std::optional<std::functio

torch::Tensor Buffer::get_next_low_latency_combine_buffer(int num_max_dispatch_tokens_per_rank, int hidden, int num_experts) const {
#ifndef DISABLE_NVSHMEM
LowLatencyLayout layout(rdma_buffer_ptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts);
LowLatencyLayout layout(_disable_ll_layered, rdma_buffer_ptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts);

auto buffer = layout.buffers[low_latency_buffer_idx];
auto dtype = torch::kBFloat16;
Expand Down Expand Up @@ -1881,7 +1885,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
.def("current_stream_wait", &deep_ep::EventHandle::current_stream_wait);

pybind11::class_<deep_ep::Buffer>(m, "Buffer")
.def(pybind11::init<int, int, int64_t, int64_t, bool, bool, bool, bool>())
.def(pybind11::init<int, int, int64_t, int64_t, bool, bool, bool, bool, bool>())
.def("is_available", &deep_ep::Buffer::is_available)
.def("get_num_rdma_ranks", &deep_ep::Buffer::get_num_rdma_ranks)
.def("get_rdma_rank", &deep_ep::Buffer::get_rdma_rank)
Expand Down
4 changes: 3 additions & 1 deletion csrc/deep_ep.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct Buffer {

// Shrink mode buffer
bool enable_shrink = false;
bool _disable_ll_layered = false;
int* mask_buffer_ptr = nullptr;
int* sync_buffer_ptr = nullptr;

Expand Down Expand Up @@ -120,7 +121,8 @@ struct Buffer {
bool low_latency_mode,
bool explicitly_destroy,
bool enable_shrink,
bool use_fabric);
bool use_fabric,
bool _disable_ll_layered);

~Buffer() noexcept(false);

Expand Down
6 changes: 4 additions & 2 deletions csrc/kernels/api.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ void clean_low_latency_buffer(int* clean_0,
int* sync_buffer,
cudaStream_t stream);

void dispatch(void* packed_recv_x,
void dispatch(bool dispatch_ll_dispatch_opt,
void* packed_recv_x,
void* packed_recv_x_scales,
int64_t* packed_recv_src_info,
int64_t* packed_recv_layout_range,
Expand Down Expand Up @@ -312,7 +313,8 @@ void dispatch(void* packed_recv_x,
cudaStream_t stream,
int phases);

void combine(void* combined_x,
void combine(bool dispatch_ll_dispatch_opt,
void* combined_x,
void* rdma_recv_x,
int* rdma_recv_flag,
void* rdma_send_x,
Expand Down
Loading