Skip to content

CBG-4712 /_sgcollect_info endpoint to work with non default port #7601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 24, 2025
Merged
19 changes: 16 additions & 3 deletions rest/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@ func (h *handler) handleSetLogging() error {

func (h *handler) handleSGCollectStatus() error {
status := "stopped"
if sgcollectInstance.IsRunning() {
if h.server.sgcollect.IsRunning() {
status = "running"
}

Expand All @@ -1647,7 +1647,7 @@ func (h *handler) handleSGCollectStatus() error {
}

func (h *handler) handleSGCollectCancel() error {
err := sgcollectInstance.Stop()
err := h.server.sgcollect.Stop()
if err != nil {
return base.HTTPErrorf(http.StatusBadRequest, "Error stopping sgcollect_info: %v", err)
}
Expand Down Expand Up @@ -1676,11 +1676,24 @@ func (h *handler) handleSGCollect() error {
// Populate username and password used by sgcollect_info script for talking to Sync Gateway.
params.syncGatewayUsername, params.syncGatewayPassword = h.getBasicAuth()

addr, err := h.server.getServerAddr(adminServer)
if err != nil {
return base.HTTPErrorf(http.StatusInternalServerError, "Error getting admin server address: %v", err)
}
if h.server.Config.API.HTTPS.TLSCertPath != "" {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
params.adminURL = addr

zipFilename := sgcollectFilename()

logFilePath := h.server.Config.Logging.LogFilePath

if err := sgcollectInstance.Start(logFilePath, h.serialNumber, zipFilename, params); err != nil {
ctx := base.CorrelationIDLogCtx(context.WithoutCancel(h.ctx()), fmt.Sprintf("SGCollect-%03d", h.serialNumber))

if err := h.server.sgcollect.Start(ctx, logFilePath, zipFilename, params); err != nil {
return base.HTTPErrorf(http.StatusInternalServerError, "Error running sgcollect_info: %v", err)
}

Expand Down
3 changes: 3 additions & 0 deletions rest/server_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type ServerContext struct {
DatabaseInitManager *DatabaseInitManager // Manages database initialization (index creation and readiness) independent of database stop/start/reload, when using persistent config
ActiveReplicationsCounter
invalidDatabaseConfigTracking invalidDatabaseConfigs
// handle sgcollect processes for a given Server
sgcollect *sgCollect
}

type ActiveReplicationsCounter struct {
Expand Down Expand Up @@ -163,6 +165,7 @@ func NewServerContext(ctx context.Context, config *StartupConfig, persistentConf
BootstrapContext: &bootstrapContext{sgVersion: *base.ProductVersion},
hasStarted: make(chan struct{}),
_httpServers: map[serverType]*serverInfo{},
sgcollect: newSGCollect(ctx),
}
sc.invalidDatabaseConfigTracking = invalidDatabaseConfigs{
dbNames: map[string]*invalidConfigInfo{},
Expand Down
200 changes: 135 additions & 65 deletions rest/sgcollect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path/filepath"
"regexp"
"runtime"
"slices"
"sync/atomic"
"time"

Expand All @@ -35,14 +36,6 @@ var (
ErrSGCollectInfoNotRunning = errors.New("not running")

validateTicketPattern = regexp.MustCompile(`\d{1,7}`)

sgPath, sgCollectPath, sgCollectPathErr = sgCollectPaths()
sgcollectInstance = sgCollect{
status: base.Ptr(sgStopped),
sgPath: sgPath,
sgCollectPath: sgCollectPath,
pathError: sgCollectPathErr,
}
)

const (
Expand All @@ -52,31 +45,43 @@ const (
defaultSGUploadHost = "https://uploads.couchbase.com"
)

// sgCollectOutputStream handles stderr/stdout from a running sgcollect process.
type sgCollectOutputStream struct {
stdoutPipeWriter io.WriteCloser // Pipe writer for stdout
stderrPipeWriter io.WriteCloser // Pipe writer for stderr
stderrPipeReader io.Reader // Pipe reader for stderr
stdoutPipeReader io.Reader // Pipe reader for stdout
stdoutDoneChan chan struct{} // Channel to signal stdout processing completion
stderrDoneChan chan struct{} // Channel to signal stderr processing completion
}

// sgCollect manages the state of a running sgcollect_info process.
type sgCollect struct {
cancel context.CancelFunc
status *uint32
sgPath string
sgCollectPath string
pathError error
context context.Context
cancel context.CancelFunc // Function to cancel a running sgcollect_info process, set when status == sgRunning
status *uint32
sgPath string // Path to the Sync Gateway executable
sgCollectPath []string // Path to the sgcollect_info executable
sgCollectPathErr error // Error if sgcollect_info path could not be determined
stdout io.Writer // test seam, is nil in production
stderr io.Writer // test seam, is nil in production
}

// Start will attempt to start sgcollect_info, if another is not already running.
func (sg *sgCollect) Start(logFilePath string, ctxSerialNumber uint64, zipFilename string, params sgCollectOptions) error {
func (sg *sgCollect) Start(ctx context.Context, logFilePath string, zipFilename string, params sgCollectOptions) error {
if atomic.LoadUint32(sg.status) == sgRunning {
return ErrSGCollectInfoAlreadyRunning
}

// Return error if there is any failure while obtaining sgCollectPaths.
if sg.pathError != nil {
return sg.pathError
if sg.sgCollectPathErr != nil {
return sg.sgCollectPathErr
}

if params.OutputDirectory == "" {
// If no output directory specified, default to the configured LogFilePath
if logFilePath != "" {
params.OutputDirectory = logFilePath
base.DebugfCtx(sg.context, base.KeyAdmin, "sgcollect_info: no output directory specified, using LogFilePath: %v", params.OutputDirectory)
base.DebugfCtx(ctx, base.KeyAdmin, "sgcollect_info: no output directory specified, using LogFilePath: %v", params.OutputDirectory)
} else {
// If LogFilePath is not set, and DefaultLogFilePath is not set via a service script, error out.
return errors.New("no output directory or LogFilePath specified")
Expand All @@ -90,69 +95,46 @@ func (sg *sgCollect) Start(logFilePath string, ctxSerialNumber uint64, zipFilena

zipPath := filepath.Join(params.OutputDirectory, zipFilename)

args := params.Args()
args = append(args, "--sync-gateway-executable", sgPath)
args = append(args, zipPath)
cmdline := slices.Clone(sg.sgCollectPath)
cmdline = append(cmdline, params.Args()...)
cmdline = append(cmdline, "--sync-gateway-executable", sg.sgPath)
cmdline = append(cmdline, zipPath)

ctx := base.CorrelationIDLogCtx(context.Background(), fmt.Sprintf("SGCollect-%03d", ctxSerialNumber))
ctx, sg.cancel = context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, cmdline[0], cmdline[1:]...)

sg.context, sg.cancel = context.WithCancel(ctx)
cmd := exec.CommandContext(sg.context, sgCollectPath, args...)

// Send command stderr/stdout to pipes
stderrPipeReader, stderrPipeWriter := io.Pipe()
cmd.Stderr = stderrPipeWriter
stdoutPipeReader, stdoutpipeWriter := io.Pipe()
cmd.Stdout = stdoutpipeWriter
outStream := newSGCollectOutputStream(ctx, sg.stdout, sg.stderr)
cmd.Stdout = outStream.stdoutPipeWriter
cmd.Stderr = outStream.stderrPipeWriter

if err := cmd.Start(); err != nil {
outStream.Close(ctx)
return err
}

atomic.StoreUint32(sg.status, sgRunning)
startTime := time.Now()
base.InfofCtx(sg.context, base.KeyAdmin, "sgcollect_info started with args: %v", base.UD(args))

// Stream sgcollect_info stderr to warn logs
go func() {
scanner := bufio.NewScanner(stderrPipeReader)
for scanner.Scan() {
base.InfofCtx(sg.context, base.KeyAll, "sgcollect_info: %v", scanner.Text())
}
if err := scanner.Err(); err != nil {
base.ErrorfCtx(sg.context, "sgcollect_info: unexpected error: %v", err)
}
}()

// Stream sgcollect_info stdout to debug logs
go func() {
scanner := bufio.NewScanner(stdoutPipeReader)
for scanner.Scan() {
base.InfofCtx(sg.context, base.KeyAll, "sgcollect_info: %v", scanner.Text())
}
if err := scanner.Err(); err != nil {
base.ErrorfCtx(sg.context, "sgcollect_info: unexpected error: %v", err)
}
}()
base.InfofCtx(ctx, base.KeyAdmin, "sgcollect_info started with output zip: %v", base.UD(zipPath))

go func() {
// Blocks until command finishes
err := cmd.Wait()
outStream.Close(ctx)

atomic.StoreUint32(sg.status, sgStopped)
duration := time.Since(startTime)

if err != nil {
if err.Error() == "signal: killed" {
base.InfofCtx(sg.context, base.KeyAdmin, "sgcollect_info cancelled after %v", duration)
base.InfofCtx(ctx, base.KeyAdmin, "sgcollect_info cancelled after %v", duration)
return
}

base.ErrorfCtx(sg.context, "sgcollect_info failed after %v with reason: %v. Check warning level logs for more information.", duration, err)
base.ErrorfCtx(ctx, "sgcollect_info failed after %v with reason: %v. Check warning level logs for more information.", duration, err)
return
}

base.InfofCtx(sg.context, base.KeyAdmin, "sgcollect_info finished successfully after %v", duration)
base.InfofCtx(ctx, base.KeyAdmin, "sgcollect_info finished successfully after %v", duration)
}()

return nil
Expand Down Expand Up @@ -190,6 +172,7 @@ type sgCollectOptions struct {
// We'll set them from the request's basic auth.
syncGatewayUsername string
syncGatewayPassword string
adminURL string // URL to the Sync Gateway admin API.
}

// validateOutputDirectory will check that the given path exists, and is a directory.
Expand All @@ -212,6 +195,80 @@ func validateOutputDirectory(dir string) error {
return nil
}

// newSGCollectOutputStream creates an instance to monitor stdout and stderr. Stdout is logged at Debug and Stderr at Info. extraStdout and extraStderr are optional writers used for testing only.
func newSGCollectOutputStream(ctx context.Context, extraStdout io.Writer, extraStderr io.Writer) *sgCollectOutputStream {
stderrPipeReader, stderrPipeWriter := io.Pipe()
stdoutPipeReader, stdoutPipeWriter := io.Pipe()
o := &sgCollectOutputStream{
stdoutPipeWriter: stdoutPipeWriter,
stderrPipeWriter: stderrPipeWriter,
stderrPipeReader: stderrPipeReader,
stdoutPipeReader: stdoutPipeReader,
stdoutDoneChan: make(chan struct{}),
stderrDoneChan: make(chan struct{}),
}
go func() {
defer close(o.stderrDoneChan)
scanner := bufio.NewScanner(stderrPipeReader)
for scanner.Scan() {
text := scanner.Text()
base.InfofCtx(ctx, base.KeyAll, "sgcollect_info: %v", text)
if extraStderr != nil {
_, err := extraStderr.Write([]byte(text + "\n"))
if err != nil {
base.ErrorfCtx(ctx, "sgcollect_info: failed to write to stderr pipe: %v", err)
}
}
}
if err := scanner.Err(); err != nil {
base.ErrorfCtx(ctx, "sgcollect_info: unexpected error: %v", err)
}
}()

// Stream sgcollect_info stdout to debug logs
go func() {
defer close(o.stdoutDoneChan)
scanner := bufio.NewScanner(stdoutPipeReader)
for scanner.Scan() {
text := scanner.Text()
base.InfofCtx(ctx, base.KeyAll, "sgcollect_info: %v", text)
if extraStdout != nil {
_, err := extraStdout.Write([]byte(text + "\n"))
if err != nil {
base.ErrorfCtx(ctx, "sgcollect_info: failed to write to stdout pipe: %v", err)
}
}
}
if err := scanner.Err(); err != nil {
base.ErrorfCtx(ctx, "sgcollect_info: unexpected error: %v", err)
}
}()
return o
}

// Close the output streams, required to close goroutines when sgCollectOutputStream is created.
func (o *sgCollectOutputStream) Close(ctx context.Context) {
err := o.stderrPipeWriter.Close()
if err != nil {
base.WarnfCtx(ctx, "sgcollect_info: failed to close stderr pipe writer: %v", err)
}
err = o.stdoutPipeWriter.Close()
if err != nil {
base.WarnfCtx(ctx, "sgcollect_info: failed to close stdout pipe writer: %v", err)
}
// Wait for the goroutines to finish processing the output streams, or exit after 5 seconds.
select {
case <-o.stdoutDoneChan:
case <-time.After(5 * time.Second):
base.WarnfCtx(ctx, "sgcollect_info: timed out waiting for stdout processing to finish")
}
select {
case <-o.stderrDoneChan:
case <-time.After(5 * time.Second):
base.WarnfCtx(ctx, "sgcollect_info: timed out waiting for stderr processing to finish")
}
}

// Validate ensures the options are OK to use in sgcollect_info.
func (c *sgCollectOptions) Validate() error {

Expand Down Expand Up @@ -297,23 +354,26 @@ func (c *sgCollectOptions) Args() []string {
if c.KeepZip {
args = append(args, "--keep-zip")
}

if c.adminURL != "" {
args = append(args, "--sync-gateway-url", c.adminURL)
}
return args
}

// sgCollectPaths attempts to return the absolute paths to Sync Gateway and to sgcollect_info binaries.
func sgCollectPaths() (sgBinary, sgCollectBinary string, err error) {
// sgCollectPaths attempts to return the absolute paths to Sync Gateway and to sgcollect_info binaries. Returns an error if either cannot be found.
//
// The sgcollect_info return value is allowed to be a list of strings for testing, where is it , or an error if not.
func sgCollectPaths(ctx context.Context) (sgBinary string, sgCollect []string, err error) {
sgBinary, err = os.Executable()
if err != nil {
return "", "", err
return "", nil, err
}

sgBinary, err = filepath.Abs(sgBinary)
if err != nil {
return "", "", err
return "", nil, err
}

logCtx := context.TODO() // this is global variable at init, we can't pass it in easily
hasBinDir := true
sgCollectPath := filepath.Join("tools", "sgcollect_info")

Expand All @@ -324,14 +384,15 @@ func sgCollectPaths() (sgBinary, sgCollectBinary string, err error) {
}

for {
var sgCollectBinary string
if hasBinDir {
sgCollectBinary = filepath.Join(filepath.Dir(filepath.Dir(sgBinary)), sgCollectPath)
} else {
sgCollectBinary = filepath.Join(filepath.Dir(sgBinary), sgCollectPath)
}

// Check sgcollect_info exists at the path we guessed.
base.DebugfCtx(logCtx, base.KeyAdmin, "Checking sgcollect_info binary exists at: %v", sgCollectBinary)
base.DebugfCtx(ctx, base.KeyAdmin, "Checking sgcollect_info binary exists at: %v", sgCollectBinary)
_, err = os.Stat(sgCollectBinary)
if err != nil {

Expand All @@ -341,10 +402,10 @@ func sgCollectPaths() (sgBinary, sgCollectBinary string, err error) {
continue
}

return "", "", err
return "", nil, err
}

return sgBinary, sgCollectBinary, nil
return sgBinary, []string{sgCollectBinary}, nil
}
}

Expand All @@ -371,3 +432,12 @@ func sgcollectFilename() string {

return filename
}

// newSGCollect creates a new sgCollect instance.
func newSGCollect(ctx context.Context) *sgCollect {
sgCollectInstance := sgCollect{
status: base.Ptr(sgStopped),
}
sgCollectInstance.sgPath, sgCollectInstance.sgCollectPath, sgCollectInstance.sgCollectPathErr = sgCollectPaths(ctx)
return &sgCollectInstance
}
Loading
Loading