Skip to content

Commit c8027a1

Browse files
authored
fix(codec): Cancelled client streaming handling (#1315)
This PR fixes how client side streaming is handled on the server side and improves overall source error matching. Fixes: - Correctly, detect h2 codes when its wrapped in a hyper error. - Cancelled requests from the client side during client streaming requests correctly return EOF (`None` from `Streaming::message()`) Closes #848
1 parent dcb0026 commit c8027a1

File tree

2 files changed

+24
-7
lines changed

2 files changed

+24
-7
lines changed

tonic/src/codec/decode.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ enum State {
4747
Error,
4848
}
4949

50-
#[derive(Debug)]
50+
#[derive(Debug, PartialEq, Eq)]
5151
enum Direction {
5252
Request,
5353
Response(StatusCode),
@@ -232,6 +232,10 @@ impl StreamingInner {
232232
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
233233
Some(Ok(d)) => Some(d),
234234
Some(Err(e)) => {
235+
if self.direction == Direction::Request && e.code() == Code::Cancelled {
236+
return Poll::Ready(Ok(None));
237+
}
238+
235239
let _ = std::mem::replace(&mut self.state, State::Error);
236240
let err: crate::Error = e.into();
237241
debug!("decoder inner stream error: {:?}", err);

tonic/src/status.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,17 @@ impl Status {
361361
// FIXME: bubble this into `transport` and expose generic http2 reasons.
362362
#[cfg(feature = "transport")]
363363
fn from_h2_error(err: Box<h2::Error>) -> Status {
364+
let code = Self::code_from_h2(&err);
365+
366+
let mut status = Self::new(code, format!("h2 protocol error: {}", err));
367+
status.source = Some(Arc::new(*err));
368+
status
369+
}
370+
371+
#[cfg(feature = "transport")]
372+
fn code_from_h2(err: &h2::Error) -> Code {
364373
// See https://github.com/grpc/grpc/blob/3977c30/doc/PROTOCOL-HTTP2.md#errors
365-
let code = match err.reason() {
374+
match err.reason() {
366375
Some(h2::Reason::NO_ERROR)
367376
| Some(h2::Reason::PROTOCOL_ERROR)
368377
| Some(h2::Reason::INTERNAL_ERROR)
@@ -376,11 +385,7 @@ impl Status {
376385
Some(h2::Reason::INADEQUATE_SECURITY) => Code::PermissionDenied,
377386

378387
_ => Code::Unknown,
379-
};
380-
381-
let mut status = Self::new(code, format!("h2 protocol error: {}", err));
382-
status.source = Some(Arc::new(*err));
383-
status
388+
}
384389
}
385390

386391
#[cfg(feature = "transport")]
@@ -416,6 +421,14 @@ impl Status {
416421
if err.is_timeout() || err.is_connect() {
417422
return Some(Status::unavailable(err.to_string()));
418423
}
424+
425+
if let Some(h2_err) = err.source().and_then(|e| e.downcast_ref::<h2::Error>()) {
426+
let code = Status::code_from_h2(&h2_err);
427+
let status = Self::new(code, format!("h2 protocol error: {}", err));
428+
429+
return Some(status);
430+
}
431+
419432
None
420433
}
421434

0 commit comments

Comments
 (0)