@@ -54,7 +54,7 @@ import (
54
54
"k8s.io/utils/clock"
55
55
)
56
56
57
- var (
57
+ const (
58
58
// longThrottleLatency defines threshold for logging requests. All requests being
59
59
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
60
60
// be logged.
@@ -676,21 +676,17 @@ func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) err
676
676
}
677
677
latency := time .Since (now )
678
678
679
- var message string
680
- switch {
681
- case len (retryInfo ) > 0 :
682
- message = fmt .Sprintf ("Waited for %v, %s - request: %s:%s" , latency , retryInfo , r .verb , r .URL ().String ())
683
- default :
684
- message = fmt .Sprintf ("Waited for %v due to client-side throttling, not priority and fairness, request: %s:%s" , latency , r .verb , r .URL ().String ())
685
- }
686
-
687
679
if latency > longThrottleLatency {
688
- klog .V (3 ).Info (message )
689
- }
690
- if latency > extraLongThrottleLatency {
691
- // If the rate limiter latency is very high, the log message should be printed at a higher log level,
692
- // but we use a throttled logger to prevent spamming.
693
- globalThrottledLogger .Infof ("%s" , message )
680
+ if retryInfo == "" {
681
+ retryInfo = "client-side throttling, not priority and fairness"
682
+ }
683
+ klog .FromContext (ctx ).V (3 ).Info ("Waited before sending request" , "delay" , latency , "reason" , retryInfo , "verb" , r .verb , "URL" , r .URL ())
684
+
685
+ if latency > extraLongThrottleLatency {
686
+ // If the rate limiter latency is very high, the log message should be printed at a higher log level,
687
+ // but we use a throttled logger to prevent spamming.
688
+ globalThrottledLogger .info (klog .FromContext (ctx ), "Waited before sending request" , "delay" , latency , "reason" , retryInfo , "verb" , r .verb , "URL" , r .URL ())
689
+ }
694
690
}
695
691
metrics .RateLimiterLatency .Observe (ctx , r .verb , r .finalURLTemplate (), latency )
696
692
@@ -702,7 +698,7 @@ func (r *Request) tryThrottle(ctx context.Context) error {
702
698
}
703
699
704
700
type throttleSettings struct {
705
- logLevel klog. Level
701
+ logLevel int
706
702
minLogInterval time.Duration
707
703
708
704
lastLogTime time.Time
@@ -727,9 +723,9 @@ var globalThrottledLogger = &throttledLogger{
727
723
},
728
724
}
729
725
730
- func (b * throttledLogger ) attemptToLog () ( klog.Level , bool ) {
726
+ func (b * throttledLogger ) attemptToLog (logger klog.Logger ) ( int , bool ) {
731
727
for _ , setting := range b .settings {
732
- if bool (klog .V (setting .logLevel ).Enabled ()) {
728
+ if bool (logger .V (setting .logLevel ).Enabled ()) {
733
729
// Return early without write locking if possible.
734
730
if func () bool {
735
731
setting .lock .RLock ()
@@ -751,9 +747,9 @@ func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
751
747
752
748
// Infof will write a log message at each logLevel specified by the receiver's throttleSettings
753
749
// as long as it hasn't written a log message more recently than minLogInterval.
754
- func (b * throttledLogger ) Infof ( message string , args ... interface {} ) {
755
- if logLevel , ok := b .attemptToLog (); ok {
756
- klog .V (logLevel ).Infof (message , args ... )
750
+ func (b * throttledLogger ) info ( logger klog. Logger , message string , kv ... any ) {
751
+ if logLevel , ok := b .attemptToLog (logger ); ok {
752
+ logger .V (logLevel ).Info (message , kv ... )
757
753
}
758
754
}
759
755
@@ -1000,7 +996,7 @@ func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (wa
1000
996
contentType := resp .Header .Get ("Content-Type" )
1001
997
mediaType , params , err := mime .ParseMediaType (contentType )
1002
998
if err != nil {
1003
- klog .V (4 ).Infof ("Unexpected content type from the server: %q: %v" , contentType , err )
999
+ klog .FromContext ( ctx ). V (4 ).Info ("Unexpected content type from the server" , "contentType" , contentType , "err" , err )
1004
1000
}
1005
1001
objectDecoder , streamingSerializer , framer , err := r .contentConfig .Negotiator .StreamDecoder (mediaType , params )
1006
1002
if err != nil {
@@ -1202,7 +1198,7 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp
1202
1198
}()
1203
1199
1204
1200
if r .err != nil {
1205
- klog .V (4 ).Infof ("Error in request: %v " , r .err )
1201
+ klog .FromContext ( ctx ). V (4 ).Info ("Error in request" , "err " , r .err )
1206
1202
return r .err
1207
1203
}
1208
1204
@@ -1303,7 +1299,7 @@ func (r *Request) Do(ctx context.Context) Result {
1303
1299
result = r .transformResponse (ctx , resp , req )
1304
1300
})
1305
1301
if err != nil {
1306
- return Result {err : err }
1302
+ return Result {err : err , loggingCtx : context . WithoutCancel ( ctx ) }
1307
1303
}
1308
1304
if result .err == nil || len (result .body ) > 0 {
1309
1305
metrics .ResponseSize .Observe (ctx , r .verb , r .URL ().Host , float64 (len (result .body )))
@@ -1350,16 +1346,18 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
1350
1346
// 2. Apiserver sends back the headers and then part of the body
1351
1347
// 3. Apiserver closes connection.
1352
1348
// 4. client-go should catch this and return an error.
1353
- klog .V (2 ).Infof ("Stream error %#v when reading response body, may be caused by closed connection. " , err )
1349
+ klog .FromContext ( ctx ). V (2 ).Info ("Stream error when reading response body, may be caused by closed connection" , "err " , err )
1354
1350
streamErr := fmt .Errorf ("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %w" , err )
1355
1351
return Result {
1356
- err : streamErr ,
1352
+ err : streamErr ,
1353
+ loggingCtx : context .WithoutCancel (ctx ),
1357
1354
}
1358
1355
default :
1359
- klog .Errorf ( "Unexpected error when reading response body: %v" , err )
1356
+ klog .FromContext ( ctx ). Error ( err , "Unexpected error when reading response body" )
1360
1357
unexpectedErr := fmt .Errorf ("unexpected error when reading response body. Please retry. Original error: %w" , err )
1361
1358
return Result {
1362
- err : unexpectedErr ,
1359
+ err : unexpectedErr ,
1360
+ loggingCtx : context .WithoutCancel (ctx ),
1363
1361
}
1364
1362
}
1365
1363
}
@@ -1377,7 +1375,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
1377
1375
var err error
1378
1376
mediaType , params , err := mime .ParseMediaType (contentType )
1379
1377
if err != nil {
1380
- return Result {err : errors .NewInternalError (err )}
1378
+ return Result {err : errors .NewInternalError (err ), loggingCtx : context . WithoutCancel ( ctx ) }
1381
1379
}
1382
1380
decoder , err = r .contentConfig .Negotiator .Decoder (mediaType , params )
1383
1381
if err != nil {
@@ -1386,13 +1384,14 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
1386
1384
case resp .StatusCode == http .StatusSwitchingProtocols :
1387
1385
// no-op, we've been upgraded
1388
1386
case resp .StatusCode < http .StatusOK || resp .StatusCode > http .StatusPartialContent :
1389
- return Result {err : r .transformUnstructuredResponseError (resp , req , body )}
1387
+ return Result {err : r .transformUnstructuredResponseError (resp , req , body ), loggingCtx : context . WithoutCancel ( ctx ) }
1390
1388
}
1391
1389
return Result {
1392
1390
body : body ,
1393
1391
contentType : contentType ,
1394
1392
statusCode : resp .StatusCode ,
1395
1393
warnings : handleWarnings (ctx , resp .Header , r .warningHandler ),
1394
+ loggingCtx : context .WithoutCancel (ctx ),
1396
1395
}
1397
1396
}
1398
1397
}
@@ -1412,6 +1411,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
1412
1411
decoder : decoder ,
1413
1412
err : err ,
1414
1413
warnings : handleWarnings (ctx , resp .Header , r .warningHandler ),
1414
+ loggingCtx : context .WithoutCancel (ctx ),
1415
1415
}
1416
1416
}
1417
1417
@@ -1421,6 +1421,7 @@ func (r *Request) transformResponse(ctx context.Context, resp *http.Response, re
1421
1421
statusCode : resp .StatusCode ,
1422
1422
decoder : decoder ,
1423
1423
warnings : handleWarnings (ctx , resp .Header , r .warningHandler ),
1424
+ loggingCtx : context .WithoutCancel (ctx ),
1424
1425
}
1425
1426
}
1426
1427
@@ -1552,6 +1553,10 @@ type Result struct {
1552
1553
err error
1553
1554
statusCode int
1554
1555
1556
+ // Log calls in Result methods use the same context for logging as the
1557
+ // method which created the Result. This context has no cancellation.
1558
+ loggingCtx context.Context
1559
+
1555
1560
decoder runtime.Decoder
1556
1561
}
1557
1562
@@ -1656,7 +1661,11 @@ func (r Result) Error() error {
1656
1661
// to be backwards compatible with old servers that do not return a version, default to "v1"
1657
1662
out , _ , err := r .decoder .Decode (r .body , & schema.GroupVersionKind {Version : "v1" }, nil )
1658
1663
if err != nil {
1659
- klog .V (5 ).Infof ("body was not decodable (unable to check for Status): %v" , err )
1664
+ ctx := r .loggingCtx
1665
+ if ctx == nil {
1666
+ ctx = context .Background ()
1667
+ }
1668
+ klog .FromContext (ctx ).V (5 ).Info ("Body was not decodable (unable to check for Status)" , "err" , err )
1660
1669
return r .err
1661
1670
}
1662
1671
switch t := out .(type ) {
0 commit comments