diff --git a/Cargo.toml b/Cargo.toml index d945c12..5f419db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,13 +21,14 @@ appveyor = { repository = "lipanski/mockito", branch = "master", service = "gith assert-json-diff = "2.0" bytes = "1" colored = { version = ">=2.0, <=3", optional = true } -futures-util = { version = "0.3", default-features = false } +futures-core = "0.3" http = "1" http-body = "1" http-body-util = "0.1" hyper = "1" hyper-util = { version = "0.1", features = ["server-auto", "tokio"] } log = "0.4" +pin-project-lite = "0.2" rand = "0.9" regex = "1.7" serde_json = "1.0" diff --git a/src/response.rs b/src/response.rs index 051e668..6515ef4 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,7 +1,7 @@ use crate::error::Error; use crate::Request; use bytes::Bytes; -use futures_util::Stream; +use futures_core::Stream; use http::{HeaderMap, StatusCode}; use std::fmt; use std::io; diff --git a/src/server.rs b/src/server.rs index cfac36e..e7afe8a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,10 +4,9 @@ use crate::response::{Body as ResponseBody, ChunkedStream, Header}; use crate::ServerGuard; use crate::{Error, ErrorKind, Matcher, Mock}; use bytes::Bytes; -use futures_util::{TryStream, TryStreamExt}; +use futures_core::Stream; use http::{Request as HttpRequest, Response, StatusCode}; use http_body::{Body as HttpBody, Frame, SizeHint}; -use http_body_util::{BodyExt, StreamBody}; use hyper::body::Incoming; use hyper::service::service_fn; use hyper_util::rt::{TokioExecutor, TokioIo}; @@ -20,7 +19,7 @@ use std::ops::Drop; use std::pin::Pin; use std::str::FromStr; use std::sync::{mpsc, Arc, RwLock}; -use std::task::{ready, Context, Poll}; +use std::task::{Context, Poll}; use std::thread; use tokio::net::TcpListener; use tokio::runtime; @@ -462,23 +461,17 @@ impl fmt::Display for Server { type BoxError = Box; -enum Body { - Once(Option), - Wrap(http_body_util::combinators::UnsyncBoxBody), +pin_project_lite::pin_project! { + #[project = BodyProj] + enum Body { + Once {inner: Option}, + ChunkedStream { #[pin] inner: ChunkedStream }, + } } impl Body { fn empty() -> Self { - Self::Once(None) - } - - fn from_data_stream(stream: S) -> Self - where - S: TryStream + Send + 'static, - S::Error: Into, - { - let body = StreamBody::new(stream.map_ok(Frame::data).map_err(Into::into)).boxed_unsync(); - Self::Wrap(body) + Self::Once { inner: None } } } @@ -487,38 +480,46 @@ impl From for Body { if bytes.is_empty() { Self::empty() } else { - Self::Once(Some(bytes)) + Self::Once { inner: Some(bytes) } } } } +impl From for Body { + fn from(stream: ChunkedStream) -> Self { + Self::ChunkedStream { inner: stream } + } +} + impl HttpBody for Body { type Data = Bytes; type Error = BoxError; fn poll_frame( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { - match self.as_mut().get_mut() { - Self::Once(val) => Poll::Ready(Ok(val.take().map(Frame::data)).transpose()), - Self::Wrap(body) => Poll::Ready(ready!(Pin::new(body).poll_frame(cx))), + match self.project() { + BodyProj::Once { inner } => Poll::Ready(Ok(inner.take().map(Frame::data)).transpose()), + BodyProj::ChunkedStream { inner } => { + inner.poll_next(cx).map_ok(Frame::data).map_err(Into::into) + } } } fn size_hint(&self) -> SizeHint { match self { - Self::Once(None) => SizeHint::with_exact(0), - Self::Once(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64), - Self::Wrap(body) => body.size_hint(), + Self::Once { inner: None } => SizeHint::with_exact(0), + Self::Once { inner: Some(bytes) } => SizeHint::with_exact(bytes.len() as u64), + Self::ChunkedStream { .. } => SizeHint::new(), } } fn is_end_stream(&self) -> bool { match self { - Self::Once(None) => true, - Self::Once(Some(bytes)) => bytes.is_empty(), - Self::Wrap(body) => body.is_end_stream(), + Self::Once { inner: None } => true, + Self::Once { inner: Some(bytes) } => bytes.is_empty(), + Self::ChunkedStream { .. } => false, } } } @@ -582,7 +583,7 @@ fn respond_with_mock(request: Request, mock: &RemoteMock) -> Result { let stream = ChunkedStream::new(Arc::clone(body_fn))?; - Body::from_data_stream(stream) + Body::from(stream) } ResponseBody::FnWithRequest(body_fn) => { let bytes = body_fn(&request);