Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ linters-settings:
depguard:
rules:
everything:
list-mode: lax
allow:
- go.opentelemetry.io/otel/semconv/v1.27.0
deny:
- pkg: go.opentelemetry.io/otel/semconv
desc: Use "go.opentelemetry.io/otel/semconv/v1.27.0" instead.

- pkg: io/ioutil
desc: >
Use the "io" and "os" packages instead.
Expand Down Expand Up @@ -93,6 +99,13 @@ linters-settings:
alias: apierrors
no-unaliased: true

spancheck:
checks: [end, record-error]
extra-start-span-signatures:
- 'github.com/crunchydata/postgres-operator/internal/tracing.Start:opentelemetry'
ignore-check-signatures:
- 'tracing.Escape'

issues:
exclude-generated: strict
exclude-rules:
Expand Down
89 changes: 65 additions & 24 deletions cmd/postgres-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ package main

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"unicode"

"go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand All @@ -31,12 +33,11 @@ import (
"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/internal/registration"
"github.com/crunchydata/postgres-operator/internal/tracing"
"github.com/crunchydata/postgres-operator/internal/upgradecheck"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)

var versionString string

// assertNoError panics when err is not nil.
func assertNoError(err error) {
if err != nil {
Expand All @@ -58,8 +59,8 @@ func initLogging() {

//+kubebuilder:rbac:groups="coordination.k8s.io",resources="leases",verbs={get,create,update,watch}

func initManager() (runtime.Options, error) {
log := logging.FromContext(context.Background())
func initManager(ctx context.Context) (runtime.Options, error) {
log := logging.FromContext(ctx)

options := runtime.Options{}
options.Cache.SyncPeriod = initialize.Pointer(time.Hour)
Expand Down Expand Up @@ -120,45 +121,67 @@ func initManager() (runtime.Options, error) {
}

func main() {
// This context is canceled by SIGINT, SIGTERM, or by calling shutdown.
ctx, shutdown := context.WithCancel(runtime.SignalHandler())

otelFlush, err := initOpenTelemetry()
assertNoError(err)
defer otelFlush()
running, stopRunning := context.WithCancel(context.Background())
defer stopRunning()

initVersion()
initLogging()

log := logging.FromContext(ctx)
log := logging.FromContext(running)
log.V(1).Info("debug flag set to true")

// Start a goroutine that waits for SIGINT or SIGTERM.
{
signals := []os.Signal{os.Interrupt, syscall.SIGTERM}
receive := make(chan os.Signal, len(signals))
signal.Notify(receive, signals...)
go func() {
// Wait for a signal then immediately restore the default signal handlers.
// After this, a SIGHUP, SIGINT, or SIGTERM causes the program to exit.
// - https://pkg.go.dev/os/signal#hdr-Default_behavior_of_signals_in_Go_programs
s := <-receive
signal.Stop(receive)

log.Info("received signal from OS", "signal", s.String())
stopRunning()
}()
}

features := feature.NewGate()
assertNoError(features.Set(os.Getenv("PGO_FEATURE_GATES")))

ctx = feature.NewContext(ctx, features)
running = feature.NewContext(running, features)
log.Info("feature gates",
// These are set by the user
"PGO_FEATURE_GATES", feature.ShowAssigned(ctx),
"PGO_FEATURE_GATES", feature.ShowAssigned(running),
// These are enabled, including features that are on by default
"enabled", feature.ShowEnabled(ctx))
"enabled", feature.ShowEnabled(running))

// Initialize OpenTelemetry and flush data when there is a panic.
otelFinish, err := initOpenTelemetry(running)
assertNoError(err)
defer func(ctx context.Context) { _ = otelFinish(ctx) }(running)

tracing.SetDefaultTracer(tracing.New("github.com/CrunchyData/postgres-operator"))

cfg, err := runtime.GetConfig()
assertNoError(err)

cfg.UserAgent = userAgent
cfg.Wrap(otelTransportWrapper())

// TODO(controller-runtime): Set config.WarningHandler instead after v0.19.0.
// Configure client-go to suppress warnings when warning headers are encountered. This prevents
// warnings from being logged over and over again during reconciliation (e.g. this will suppress
// deprecation warnings when using an older version of a resource for backwards compatibility).
rest.SetDefaultWarningHandler(rest.NoWarnings{})

k8s, err := kubernetes.NewDiscoveryRunner(cfg)
assertNoError(err)
assertNoError(k8s.Read(ctx))
assertNoError(k8s.Read(running))

log.Info("Connected to Kubernetes", "api", k8s.Version().String(), "openshift", k8s.IsOpenShift())
log.Info("connected to Kubernetes", "api", k8s.Version().String(), "openshift", k8s.IsOpenShift())

options, err := initManager()
options, err := initManager(running)
assertNoError(err)

// Add to the Context that Manager passes to Reconciler.Start, Runnable.Start,
Expand All @@ -174,7 +197,7 @@ func main() {
assertNoError(err)
assertNoError(mgr.Add(k8s))

registrar, err := registration.NewRunner(os.Getenv("RSA_KEY"), os.Getenv("TOKEN_PATH"), shutdown)
registrar, err := registration.NewRunner(os.Getenv("RSA_KEY"), os.Getenv("TOKEN_PATH"), stopRunning)
assertNoError(err)
assertNoError(mgr.Add(registrar))
token, _ := registrar.CheckToken()
Expand Down Expand Up @@ -212,10 +235,29 @@ func main() {
assertNoError(mgr.AddHealthzCheck("health", healthz.Ping))
assertNoError(mgr.AddReadyzCheck("check", healthz.Ping))

log.Info("starting controller runtime manager and will wait for signal to exit")
// Start the manager and wait for its context to be canceled.
stopped := make(chan error, 1)
go func() { stopped <- mgr.Start(running) }()
<-running.Done()

// Set a deadline for graceful termination.
log.Info("shutting down")
stopping, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

// Wait for the manager to return or the deadline to pass.
select {
case err = <-stopped:
case <-stopping.Done():
err = stopping.Err()
}

assertNoError(mgr.Start(ctx))
log.Info("signal received, exiting")
// Flush any telemetry with the remaining time we have.
if err = errors.Join(err, otelFinish(stopping)); err != nil {
log.Error(err, "shutdown failed")
} else {
log.Info("shutdown complete")
}
}

// addControllersToManager adds all PostgreSQL Operator controllers to the provided controller
Expand All @@ -226,7 +268,6 @@ func addControllersToManager(mgr runtime.Manager, log logging.Logger, reg regist
Owner: postgrescluster.ControllerName,
Recorder: mgr.GetEventRecorderFor(postgrescluster.ControllerName),
Registration: reg,
Tracer: otel.Tracer(postgrescluster.ControllerName),
}

if err := pgReconciler.SetupWithManager(mgr); err != nil {
Expand Down
17 changes: 10 additions & 7 deletions cmd/postgres-operator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"reflect"
"testing"
"time"
Expand All @@ -14,8 +15,10 @@ import (
)

func TestInitManager(t *testing.T) {
ctx := context.Background()

t.Run("Defaults", func(t *testing.T) {
options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)

if assert.Check(t, options.Cache.SyncPeriod != nil) {
Expand Down Expand Up @@ -48,7 +51,7 @@ func TestInitManager(t *testing.T) {
t.Run("Invalid", func(t *testing.T) {
t.Setenv("PGO_CONTROLLER_LEASE_NAME", "INVALID_NAME")

options, err := initManager()
options, err := initManager(ctx)
assert.ErrorContains(t, err, "PGO_CONTROLLER_LEASE_NAME")
assert.ErrorContains(t, err, "invalid")

Expand All @@ -59,7 +62,7 @@ func TestInitManager(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
t.Setenv("PGO_CONTROLLER_LEASE_NAME", "valid-name")

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.Assert(t, options.LeaderElection == true)
assert.Equal(t, options.LeaderElectionNamespace, "test-namespace")
Expand All @@ -70,7 +73,7 @@ func TestInitManager(t *testing.T) {
t.Run("PGO_TARGET_NAMESPACE", func(t *testing.T) {
t.Setenv("PGO_TARGET_NAMESPACE", "some-such")

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.Assert(t, cmp.Len(options.Cache.DefaultNamespaces, 1),
"expected only one configured namespace")
Expand All @@ -81,7 +84,7 @@ func TestInitManager(t *testing.T) {
t.Run("PGO_TARGET_NAMESPACES", func(t *testing.T) {
t.Setenv("PGO_TARGET_NAMESPACES", "some-such,another-one")

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.Assert(t, cmp.Len(options.Cache.DefaultNamespaces, 2),
"expect two configured namespaces")
Expand All @@ -95,7 +98,7 @@ func TestInitManager(t *testing.T) {
for _, v := range []string{"-3", "0", "3.14"} {
t.Setenv("PGO_WORKERS", v)

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.DeepEqual(t, options.Controller.GroupKindConcurrency,
map[string]int{
Expand All @@ -107,7 +110,7 @@ func TestInitManager(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
t.Setenv("PGO_WORKERS", "19")

options, err := initManager()
options, err := initManager(ctx)
assert.NilError(t, err)
assert.DeepEqual(t, options.Controller.GroupKindConcurrency,
map[string]int{
Expand Down
Loading
Loading