@@ -48,6 +48,7 @@ import (
48
48
49
49
const END_OF_TRANSMISSION = "\u0004 "
50
50
const ProcessExitedMsg = "Process exited"
51
+ const ProcessTimedOut = "Process timedOut"
51
52
52
53
// PtyHandler is what remotecommand expects from a pty
53
54
type PtyHandler interface {
@@ -185,23 +186,53 @@ func (sm *SessionMap) SetTerminalSessionStartTime(sessionId string) {
185
186
}
186
187
}
187
188
189
+ func (sm * SessionMap ) setAndSendSignal (sessionId string , session sockjs.Session ) {
190
+ sm .Lock .Lock ()
191
+ defer sm .Lock .Unlock ()
192
+ terminalSession , ok := sm .Sessions [sessionId ]
193
+ if ok && terminalSession .id == "" {
194
+ log .Printf ("handleTerminalSession: can't find session '%s'" , sessionId )
195
+ session .Close (http .StatusGone , fmt .Sprintf ("handleTerminalSession: can't find session '%s'" , sessionId ))
196
+ return
197
+ } else if ok {
198
+ terminalSession .sockJSSession = session
199
+ sm .Sessions [sessionId ] = terminalSession
200
+
201
+ select {
202
+ case terminalSession .bound <- nil :
203
+ log .Printf ("message sent on bound channel for sessionId : %s" , sessionId )
204
+ default :
205
+ // if a request from the front end is not received within a particular time frame, and no one is reading from the bound channel, we will ignore sending on the bound channel.
206
+ log .Printf ("skipping send on bound, channel receiver possibly timed out. sessionId: %s" , sessionId )
207
+ }
208
+
209
+ }
210
+ }
211
+
188
212
// Close shuts down the SockJS connection and sends the status code and reason to the client
189
213
// Can happen if the process exits or if there is an error starting up the process
190
214
// For now the status code is unused and reason is shown to the user (unless "")
191
215
func (sm * SessionMap ) Close (sessionId string , status uint32 , reason string ) {
216
+
192
217
sm .Lock .Lock ()
193
218
defer sm .Lock .Unlock ()
219
+
194
220
terminalSession := sm .Sessions [sessionId ]
221
+
195
222
if terminalSession .sockJSSession != nil {
223
+
196
224
err := terminalSession .sockJSSession .Close (status , reason )
197
225
if err != nil {
198
226
log .Println (err )
199
227
}
228
+
229
+ close (terminalSession .doneChan )
230
+
200
231
isErroredConnectionTermination := isConnectionClosedByError (status )
201
232
middleware .IncTerminalSessionRequestCounter (SessionTerminated , strconv .FormatBool (isErroredConnectionTermination ))
202
233
middleware .RecordTerminalSessionDurationMetrics (terminalSession .podName , terminalSession .namespace , terminalSession .clusterId , time .Since (terminalSession .startedOn ).Seconds ())
203
- close (terminalSession .doneChan )
204
234
terminalSession .contextCancelFunc ()
235
+ close (terminalSession .bound )
205
236
delete (sm .Sessions , sessionId )
206
237
}
207
238
@@ -219,10 +250,9 @@ var terminalSessions = SessionMap{Sessions: make(map[string]TerminalSession)}
219
250
// handleTerminalSession is Called by net/http for any new /api/sockjs connections
220
251
func handleTerminalSession (session sockjs.Session ) {
221
252
var (
222
- buf string
223
- err error
224
- msg TerminalMessage
225
- terminalSession TerminalSession
253
+ buf string
254
+ err error
255
+ msg TerminalMessage
226
256
)
227
257
228
258
if buf , err = session .Recv (); err != nil {
@@ -241,15 +271,8 @@ func handleTerminalSession(session sockjs.Session) {
241
271
return
242
272
}
243
273
244
- if terminalSession = terminalSessions .Get (msg .SessionID ); terminalSession .id == "" {
245
- log .Printf ("handleTerminalSession: can't find session '%s'" , msg .SessionID )
246
- session .Close (http .StatusGone , fmt .Sprintf ("handleTerminalSession: can't find session '%s'" , msg .SessionID ))
247
- return
248
- }
274
+ terminalSessions .setAndSendSignal (msg .SessionID , session )
249
275
250
- terminalSession .sockJSSession = session
251
- terminalSessions .Set (msg .SessionID , terminalSession )
252
- terminalSession .bound <- nil
253
276
}
254
277
255
278
type SocketConfig struct {
@@ -381,7 +404,6 @@ func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *
381
404
timedCtx , _ := context .WithTimeout (sessionCtx , 60 * time .Second )
382
405
select {
383
406
case <- session .bound :
384
- close (session .bound )
385
407
386
408
var err error
387
409
if isValidShell (validShells , request .Shell ) {
@@ -407,8 +429,7 @@ func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *
407
429
terminalSessions .Close (request .SessionId , 1 , ProcessExitedMsg )
408
430
case <- timedCtx .Done ():
409
431
// handle case when connection has not been initiated from FE side within particular time
410
- close (session .bound )
411
- terminalSessions .Close (request .SessionId , 1 , ProcessExitedMsg )
432
+ terminalSessions .Close (request .SessionId , 1 , ProcessTimedOut )
412
433
}
413
434
}
414
435
0 commit comments