Skip to content

Commit 5dacc2e

Browse files
task: add tests for spawn_local and spawn_local_on (#7609)
Add tests for task collections (TaskTracker, JoinSet, JoinMap).
1 parent 444d3f5 commit 5dacc2e

File tree

4 files changed

+773
-4
lines changed

4 files changed

+773
-4
lines changed

tokio-util/tests/task_join_map.rs

Lines changed: 277 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,76 @@
33

44
use std::panic::AssertUnwindSafe;
55

6+
use futures::future::{pending, FutureExt};
67
use tokio::sync::oneshot;
8+
use tokio::task::LocalSet;
79
use tokio::time::Duration;
810
use tokio_util::task::JoinMap;
911

10-
use futures::future::FutureExt;
11-
1212
fn rt() -> tokio::runtime::Runtime {
1313
tokio::runtime::Builder::new_current_thread()
1414
.build()
1515
.unwrap()
1616
}
1717

18+
// Spawn `N` tasks that return their index (`i`).
19+
fn spawn_index_tasks(map: &mut JoinMap<usize, usize>, n: usize, on: Option<&LocalSet>) {
20+
for i in 0..n {
21+
let rc = std::rc::Rc::new(i);
22+
match on {
23+
None => map.spawn_local(i, async move { *rc }),
24+
Some(local) => map.spawn_local_on(i, async move { *rc }, local),
25+
};
26+
}
27+
}
28+
29+
// Spawn `N` “pending” tasks that own a `oneshot::Sender`.
30+
// When the task is aborted the sender is dropped, which is observed
31+
// via the returned `Receiver`s.
32+
fn spawn_pending_tasks(
33+
map: &mut JoinMap<usize, ()>,
34+
receivers: &mut Vec<oneshot::Receiver<()>>,
35+
n: usize,
36+
on: Option<&LocalSet>,
37+
) {
38+
for i in 0..n {
39+
let (tx, rx) = oneshot::channel::<()>();
40+
receivers.push(rx);
41+
42+
let fut = async move {
43+
pending::<()>().await;
44+
drop(tx);
45+
};
46+
match on {
47+
None => map.spawn_local(i, fut),
48+
Some(local) => map.spawn_local_on(i, fut, local),
49+
};
50+
}
51+
}
52+
53+
/// Await every task in JoinMap and assert every task returns its own key.
54+
async fn drain_joinmap_and_assert(mut map: JoinMap<usize, usize>, n: usize) {
55+
let mut seen = vec![false; n];
56+
while let Some((k, res)) = map.join_next().await {
57+
let v = res.expect("task panicked");
58+
assert_eq!(k, v);
59+
seen[v] = true;
60+
}
61+
assert!(seen.into_iter().all(|b| b));
62+
assert!(map.is_empty());
63+
}
64+
65+
// Await every receiver and assert they all return `Err` because the
66+
// corresponding sender (inside an aborted task) was dropped.
67+
async fn await_receivers_and_assert(receivers: Vec<oneshot::Receiver<()>>) {
68+
for rx in receivers {
69+
assert!(
70+
rx.await.is_err(),
71+
"task should have been aborted and sender dropped"
72+
);
73+
}
74+
}
75+
1876
#[tokio::test(start_paused = true)]
1977
async fn test_with_sleep() {
2078
let mut map = JoinMap::new();
@@ -376,3 +434,220 @@ async fn duplicate_keys_drop() {
376434

377435
assert!(map.join_next().await.is_none());
378436
}
437+
438+
mod spawn_local {
439+
use super::*;
440+
441+
#[cfg(tokio_unstable)]
442+
mod local_runtime {
443+
use super::*;
444+
445+
/// Spawn several tasks, and then join all tasks.
446+
#[tokio::test(flavor = "local")]
447+
async fn spawn_then_join_next() {
448+
const N: usize = 8;
449+
450+
let mut map = JoinMap::new();
451+
spawn_index_tasks(&mut map, N, None);
452+
453+
assert!(map.join_next().now_or_never().is_none());
454+
drain_joinmap_and_assert(map, N).await;
455+
}
456+
457+
/// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
458+
#[tokio::test(flavor = "local")]
459+
async fn spawn_then_shutdown() {
460+
const N: usize = 8;
461+
462+
let mut map = JoinMap::new();
463+
let mut receivers = Vec::new();
464+
465+
spawn_pending_tasks(&mut map, &mut receivers, N, None);
466+
assert!(map.join_next().now_or_never().is_none());
467+
468+
map.shutdown().await;
469+
assert!(map.is_empty());
470+
await_receivers_and_assert(receivers).await;
471+
}
472+
473+
/// Spawn several pending-forever tasks, and then drop the [`JoinMap`].
474+
#[tokio::test(flavor = "local")]
475+
async fn spawn_then_drop() {
476+
const N: usize = 8;
477+
478+
let mut map = JoinMap::new();
479+
let mut receivers = Vec::new();
480+
481+
spawn_pending_tasks(&mut map, &mut receivers, N, None);
482+
assert!(map.join_next().now_or_never().is_none());
483+
484+
drop(map);
485+
await_receivers_and_assert(receivers).await;
486+
}
487+
}
488+
489+
mod local_set {
490+
use super::*;
491+
492+
/// Spawn several tasks, and then join all tasks.
493+
#[tokio::test(flavor = "current_thread")]
494+
async fn spawn_then_join_next() {
495+
const N: usize = 8;
496+
let local = LocalSet::new();
497+
498+
local
499+
.run_until(async move {
500+
let mut map = JoinMap::new();
501+
spawn_index_tasks(&mut map, N, None);
502+
drain_joinmap_and_assert(map, N).await;
503+
})
504+
.await;
505+
}
506+
507+
/// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
508+
#[tokio::test(flavor = "current_thread")]
509+
async fn spawn_then_shutdown() {
510+
const N: usize = 8;
511+
let local = LocalSet::new();
512+
513+
local
514+
.run_until(async {
515+
let mut map = JoinMap::new();
516+
let mut receivers = Vec::new();
517+
518+
spawn_pending_tasks(&mut map, &mut receivers, N, None);
519+
assert!(map.join_next().now_or_never().is_none());
520+
521+
map.shutdown().await;
522+
assert!(map.is_empty());
523+
await_receivers_and_assert(receivers).await;
524+
})
525+
.await;
526+
}
527+
528+
/// Spawn several pending-forever tasks, and then drop the [`JoinMap`].
529+
#[tokio::test(flavor = "current_thread")]
530+
async fn spawn_then_drop() {
531+
const N: usize = 8;
532+
let local = LocalSet::new();
533+
534+
local
535+
.run_until(async {
536+
let mut map = JoinMap::new();
537+
let mut receivers = Vec::new();
538+
539+
spawn_pending_tasks(&mut map, &mut receivers, N, None);
540+
assert!(map.join_next().now_or_never().is_none());
541+
542+
drop(map);
543+
await_receivers_and_assert(receivers).await;
544+
})
545+
.await;
546+
}
547+
}
548+
}
549+
550+
mod spawn_local_on {
551+
use super::*;
552+
553+
#[cfg(tokio_unstable)]
554+
mod local_runtime {
555+
use super::*;
556+
557+
/// Spawn several tasks, and then join all tasks.
558+
#[tokio::test(flavor = "local")]
559+
async fn spawn_then_join_next() {
560+
const N: usize = 8;
561+
562+
let local = LocalSet::new();
563+
let mut map = JoinMap::new();
564+
565+
spawn_index_tasks(&mut map, N, Some(&local));
566+
assert!(map.join_next().now_or_never().is_none());
567+
568+
local
569+
.run_until(async move {
570+
drain_joinmap_and_assert(map, N).await;
571+
})
572+
.await;
573+
}
574+
}
575+
576+
mod local_set {
577+
use super::*;
578+
579+
/// Spawn several tasks, and then join all tasks.
580+
#[tokio::test(flavor = "current_thread")]
581+
async fn spawn_then_join_next() {
582+
const N: usize = 8;
583+
let local = LocalSet::new();
584+
let mut pending_map = JoinMap::new();
585+
586+
spawn_index_tasks(&mut pending_map, N, Some(&local));
587+
assert!(pending_map.join_next().now_or_never().is_none());
588+
589+
local
590+
.run_until(async move {
591+
drain_joinmap_and_assert(pending_map, N).await;
592+
})
593+
.await;
594+
}
595+
596+
/// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
597+
#[tokio::test(flavor = "current_thread")]
598+
async fn spawn_then_shutdown() {
599+
const N: usize = 8;
600+
let local = LocalSet::new();
601+
let mut map = JoinMap::new();
602+
let mut receivers = Vec::new();
603+
604+
spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
605+
assert!(map.join_next().now_or_never().is_none());
606+
607+
local
608+
.run_until(async move {
609+
map.shutdown().await;
610+
assert!(map.is_empty());
611+
await_receivers_and_assert(receivers).await;
612+
})
613+
.await;
614+
}
615+
616+
/// Spawn several pending-forever tasks and then drop the [`JoinMap`]
617+
/// before the `LocalSet` is driven and while the `LocalSet` is already driven.
618+
#[tokio::test(flavor = "current_thread")]
619+
async fn spawn_then_drop() {
620+
const N: usize = 8;
621+
622+
{
623+
let local = LocalSet::new();
624+
let mut map = JoinMap::new();
625+
let mut receivers = Vec::new();
626+
627+
spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
628+
assert!(map.join_next().now_or_never().is_none());
629+
630+
drop(map);
631+
local
632+
.run_until(async move { await_receivers_and_assert(receivers).await })
633+
.await;
634+
}
635+
636+
{
637+
let local = LocalSet::new();
638+
let mut map = JoinMap::new();
639+
let mut receivers = Vec::new();
640+
641+
spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
642+
assert!(map.join_next().now_or_never().is_none());
643+
644+
local
645+
.run_until(async move {
646+
drop(map);
647+
await_receivers_and_assert(receivers).await;
648+
})
649+
.await;
650+
}
651+
}
652+
}
653+
}

0 commit comments

Comments
 (0)