@@ -38,12 +38,43 @@ impl<T> Unpin for Streaming<T> {}
38
38
39
39
#[ derive( Debug , Clone ) ]
40
40
enum State {
41
- ReadHeader ,
41
+ ReadHeader {
42
+ span : Option < tracing:: Span > ,
43
+ } ,
42
44
ReadBody {
45
+ span : tracing:: Span ,
43
46
compression : Option < CompressionEncoding > ,
44
47
len : usize ,
45
48
} ,
46
- Error ( Status ) ,
49
+ Error ( Box < Status > ) ,
50
+ }
51
+
52
+ impl State {
53
+ fn read_header ( ) -> Self {
54
+ Self :: ReadHeader { span : None }
55
+ }
56
+
57
+ fn read_body ( compression : Option < CompressionEncoding > , len : usize ) -> Self {
58
+ let span = tracing:: debug_span!(
59
+ "read_body" ,
60
+ body. compression = compression. map( |c| c. as_str( ) ) . unwrap_or( "none" ) ,
61
+ body. bytes. compressed = compression. is_some( ) . then_some( len) ,
62
+ body. bytes. uncompressed = compression. is_none( ) . then_some( len) ,
63
+ ) ;
64
+ Self :: ReadBody {
65
+ span,
66
+ compression,
67
+ len,
68
+ }
69
+ }
70
+
71
+ fn span ( & self ) -> Option < & tracing:: Span > {
72
+ match self {
73
+ Self :: ReadHeader { span } => span. as_ref ( ) ,
74
+ Self :: ReadBody { span, .. } => Some ( span) ,
75
+ Self :: Error ( _) => None ,
76
+ }
77
+ }
47
78
}
48
79
49
80
#[ derive( Debug , PartialEq , Eq ) ]
@@ -125,7 +156,7 @@ impl<T> Streaming<T> {
125
156
. map_frame ( |frame| frame. map_data ( |mut buf| buf. copy_to_bytes ( buf. remaining ( ) ) ) )
126
157
. map_err ( |err| Status :: map_error ( err. into ( ) ) )
127
158
. boxed_unsync ( ) ,
128
- state : State :: ReadHeader ,
159
+ state : State :: read_header ( ) ,
129
160
direction,
130
161
buf : BytesMut :: with_capacity ( buffer_size) ,
131
162
trailers : None ,
@@ -142,7 +173,19 @@ impl StreamingInner {
142
173
& mut self ,
143
174
buffer_settings : BufferSettings ,
144
175
) -> Result < Option < DecodeBuf < ' _ > > , Status > {
145
- if let State :: ReadHeader = self . state {
176
+ if let State :: ReadHeader { span } = & mut self . state {
177
+ if !self . buf . has_remaining ( ) {
178
+ return Ok ( None ) ;
179
+ }
180
+
181
+ let span = span. get_or_insert_with ( || {
182
+ tracing:: debug_span!(
183
+ "read_header" ,
184
+ body. compression = "none" ,
185
+ body. bytes = tracing:: field:: Empty ,
186
+ )
187
+ } ) ;
188
+ let _guard = span. enter ( ) ;
146
189
if self . buf . remaining ( ) < HEADER_SIZE {
147
190
return Ok ( None ) ;
148
191
}
@@ -151,7 +194,8 @@ impl StreamingInner {
151
194
0 => None ,
152
195
1 => {
153
196
{
154
- if self . encoding . is_some ( ) {
197
+ if let Some ( ce) = self . encoding {
198
+ span. record ( "body.compression" , ce. as_str ( ) ) ;
155
199
self . encoding
156
200
} else {
157
201
// https://grpc.github.io/grpc/core/md_doc_compression.html
@@ -177,6 +221,7 @@ impl StreamingInner {
177
221
} ;
178
222
179
223
let len = self . buf . get_u32 ( ) as usize ;
224
+ span. record ( "body.bytes" , len) ;
180
225
let limit = self
181
226
. max_message_size
182
227
. unwrap_or ( DEFAULT_MAX_RECV_MESSAGE_SIZE ) ;
@@ -191,14 +236,19 @@ impl StreamingInner {
191
236
}
192
237
193
238
self . buf . reserve ( len) ;
239
+ drop ( _guard) ;
194
240
195
- self . state = State :: ReadBody {
196
- compression : compression_encoding,
197
- len,
198
- }
241
+ self . state = State :: read_body ( compression_encoding, len)
199
242
}
200
243
201
- if let State :: ReadBody { len, compression } = self . state {
244
+ if let State :: ReadBody {
245
+ len,
246
+ span,
247
+ compression,
248
+ } = & self . state
249
+ {
250
+ let ( len, compression) = ( * len, * compression) ;
251
+ let _guard = span. enter ( ) ;
202
252
// if we haven't read enough of the message then return and keep
203
253
// reading
204
254
if self . buf . remaining ( ) < len || self . buf . len ( ) < len {
@@ -228,6 +278,7 @@ impl StreamingInner {
228
278
return Err ( Status :: new ( Code :: Internal , message) ) ;
229
279
}
230
280
let decompressed_len = self . decompress_buf . len ( ) ;
281
+ span. record ( "body.bytes.uncompressed" , decompressed_len) ;
231
282
DecodeBuf :: new ( & mut self . decompress_buf , decompressed_len)
232
283
} else {
233
284
DecodeBuf :: new ( & mut self . buf , len)
@@ -241,14 +292,16 @@ impl StreamingInner {
241
292
242
293
// Returns Some(()) if data was found or None if the loop in `poll_next` should break
243
294
fn poll_frame ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < Option < ( ) > , Status > > {
295
+ let _guard = self . state . span ( ) . map ( |s| s. enter ( ) ) ;
244
296
let chunk = match ready ! ( Pin :: new( & mut self . body) . poll_frame( cx) ) {
245
297
Some ( Ok ( d) ) => Some ( d) ,
246
298
Some ( Err ( status) ) => {
247
299
if self . direction == Direction :: Request && status. code ( ) == Code :: Cancelled {
248
300
return Poll :: Ready ( Ok ( None ) ) ;
249
301
}
250
302
251
- let _ = std:: mem:: replace ( & mut self . state , State :: Error ( status. clone ( ) ) ) ;
303
+ drop ( _guard) ;
304
+ let _ = std:: mem:: replace ( & mut self . state , State :: Error ( Box :: new ( status. clone ( ) ) ) ) ;
252
305
debug ! ( "decoder inner stream error: {:?}" , status) ;
253
306
return Poll :: Ready ( Err ( status) ) ;
254
307
}
@@ -378,7 +431,7 @@ impl<T> Streaming<T> {
378
431
match self . inner . decode_chunk ( self . decoder . buffer_settings ( ) ) ? {
379
432
Some ( mut decode_buf) => match self . decoder . decode ( & mut decode_buf) ? {
380
433
Some ( msg) => {
381
- self . inner . state = State :: ReadHeader ;
434
+ self . inner . state = State :: read_header ( ) ;
382
435
Ok ( Some ( msg) )
383
436
}
384
437
None => Ok ( None ) ,
@@ -394,7 +447,7 @@ impl<T> Stream for Streaming<T> {
394
447
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
395
448
loop {
396
449
if let State :: Error ( status) = & self . inner . state {
397
- return Poll :: Ready ( Some ( Err ( status. clone ( ) ) ) ) ;
450
+ return Poll :: Ready ( Some ( Err ( * status. clone ( ) ) ) ) ;
398
451
}
399
452
400
453
if let Some ( item) = self . decode_chunk ( ) ? {
0 commit comments