Skip to content

Commit 3ac6746

Browse files
committed
feat: add debug spans for decoding requests
Closes: #1759
1 parent d312dcc commit 3ac6746

File tree

2 files changed

+70
-18
lines changed

2 files changed

+70
-18
lines changed

tonic/src/codec/compression.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,8 @@ impl CompressionEncoding {
158158
}
159159

160160
#[allow(missing_docs)]
161-
#[cfg(any(feature = "gzip", feature = "zstd"))]
162-
pub(crate) fn as_str(&self) -> &'static str {
163-
match self {
161+
pub(crate) const fn as_str(&self) -> &'static str {
162+
match *self {
164163
#[cfg(feature = "gzip")]
165164
CompressionEncoding::Gzip => "gzip",
166165
#[cfg(feature = "zstd")]
@@ -169,11 +168,11 @@ impl CompressionEncoding {
169168
}
170169

171170
#[cfg(any(feature = "gzip", feature = "zstd"))]
172-
pub(crate) fn into_header_value(self) -> http::HeaderValue {
171+
pub(crate) const fn into_header_value(self) -> http::HeaderValue {
173172
http::HeaderValue::from_static(self.as_str())
174173
}
175174

176-
pub(crate) fn encodings() -> &'static [Self] {
175+
pub(crate) const fn encodings() -> &'static [Self] {
177176
&[
178177
#[cfg(feature = "gzip")]
179178
CompressionEncoding::Gzip,

tonic/src/codec/decode.rs

+66-13
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,43 @@ impl<T> Unpin for Streaming<T> {}
3838

3939
#[derive(Debug, Clone)]
4040
enum State {
41-
ReadHeader,
41+
ReadHeader {
42+
span: Option<tracing::Span>,
43+
},
4244
ReadBody {
45+
span: tracing::Span,
4346
compression: Option<CompressionEncoding>,
4447
len: usize,
4548
},
46-
Error(Status),
49+
Error(Box<Status>),
50+
}
51+
52+
impl State {
53+
fn read_header() -> Self {
54+
Self::ReadHeader { span: None }
55+
}
56+
57+
fn read_body(compression: Option<CompressionEncoding>, len: usize) -> Self {
58+
let span = tracing::debug_span!(
59+
"read_body",
60+
compression = compression.map(|c| c.as_str()),
61+
compressed.bytes = compression.is_some().then_some(len),
62+
uncompressed.bytes = compression.is_none().then_some(len),
63+
);
64+
Self::ReadBody {
65+
span,
66+
compression,
67+
len,
68+
}
69+
}
70+
71+
fn span(&self) -> Option<&tracing::Span> {
72+
match self {
73+
Self::ReadHeader { span } => span.as_ref(),
74+
Self::ReadBody { span, .. } => Some(span),
75+
Self::Error(_) => None,
76+
}
77+
}
4778
}
4879

4980
#[derive(Debug, PartialEq, Eq)]
@@ -125,7 +156,7 @@ impl<T> Streaming<T> {
125156
.map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())))
126157
.map_err(|err| Status::map_error(err.into()))
127158
.boxed_unsync(),
128-
state: State::ReadHeader,
159+
state: State::read_header(),
129160
direction,
130161
buf: BytesMut::with_capacity(buffer_size),
131162
trailers: None,
@@ -142,7 +173,19 @@ impl StreamingInner {
142173
&mut self,
143174
buffer_settings: BufferSettings,
144175
) -> Result<Option<DecodeBuf<'_>>, Status> {
145-
if let State::ReadHeader = self.state {
176+
if let State::ReadHeader { span } = &mut self.state {
177+
if !self.buf.has_remaining() {
178+
return Ok(None);
179+
}
180+
181+
let span = span.get_or_insert_with(|| {
182+
tracing::debug_span!(
183+
"read_header",
184+
compression = tracing::field::Empty,
185+
body.bytes = tracing::field::Empty,
186+
)
187+
});
188+
let _guard = span.enter();
146189
if self.buf.remaining() < HEADER_SIZE {
147190
return Ok(None);
148191
}
@@ -151,7 +194,8 @@ impl StreamingInner {
151194
0 => None,
152195
1 => {
153196
{
154-
if self.encoding.is_some() {
197+
if let Some(ce) = self.encoding {
198+
span.record("compression", ce.as_str());
155199
self.encoding
156200
} else {
157201
// https://grpc.github.io/grpc/core/md_doc_compression.html
@@ -177,6 +221,7 @@ impl StreamingInner {
177221
};
178222

179223
let len = self.buf.get_u32() as usize;
224+
span.record("body.bytes", len);
180225
let limit = self
181226
.max_message_size
182227
.unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
@@ -191,14 +236,19 @@ impl StreamingInner {
191236
}
192237

193238
self.buf.reserve(len);
239+
drop(_guard);
194240

195-
self.state = State::ReadBody {
196-
compression: compression_encoding,
197-
len,
198-
}
241+
self.state = State::read_body(compression_encoding, len)
199242
}
200243

201-
if let State::ReadBody { len, compression } = self.state {
244+
if let State::ReadBody {
245+
len,
246+
span,
247+
compression,
248+
} = &self.state
249+
{
250+
let (len, compression) = (*len, *compression);
251+
let _guard = span.enter();
202252
// if we haven't read enough of the message then return and keep
203253
// reading
204254
if self.buf.remaining() < len || self.buf.len() < len {
@@ -228,6 +278,7 @@ impl StreamingInner {
228278
return Err(Status::new(Code::Internal, message));
229279
}
230280
let decompressed_len = self.decompress_buf.len();
281+
span.record("uncompressed.bytes", decompressed_len);
231282
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
232283
} else {
233284
DecodeBuf::new(&mut self.buf, len)
@@ -241,14 +292,16 @@ impl StreamingInner {
241292

242293
// Returns Some(()) if data was found or None if the loop in `poll_next` should break
243294
fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
295+
let _guard = self.state.span().map(|s| s.enter());
244296
let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) {
245297
Some(Ok(d)) => Some(d),
246298
Some(Err(status)) => {
247299
if self.direction == Direction::Request && status.code() == Code::Cancelled {
248300
return Poll::Ready(Ok(None));
249301
}
250302

251-
let _ = std::mem::replace(&mut self.state, State::Error(status.clone()));
303+
drop(_guard);
304+
let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone())));
252305
debug!("decoder inner stream error: {:?}", status);
253306
return Poll::Ready(Err(status));
254307
}
@@ -378,7 +431,7 @@ impl<T> Streaming<T> {
378431
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
379432
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
380433
Some(msg) => {
381-
self.inner.state = State::ReadHeader;
434+
self.inner.state = State::read_header();
382435
Ok(Some(msg))
383436
}
384437
None => Ok(None),
@@ -394,7 +447,7 @@ impl<T> Stream for Streaming<T> {
394447
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
395448
loop {
396449
if let State::Error(status) = &self.inner.state {
397-
return Poll::Ready(Some(Err(status.clone())));
450+
return Poll::Ready(Some(Err(*status.clone())));
398451
}
399452

400453
if let Some(item) = self.decode_chunk()? {

0 commit comments

Comments
 (0)