@@ -19,9 +19,11 @@ import (
19
19
"crypto/rand"
20
20
"encoding/hex"
21
21
"encoding/json"
22
+ errors2 "errors"
22
23
"fmt"
23
24
"github.com/caarlos0/env"
24
25
"github.com/devtron-labs/common-lib/utils/k8s"
26
+ "github.com/devtron-labs/devtron/internal/middleware"
25
27
"github.com/devtron-labs/devtron/pkg/argoApplication"
26
28
"github.com/devtron-labs/devtron/pkg/cluster"
27
29
"github.com/devtron-labs/devtron/pkg/cluster/repository"
@@ -31,6 +33,7 @@ import (
31
33
"k8s.io/apimachinery/pkg/api/errors"
32
34
"log"
33
35
"net/http"
36
+ "strconv"
34
37
"strings"
35
38
"sync"
36
39
"time"
@@ -44,6 +47,7 @@ import (
44
47
)
45
48
46
49
const END_OF_TRANSMISSION = "\u0004 "
50
+ const ProcessExitedMsg = "Process exited"
47
51
48
52
// PtyHandler is what remotecommand expects from a pty
49
53
type PtyHandler interface {
@@ -54,11 +58,17 @@ type PtyHandler interface {
54
58
55
59
// TerminalSession implements PtyHandler (using a SockJS connection)
56
60
type TerminalSession struct {
57
- id string
58
- bound chan error
59
- sockJSSession sockjs.Session
60
- sizeChan chan remotecommand.TerminalSize
61
- doneChan chan struct {}
61
+ id string
62
+ bound chan error
63
+ sockJSSession sockjs.Session
64
+ sizeChan chan remotecommand.TerminalSize
65
+ doneChan chan struct {}
66
+ context context.Context
67
+ contextCancelFunc context.CancelFunc
68
+ podName string
69
+ namespace string
70
+ clusterId string
71
+ startedOn time.Time
62
72
}
63
73
64
74
// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
@@ -166,6 +176,15 @@ func (sm *SessionMap) Set(sessionId string, session TerminalSession) {
166
176
sm .Sessions [sessionId ] = session
167
177
}
168
178
179
+ func (sm * SessionMap ) SetTerminalSessionStartTime (sessionId string ) {
180
+ sm .Lock .Lock ()
181
+ defer sm .Lock .Unlock ()
182
+ if session , ok := sm .Sessions [sessionId ]; ok {
183
+ session .startedOn = time .Now ()
184
+ sm .Sessions [sessionId ] = session
185
+ }
186
+ }
187
+
169
188
// Close shuts down the SockJS connection and sends the status code and reason to the client
170
189
// Can happen if the process exits or if there is an error starting up the process
171
190
// For now the status code is unused and reason is shown to the user (unless "")
@@ -178,11 +197,23 @@ func (sm *SessionMap) Close(sessionId string, status uint32, reason string) {
178
197
if err != nil {
179
198
log .Println (err )
180
199
}
200
+ isErroredConnectionTermination := isConnectionClosedByError (status )
201
+ middleware .IncTerminalSessionRequestCounter (SessionTerminated , strconv .FormatBool (isErroredConnectionTermination ))
202
+ middleware .RecordTerminalSessionDurationMetrics (terminalSession .podName , terminalSession .namespace , terminalSession .clusterId , time .Since (terminalSession .startedOn ).Seconds ())
203
+ close (terminalSession .doneChan )
204
+ terminalSession .contextCancelFunc ()
181
205
delete (sm .Sessions , sessionId )
182
206
}
183
207
184
208
}
185
209
210
+ func isConnectionClosedByError (status uint32 ) bool {
211
+ if status == 2 {
212
+ return true
213
+ }
214
+ return false
215
+ }
216
+
186
217
var terminalSessions = SessionMap {Sessions : make (map [string ]TerminalSession )}
187
218
188
219
// handleTerminalSession is Called by net/http for any new /api/sockjs connections
@@ -243,7 +274,7 @@ func CreateAttachHandler(path string) http.Handler {
243
274
244
275
// startProcess is called by handleAttach
245
276
// Executed cmd in the container specified in request and connects it up with the ptyHandler (a session)
246
- func startProcess (k8sClient kubernetes.Interface , cfg * rest.Config ,
277
+ func startProcess (ctx context. Context , k8sClient kubernetes.Interface , cfg * rest.Config ,
247
278
cmd []string , ptyHandler PtyHandler , sessionRequest * TerminalSessionRequest ) error {
248
279
namespace := sessionRequest .Namespace
249
280
podName := sessionRequest .PodName
@@ -262,17 +293,18 @@ func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config,
262
293
TerminalSizeQueue : ptyHandler ,
263
294
Tty : true ,
264
295
}
265
-
266
- err = execWithStreamOptions (exec , streamOptions )
296
+ isErroredConnectionTermination := false
297
+ middleware .IncTerminalSessionRequestCounter (SessionInitiating , strconv .FormatBool (isErroredConnectionTermination ))
298
+ terminalSessions .SetTerminalSessionStartTime (sessionRequest .SessionId )
299
+ err = execWithStreamOptions (ctx , exec , streamOptions )
267
300
if err != nil {
268
301
return err
269
302
}
270
-
271
303
return nil
272
304
}
273
305
274
- func execWithStreamOptions (exec remotecommand.Executor , streamOptions remotecommand.StreamOptions ) error {
275
- return exec .Stream ( streamOptions )
306
+ func execWithStreamOptions (ctx context. Context , exec remotecommand.Executor , streamOptions remotecommand.StreamOptions ) error {
307
+ return exec .StreamWithContext ( ctx , streamOptions )
276
308
}
277
309
278
310
func getExecutor (k8sClient kubernetes.Interface , cfg * rest.Config , podName , namespace , containerName string , cmd []string , stdin bool , tty bool ) (remotecommand.Executor , error ) {
@@ -344,32 +376,39 @@ var validShells = []string{"bash", "sh", "powershell", "cmd"}
344
376
// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession
345
377
func WaitForTerminal (k8sClient kubernetes.Interface , cfg * rest.Config , request * TerminalSessionRequest ) {
346
378
379
+ session := terminalSessions .Get (request .SessionId )
380
+ sessionCtx := session .context
381
+ timedCtx , _ := context .WithTimeout (sessionCtx , 60 * time .Second )
347
382
select {
348
- case <- terminalSessions . Get ( request . SessionId ) .bound :
349
- close (terminalSessions . Get ( request . SessionId ) .bound )
383
+ case <- session .bound :
384
+ close (session .bound )
350
385
351
386
var err error
352
387
if isValidShell (validShells , request .Shell ) {
353
388
cmd := []string {request .Shell }
354
389
355
- err = startProcess (k8sClient , cfg , cmd , terminalSessions .Get (request .SessionId ), request )
390
+ err = startProcess (sessionCtx , k8sClient , cfg , cmd , terminalSessions .Get (request .SessionId ), request )
356
391
} else {
357
392
// No Shell given or it was not valid: try some shells until one succeeds or all fail
358
393
// FIXME: if the first Shell fails then the first keyboard event is lost
359
394
for _ , testShell := range validShells {
360
395
cmd := []string {testShell }
361
- if err = startProcess (k8sClient , cfg , cmd , terminalSessions .Get (request .SessionId ), request ); err == nil {
396
+ if err = startProcess (sessionCtx , k8sClient , cfg , cmd , terminalSessions .Get (request .SessionId ), request ); err == nil || errors2 . Is ( err , context . Canceled ) {
362
397
break
363
398
}
364
399
}
365
400
}
366
401
367
- if err != nil {
402
+ if err != nil && ! errors2 . Is ( err , context . Canceled ) {
368
403
terminalSessions .Close (request .SessionId , 2 , err .Error ())
369
404
return
370
405
}
371
406
372
- terminalSessions .Close (request .SessionId , 1 , "Process exited" )
407
+ terminalSessions .Close (request .SessionId , 1 , ProcessExitedMsg )
408
+ case <- timedCtx .Done ():
409
+ // handle case when connection has not been initiated from FE side within particular time
410
+ close (session .bound )
411
+ terminalSessions .Close (request .SessionId , 1 , ProcessExitedMsg )
373
412
}
374
413
}
375
414
@@ -432,10 +471,17 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR
432
471
return statusCode , nil , err
433
472
}
434
473
req .SessionId = sessionID
474
+ sessionCtx , cancelFunc := context .WithCancel (context .Background ())
435
475
terminalSessions .Set (sessionID , TerminalSession {
436
- id : sessionID ,
437
- bound : make (chan error ),
438
- sizeChan : make (chan remotecommand.TerminalSize ),
476
+ id : sessionID ,
477
+ bound : make (chan error ),
478
+ sizeChan : make (chan remotecommand.TerminalSize ),
479
+ doneChan : make (chan struct {}),
480
+ context : sessionCtx ,
481
+ contextCancelFunc : cancelFunc ,
482
+ podName : req .PodName ,
483
+ namespace : req .Namespace ,
484
+ clusterId : strconv .Itoa (req .ClusterId ),
439
485
})
440
486
config , client , err := impl .getClientConfig (req )
441
487
@@ -559,7 +605,7 @@ func (impl *TerminalSessionHandlerImpl) RunCmdInRemotePod(req *TerminalSessionRe
559
605
buf := & bytes.Buffer {}
560
606
errBuf := & bytes.Buffer {}
561
607
impl .logger .Debug ("reached execWithStreamOptions method call" )
562
- err = execWithStreamOptions (exec , remotecommand.StreamOptions {
608
+ err = execWithStreamOptions (context . Background (), exec , remotecommand.StreamOptions {
563
609
Stdout : buf ,
564
610
Stderr : errBuf ,
565
611
})
0 commit comments