Skip to content

Commit 42b1426

Browse files
authored
add some futures_core::stream::Stream impls for AsyncInputStream (#80)
* add some futures_core::stream::Stream impls for AsyncInputStream * fix: AsyncInputStream::ready needs to poll the same WaitFor and also make AsyncInputStream Send and Sync by switching the internal OnceCell to OnceLock. * no you cant, and its a fundamental misunderstanding to ask that q
1 parent 1eb7616 commit 42b1426

File tree

2 files changed

+146
-19
lines changed

2 files changed

+146
-19
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ json = ["dep:serde", "dep:serde_json"]
1818

1919
[dependencies]
2020
async-task.workspace = true
21+
futures-core.workspace = true
2122
http.workspace = true
2223
itoa.workspace = true
2324
pin-project-lite.workspace = true

src/io/streams.rs

Lines changed: 145 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,54 @@
11
use super::{AsyncPollable, AsyncRead, AsyncWrite};
2-
use std::cell::OnceCell;
3-
use std::io::Result;
2+
use crate::runtime::WaitFor;
3+
use std::future::{poll_fn, Future};
4+
use std::pin::Pin;
5+
use std::sync::{Mutex, OnceLock};
6+
use std::task::{Context, Poll};
47
use wasip2::io::streams::{InputStream, OutputStream, StreamError};
58

69
/// A wrapper for WASI's `InputStream` resource that provides implementations of `AsyncRead` and
710
/// `AsyncPollable`.
811
#[derive(Debug)]
912
pub struct AsyncInputStream {
13+
wait_for: Mutex<Option<Pin<Box<WaitFor>>>>,
1014
// Lazily initialized pollable, used for lifetime of stream to check readiness.
1115
// Field ordering matters: this child must be dropped before stream
12-
subscription: OnceCell<AsyncPollable>,
16+
subscription: OnceLock<AsyncPollable>,
1317
stream: InputStream,
1418
}
1519

1620
impl AsyncInputStream {
1721
/// Construct an `AsyncInputStream` from a WASI `InputStream` resource.
1822
pub fn new(stream: InputStream) -> Self {
1923
Self {
20-
subscription: OnceCell::new(),
24+
wait_for: Mutex::new(None),
25+
subscription: OnceLock::new(),
2126
stream,
2227
}
2328
}
24-
/// Await for read readiness.
25-
async fn ready(&self) {
29+
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
2630
// Lazily initialize the AsyncPollable
2731
let subscription = self
2832
.subscription
2933
.get_or_init(|| AsyncPollable::new(self.stream.subscribe()));
30-
// Wait on readiness
31-
subscription.wait_for().await;
34+
// Lazily initialize the WaitFor. Clear it after it becomes ready.
35+
let mut wait_for_slot = self.wait_for.lock().unwrap();
36+
let wait_for = wait_for_slot.get_or_insert_with(|| Box::pin(subscription.wait_for()));
37+
match wait_for.as_mut().poll(cx) {
38+
Poll::Pending => Poll::Pending,
39+
Poll::Ready(()) => {
40+
let _ = wait_for_slot.take();
41+
Poll::Ready(())
42+
}
43+
}
44+
}
45+
/// Await for read readiness.
46+
async fn ready(&self) {
47+
poll_fn(|cx| self.poll_ready(cx)).await
3248
}
3349
/// Asynchronously read from the input stream.
3450
/// This method is the same as [`AsyncRead::read`], but doesn't require a `&mut self`.
35-
pub async fn read(&self, buf: &mut [u8]) -> Result<usize> {
51+
pub async fn read(&self, buf: &mut [u8]) -> std::io::Result<usize> {
3652
let read = loop {
3753
self.ready().await;
3854
// Ideally, the ABI would be able to read directly into buf.
@@ -56,10 +72,40 @@ impl AsyncInputStream {
5672
buf[0..len].copy_from_slice(&read);
5773
Ok(len)
5874
}
75+
76+
/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
77+
/// items of `Result<Vec<u8>, std::io::Error>`. The returned byte vectors
78+
/// will be at most 8k. If you want to control chunk size, use
79+
/// `Self::into_stream_of`.
80+
pub fn into_stream(self) -> AsyncInputChunkStream {
81+
AsyncInputChunkStream {
82+
stream: self,
83+
chunk_size: 8 * 1024,
84+
}
85+
}
86+
87+
/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
88+
/// items of `Result<Vec<u8>, std::io::Error>`. The returned byte vectors
89+
/// will be at most the `chunk_size` argument specified.
90+
pub fn into_stream_of(self, chunk_size: usize) -> AsyncInputChunkStream {
91+
AsyncInputChunkStream {
92+
stream: self,
93+
chunk_size,
94+
}
95+
}
96+
97+
/// Use this `AsyncInputStream` as a `futures_core::stream::Stream` with
98+
/// items of `Result<u8, std::io::Error>`.
99+
pub fn into_bytestream(self) -> AsyncInputByteStream {
100+
AsyncInputByteStream {
101+
stream: self.into_stream(),
102+
buffer: std::io::Read::bytes(std::io::Cursor::new(Vec::new())),
103+
}
104+
}
59105
}
60106

61107
impl AsyncRead for AsyncInputStream {
62-
async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
108+
async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
63109
Self::read(self, buf).await
64110
}
65111

@@ -69,21 +115,102 @@ impl AsyncRead for AsyncInputStream {
69115
}
70116
}
71117

118+
/// Wrapper of `AsyncInputStream` that impls `futures_core::stream::Stream`
119+
/// with an item of `Result<Vec<u8>, std::io::Error>`
120+
pub struct AsyncInputChunkStream {
121+
stream: AsyncInputStream,
122+
chunk_size: usize,
123+
}
124+
125+
impl AsyncInputChunkStream {
126+
/// Extract the `AsyncInputStream` which backs this stream.
127+
pub fn into_inner(self) -> AsyncInputStream {
128+
self.stream
129+
}
130+
}
131+
132+
impl futures_core::stream::Stream for AsyncInputChunkStream {
133+
type Item = Result<Vec<u8>, std::io::Error>;
134+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
135+
match self.stream.poll_ready(cx) {
136+
Poll::Pending => Poll::Pending,
137+
Poll::Ready(()) => match self.stream.stream.read(self.chunk_size as u64) {
138+
Ok(r) if r.is_empty() => Poll::Pending,
139+
Ok(r) => Poll::Ready(Some(Ok(r))),
140+
Err(StreamError::LastOperationFailed(err)) => {
141+
Poll::Ready(Some(Err(std::io::Error::other(err.to_debug_string()))))
142+
}
143+
Err(StreamError::Closed) => Poll::Ready(None),
144+
},
145+
}
146+
}
147+
}
148+
149+
pin_project_lite::pin_project! {
150+
/// Wrapper of `AsyncInputStream` that impls
151+
/// `futures_core::stream::Stream` with item `Result<u8, std::io::Error>`.
152+
pub struct AsyncInputByteStream {
153+
#[pin]
154+
stream: AsyncInputChunkStream,
155+
buffer: std::io::Bytes<std::io::Cursor<Vec<u8>>>,
156+
}
157+
}
158+
159+
impl AsyncInputByteStream {
160+
/// Extract the `AsyncInputStream` which backs this stream, and any bytes
161+
/// read from the `AsyncInputStream` which have not yet been yielded by
162+
/// the byte stream.
163+
pub fn into_inner(self) -> (AsyncInputStream, Vec<u8>) {
164+
(
165+
self.stream.into_inner(),
166+
self.buffer
167+
.collect::<Result<Vec<u8>, std::io::Error>>()
168+
.expect("read of Cursor<Vec<u8>> is infallible"),
169+
)
170+
}
171+
}
172+
173+
impl futures_core::stream::Stream for AsyncInputByteStream {
174+
type Item = Result<u8, std::io::Error>;
175+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176+
let this = self.project();
177+
match this.buffer.next() {
178+
Some(byte) => Poll::Ready(Some(Ok(byte.expect("cursor on Vec<u8> is infallible")))),
179+
None => match futures_core::stream::Stream::poll_next(this.stream, cx) {
180+
Poll::Ready(Some(Ok(bytes))) => {
181+
let mut bytes = std::io::Read::bytes(std::io::Cursor::new(bytes));
182+
match bytes.next() {
183+
Some(Ok(byte)) => {
184+
*this.buffer = bytes;
185+
Poll::Ready(Some(Ok(byte)))
186+
}
187+
Some(Err(err)) => Poll::Ready(Some(Err(err))),
188+
None => Poll::Ready(None),
189+
}
190+
}
191+
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
192+
Poll::Ready(None) => Poll::Ready(None),
193+
Poll::Pending => Poll::Pending,
194+
},
195+
}
196+
}
197+
}
198+
72199
/// A wrapper for WASI's `output-stream` resource that provides implementations of `AsyncWrite` and
73200
/// `AsyncPollable`.
74201
#[derive(Debug)]
75202
pub struct AsyncOutputStream {
76203
// Lazily initialized pollable, used for lifetime of stream to check readiness.
77204
// Field ordering matters: this child must be dropped before stream
78-
subscription: OnceCell<AsyncPollable>,
205+
subscription: OnceLock<AsyncPollable>,
79206
stream: OutputStream,
80207
}
81208

82209
impl AsyncOutputStream {
83210
/// Construct an `AsyncOutputStream` from a WASI `OutputStream` resource.
84211
pub fn new(stream: OutputStream) -> Self {
85212
Self {
86-
subscription: OnceCell::new(),
213+
subscription: OnceLock::new(),
87214
stream,
88215
}
89216
}
@@ -104,7 +231,7 @@ impl AsyncOutputStream {
104231
/// a `std::io::Error` indicating either an error returned by the stream write
105232
/// using the debug string provided by the WASI error, or else that the,
106233
/// indicated by `std::io::ErrorKind::ConnectionReset`.
107-
pub async fn write(&self, buf: &[u8]) -> Result<usize> {
234+
pub async fn write(&self, buf: &[u8]) -> std::io::Result<usize> {
108235
// Loops at most twice.
109236
loop {
110237
match self.stream.check_write() {
@@ -145,7 +272,7 @@ impl AsyncOutputStream {
145272
/// the stream flush, using the debug string provided by the WASI error,
146273
/// or else that the stream is closed, indicated by
147274
/// `std::io::ErrorKind::ConnectionReset`.
148-
pub async fn flush(&self) -> Result<()> {
275+
pub async fn flush(&self) -> std::io::Result<()> {
149276
match self.stream.flush() {
150277
Ok(()) => {
151278
self.ready().await;
@@ -162,10 +289,10 @@ impl AsyncOutputStream {
162289
}
163290
impl AsyncWrite for AsyncOutputStream {
164291
// Required methods
165-
async fn write(&mut self, buf: &[u8]) -> Result<usize> {
292+
async fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
166293
Self::write(self, buf).await
167294
}
168-
async fn flush(&mut self) -> Result<()> {
295+
async fn flush(&mut self) -> std::io::Result<()> {
169296
Self::flush(self).await
170297
}
171298

@@ -180,11 +307,10 @@ pub(crate) async fn splice(
180307
reader: &AsyncInputStream,
181308
writer: &AsyncOutputStream,
182309
len: u64,
183-
) -> core::result::Result<u64, StreamError> {
310+
) -> Result<u64, StreamError> {
184311
// Wait for both streams to be ready.
185-
let r = reader.ready();
312+
reader.ready().await;
186313
writer.ready().await;
187-
r.await;
188314

189315
writer.stream.splice(&reader.stream, len)
190316
}

0 commit comments

Comments
 (0)