From 888815e6e969773299144cdca934eb163b3f6bac Mon Sep 17 00:00:00 2001 From: ayushmaheshwari Date: Mon, 22 Apr 2024 15:25:30 +0530 Subject: [PATCH 1/8] closing channel after write operation --- pkg/terminal/terminalSesion.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index eb59eefbcf..783a5e3a82 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -250,6 +250,7 @@ func handleTerminalSession(session sockjs.Session) { terminalSession.sockJSSession = session terminalSessions.Set(msg.SessionID, terminalSession) terminalSession.bound <- nil + close(terminalSession.bound) } type SocketConfig struct { @@ -381,7 +382,6 @@ func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request * timedCtx, _ := context.WithTimeout(sessionCtx, 60*time.Second) select { case <-session.bound: - close(session.bound) var err error if isValidShell(validShells, request.Shell) { @@ -407,7 +407,6 @@ func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request * terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg) case <-timedCtx.Done(): // handle case when connection has not been initiated from FE side within particular time - close(session.bound) terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg) } } From ac6dfd0898b9e7efa684ba212b9f4e9fffac4925 Mon Sep 17 00:00:00 2001 From: ayushmaheshwari Date: Mon, 22 Apr 2024 16:02:09 +0530 Subject: [PATCH 2/8] removing close --- pkg/terminal/terminalSesion.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index 783a5e3a82..601b3d86da 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -250,7 +250,6 @@ func handleTerminalSession(session sockjs.Session) { terminalSession.sockJSSession = session terminalSessions.Set(msg.SessionID, terminalSession) terminalSession.bound <- nil - close(terminalSession.bound) } type SocketConfig struct { From dd80e20b2730d6992db65f208162bdcfd23c1aea Mon Sep 17 00:00:00 2001 From: ayushmaheshwari Date: Tue, 23 Apr 2024 16:12:18 +0530 Subject: [PATCH 3/8] using buffered channel --- pkg/terminal/terminalSesion.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index 601b3d86da..1c5636186c 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -472,7 +472,7 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR sessionCtx, cancelFunc := context.WithCancel(context.Background()) terminalSessions.Set(sessionID, TerminalSession{ id: sessionID, - bound: make(chan error), + bound: make(chan error, 1), sizeChan: make(chan remotecommand.TerminalSize), doneChan: make(chan struct{}), context: sessionCtx, From 1d5a3ef24d9cf233f42b6f48cbfe58f697ada0b4 Mon Sep 17 00:00:00 2001 From: ayushmaheshwari Date: Tue, 30 Apr 2024 11:50:37 +0530 Subject: [PATCH 4/8] wip: making done channel bufferred --- pkg/terminal/terminalSesion.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index 1c5636186c..5ceced7582 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -193,6 +193,7 @@ func (sm *SessionMap) Close(sessionId string, status uint32, reason string) { defer sm.Lock.Unlock() terminalSession := sm.Sessions[sessionId] if terminalSession.sockJSSession != nil { + terminalSession.doneChan <- struct{}{} err := terminalSession.sockJSSession.Close(status, reason) if err != nil { log.Println(err) @@ -474,7 +475,7 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR id: sessionID, bound: make(chan error, 1), sizeChan: make(chan remotecommand.TerminalSize), - doneChan: make(chan struct{}), + doneChan: make(chan struct{}, 1), context: sessionCtx, contextCancelFunc: cancelFunc, podName: req.PodName, From db4bb39e1961a3f51bdfba790fe04ff1ad271d42 Mon Sep 17 00:00:00 2001 From: ayushmaheshwari Date: Tue, 30 Apr 2024 18:35:06 +0530 Subject: [PATCH 5/8] terminal racecondition and deadlock fix --- pkg/terminal/terminalSesion.go | 51 +++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index 5ceced7582..e8a7dbbac3 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -48,6 +48,7 @@ import ( const END_OF_TRANSMISSION = "\u0004" const ProcessExitedMsg = "Process exited" +const ProcessTimedOut = "Process timedOut" // PtyHandler is what remotecommand expects from a pty type PtyHandler interface { @@ -185,24 +186,50 @@ func (sm *SessionMap) SetTerminalSessionStartTime(sessionId string) { } } +func (sm *SessionMap) setAndSendSignal(sessionId string, session sockjs.Session) { + sm.Lock.Lock() + defer sm.Lock.Unlock() + terminalSession, ok := sm.Sessions[sessionId] + if ok && terminalSession.id == "" { + log.Printf("handleTerminalSession: can't find session '%s'", sessionId) + session.Close(http.StatusGone, fmt.Sprintf("handleTerminalSession: can't find session '%s'", sessionId)) + return + } else if ok { + terminalSession.sockJSSession = session + sm.Sessions[sessionId] = terminalSession + terminalSession.bound <- nil + } +} + // Close shuts down the SockJS connection and sends the status code and reason to the client // Can happen if the process exits or if there is an error starting up the process // For now the status code is unused and reason is shown to the user (unless "") func (sm *SessionMap) Close(sessionId string, status uint32, reason string) { + sm.Lock.Lock() defer sm.Lock.Unlock() + terminalSession := sm.Sessions[sessionId] + if terminalSession.sockJSSession != nil { - terminalSession.doneChan <- struct{}{} + err := terminalSession.sockJSSession.Close(status, reason) if err != nil { log.Println(err) } + + select { + case terminalSession.doneChan <- struct{}{}: + close(terminalSession.doneChan) + default: + log.Printf("no message sent on done channel, sessionId: %v", sessionId) + } + isErroredConnectionTermination := isConnectionClosedByError(status) middleware.IncTerminalSessionRequestCounter(SessionTerminated, strconv.FormatBool(isErroredConnectionTermination)) middleware.RecordTerminalSessionDurationMetrics(terminalSession.podName, terminalSession.namespace, terminalSession.clusterId, time.Since(terminalSession.startedOn).Seconds()) - close(terminalSession.doneChan) terminalSession.contextCancelFunc() + close(terminalSession.bound) delete(sm.Sessions, sessionId) } @@ -220,10 +247,9 @@ var terminalSessions = SessionMap{Sessions: make(map[string]TerminalSession)} // handleTerminalSession is Called by net/http for any new /api/sockjs connections func handleTerminalSession(session sockjs.Session) { var ( - buf string - err error - msg TerminalMessage - terminalSession TerminalSession + buf string + err error + msg TerminalMessage ) if buf, err = session.Recv(); err != nil { @@ -242,15 +268,8 @@ func handleTerminalSession(session sockjs.Session) { return } - if terminalSession = terminalSessions.Get(msg.SessionID); terminalSession.id == "" { - log.Printf("handleTerminalSession: can't find session '%s'", msg.SessionID) - session.Close(http.StatusGone, fmt.Sprintf("handleTerminalSession: can't find session '%s'", msg.SessionID)) - return - } + terminalSessions.setAndSendSignal(msg.SessionID, session) - terminalSession.sockJSSession = session - terminalSessions.Set(msg.SessionID, terminalSession) - terminalSession.bound <- nil } type SocketConfig struct { @@ -407,7 +426,7 @@ func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request * terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg) case <-timedCtx.Done(): // handle case when connection has not been initiated from FE side within particular time - terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg) + terminalSessions.Close(request.SessionId, 1, ProcessTimedOut) } } @@ -475,7 +494,7 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR id: sessionID, bound: make(chan error, 1), sizeChan: make(chan remotecommand.TerminalSize), - doneChan: make(chan struct{}, 1), + doneChan: make(chan struct{}), context: sessionCtx, contextCancelFunc: cancelFunc, podName: req.PodName, From 341d6298ecce5dfaf474f4d9fe2885bb8aa87b98 Mon Sep 17 00:00:00 2001 From: ayushmaheshwari Date: Tue, 30 Apr 2024 18:37:45 +0530 Subject: [PATCH 6/8] wire run --- env_gen.md | 4 ++-- wire_gen.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/env_gen.md b/env_gen.md index ea8c8de083..7b220f4793 100644 --- a/env_gen.md +++ b/env_gen.md @@ -196,9 +196,9 @@ | REQ_CI_MEM | 3G | | | RESOURCE_LIST_FOR_REPLICAS | Deployment,Rollout,StatefulSet,ReplicaSet | | | RESOURCE_LIST_FOR_REPLICAS_BATCH_SIZE | 5 | | - | REVISION_HISTORY_LIMIT_DEVTRON_APP | 0 | | + | REVISION_HISTORY_LIMIT_DEVTRON_APP | 1 | | | REVISION_HISTORY_LIMIT_EXTERNAL_HELM_APP | 0 | | - | REVISION_HISTORY_LIMIT_HELM_APP | 0 | | + | REVISION_HISTORY_LIMIT_HELM_APP | 1 | | | RUNTIME_CONFIG_LOCAL_DEV | false | | | RUN_HELM_INSTALL_IN_ASYNC_MODE_HELM_APPS | false | | | SCOPED_VARIABLE_ENABLED | false | | diff --git a/wire_gen.go b/wire_gen.go index 6d697c7670..5a60b944d1 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -1,6 +1,6 @@ // Code generated by Wire. DO NOT EDIT. -//go:generate go run -mod=mod github.com/google/wire/cmd/wire +//go:generate go run github.com/google/wire/cmd/wire //go:build !wireinject // +build !wireinject From b49c54a8fff954dba9d3f21d44366d91c38f0ffb Mon Sep 17 00:00:00 2001 From: ayushmaheshwari Date: Tue, 30 Apr 2024 19:34:40 +0530 Subject: [PATCH 7/8] removing done send call --- pkg/terminal/terminalSesion.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index e8a7dbbac3..815a6fca93 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -218,12 +218,7 @@ func (sm *SessionMap) Close(sessionId string, status uint32, reason string) { log.Println(err) } - select { - case terminalSession.doneChan <- struct{}{}: - close(terminalSession.doneChan) - default: - log.Printf("no message sent on done channel, sessionId: %v", sessionId) - } + close(terminalSession.doneChan) isErroredConnectionTermination := isConnectionClosedByError(status) middleware.IncTerminalSessionRequestCounter(SessionTerminated, strconv.FormatBool(isErroredConnectionTermination)) From ab4be9e3d3be05147cb964ea7999f34826be1666 Mon Sep 17 00:00:00 2001 From: ayushmaheshwari Date: Wed, 1 May 2024 11:56:30 +0530 Subject: [PATCH 8/8] updating bound channel send function --- pkg/terminal/terminalSesion.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/terminal/terminalSesion.go b/pkg/terminal/terminalSesion.go index 815a6fca93..cc75bc0c08 100644 --- a/pkg/terminal/terminalSesion.go +++ b/pkg/terminal/terminalSesion.go @@ -197,7 +197,15 @@ func (sm *SessionMap) setAndSendSignal(sessionId string, session sockjs.Session) } else if ok { terminalSession.sockJSSession = session sm.Sessions[sessionId] = terminalSession - terminalSession.bound <- nil + + select { + case terminalSession.bound <- nil: + log.Printf("message sent on bound channel for sessionId : %s", sessionId) + default: + // if a request from the front end is not received within a particular time frame, and no one is reading from the bound channel, we will ignore sending on the bound channel. + log.Printf("skipping send on bound, channel receiver possibly timed out. sessionId: %s", sessionId) + } + } } @@ -487,7 +495,7 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR sessionCtx, cancelFunc := context.WithCancel(context.Background()) terminalSessions.Set(sessionID, TerminalSession{ id: sessionID, - bound: make(chan error, 1), + bound: make(chan error), sizeChan: make(chan remotecommand.TerminalSize), doneChan: make(chan struct{}), context: sessionCtx,