-
Notifications
You must be signed in to change notification settings - Fork 65
Description
Current implementation of try_write always creates a TCP segment of exactly the same size as was passed into it:
Lines 317 to 335 in 9ce3d39
| fn try_write(&self, buf: &[u8]) -> Result<usize> { | |
| if buf.remaining() == 0 { | |
| return Ok(0); | |
| } | |
| if self.is_shutdown { | |
| return Err(io::Error::new(io::ErrorKind::BrokenPipe, "Broken pipe")); | |
| } | |
| World::current(|world| { | |
| let bytes = Bytes::copy_from_slice(buf); | |
| let len = bytes.len(); | |
| let seq = self.seq(world)?; | |
| self.send(world, Segment::Data(seq, bytes))?; | |
| Ok(len) | |
| }) | |
| } |
In real TCP there is no guarantee that write boundaries are the same on the receiver as on the sender. The network may have MTU limit lower than the write size, resulting in segmentation. It is also possible that some data is buffered inside the TCP socket and then concatenated with the next write.
Both segmentation and concatenation can reveal problems with buffering in encryption, compression/decompression and parsing code.
I don't suggest simulating set_nodelay or network MTU, but simply randomly split and concatenate the data passed into writer. Receiver then does not need to do anything special.
The reason for this is that I wanted to use turmoil to test async-compression crate and streaming compressed stream over the network. This does not reveal any problems, but I am still not sure if async-compression is bug free because turmoil does not introduce enough chaos into this:
use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration;
use async_compression::tokio::bufread::DeflateDecoder;
use async_compression::tokio::write::DeflateEncoder;
use proptest::prelude::*;
mod utils;
use turmoil;
use turmoil::net::{TcpListener, TcpStream};
use tokio::io::BufReader;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const PORT: u16 = 1234;
proptest! {
#[test]
fn test_turmoil(input in proptest::collection::vec(0u8..=255, 5000..=10000)) {
let mut sim = turmoil::Builder::new()
.simulation_duration(Duration::from_secs(60))
.tcp_capacity(10)
.min_message_latency(Duration::from_millis(1))
.max_message_latency(Duration::from_millis(10)).build();
let input_len = input.len();
sim.host("server", move || {
let input = input.clone();
async move {
let listener = TcpListener::bind((IpAddr::from(Ipv4Addr::UNSPECIFIED), PORT)).await?;
let (stream, _addr) = listener.accept().await?;
let mut stream = DeflateEncoder::new(stream);
let _ = stream.write_all(&input).await;
stream.flush().await;
let _ = stream.write_all(&input).await;
eprintln!("Wrote {} bytes", input.len());
stream.shutdown().await?;
Ok(())
}
});
sim.client("client", async move {
let stream = TcpStream::connect(("server", PORT)).await?;
let stream = BufReader::new(stream);
let mut stream = DeflateDecoder::new(stream);
let mut buffer = [0; 123];
let mut total_read = 0;
while let x = stream.read(&mut buffer[..]).await? {
eprintln!("Read {x}");
total_read += x;
if x == 0 {
break;
}
}
assert_eq!(total_read, input_len * 2);
Ok(())
});
sim.run().unwrap();
}
}I was also not able to write a lot of data into the stream because of the "socket buffer full" error which is an existing issue:
#168