Skip to content

Commit 5db5b74

Browse files
authored
Add client connection handling via I/O threads (#27)
* Add client connection handling via I/O threads * Use anyhow errors * Return result in `next_request` * Rename run -> spawn * Implement `drop` for `Connection` * Implement drop for `IoThreads` * Add comment * Address comments
1 parent 13d163b commit 5db5b74

File tree

4 files changed

+152
-26
lines changed

4 files changed

+152
-26
lines changed

Cargo.lock

Lines changed: 8 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ version = "0.1.0"
44
edition = "2024"
55

66
[dependencies]
7-
dap = "0.4.1-alpha1"
7+
dap = { git = "https://github.com/software-mansion-labs/dap-rs", rev = "4440a6f" }
88
tracing = "0.1"
9+
anyhow = "1.0"

src/connection.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use std::io::{BufReader, BufWriter};
2+
use std::net::{TcpListener, TcpStream};
3+
use std::sync::mpsc;
4+
use std::thread;
5+
use std::thread::JoinHandle;
6+
7+
use anyhow::Context;
8+
use anyhow::Result;
9+
use dap::base_message::Sendable;
10+
use dap::errors::ServerError;
11+
use dap::prelude::{Event, Request, ResponseBody, Server};
12+
use dap::server::{ServerReader, ServerWriter};
13+
14+
pub struct Connection {
15+
inbound_rx: mpsc::Receiver<Request>,
16+
outbound_tx: mpsc::Sender<Sendable>,
17+
18+
// NOTE: The order of members matters here.
19+
// I/O threads must be dropped after the channels.
20+
_io_threads: IoThreads,
21+
}
22+
23+
impl Connection {
24+
pub fn new() -> Result<Self> {
25+
let tcp_listener = TcpListener::bind("127.0.0.1:0").map_err(ServerError::IoError)?;
26+
let os_assigned_port = tcp_listener.local_addr()?.port();
27+
// Print it so that the client can read it.
28+
println!("\nDEBUGGER PORT: {os_assigned_port}");
29+
30+
let (stream, _client_addr) = tcp_listener.accept().map_err(ServerError::IoError)?;
31+
let input = BufReader::new(stream.try_clone()?);
32+
let output = BufWriter::new(stream);
33+
34+
let (server_reader, server_writer) = Server::new(input, output).split_server();
35+
36+
let (inbound_tx, inbound_rx) = mpsc::channel::<Request>();
37+
let (outbound_tx, outbound_rx) = mpsc::channel::<Sendable>();
38+
39+
Ok(Self {
40+
inbound_rx,
41+
outbound_tx,
42+
_io_threads: IoThreads::spawn(server_reader, server_writer, inbound_tx, outbound_rx),
43+
})
44+
}
45+
46+
pub fn next_request(&self) -> Result<Request> {
47+
self.inbound_rx.recv().context("Inbound connection closed")
48+
}
49+
50+
pub fn send_event(&self, event: Event) -> Result<()> {
51+
self.outbound_tx
52+
.send(Sendable::Event(event))
53+
.context("Sending event to outbound channel failed")
54+
}
55+
56+
pub fn send_success(&self, request: Request, body: ResponseBody) -> Result<()> {
57+
self.outbound_tx
58+
.send(Sendable::Response(request.success(body)))
59+
.context("Sending success response to outbound channel failed")
60+
}
61+
62+
pub fn send_error(&self, request: Request, msg: &str) -> Result<()> {
63+
self.outbound_tx
64+
.send(Sendable::Response(request.error(msg)))
65+
.context("Sending error response to outbound channel failed")
66+
}
67+
}
68+
69+
struct IoThreads {
70+
pub reader: Option<JoinHandle<()>>,
71+
pub writer: Option<JoinHandle<()>>,
72+
}
73+
74+
impl IoThreads {
75+
fn spawn(
76+
server_reader: ServerReader<TcpStream>,
77+
server_writer: ServerWriter<TcpStream>,
78+
inbound_tx: mpsc::Sender<Request>,
79+
outbound_rx: mpsc::Receiver<Sendable>,
80+
) -> Self {
81+
Self {
82+
reader: Some(spawn_reader_thread(server_reader, inbound_tx)),
83+
writer: Some(spawn_writer_thread(server_writer, outbound_rx)),
84+
}
85+
}
86+
}
87+
88+
impl Drop for IoThreads {
89+
fn drop(&mut self) {
90+
self.reader.take().map(|h| h.join());
91+
self.writer.take().map(|h| h.join());
92+
}
93+
}
94+
95+
fn spawn_reader_thread(
96+
mut server_reader: ServerReader<TcpStream>,
97+
inbound_tx: mpsc::Sender<Request>,
98+
) -> JoinHandle<()> {
99+
thread::spawn(move || {
100+
while let Ok(Some(request)) = server_reader.poll_request() {
101+
if inbound_tx.send(request).is_err() {
102+
// TODO: Add error tracing
103+
break;
104+
}
105+
}
106+
})
107+
}
108+
109+
fn spawn_writer_thread(
110+
mut server_writer: ServerWriter<TcpStream>,
111+
outbound_rx: mpsc::Receiver<Sendable>,
112+
) -> JoinHandle<()> {
113+
thread::spawn(move || {
114+
while let Ok(msg) = outbound_rx.recv() {
115+
match msg {
116+
Sendable::Response(response) => {
117+
server_writer.respond(response).expect("Failed to send response")
118+
}
119+
Sendable::Event(event) => {
120+
server_writer.send_event(event).expect("Failed to send event")
121+
}
122+
Sendable::ReverseRequest(_) => unreachable!("Reverse requests are not supported"),
123+
}
124+
}
125+
})
126+
}

src/lib.rs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
use std::io::{BufReader, BufWriter};
2-
use std::net::{TcpListener, TcpStream};
3-
4-
use dap::errors::ServerError;
1+
use anyhow::Result;
2+
use connection::Connection;
53
use dap::events::{Event, StoppedEventBody};
6-
use dap::prelude::{Command, Request, ResponseBody, Server};
4+
use dap::prelude::{Command, Request, ResponseBody};
75
use dap::responses::{
86
EvaluateResponse, ScopesResponse, SetBreakpointsResponse, StackTraceResponse, ThreadsResponse,
97
VariablesResponse,
108
};
119
use dap::types::{Breakpoint, Capabilities, Source, StackFrame, StoppedEventReason, Thread};
1210
use tracing::trace;
1311

12+
mod connection;
13+
1414
// TODO: add vm, add handlers for requests.
1515
pub struct CairoDebugger {
16-
server: Server<TcpStream, TcpStream>,
16+
connection: Connection,
1717
}
1818

1919
enum ServerResponse {
@@ -24,27 +24,20 @@ enum ServerResponse {
2424
}
2525

2626
impl CairoDebugger {
27-
pub fn connect() -> Result<Self, ServerError> {
28-
let tcp_listener = TcpListener::bind("127.0.0.1:0").map_err(ServerError::IoError)?;
29-
let os_assigned_port = tcp_listener.local_addr().unwrap().port();
30-
// Print it so that the client can read it.
31-
println!("\nDEBUGGER PORT: {os_assigned_port}");
32-
33-
let (stream, _client_addr) = tcp_listener.accept().map_err(ServerError::IoError)?;
34-
let input = BufReader::new(stream.try_clone().unwrap());
35-
let output = BufWriter::new(stream);
36-
Ok(Self { server: Server::new(input, output) })
27+
pub fn connect() -> Result<Self> {
28+
let connection = Connection::new()?;
29+
Ok(Self { connection })
3730
}
3831

39-
pub fn run(&mut self) -> Result<(), ServerError> {
40-
while let Some(req) = self.server.poll_request()? {
32+
pub fn run(&mut self) -> Result<()> {
33+
while let Ok(req) = self.connection.next_request() {
4134
match handle_request(&req) {
42-
ServerResponse::Success(body) => self.server.respond(req.success(body))?,
43-
ServerResponse::Error(msg) => self.server.respond(req.error(&msg))?,
44-
ServerResponse::Event(event) => self.server.send_event(event)?,
35+
ServerResponse::Success(body) => self.connection.send_success(req, body)?,
36+
ServerResponse::Error(msg) => self.connection.send_error(req, &msg)?,
37+
ServerResponse::Event(event) => self.connection.send_event(event)?,
4538
ServerResponse::SuccessThenEvent(body, event) => {
46-
self.server.respond(req.success(body))?;
47-
self.server.send_event(event)?;
39+
self.connection.send_success(req, body)?;
40+
self.connection.send_event(event)?;
4841
}
4942
}
5043
}

0 commit comments

Comments
 (0)