Skip to content

Commit 5c7a69e

Browse files
authored
chore(interop): Use http-body-util WithTrailers (#2134)
1 parent 66aaa5b commit 5c7a69e

File tree

2 files changed

+8
-37
lines changed

2 files changed

+8
-37
lines changed

interop/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pico-args = {version = "0.5", features = ["eq-separator"]}
1919
console = "0.15"
2020
http = "1"
2121
http-body = "1"
22+
http-body-util = "0.1"
2223
hyper = "1"
2324
prost = "0.13"
2425
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}

interop/src/server.rs

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use crate::pb::{self, *};
22
use async_stream::try_stream;
3-
use http::header::{HeaderName, HeaderValue};
4-
use http_body::Body as HttpBody;
3+
use http::header::{HeaderMap, HeaderName};
4+
use http_body_util::BodyExt;
55
use std::future::Future;
66
use std::pin::Pin;
77
use std::result::Result as StdResult;
8-
use std::task::{ready, Context, Poll};
8+
use std::task::{Context, Poll};
99
use std::time::Duration;
1010
use tokio_stream::StreamExt;
1111
use tonic::{body::Body, server::NamedService, Code, Request, Response, Status};
@@ -196,11 +196,12 @@ where
196196
fn call(&mut self, req: http::Request<Body>) -> Self::Future {
197197
let echo_header = req.headers().get("x-grpc-test-echo-initial").cloned();
198198

199+
let trailer_name = HeaderName::from_static("x-grpc-test-echo-trailing-bin");
199200
let echo_trailer = req
200201
.headers()
201-
.get("x-grpc-test-echo-trailing-bin")
202+
.get(&trailer_name)
202203
.cloned()
203-
.map(|v| (HeaderName::from_static("x-grpc-test-echo-trailing-bin"), v));
204+
.map(|v| HeaderMap::from_iter(std::iter::once((trailer_name, v))));
204205

205206
let call = self.inner.call(req);
206207

@@ -211,42 +212,11 @@ where
211212
res.headers_mut()
212213
.insert("x-grpc-test-echo-initial", echo_header);
213214
Ok(res
214-
.map(|b| MergeTrailers::new(b, echo_trailer))
215+
.map(|b| b.with_trailers(async move { echo_trailer.map(Ok) }))
215216
.map(Body::new))
216217
} else {
217218
Ok(res)
218219
}
219220
})
220221
}
221222
}
222-
223-
pub struct MergeTrailers<B> {
224-
inner: B,
225-
trailer: Option<(HeaderName, HeaderValue)>,
226-
}
227-
228-
impl<B> MergeTrailers<B> {
229-
pub fn new(inner: B, trailer: Option<(HeaderName, HeaderValue)>) -> Self {
230-
Self { inner, trailer }
231-
}
232-
}
233-
234-
impl<B: HttpBody + Unpin> HttpBody for MergeTrailers<B> {
235-
type Data = B::Data;
236-
type Error = B::Error;
237-
238-
fn poll_frame(
239-
self: Pin<&mut Self>,
240-
cx: &mut Context<'_>,
241-
) -> Poll<Option<StdResult<http_body::Frame<Self::Data>, Self::Error>>> {
242-
let this = self.get_mut();
243-
let mut frame = ready!(Pin::new(&mut this.inner).poll_frame(cx)?);
244-
if let (Some(trailers), Some((key, value))) = (
245-
frame.as_mut().and_then(|frame| frame.trailers_mut()),
246-
&this.trailer,
247-
) {
248-
trailers.insert(key.clone(), value.clone());
249-
}
250-
Poll::Ready(frame.map(Ok))
251-
}
252-
}

0 commit comments

Comments
 (0)