From 4be34b08051ee13cf942372e09c47e5cc730a576 Mon Sep 17 00:00:00 2001 From: Dariusz Doktorski Date: Thu, 27 Nov 2025 13:52:58 +0100 Subject: [PATCH 1/8] Add client connection handling via I/O threads --- Cargo.lock | 10 ++++- Cargo.toml | 3 +- src/connection.rs | 98 +++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 28 ++++++-------- 4 files changed, 120 insertions(+), 19 deletions(-) create mode 100644 src/connection.rs diff --git a/Cargo.lock b/Cargo.lock index c37b946..711314d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,10 +2,17 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "anyhow" +version = "1.0.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" + [[package]] name = "cairo-debugger" version = "0.1.0" dependencies = [ + "anyhow", "dap", "tracing", ] @@ -13,8 +20,7 @@ dependencies = [ [[package]] name = "dap" version = "0.4.1-alpha1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35c7fc89d334ab745ba679f94c7314c9b17ecdcd923c111df6206e9fd7729fa9" +source = "git+https://github.com/software-mansion-labs/dap-rs?branch=debugger-poc#4440a6fa4ffd26f88f1191ee2371a482fde2a539" dependencies = [ "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 9838fab..9d32b36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,5 +4,6 @@ version = "0.1.0" edition = "2024" [dependencies] -dap = "0.4.1-alpha1" +dap = { git = "https://github.com/software-mansion-labs/dap-rs", branch = "debugger-poc" } tracing = "0.1" +anyhow = "1.0" diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 0000000..b771145 --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,98 @@ +use std::io::{BufReader, BufWriter}; +use std::net::{TcpListener, TcpStream}; +use std::sync::mpsc; +use std::thread; + +use anyhow::Context; +use anyhow::Result; +use dap::base_message::Sendable; +use dap::errors::ServerError; +use dap::prelude::{Event, Request, ResponseBody, Server}; +use dap::server::{ServerReader, ServerWriter}; + +pub struct Connection { + inbound_rx: mpsc::Receiver, + outbound_tx: mpsc::Sender, +} + +impl Connection { + pub fn new() -> Result { + let tcp_listener = TcpListener::bind("127.0.0.1:0").map_err(ServerError::IoError)?; + let os_assigned_port = tcp_listener.local_addr()?.port(); + // Print it so that the client can read it. + println!("\nDEBUGGER PORT: {os_assigned_port}"); + + let (stream, _client_addr) = tcp_listener.accept().map_err(ServerError::IoError)?; + let input = BufReader::new(stream.try_clone()?); + let output = BufWriter::new(stream); + + let (server_reader, server_writer) = Server::new(input, output).split_server(); + + let (inbound_tx, inbound_rx) = mpsc::channel::(); + let (outbound_tx, outbound_rx) = mpsc::channel::(); + + run_reader_thread(server_reader, inbound_tx); + run_writer_thread(server_writer, outbound_rx); + + Ok(Self { inbound_rx, outbound_tx }) + } + + pub fn next_request(&self) -> Option { + self.inbound_rx.recv().ok() + } + + pub fn try_next_request(&self) -> Option { + self.inbound_rx.try_recv().ok() + } + + pub fn send_event(&self, event: Event) -> Result<()> { + self.outbound_tx + .send(Sendable::Event(event)) + .context("Sending event to outbound channel failed") + } + + pub fn send_success(&self, request: Request, body: ResponseBody) -> Result<()> { + self.outbound_tx + .send(Sendable::Response(request.success(body))) + .context("Sending success response to outbound channel failed") + } + + pub fn send_error(&self, request: Request, msg: &str) -> Result<()> { + self.outbound_tx + .send(Sendable::Response(request.error(msg))) + .context("Sending error response to outbound channel failed") + } +} + +fn run_reader_thread( + mut server_reader: ServerReader, + inbound_tx: mpsc::Sender, +) { + thread::spawn(move || { + while let Ok(Some(request)) = server_reader.poll_request() { + if inbound_tx.send(request).is_err() { + // TODO: Add error tracing + break; + } + } + }); +} + +fn run_writer_thread( + mut server_writer: ServerWriter, + outbound_rx: mpsc::Receiver, +) { + thread::spawn(move || { + while let Ok(msg) = outbound_rx.recv() { + match msg { + Sendable::Response(response) => { + server_writer.respond(response).expect("Failed to send response") + } + Sendable::Event(event) => { + server_writer.send_event(event).expect("Failed to send event") + } + Sendable::ReverseRequest(_) => unreachable!(), + } + } + }); +} diff --git a/src/lib.rs b/src/lib.rs index 82eb092..2023333 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ use std::io::{BufReader, BufWriter}; use std::net::{TcpListener, TcpStream}; +use connection::Connection; use dap::errors::ServerError; use dap::events::{Event, StoppedEventBody}; use dap::prelude::{Command, Request, ResponseBody, Server}; @@ -11,9 +12,11 @@ use dap::responses::{ use dap::types::{Breakpoint, Capabilities, Source, StackFrame, StoppedEventReason, Thread}; use tracing::trace; +mod connection; + // TODO: add vm, add handlers for requests. pub struct CairoDebugger { - server: Server, + connection: Connection, } enum ServerResponse { @@ -25,26 +28,19 @@ enum ServerResponse { impl CairoDebugger { pub fn connect() -> Result { - let tcp_listener = TcpListener::bind("127.0.0.1:0").map_err(ServerError::IoError)?; - let os_assigned_port = tcp_listener.local_addr().unwrap().port(); - // Print it so that the client can read it. - println!("\nDEBUGGER PORT: {os_assigned_port}"); - - let (stream, _client_addr) = tcp_listener.accept().map_err(ServerError::IoError)?; - let input = BufReader::new(stream.try_clone().unwrap()); - let output = BufWriter::new(stream); - Ok(Self { server: Server::new(input, output) }) + let connection = Connection::new()?; + Ok(Self { connection }) } pub fn run(&mut self) -> Result<(), ServerError> { - while let Some(req) = self.server.poll_request()? { + while let Some(req) = self.connection.next_request() { match handle_request(&req) { - ServerResponse::Success(body) => self.server.respond(req.success(body))?, - ServerResponse::Error(msg) => self.server.respond(req.error(&msg))?, - ServerResponse::Event(event) => self.server.send_event(event)?, + ServerResponse::Success(body) => self.connection.send_success(req, body)?, + ServerResponse::Error(msg) => self.connection.send_error(req, &msg)?, + ServerResponse::Event(event) => self.connection.send_event(event)?, ServerResponse::SuccessThenEvent(body, event) => { - self.server.respond(req.success(body))?; - self.server.send_event(event)?; + self.connection.send_success(req, body)?; + self.connection.send_event(event)?; } } } From 9a8365af4eeffa7f801e0b4545fecc09ca44890d Mon Sep 17 00:00:00 2001 From: Dariusz Doktorski Date: Thu, 27 Nov 2025 13:54:57 +0100 Subject: [PATCH 2/8] Use anyhow errors --- src/connection.rs | 4 ---- src/lib.rs | 11 ++++------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index b771145..567ca85 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -41,10 +41,6 @@ impl Connection { self.inbound_rx.recv().ok() } - pub fn try_next_request(&self) -> Option { - self.inbound_rx.try_recv().ok() - } - pub fn send_event(&self, event: Event) -> Result<()> { self.outbound_tx .send(Sendable::Event(event)) diff --git a/src/lib.rs b/src/lib.rs index 2023333..9bcecc4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,7 @@ -use std::io::{BufReader, BufWriter}; -use std::net::{TcpListener, TcpStream}; - +use anyhow::Result; use connection::Connection; -use dap::errors::ServerError; use dap::events::{Event, StoppedEventBody}; -use dap::prelude::{Command, Request, ResponseBody, Server}; +use dap::prelude::{Command, Request, ResponseBody}; use dap::responses::{ EvaluateResponse, ScopesResponse, SetBreakpointsResponse, StackTraceResponse, ThreadsResponse, VariablesResponse, @@ -27,12 +24,12 @@ enum ServerResponse { } impl CairoDebugger { - pub fn connect() -> Result { + pub fn connect() -> Result { let connection = Connection::new()?; Ok(Self { connection }) } - pub fn run(&mut self) -> Result<(), ServerError> { + pub fn run(&mut self) -> Result<()> { while let Some(req) = self.connection.next_request() { match handle_request(&req) { ServerResponse::Success(body) => self.connection.send_success(req, body)?, From 256230d4401e8854b92d59ff9f3bb14c5b244445 Mon Sep 17 00:00:00 2001 From: Dariusz Doktorski Date: Thu, 27 Nov 2025 13:58:50 +0100 Subject: [PATCH 3/8] Return result in `next_request` --- src/connection.rs | 4 ++-- src/lib.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 567ca85..f4103a1 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -37,8 +37,8 @@ impl Connection { Ok(Self { inbound_rx, outbound_tx }) } - pub fn next_request(&self) -> Option { - self.inbound_rx.recv().ok() + pub fn next_request(&self) -> Result { + self.inbound_rx.recv().context("Connection close") } pub fn send_event(&self, event: Event) -> Result<()> { diff --git a/src/lib.rs b/src/lib.rs index 9bcecc4..a3c7dd8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,7 +30,7 @@ impl CairoDebugger { } pub fn run(&mut self) -> Result<()> { - while let Some(req) = self.connection.next_request() { + while let Ok(req) = self.connection.next_request() { match handle_request(&req) { ServerResponse::Success(body) => self.connection.send_success(req, body)?, ServerResponse::Error(msg) => self.connection.send_error(req, &msg)?, From 7fccaeb5f65e31bf4bcfdeab878de2cab2bd1527 Mon Sep 17 00:00:00 2001 From: Dariusz Doktorski Date: Thu, 27 Nov 2025 14:00:13 +0100 Subject: [PATCH 4/8] Rename run -> spawn --- src/connection.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index f4103a1..4323383 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -31,8 +31,8 @@ impl Connection { let (inbound_tx, inbound_rx) = mpsc::channel::(); let (outbound_tx, outbound_rx) = mpsc::channel::(); - run_reader_thread(server_reader, inbound_tx); - run_writer_thread(server_writer, outbound_rx); + spawn_reader_thread(server_reader, inbound_tx); + spawn_writer_thread(server_writer, outbound_rx); Ok(Self { inbound_rx, outbound_tx }) } @@ -60,7 +60,7 @@ impl Connection { } } -fn run_reader_thread( +fn spawn_reader_thread( mut server_reader: ServerReader, inbound_tx: mpsc::Sender, ) { @@ -74,7 +74,7 @@ fn run_reader_thread( }); } -fn run_writer_thread( +fn spawn_writer_thread( mut server_writer: ServerWriter, outbound_rx: mpsc::Receiver, ) { From e4cbdd563540ccc8132afde5e5f38cab209f0542 Mon Sep 17 00:00:00 2001 From: Dariusz Doktorski Date: Thu, 27 Nov 2025 14:16:11 +0100 Subject: [PATCH 5/8] Implement `drop` for `Connection` --- src/connection.rs | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 4323383..9183062 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -2,6 +2,7 @@ use std::io::{BufReader, BufWriter}; use std::net::{TcpListener, TcpStream}; use std::sync::mpsc; use std::thread; +use std::thread::JoinHandle; use anyhow::Context; use anyhow::Result; @@ -13,6 +14,12 @@ use dap::server::{ServerReader, ServerWriter}; pub struct Connection { inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender, + io_threads: IoThreads, +} + +struct IoThreads { + pub reader: Option>, + pub writer: Option>, } impl Connection { @@ -31,10 +38,9 @@ impl Connection { let (inbound_tx, inbound_rx) = mpsc::channel::(); let (outbound_tx, outbound_rx) = mpsc::channel::(); - spawn_reader_thread(server_reader, inbound_tx); - spawn_writer_thread(server_writer, outbound_rx); + let io_threads = IoThreads::spawn(server_reader, server_writer, inbound_tx, outbound_rx); - Ok(Self { inbound_rx, outbound_tx }) + Ok(Self { inbound_rx, outbound_tx, io_threads }) } pub fn next_request(&self) -> Result { @@ -60,10 +66,31 @@ impl Connection { } } +impl IoThreads { + fn spawn( + server_reader: ServerReader, + server_writer: ServerWriter, + inbound_tx: mpsc::Sender, + outbound_rx: mpsc::Receiver, + ) -> Self { + Self { + reader: Some(spawn_reader_thread(server_reader, inbound_tx)), + writer: Some(spawn_writer_thread(server_writer, outbound_rx)), + } + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.io_threads.reader.take().map(|h| h.join()); + self.io_threads.writer.take().map(|h| h.join()); + } +} + fn spawn_reader_thread( mut server_reader: ServerReader, inbound_tx: mpsc::Sender, -) { +) -> JoinHandle<()> { thread::spawn(move || { while let Ok(Some(request)) = server_reader.poll_request() { if inbound_tx.send(request).is_err() { @@ -71,13 +98,13 @@ fn spawn_reader_thread( break; } } - }); + }) } fn spawn_writer_thread( mut server_writer: ServerWriter, outbound_rx: mpsc::Receiver, -) { +) -> JoinHandle<()> { thread::spawn(move || { while let Ok(msg) = outbound_rx.recv() { match msg { @@ -90,5 +117,5 @@ fn spawn_writer_thread( Sendable::ReverseRequest(_) => unreachable!(), } } - }); + }) } From 4bf190514fe6cf32a0089521db447e059265c3fe Mon Sep 17 00:00:00 2001 From: Dariusz Doktorski Date: Fri, 28 Nov 2025 09:18:21 +0100 Subject: [PATCH 6/8] Implement drop for `IoThreads` --- src/connection.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 9183062..c75c41b 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -14,7 +14,7 @@ use dap::server::{ServerReader, ServerWriter}; pub struct Connection { inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender, - io_threads: IoThreads, + _io_threads: IoThreads, } struct IoThreads { @@ -38,9 +38,11 @@ impl Connection { let (inbound_tx, inbound_rx) = mpsc::channel::(); let (outbound_tx, outbound_rx) = mpsc::channel::(); - let io_threads = IoThreads::spawn(server_reader, server_writer, inbound_tx, outbound_rx); - - Ok(Self { inbound_rx, outbound_tx, io_threads }) + Ok(Self { + inbound_rx, + outbound_tx, + _io_threads: IoThreads::spawn(server_reader, server_writer, inbound_tx, outbound_rx), + }) } pub fn next_request(&self) -> Result { @@ -80,10 +82,10 @@ impl IoThreads { } } -impl Drop for Connection { +impl Drop for IoThreads { fn drop(&mut self) { - self.io_threads.reader.take().map(|h| h.join()); - self.io_threads.writer.take().map(|h| h.join()); + self.reader.take().map(|h| h.join()); + self.writer.take().map(|h| h.join()); } } From f1ff55fa2c9ab1327f21478b3f2b6650d3336f07 Mon Sep 17 00:00:00 2001 From: Dariusz Doktorski Date: Fri, 28 Nov 2025 09:21:20 +0100 Subject: [PATCH 7/8] Add comment --- src/connection.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/connection.rs b/src/connection.rs index c75c41b..68c077b 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -14,6 +14,9 @@ use dap::server::{ServerReader, ServerWriter}; pub struct Connection { inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender, + + // NOTE: The order matters here. + // I/O threads must be dropped after the channels. _io_threads: IoThreads, } From b0ad092e33f9930784e6a439755b177e7cd341e6 Mon Sep 17 00:00:00 2001 From: Dariusz Doktorski Date: Fri, 28 Nov 2025 16:38:04 +0100 Subject: [PATCH 8/8] Address comments --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/connection.rs | 16 ++++++++-------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 711314d..e5832b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,7 +20,7 @@ dependencies = [ [[package]] name = "dap" version = "0.4.1-alpha1" -source = "git+https://github.com/software-mansion-labs/dap-rs?branch=debugger-poc#4440a6fa4ffd26f88f1191ee2371a482fde2a539" +source = "git+https://github.com/software-mansion-labs/dap-rs?rev=4440a6f#4440a6fa4ffd26f88f1191ee2371a482fde2a539" dependencies = [ "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 9d32b36..a1e33af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,6 @@ version = "0.1.0" edition = "2024" [dependencies] -dap = { git = "https://github.com/software-mansion-labs/dap-rs", branch = "debugger-poc" } +dap = { git = "https://github.com/software-mansion-labs/dap-rs", rev = "4440a6f" } tracing = "0.1" anyhow = "1.0" diff --git a/src/connection.rs b/src/connection.rs index 68c077b..bbf7398 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -15,16 +15,11 @@ pub struct Connection { inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender, - // NOTE: The order matters here. + // NOTE: The order of members matters here. // I/O threads must be dropped after the channels. _io_threads: IoThreads, } -struct IoThreads { - pub reader: Option>, - pub writer: Option>, -} - impl Connection { pub fn new() -> Result { let tcp_listener = TcpListener::bind("127.0.0.1:0").map_err(ServerError::IoError)?; @@ -49,7 +44,7 @@ impl Connection { } pub fn next_request(&self) -> Result { - self.inbound_rx.recv().context("Connection close") + self.inbound_rx.recv().context("Inbound connection closed") } pub fn send_event(&self, event: Event) -> Result<()> { @@ -71,6 +66,11 @@ impl Connection { } } +struct IoThreads { + pub reader: Option>, + pub writer: Option>, +} + impl IoThreads { fn spawn( server_reader: ServerReader, @@ -119,7 +119,7 @@ fn spawn_writer_thread( Sendable::Event(event) => { server_writer.send_event(event).expect("Failed to send event") } - Sendable::ReverseRequest(_) => unreachable!(), + Sendable::ReverseRequest(_) => unreachable!("Reverse requests are not supported"), } } })