Skip to content

Commit c6c26c4

Browse files
committed
trap on blocking call in sync task before return
This implements a spec change (PR pending) such that tasks created for calls to synchronous exports may not call potentially-blocking imports or return `wait` or `poll` callback codes prior to returning a value. Specifically, the following are prohibited in that scenario: - returning callback-code.{wait,poll} - sync calling async import - sync calling subtask.cancel - sync calling {stream,future}.{read,write} - sync calling {stream,future}.cancel-{read,write} - calling waitable-set.{wait,poll} - calling thread.suspend This breaks a number of tests, which will be addressed in follow-up commits: - The `{tcp,udp}-socket.bind` implementation in `wasmtime-wasi` is implemented using `Linker::func_wrap_concurrent` and thus assumed to be async, whereas the WIT interface says they're sync, leading to a type mismatch error at runtime. Alex and I have discussed this and have a general plan to address it. - A number of tests in the tests/component-model submodule that points to the spec repo are failing. Those will presumably be fixed as part of the upcoming spec PR (although some could be due to bugs in this implementation, in which case I'll fix them). - A number of tests in tests/misc_testsuite are failing. I'll address those in a follow-up commit. Signed-off-by: Joel Dice <[email protected]>
1 parent b570b4f commit c6c26c4

File tree

10 files changed

+141
-21
lines changed

10 files changed

+141
-21
lines changed

crates/environ/src/component.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ macro_rules! foreach_builtin_component_function {
132132
caller_instance: u32,
133133
callee_instance: u32,
134134
task_return_type: u32,
135+
callee_async: u32,
135136
string_encoding: u32,
136137
result_count_or_max_if_async: u32,
137138
storage: ptr_u8,

crates/environ/src/fact.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub static PREPARE_CALL_FIXED_PARAMS: &[ValType] = &[
4747
ValType::I32, // caller_instance
4848
ValType::I32, // callee_instance
4949
ValType::I32, // task_return_type
50+
ValType::I32, // callee_async
5051
ValType::I32, // string_encoding
5152
ValType::I32, // result_count_or_max_if_async
5253
];

crates/environ/src/fact/trampoline.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,11 @@ impl<'a, 'b> Compiler<'a, 'b> {
544544
self.instruction(I32Const(
545545
i32::try_from(self.types[adapter.lift.ty].results.as_u32()).unwrap(),
546546
));
547+
self.instruction(I32Const(if self.types[adapter.lift.ty].async_ {
548+
1
549+
} else {
550+
0
551+
}));
547552
self.instruction(I32Const(i32::from(
548553
adapter.lift.options.string_encoding as u8,
549554
)));

crates/environ/src/trap_encoding.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ pub enum Trap {
112112
/// scenario where a component instance tried to call an import or intrinsic
113113
/// when it wasn't allowed to, e.g. from a post-return function.
114114
CannotLeaveComponent,
115+
116+
/// A synchronous task attempted to make a potentially blocking call.
117+
CannotBlockSyncTask,
115118
// if adding a variant here be sure to update the `check!` macro below
116119
}
117120

@@ -154,6 +157,7 @@ impl Trap {
154157
DisabledOpcode
155158
AsyncDeadlock
156159
CannotLeaveComponent
160+
CannotBlockSyncTask
157161
}
158162

159163
None
@@ -190,6 +194,7 @@ impl fmt::Display for Trap {
190194
DisabledOpcode => "pulley opcode disabled at compile time was executed",
191195
AsyncDeadlock => "deadlock detected: event loop cannot make further progress",
192196
CannotLeaveComponent => "cannot leave component instance",
197+
CannotBlockSyncTask => "cannot block a synchronous task",
193198
};
194199
write!(f, "wasm trap: {desc}")
195200
}

crates/misc/component-async-tests/wit/test.wit

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ interface resource-stream {
122122
foo: func();
123123
}
124124

125-
foo: func(count: u32) -> stream<x>;
125+
foo: async func(count: u32) -> stream<x>;
126126
}
127127

128128
interface closed {
@@ -157,7 +157,7 @@ interface cancel {
157157
leak-task-after-cancel,
158158
}
159159

160-
run: func(mode: mode, cancel-delay-millis: u64);
160+
run: async func(mode: mode, cancel-delay-millis: u64);
161161
}
162162

163163
interface intertask {

crates/test-programs/src/bin/async_read_resource_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ struct Component;
1515
impl Guest for Component {
1616
async fn run() {
1717
let mut count = 7;
18-
let mut stream = resource_stream::foo(count);
18+
let mut stream = resource_stream::foo(count).await;
1919

2020
while let Some(x) = stream.next().await {
2121
if count > 0 {

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,12 @@ pub(crate) enum WaitResult {
705705
Completed,
706706
}
707707

708+
/// Raise a trap if the calling task is synchronous and trying to block prior to
709+
/// returning a value.
710+
pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> {
711+
store.concurrent_state_mut().check_blocking()
712+
}
713+
708714
/// Poll the specified future until it completes on behalf of a guest->host call
709715
/// using a sync-lowered import.
710716
///
@@ -1643,6 +1649,8 @@ impl Instance {
16431649
}));
16441650
}
16451651
callback_code::WAIT | callback_code::POLL => {
1652+
state.check_blocking_for(guest_thread.task)?;
1653+
16461654
let set = get_set(store, set)?;
16471655
let state = store.concurrent_state_mut();
16481656

@@ -2038,6 +2046,7 @@ impl Instance {
20382046
caller_instance: RuntimeComponentInstanceIndex,
20392047
callee_instance: RuntimeComponentInstanceIndex,
20402048
task_return_type: TypeTupleIndex,
2049+
callee_async: bool,
20412050
memory: *mut VMMemoryDefinition,
20422051
string_encoding: u8,
20432052
caller_info: CallerInfo,
@@ -2181,6 +2190,7 @@ impl Instance {
21812190
},
21822191
None,
21832192
callee_instance,
2193+
callee_async,
21842194
)?;
21852195

21862196
let guest_task = state.push(new_task)?;
@@ -2848,6 +2858,10 @@ impl Instance {
28482858
set: u32,
28492859
payload: u32,
28502860
) -> Result<u32> {
2861+
if !self.options(store, options).async_ {
2862+
store.concurrent_state_mut().check_blocking()?;
2863+
}
2864+
28512865
self.id().get(store).check_may_leave(caller)?;
28522866
let &CanonicalOptions {
28532867
cancellable,
@@ -2877,6 +2891,10 @@ impl Instance {
28772891
set: u32,
28782892
payload: u32,
28792893
) -> Result<u32> {
2894+
if !self.options(store, options).async_ {
2895+
store.concurrent_state_mut().check_blocking()?;
2896+
}
2897+
28802898
self.id().get(store).check_may_leave(caller)?;
28812899
let &CanonicalOptions {
28822900
cancellable,
@@ -3056,6 +3074,11 @@ impl Instance {
30563074
yielding: bool,
30573075
to_thread: Option<u32>,
30583076
) -> Result<WaitResult> {
3077+
if to_thread.is_none() && !yielding {
3078+
// This is a `thread.suspend` call
3079+
store.concurrent_state_mut().check_blocking()?;
3080+
}
3081+
30593082
// There could be a pending cancellation from a previous uncancellable wait
30603083
if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
30613084
return Ok(WaitResult::Cancelled);
@@ -3185,6 +3208,10 @@ impl Instance {
31853208
async_: bool,
31863209
task_id: u32,
31873210
) -> Result<u32> {
3211+
if !async_ {
3212+
store.concurrent_state_mut().check_blocking()?;
3213+
}
3214+
31883215
self.id().get(store).check_may_leave(caller_instance)?;
31893216
let (rep, is_host) =
31903217
self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
@@ -3345,6 +3372,7 @@ pub trait VMComponentAsyncStore {
33453372
caller_instance: RuntimeComponentInstanceIndex,
33463373
callee_instance: RuntimeComponentInstanceIndex,
33473374
task_return_type: TypeTupleIndex,
3375+
callee_async: bool,
33483376
string_encoding: u8,
33493377
result_count: u32,
33503378
storage: *mut ValRaw,
@@ -3504,6 +3532,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
35043532
caller_instance: RuntimeComponentInstanceIndex,
35053533
callee_instance: RuntimeComponentInstanceIndex,
35063534
task_return_type: TypeTupleIndex,
3535+
callee_async: bool,
35073536
string_encoding: u8,
35083537
result_count_or_max_if_async: u32,
35093538
storage: *mut ValRaw,
@@ -3522,6 +3551,7 @@ impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
35223551
caller_instance,
35233552
callee_instance,
35243553
task_return_type,
3554+
callee_async,
35253555
memory,
35263556
string_encoding,
35273557
match result_count_or_max_if_async {
@@ -4062,6 +4092,9 @@ pub(crate) struct GuestTask {
40624092
/// The state of the host future that represents an async task, which must
40634093
/// be dropped before we can delete the task.
40644094
host_future_state: HostFutureState,
4095+
/// Indicates whether this task was created for a call to an async-lifted
4096+
/// export.
4097+
async_function: bool,
40654098
}
40664099

40674100
impl GuestTask {
@@ -4102,6 +4135,7 @@ impl GuestTask {
41024135
caller: Caller,
41034136
callback: Option<CallbackFn>,
41044137
component_instance: RuntimeComponentInstanceIndex,
4138+
async_function: bool,
41054139
) -> Result<Self> {
41064140
let sync_call_set = state.push(WaitableSet::default())?;
41074141
let host_future_state = match &caller {
@@ -4136,6 +4170,7 @@ impl GuestTask {
41364170
exited: false,
41374171
threads: HashSet::new(),
41384172
host_future_state,
4173+
async_function,
41394174
})
41404175
}
41414176

@@ -4749,6 +4784,20 @@ impl ConcurrentState {
47494784
false
47504785
}
47514786
}
4787+
4788+
fn check_blocking(&mut self) -> Result<()> {
4789+
let task = self.guest_thread.unwrap().task;
4790+
self.check_blocking_for(task)
4791+
}
4792+
4793+
fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4794+
let task = self.get_mut(task).unwrap();
4795+
if !(task.async_function || task.returned_or_cancelled()) {
4796+
Err(Trap::CannotBlockSyncTask.into())
4797+
} else {
4798+
Ok(())
4799+
}
4800+
}
47524801
}
47534802

47544803
/// Provide a type hint to compiler about the shape of a parameter lower
@@ -4907,7 +4956,9 @@ pub(crate) fn prepare_call<T, R>(
49074956

49084957
let instance = handle.instance().id().get(store.0);
49094958
let options = &instance.component().env_component().options[options];
4910-
let task_return_type = instance.component().types()[ty].results;
4959+
let ty = &instance.component().types()[ty];
4960+
let async_function = ty.async_;
4961+
let task_return_type = ty.results;
49114962
let component_instance = raw_options.instance;
49124963
let callback = options.callback.map(|i| instance.runtime_callback(i));
49134964
let memory = options
@@ -4964,6 +5015,7 @@ pub(crate) fn prepare_call<T, R>(
49645015
) as CallbackFn
49655016
}),
49665017
component_instance,
5018+
async_function,
49675019
)?;
49685020
task.function_index = Some(handle.index());
49695021

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3093,6 +3093,10 @@ impl Instance {
30933093
address: u32,
30943094
count: u32,
30953095
) -> Result<ReturnCode> {
3096+
if !self.options(store.0, options).async_ {
3097+
store.0.concurrent_state_mut().check_blocking()?;
3098+
}
3099+
30963100
let address = usize::try_from(address).unwrap();
30973101
let count = usize::try_from(count).unwrap();
30983102
self.check_bounds(store.0, options, ty, address, count)?;
@@ -3315,6 +3319,10 @@ impl Instance {
33153319
address: u32,
33163320
count: u32,
33173321
) -> Result<ReturnCode> {
3322+
if !self.options(store.0, options).async_ {
3323+
store.0.concurrent_state_mut().check_blocking()?;
3324+
}
3325+
33183326
let address = usize::try_from(address).unwrap();
33193327
let count = usize::try_from(count).unwrap();
33203328
self.check_bounds(store.0, options, ty, address, count)?;
@@ -3686,6 +3694,10 @@ impl Instance {
36863694
async_: bool,
36873695
writer: u32,
36883696
) -> Result<ReturnCode> {
3697+
if !async_ {
3698+
store.concurrent_state_mut().check_blocking()?;
3699+
}
3700+
36893701
let (rep, state) =
36903702
get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, writer)?;
36913703
let id = TableId::<TransmitHandle>::new(rep);
@@ -3720,6 +3732,10 @@ impl Instance {
37203732
async_: bool,
37213733
reader: u32,
37223734
) -> Result<ReturnCode> {
3735+
if !async_ {
3736+
store.concurrent_state_mut().check_blocking()?;
3737+
}
3738+
37233739
let (rep, state) =
37243740
get_mut_by_index_from(self.id().get_mut(store).table_for_transmit(ty), ty, reader)?;
37253741
let id = TableId::<TransmitHandle>::new(rep);

0 commit comments

Comments
 (0)