Skip to content

Commit af1c026

Browse files
committed
Added Receiver support to broadcast channels
I originally thought the need to keep everything lock-step was too much of a burden, but I realized by removing the usage of mpsc, I could support a Receiver interface by using send_async in an on_receive_async callback. While this adds overhead if the receiver is then turned into a callback, that's not the intended workflow -- and it still works just fine, just has a little more overhead.
1 parent 5741d4e commit af1c026

File tree

1 file changed

+50
-10
lines changed

1 file changed

+50
-10
lines changed

src/reactive/channel.rs

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ use std::fmt::{self, Debug};
9090
use std::future::Future;
9191
use std::ops::ControlFlow;
9292
use std::pin::Pin;
93-
use std::sync::{mpsc, Arc};
93+
use std::sync::Arc;
9494
use std::task::{ready, Context, Poll, Waker};
9595

9696
use builder::Builder;
@@ -412,8 +412,8 @@ impl<T> Debug for SingleCallback<T> {
412412

413413
enum BroadcastCallback<T> {
414414
Blocking {
415-
sender: mpsc::SyncSender<(T, Waker)>,
416-
result: mpsc::Receiver<()>,
415+
sender: Sender<(T, Autowaker)>,
416+
result: Receiver<()>,
417417
},
418418
NonBlocking(Box<dyn AnyChannelCallback<T>>),
419419
}
@@ -425,10 +425,10 @@ impl<T> BroadcastCallback<T> {
425425
where
426426
T: Send + 'static,
427427
{
428-
let (value_sender, value_receiver) = mpsc::sync_channel::<(T, Waker)>(1);
429-
let (result_sender, result_receiver) = mpsc::sync_channel(1);
428+
let (value_sender, value_receiver) = bounded::<(T, Autowaker)>(1);
429+
let (result_sender, result_receiver) = bounded(1);
430430
std::thread::spawn(move || {
431-
while let Ok((value, waker)) = value_receiver.recv() {
431+
while let Some((value, waker)) = value_receiver.receive() {
432432
if let Ok(()) = cb(value) {
433433
if result_sender.send(()).is_err() {
434434
return;
@@ -587,12 +587,12 @@ where
587587
else {
588588
unreachable!("valid state");
589589
};
590-
match result.try_recv() {
590+
match result.try_receive() {
591591
Ok(()) => {
592592
self.next_recipient += 1;
593593
}
594-
Err(mpsc::TryRecvError::Empty) => return Poll::Pending,
595-
Err(mpsc::TryRecvError::Disconnected) => {
594+
Err(TryReceiveError::Empty) => return Poll::Pending,
595+
Err(TryReceiveError::Disconnected) => {
596596
data.behavior.0.remove(self.next_recipient);
597597
}
598598
}
@@ -620,7 +620,7 @@ where
620620
BroadcastCallback::Blocking { sender, .. } => {
621621
if let Ok(()) = sender.send((
622622
this.value.next().expect("enough value clones"),
623-
cx.waker().clone(),
623+
Autowaker(Some(cx.waker().clone())),
624624
)) {
625625
this.current_is_blocking = true;
626626
drop(data_mutex);
@@ -897,6 +897,25 @@ where
897897
self.data.force_send_inner(value, channel_id(&self.data))
898898
}
899899

900+
/// Creates a new receiver for this channel.
901+
///
902+
/// All receivers and callbacks must receive each value before the next
903+
/// value is able to be received.
904+
#[must_use]
905+
pub fn create_receiver(&self) -> Receiver<T> {
906+
let (sender, receiver) = bounded(1);
907+
self.on_receive_async_try(move |value| {
908+
let sender = sender.clone();
909+
async move {
910+
sender
911+
.send_async(value)
912+
.await
913+
.map_err(|_| CallbackDisconnected)
914+
}
915+
});
916+
receiver
917+
}
918+
900919
/// Invokes `on_receive` each time a value is sent to this channel.
901920
///
902921
/// This function assumes `on_receive` may block while waiting on another
@@ -1464,6 +1483,27 @@ pub enum TryReceiveError {
14641483
Disconnected,
14651484
}
14661485

1486+
struct Autowaker(Option<Waker>);
1487+
1488+
impl Autowaker {
1489+
fn wake_by_ref(&mut self) {
1490+
let Some(waker) = self.0.take() else {
1491+
return;
1492+
};
1493+
waker.wake();
1494+
}
1495+
1496+
fn wake(mut self) {
1497+
self.wake_by_ref();
1498+
}
1499+
}
1500+
1501+
impl Drop for Autowaker {
1502+
fn drop(&mut self) {
1503+
self.wake_by_ref();
1504+
}
1505+
}
1506+
14671507
#[test]
14681508
fn channel_basics() {
14691509
let (result_sender, result_receiver) = unbounded();

0 commit comments

Comments
 (0)