Skip to content

Commit b9d775b

Browse files
perf: memory optimizations and prom metrics on terminal session exposed (#4909)
* memory optimizations and prom metrics on terminal session exposed * inc terminal counter before stream actually starts
1 parent e7f34f5 commit b9d775b

File tree

6 files changed

+99
-23
lines changed

6 files changed

+99
-23
lines changed

client/events/EventClient.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ func (impl *EventRESTClientImpl) sendEvent(event Event) (bool, error) {
260260
impl.logger.Errorw("error while UpdateJiraTransition request ", "err", err)
261261
return false, err
262262
}
263+
defer resp.Body.Close()
263264
impl.logger.Debugw("event completed", "event resp", resp)
264265
return true, err
265266
}

internal/middleware/instrument.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,16 @@ var DeploymentStatusCronDuration = promauto.NewHistogramVec(prometheus.Histogram
117117
Name: "deployment_status_cron_process_time",
118118
}, []string{"cronName"})
119119

120+
var TerminalSessionRequestCounter = promauto.NewCounterVec(prometheus.CounterOpts{
121+
Name: "initiate_terminal_session_request_counter",
122+
Help: "count of requests for initiated, established and closed terminal sessions",
123+
}, []string{"sessionAction", "isError"})
124+
125+
var TerminalSessionDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
126+
Name: "terminal_session_duration",
127+
Help: "duration of each terminal session",
128+
}, []string{"podName", "namespace", "clusterId"})
129+
120130
// prometheusMiddleware implements mux.MiddlewareFunc.
121131
func PrometheusMiddleware(next http.Handler) http.Handler {
122132
// prometheus.MustRegister(requestCounter)
@@ -134,3 +144,11 @@ func PrometheusMiddleware(next http.Handler) http.Handler {
134144
requestCounter.WithLabelValues(path, method, strconv.Itoa(d.Status())).Inc()
135145
})
136146
}
147+
148+
func IncTerminalSessionRequestCounter(sessionAction string, isError string) {
149+
TerminalSessionRequestCounter.WithLabelValues(sessionAction, isError).Inc()
150+
}
151+
152+
func RecordTerminalSessionDurationMetrics(podName, namespace, clusterId string, sessionDuration float64) {
153+
TerminalSessionDuration.WithLabelValues(podName, namespace, clusterId).Observe(sessionDuration)
154+
}

pkg/clusterTerminalAccess/UserTerminalAccessService.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func (impl *UserTerminalAccessServiceImpl) closeAndCleanTerminalSession(accessSe
393393
}
394394

395395
func (impl *UserTerminalAccessServiceImpl) closeSession(sessionId string) {
396-
impl.terminalSessionHandler.Close(sessionId, 1, "Process exited")
396+
impl.terminalSessionHandler.Close(sessionId, 1, terminal.ProcessExitedMsg)
397397
}
398398

399399
func (impl *UserTerminalAccessServiceImpl) extractMetadataString(request *models.UserTerminalSessionRequest) string {

pkg/k8s/K8sCommonService.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,12 @@ func (impl *K8sCommonServiceImpl) GetManifestsByBatch(ctx context.Context, reque
220220
defer cancel()
221221
go func() {
222222
ans := impl.getManifestsByBatch(ctx, requests)
223-
ch <- ans
223+
select {
224+
case <-ctx.Done():
225+
return
226+
default:
227+
ch <- ans
228+
}
224229
}()
225230
select {
226231
case ans := <-ch:

pkg/terminal/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package terminal
2+
3+
const (
4+
SessionTerminated = "SessionTerminated"
5+
SessionInitiating = "SessionInitiating"
6+
)

pkg/terminal/terminalSesion.go

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ import (
1919
"crypto/rand"
2020
"encoding/hex"
2121
"encoding/json"
22+
errors2 "errors"
2223
"fmt"
2324
"github.com/caarlos0/env"
2425
"github.com/devtron-labs/common-lib/utils/k8s"
26+
"github.com/devtron-labs/devtron/internal/middleware"
2527
"github.com/devtron-labs/devtron/pkg/argoApplication"
2628
"github.com/devtron-labs/devtron/pkg/cluster"
2729
"github.com/devtron-labs/devtron/pkg/cluster/repository"
@@ -31,6 +33,7 @@ import (
3133
"k8s.io/apimachinery/pkg/api/errors"
3234
"log"
3335
"net/http"
36+
"strconv"
3437
"strings"
3538
"sync"
3639
"time"
@@ -44,6 +47,7 @@ import (
4447
)
4548

4649
const END_OF_TRANSMISSION = "\u0004"
50+
const ProcessExitedMsg = "Process exited"
4751

4852
// PtyHandler is what remotecommand expects from a pty
4953
type PtyHandler interface {
@@ -54,11 +58,17 @@ type PtyHandler interface {
5458

5559
// TerminalSession implements PtyHandler (using a SockJS connection)
5660
type TerminalSession struct {
57-
id string
58-
bound chan error
59-
sockJSSession sockjs.Session
60-
sizeChan chan remotecommand.TerminalSize
61-
doneChan chan struct{}
61+
id string
62+
bound chan error
63+
sockJSSession sockjs.Session
64+
sizeChan chan remotecommand.TerminalSize
65+
doneChan chan struct{}
66+
context context.Context
67+
contextCancelFunc context.CancelFunc
68+
podName string
69+
namespace string
70+
clusterId string
71+
startedOn time.Time
6272
}
6373

6474
// TerminalMessage is the messaging protocol between ShellController and TerminalSession.
@@ -166,6 +176,15 @@ func (sm *SessionMap) Set(sessionId string, session TerminalSession) {
166176
sm.Sessions[sessionId] = session
167177
}
168178

179+
func (sm *SessionMap) SetTerminalSessionStartTime(sessionId string) {
180+
sm.Lock.Lock()
181+
defer sm.Lock.Unlock()
182+
if session, ok := sm.Sessions[sessionId]; ok {
183+
session.startedOn = time.Now()
184+
sm.Sessions[sessionId] = session
185+
}
186+
}
187+
169188
// Close shuts down the SockJS connection and sends the status code and reason to the client
170189
// Can happen if the process exits or if there is an error starting up the process
171190
// For now the status code is unused and reason is shown to the user (unless "")
@@ -178,11 +197,23 @@ func (sm *SessionMap) Close(sessionId string, status uint32, reason string) {
178197
if err != nil {
179198
log.Println(err)
180199
}
200+
isErroredConnectionTermination := isConnectionClosedByError(status)
201+
middleware.IncTerminalSessionRequestCounter(SessionTerminated, strconv.FormatBool(isErroredConnectionTermination))
202+
middleware.RecordTerminalSessionDurationMetrics(terminalSession.podName, terminalSession.namespace, terminalSession.clusterId, time.Since(terminalSession.startedOn).Seconds())
203+
close(terminalSession.doneChan)
204+
terminalSession.contextCancelFunc()
181205
delete(sm.Sessions, sessionId)
182206
}
183207

184208
}
185209

210+
func isConnectionClosedByError(status uint32) bool {
211+
if status == 2 {
212+
return true
213+
}
214+
return false
215+
}
216+
186217
var terminalSessions = SessionMap{Sessions: make(map[string]TerminalSession)}
187218

188219
// handleTerminalSession is Called by net/http for any new /api/sockjs connections
@@ -243,7 +274,7 @@ func CreateAttachHandler(path string) http.Handler {
243274

244275
// startProcess is called by handleAttach
245276
// Executed cmd in the container specified in request and connects it up with the ptyHandler (a session)
246-
func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config,
277+
func startProcess(ctx context.Context, k8sClient kubernetes.Interface, cfg *rest.Config,
247278
cmd []string, ptyHandler PtyHandler, sessionRequest *TerminalSessionRequest) error {
248279
namespace := sessionRequest.Namespace
249280
podName := sessionRequest.PodName
@@ -262,17 +293,18 @@ func startProcess(k8sClient kubernetes.Interface, cfg *rest.Config,
262293
TerminalSizeQueue: ptyHandler,
263294
Tty: true,
264295
}
265-
266-
err = execWithStreamOptions(exec, streamOptions)
296+
isErroredConnectionTermination := false
297+
middleware.IncTerminalSessionRequestCounter(SessionInitiating, strconv.FormatBool(isErroredConnectionTermination))
298+
terminalSessions.SetTerminalSessionStartTime(sessionRequest.SessionId)
299+
err = execWithStreamOptions(ctx, exec, streamOptions)
267300
if err != nil {
268301
return err
269302
}
270-
271303
return nil
272304
}
273305

274-
func execWithStreamOptions(exec remotecommand.Executor, streamOptions remotecommand.StreamOptions) error {
275-
return exec.Stream(streamOptions)
306+
func execWithStreamOptions(ctx context.Context, exec remotecommand.Executor, streamOptions remotecommand.StreamOptions) error {
307+
return exec.StreamWithContext(ctx, streamOptions)
276308
}
277309

278310
func getExecutor(k8sClient kubernetes.Interface, cfg *rest.Config, podName, namespace, containerName string, cmd []string, stdin bool, tty bool) (remotecommand.Executor, error) {
@@ -344,32 +376,39 @@ var validShells = []string{"bash", "sh", "powershell", "cmd"}
344376
// Waits for the SockJS connection to be opened by the client the session to be bound in handleTerminalSession
345377
func WaitForTerminal(k8sClient kubernetes.Interface, cfg *rest.Config, request *TerminalSessionRequest) {
346378

379+
session := terminalSessions.Get(request.SessionId)
380+
sessionCtx := session.context
381+
timedCtx, _ := context.WithTimeout(sessionCtx, 60*time.Second)
347382
select {
348-
case <-terminalSessions.Get(request.SessionId).bound:
349-
close(terminalSessions.Get(request.SessionId).bound)
383+
case <-session.bound:
384+
close(session.bound)
350385

351386
var err error
352387
if isValidShell(validShells, request.Shell) {
353388
cmd := []string{request.Shell}
354389

355-
err = startProcess(k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request)
390+
err = startProcess(sessionCtx, k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request)
356391
} else {
357392
// No Shell given or it was not valid: try some shells until one succeeds or all fail
358393
// FIXME: if the first Shell fails then the first keyboard event is lost
359394
for _, testShell := range validShells {
360395
cmd := []string{testShell}
361-
if err = startProcess(k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request); err == nil {
396+
if err = startProcess(sessionCtx, k8sClient, cfg, cmd, terminalSessions.Get(request.SessionId), request); err == nil || errors2.Is(err, context.Canceled) {
362397
break
363398
}
364399
}
365400
}
366401

367-
if err != nil {
402+
if err != nil && !errors2.Is(err, context.Canceled) {
368403
terminalSessions.Close(request.SessionId, 2, err.Error())
369404
return
370405
}
371406

372-
terminalSessions.Close(request.SessionId, 1, "Process exited")
407+
terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg)
408+
case <-timedCtx.Done():
409+
// handle case when connection has not been initiated from FE side within particular time
410+
close(session.bound)
411+
terminalSessions.Close(request.SessionId, 1, ProcessExitedMsg)
373412
}
374413
}
375414

@@ -432,10 +471,17 @@ func (impl *TerminalSessionHandlerImpl) GetTerminalSession(req *TerminalSessionR
432471
return statusCode, nil, err
433472
}
434473
req.SessionId = sessionID
474+
sessionCtx, cancelFunc := context.WithCancel(context.Background())
435475
terminalSessions.Set(sessionID, TerminalSession{
436-
id: sessionID,
437-
bound: make(chan error),
438-
sizeChan: make(chan remotecommand.TerminalSize),
476+
id: sessionID,
477+
bound: make(chan error),
478+
sizeChan: make(chan remotecommand.TerminalSize),
479+
doneChan: make(chan struct{}),
480+
context: sessionCtx,
481+
contextCancelFunc: cancelFunc,
482+
podName: req.PodName,
483+
namespace: req.Namespace,
484+
clusterId: strconv.Itoa(req.ClusterId),
439485
})
440486
config, client, err := impl.getClientConfig(req)
441487

@@ -559,7 +605,7 @@ func (impl *TerminalSessionHandlerImpl) RunCmdInRemotePod(req *TerminalSessionRe
559605
buf := &bytes.Buffer{}
560606
errBuf := &bytes.Buffer{}
561607
impl.logger.Debug("reached execWithStreamOptions method call")
562-
err = execWithStreamOptions(exec, remotecommand.StreamOptions{
608+
err = execWithStreamOptions(context.Background(), exec, remotecommand.StreamOptions{
563609
Stdout: buf,
564610
Stderr: errBuf,
565611
})

0 commit comments

Comments
 (0)