Skip to content

Commit e16ef0f

Browse files
committed
Fixing channel edge cases
The channel_disconnection test was flaky, and it was due to two separate issues. First, when sending to a channel with the callback disconnected while the future hadn't been disconnected fully yet, the send would still be allowed rather than being reported as disconnected despite never being able to be received. Second, using connected as a distinction of whether the channel polling future should be enqueued was not correct, as connected was being checked as to whether to poll for notifications.
1 parent dd1a1cc commit e16ef0f

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

src/reactive/channel.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ where
215215
{
216216
fn should_poll(&self) -> bool {
217217
let channel = self.synced.lock();
218-
!channel.queue.is_empty() && channel.behavior.connected()
218+
!channel.queue.is_empty() && channel.behavior.should_poll()
219219
}
220220

221221
fn poll(&self, futures: &mut Vec<ChannelCallbackFuture>) -> bool {
@@ -396,6 +396,10 @@ where
396396
!matches!(self, Self::Disconnected)
397397
}
398398

399+
fn should_poll(&self) -> bool {
400+
matches!(self, Self::Callback(_))
401+
}
402+
399403
fn disconnect(&mut self) {
400404
*self = Self::Disconnected;
401405
}
@@ -479,6 +483,10 @@ where
479483
!self.0.is_empty()
480484
}
481485

486+
fn should_poll(&self) -> bool {
487+
self.connected()
488+
}
489+
482490
fn disconnect(&mut self) {
483491
self.0.clear();
484492
}
@@ -693,6 +701,7 @@ where
693701

694702
trait CallbackBehavior<T>: Sized + Send + 'static {
695703
fn connected(&self) -> bool;
704+
fn should_poll(&self) -> bool;
696705
fn disconnect(&mut self);
697706
fn invoke(
698707
&mut self,
@@ -762,7 +771,9 @@ where
762771
) -> ControlFlow<()>,
763772
) -> Result<(), TrySendError<T>> {
764773
let mut channel = self.synced.lock();
765-
while channel.receivers > 0 || channel.behavior.connected() {
774+
while !matches!(channel.handle_status, CallbackHandleStatus::Dropped)
775+
&& (channel.receivers > 0 || channel.behavior.connected())
776+
{
766777
if channel.queue.len() >= channel.limit {
767778
match when_full(&mut channel) {
768779
ControlFlow::Continue(()) => continue,

0 commit comments

Comments
 (0)