Skip to content

Commit 938bf8f

Browse files
authored
raw_path and query_string should be buffers (#21)
In the HttpConnectionScope, the raw_path and query_string attributes actually need to be buffers. Also improved reporting of ASGI failures. Signed-off-by: Stephen Belanger <[email protected]>
1 parent 75eb5db commit 938bf8f

File tree

3 files changed

+91
-42
lines changed

3 files changed

+91
-42
lines changed

src/asgi/http.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ impl<'py> IntoPyObject<'py> for HttpConnectionScope {
154154
dict.set_item("method", self.method.into_pyobject(py)?)?;
155155
dict.set_item("scheme", self.scheme)?;
156156
dict.set_item("path", self.path)?;
157-
dict.set_item("raw_path", self.raw_path)?;
158-
dict.set_item("query_string", self.query_string)?;
157+
dict.set_item("raw_path", self.raw_path.as_bytes())?;
158+
dict.set_item("query_string", self.query_string.as_bytes())?;
159159
dict.set_item("root_path", self.root_path)?;
160160
dict.set_item("headers", self.headers.into_pyobject(py)?)?;
161161
if let Some((host, port)) = self.client {
@@ -355,10 +355,7 @@ impl<'py> FromPyObject<'py> for HttpSendMessage {
355355
})
356356
}
357357
"http.response.body" => {
358-
let body: Vec<u8> = dict
359-
.get_item("body")?
360-
.ok_or_else(|| PyValueError::new_err("Missing 'body' key in HTTP response body message"))?
361-
.extract()?;
358+
let body: Vec<u8> = dict.get_item("body")?.map_or(Ok(vec![]), |v| v.extract())?;
362359

363360
let more_body: bool = dict
364361
.get_item("more_body")?
@@ -562,8 +559,8 @@ mod tests {
562559
);
563560
assert_eq!(dict_extract!(py_scope, "scheme", String), "http");
564561
assert_eq!(dict_extract!(py_scope, "path", String), "");
565-
assert_eq!(dict_extract!(py_scope, "raw_path", String), "");
566-
assert_eq!(dict_extract!(py_scope, "query_string", String), "");
562+
assert_eq!(dict_extract!(py_scope, "raw_path", Vec<u8>), b"");
563+
assert_eq!(dict_extract!(py_scope, "query_string", Vec<u8>), b"");
567564
assert_eq!(dict_extract!(py_scope, "root_path", String), "");
568565
assert_eq!(
569566
dict_extract!(py_scope, "headers", Vec<(String, String)>),

src/asgi/mod.rs

Lines changed: 82 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -237,25 +237,29 @@ impl Handler for Asgi {
237237
// Create response channel
238238
let (response_tx, response_rx) = oneshot::channel();
239239

240-
// Spawn task to collect response
241-
tokio::spawn(collect_response_messages(tx_receiver, response_tx));
242-
243240
// Submit the ASGI app call to Python event loop
244-
Python::attach(|py| {
241+
let future = Python::attach(|py| {
245242
let scope_py = scope.into_pyobject(py)?;
246243
let coro = self
247244
.app_function
248245
.call1(py, (scope_py, rx_receiver, tx_sender))?;
249246

250247
let asyncio = py.import("asyncio")?;
251-
asyncio.call_method1(
248+
let future = asyncio.call_method1(
252249
"run_coroutine_threadsafe",
253250
(coro, self.event_loop_handle.event_loop()),
254251
)?;
255252

256-
Ok::<(), HandlerError>(())
253+
Ok::<Py<PyAny>, HandlerError>(future.unbind())
257254
})?;
258255

256+
// Spawn task to collect response and monitor for Python exceptions
257+
tokio::spawn(collect_response_with_exception_handling(
258+
tx_receiver,
259+
response_tx,
260+
future,
261+
));
262+
259263
// Wait for response
260264
let (status, headers, body) = response_rx.await??;
261265

@@ -379,48 +383,92 @@ fn start_python_event_loop_thread(event_loop: Py<PyAny>) {
379383
});
380384
}
381385

382-
/// Collect ASGI response messages
383-
async fn collect_response_messages(
386+
/// Collect ASGI response messages while monitoring for Python exceptions
387+
async fn collect_response_with_exception_handling(
384388
mut tx_receiver: tokio::sync::mpsc::UnboundedReceiver<AcknowledgedMessage<HttpSendMessage>>,
385389
response_tx: oneshot::Sender<HttpResponseResult>,
390+
python_future: Py<PyAny>,
386391
) {
387392
let mut status = 500u16;
388393
let mut headers = Vec::new();
389394
let mut body = Vec::new();
390395
let mut response_started = false;
391396

392-
while let Some(ack_msg) = tx_receiver.recv().await {
393-
let AcknowledgedMessage { message, ack } = ack_msg;
394-
395-
match message {
396-
HttpSendMessage::HttpResponseStart {
397-
status: s,
398-
headers: h,
399-
..
400-
} => {
401-
status = s;
402-
headers = h;
403-
response_started = true;
397+
// Spawn a task to monitor the Python future for exceptions
398+
let future_clone = Python::attach(|py| python_future.clone_ref(py));
399+
let mut exception_handle = tokio::task::spawn_blocking(move || {
400+
Python::attach(|py| {
401+
let future_bound = future_clone.bind(py);
402+
// Wait for the future to complete (with 30 second timeout)
403+
match future_bound.call_method1("result", (30.0,)) {
404+
Ok(_) => None, // Success - no exception
405+
Err(e) => Some(e), // Exception occurred
404406
}
405-
HttpSendMessage::HttpResponseBody { body: b, more_body } => {
406-
if response_started {
407-
body.extend_from_slice(&b);
408-
if !more_body {
407+
})
408+
});
409+
410+
loop {
411+
tokio::select! {
412+
// Check for messages from the ASGI app
413+
msg = tx_receiver.recv() => {
414+
match msg {
415+
Some(ack_msg) => {
416+
let AcknowledgedMessage { message, ack } = ack_msg;
417+
418+
match message {
419+
HttpSendMessage::HttpResponseStart {
420+
status: s,
421+
headers: h,
422+
..
423+
} => {
424+
status = s;
425+
headers = h;
426+
response_started = true;
427+
}
428+
HttpSendMessage::HttpResponseBody { body: b, more_body } => {
429+
if response_started {
430+
body.extend_from_slice(&b);
431+
if !more_body {
432+
let _ = ack.send(());
433+
let _ = response_tx.send(Ok((status, headers, body)));
434+
return;
435+
}
436+
}
437+
}
438+
}
439+
409440
let _ = ack.send(());
410-
let _ = response_tx.send(Ok((status, headers, body)));
441+
}
442+
None => {
443+
// Channel closed without a complete response
444+
let _ = response_tx.send(Err(if response_started {
445+
HandlerError::ResponseInterrupted
446+
} else {
447+
HandlerError::NoResponse
448+
}));
449+
return;
450+
}
451+
}
452+
}
453+
// Check if the Python coroutine raised an exception
454+
exception_result = &mut exception_handle => {
455+
match exception_result {
456+
Ok(Some(py_err)) => {
457+
// Python exception occurred
458+
let _ = response_tx.send(Err(HandlerError::PythonError(py_err)));
459+
return;
460+
}
461+
Ok(None) => {
462+
// Python coroutine completed successfully
463+
// Continue waiting for response messages
464+
}
465+
Err(e) => {
466+
// Tokio task error
467+
let _ = response_tx.send(Err(HandlerError::TokioError(e.to_string())));
411468
return;
412469
}
413470
}
414471
}
415472
}
416-
417-
let _ = ack.send(());
418473
}
419-
420-
// If we got here, the channel closed without a complete response
421-
let _ = response_tx.send(Err(if response_started {
422-
HandlerError::ResponseInterrupted
423-
} else {
424-
HandlerError::NoResponse
425-
}));
426474
}

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,10 @@ pub enum HandlerError {
348348
/// Error when a lock is poisoned
349349
#[error("Lock poisoned: {0}")]
350350
LockPoisoned(String),
351+
352+
/// Error when a Tokio task fails
353+
#[error("Tokio task error: {0}")]
354+
TokioError(String),
351355
}
352356

353357
impl<T> From<std::sync::PoisonError<T>> for HandlerError {

0 commit comments

Comments
 (0)