Skip to content

Commit 27a9d59

Browse files
authored
add TcpStream::connect (#96)
1 parent 42b1426 commit 27a9d59

File tree

3 files changed

+85
-30
lines changed

3 files changed

+85
-30
lines changed

src/net/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,29 @@
11
//! Async network abstractions.
22
3+
use std::io::{self, ErrorKind};
4+
use wasip2::sockets::network::ErrorCode;
5+
36
mod tcp_listener;
47
mod tcp_stream;
58

69
pub use tcp_listener::*;
710
pub use tcp_stream::*;
11+
12+
fn to_io_err(err: ErrorCode) -> io::Error {
13+
match err {
14+
ErrorCode::Unknown => ErrorKind::Other.into(),
15+
ErrorCode::AccessDenied => ErrorKind::PermissionDenied.into(),
16+
ErrorCode::NotSupported => ErrorKind::Unsupported.into(),
17+
ErrorCode::InvalidArgument => ErrorKind::InvalidInput.into(),
18+
ErrorCode::OutOfMemory => ErrorKind::OutOfMemory.into(),
19+
ErrorCode::Timeout => ErrorKind::TimedOut.into(),
20+
ErrorCode::WouldBlock => ErrorKind::WouldBlock.into(),
21+
ErrorCode::InvalidState => ErrorKind::InvalidData.into(),
22+
ErrorCode::AddressInUse => ErrorKind::AddrInUse.into(),
23+
ErrorCode::ConnectionRefused => ErrorKind::ConnectionRefused.into(),
24+
ErrorCode::ConnectionReset => ErrorKind::ConnectionReset.into(),
25+
ErrorCode::ConnectionAborted => ErrorKind::ConnectionAborted.into(),
26+
ErrorCode::ConcurrencyConflict => ErrorKind::AlreadyExists.into(),
27+
_ => ErrorKind::Other.into(),
28+
}
29+
}

src/net/tcp_listener.rs

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use wasip2::sockets::network::Ipv4SocketAddress;
2-
use wasip2::sockets::tcp::{ErrorCode, IpAddressFamily, IpSocketAddress, TcpSocket};
2+
use wasip2::sockets::tcp::{IpAddressFamily, IpSocketAddress, TcpSocket};
33

44
use crate::io;
55
use crate::iter::AsyncIterator;
6-
use std::io::ErrorKind;
76
use std::net::SocketAddr;
87

9-
use super::TcpStream;
8+
use super::{to_io_err, TcpStream};
109
use crate::runtime::AsyncPollable;
1110

1211
/// A TCP socket server, listening for connections.
@@ -81,29 +80,6 @@ impl<'a> AsyncIterator for Incoming<'a> {
8180
}
8281
}
8382

84-
pub(super) fn to_io_err(err: ErrorCode) -> io::Error {
85-
match err {
86-
wasip2::sockets::network::ErrorCode::Unknown => ErrorKind::Other.into(),
87-
wasip2::sockets::network::ErrorCode::AccessDenied => ErrorKind::PermissionDenied.into(),
88-
wasip2::sockets::network::ErrorCode::NotSupported => ErrorKind::Unsupported.into(),
89-
wasip2::sockets::network::ErrorCode::InvalidArgument => ErrorKind::InvalidInput.into(),
90-
wasip2::sockets::network::ErrorCode::OutOfMemory => ErrorKind::OutOfMemory.into(),
91-
wasip2::sockets::network::ErrorCode::Timeout => ErrorKind::TimedOut.into(),
92-
wasip2::sockets::network::ErrorCode::WouldBlock => ErrorKind::WouldBlock.into(),
93-
wasip2::sockets::network::ErrorCode::InvalidState => ErrorKind::InvalidData.into(),
94-
wasip2::sockets::network::ErrorCode::AddressInUse => ErrorKind::AddrInUse.into(),
95-
wasip2::sockets::network::ErrorCode::ConnectionRefused => {
96-
ErrorKind::ConnectionRefused.into()
97-
}
98-
wasip2::sockets::network::ErrorCode::ConnectionReset => ErrorKind::ConnectionReset.into(),
99-
wasip2::sockets::network::ErrorCode::ConnectionAborted => {
100-
ErrorKind::ConnectionAborted.into()
101-
}
102-
wasip2::sockets::network::ErrorCode::ConcurrencyConflict => ErrorKind::AlreadyExists.into(),
103-
_ => ErrorKind::Other.into(),
104-
}
105-
}
106-
10783
fn sockaddr_from_wasi(addr: IpSocketAddress) -> std::net::SocketAddr {
10884
use wasip2::sockets::network::Ipv6SocketAddress;
10985
match addr {

src/net/tcp_stream.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1+
use std::io::ErrorKind;
2+
use std::net::{SocketAddr, ToSocketAddrs};
3+
use wasip2::sockets::instance_network::instance_network;
4+
use wasip2::sockets::network::Ipv4SocketAddress;
5+
use wasip2::sockets::tcp::{IpAddressFamily, IpSocketAddress};
6+
use wasip2::sockets::tcp_create_socket::create_tcp_socket;
17
use wasip2::{
28
io::streams::{InputStream, OutputStream},
39
sockets::tcp::TcpSocket,
410
};
511

12+
use super::to_io_err;
613
use crate::io::{self, AsyncInputStream, AsyncOutputStream};
14+
use crate::runtime::AsyncPollable;
715

816
/// A TCP stream between a local and a remote socket.
917
pub struct TcpStream {
@@ -20,12 +28,61 @@ impl TcpStream {
2028
socket,
2129
}
2230
}
31+
32+
/// Opens a TCP connection to a remote host.
33+
///
34+
/// `addr` is an address of the remote host. Anything which implements the
35+
/// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
36+
/// yields multiple addresses, connect will be attempted with each of the
37+
/// addresses until a connection is successful. If none of the addresses
38+
/// result in a successful connection, the error returned from the last
39+
/// connection attempt (the last address) is returned.
40+
pub async fn connect(addr: impl ToSocketAddrs) -> io::Result<Self> {
41+
let addrs = addr.to_socket_addrs()?;
42+
let mut last_err = None;
43+
for addr in addrs {
44+
match TcpStream::connect_addr(addr).await {
45+
Ok(stream) => return Ok(stream),
46+
Err(e) => last_err = Some(e),
47+
}
48+
}
49+
50+
Err(last_err.unwrap_or_else(|| {
51+
io::Error::new(ErrorKind::InvalidInput, "could not resolve to any address")
52+
}))
53+
}
54+
55+
/// Establishes a connection to the specified `addr`.
56+
pub async fn connect_addr(addr: SocketAddr) -> io::Result<Self> {
57+
let family = match addr {
58+
SocketAddr::V4(_) => IpAddressFamily::Ipv4,
59+
SocketAddr::V6(_) => IpAddressFamily::Ipv6,
60+
};
61+
let socket = create_tcp_socket(family).map_err(to_io_err)?;
62+
let network = instance_network();
63+
64+
let remote_address = match addr {
65+
SocketAddr::V4(addr) => {
66+
let ip = addr.ip().octets();
67+
let address = (ip[0], ip[1], ip[2], ip[3]);
68+
let port = addr.port();
69+
IpSocketAddress::Ipv4(Ipv4SocketAddress { port, address })
70+
}
71+
SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpStream`"),
72+
};
73+
socket
74+
.start_connect(&network, remote_address)
75+
.map_err(to_io_err)?;
76+
let pollable = AsyncPollable::new(socket.subscribe());
77+
pollable.wait_for().await;
78+
let (input, output) = socket.finish_connect().map_err(to_io_err)?;
79+
80+
Ok(TcpStream::new(input, output, socket))
81+
}
82+
2383
/// Returns the socket address of the remote peer of this TCP connection.
2484
pub fn peer_addr(&self) -> io::Result<String> {
25-
let addr = self
26-
.socket
27-
.remote_address()
28-
.map_err(super::tcp_listener::to_io_err)?;
85+
let addr = self.socket.remote_address().map_err(to_io_err)?;
2986
Ok(format!("{addr:?}"))
3087
}
3188

0 commit comments

Comments
 (0)