Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions examples/task_ventilator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod async_helpers;

use rand::Rng;
use std::error::Error;
use std::io::Read;
use std::io::{BufRead, BufReader};

use zeromq::{Socket, SocketSend};

Expand All @@ -17,7 +17,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
sink.connect("tcp://127.0.0.1:5558").await?;

println!("Press Enter when the workers are ready: ");
let _ = std::io::stdin().bytes().next();
let stdin = std::io::stdin();
let mut reader = BufReader::new(stdin);
let mut line = String::new();
let _ = reader.read_line(&mut line);
println!("Sending tasks to workers…");

// The first message is "0" and signals start of batch
Expand Down
1 change: 0 additions & 1 deletion src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl SocketRecv for DealerSocket {
}
Some((_peer_id, Ok(_))) => {
// Ignore non-message frames
continue;
}
Some((_peer_id, Err(e))) => {
// Handle potential errors from the fair queue
Expand Down
4 changes: 2 additions & 2 deletions src/fair_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ mod test {
let result = Pin::new(&mut fair_queue).poll_next(&mut cx);
match result {
Poll::Pending => {} // Expected with noop_waker
other => panic!("Expected Pending, got: {:#?}", other),
other @ Poll::Ready(_) => panic!("Expected Pending, got: {:#?}", other),
}
}

Expand Down Expand Up @@ -373,7 +373,7 @@ mod test {
}
}
Poll::Ready(None) => break,
Poll::Pending => continue,
Poll::Pending => {}
}
}

Expand Down
6 changes: 1 addition & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,7 @@ pub trait Socket: Sized + Send {
/// Unbinds all bound endpoints, blocking until finished.
async fn unbind_all(&mut self) -> Vec<ZmqError> {
let mut errs = Vec::new();
let endpoints: Vec<_> = self
.binds()
.iter()
.map(|(endpoint, _)| endpoint.clone())
.collect();
let endpoints: Vec<_> = self.binds().keys().cloned().collect();
for endpoint in endpoints {
if let Err(err) = self.unbind(endpoint).await {
errs.push(err);
Expand Down
1 change: 0 additions & 1 deletion src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl SocketRecv for PullSocket {
}
Some((_peer_id, Ok(_msg))) => {
// Ignore non-message frames (Command, Greeting) as PULL sockets are designed to only receive actual messages, not internal protocol frames.
continue;
}
Some((peer_id, Err(e))) => {
self.backend.peer_disconnected(&peer_id);
Expand Down
1 change: 0 additions & 1 deletion src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ impl SocketRecv for RepSocket {
}
Message::Greeting(_) | Message::Command(_) => {
// Ignore non-message frames. REP sockets should only process actual messages.
continue;
}
},
Some((peer_id, Err(e))) => {
Expand Down
15 changes: 6 additions & 9 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,12 @@ impl SocketSend for ReqSocket {
})
}
};
match self.backend.peers.get_mut(&next_peer_id) {
Some(mut peer) => {
self.backend.round_robin.push(next_peer_id.clone());
message.push_front(Bytes::new());
peer.send_queue.send(Message::Message(message)).await?;
self.current_request = Some(next_peer_id);
return Ok(());
}
None => continue,
if let Some(mut peer) = self.backend.peers.get_mut(&next_peer_id) {
self.backend.round_robin.push(next_peer_id.clone());
message.push_front(Bytes::new());
peer.send_queue.send(Message::Message(message)).await?;
self.current_request = Some(next_peer_id);
return Ok(());
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@ impl SocketRecv for RouterSocket {
// todo: Log or handle other message types if needed
// We could take an approach of using `tracing` and have that be an optional feature
// tracing::warn!("Received unimplemented message type: {:?}", msg);
continue;
}
Some((peer_id, Err(_e))) => {
self.backend.peer_disconnected(&peer_id);
// We could take an approach of using `tracing` and have that be an optional feature
// tracing::error!("Error receiving message from peer {}: {:?}", peer_id, e);
continue;
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
Expand Down
1 change: 0 additions & 1 deletion src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ impl SocketRecv for SubSocket {
Some((_peer_id, Ok(_msg))) => {
// Ignore non-message frames. SUB sockets are designed to only receive actual messages,
// not internal protocol frames like commands or greetings.
continue;
}
Some((peer_id, Err(e))) => {
self.backend.peer_disconnected(&peer_id);
Expand Down
1 change: 0 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ pub(crate) async fn connect_forever(endpoint: Endpoint) -> ZmqResult<(FramedIo,
std::f64::consts::E.powf(try_num as f64 / 3.0) + rng.gen_range(0.0f64..0.1f64)
};
async_rt::task::sleep(std::time::Duration::from_secs_f64(delay)).await;
continue;
}
Err(e) => return Err(e),
}
Expand Down
Loading