From 5c158ef858f46cde30ef4aa4f7824350839c3563 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakob=20M=C3=B6ller?= Date: Thu, 31 Jul 2025 15:35:29 +0200 Subject: [PATCH] fix: make sure that dynamic controller informer queues, workers are shutdown correctly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit previously the informer queue of the dynamic controller and the manager shut-down immediately (killed). This is problematic because it means that a reconcile loop can potentially be interrupted in the middle of a run even though we have a normally graceful shutdown. This fixes this by adding the dynamic controller into the manager runnables instead of leaking a goroutine as well as fixing how the manager is actually stopped (previously the manager was run in a goroutine). Additionally we now properly shutdown the queue of the dynamic controller and wait for the workers to finish processing. Note that if this process still takes longer than the graceful shutdown period, we forcefully kill the manager and the binary. Because the shutdown timeout is now respected globally, I have replaced `--dynamic-controller-default-shutdown-timeout` and `KRO_DYNAMIC_CONTROLLER_DEFAULT_SHUTDOWN_TIMEOUT` with `--graceful-shutdown-timeout` and `KRO_GRACEFUL_SHUTDOWN_TIMEOUT` because the shutdown timeout is now handled globally within the manager and not individually just for the dynamic controller. Signed-off-by: Jakob Möller --- cmd/controller/main.go | 50 ++++++++----------- helm/templates/deployment.yaml | 8 +-- helm/values.yaml | 4 +- pkg/dynamiccontroller/dynamic_controller.go | 46 +++++++++++------ .../dynamic_controller_test.go | 6 +-- test/integration/environment/setup.go | 27 +++++----- .../suites/ackekscluster/suite_test.go | 2 +- test/integration/suites/core/setup_test.go | 2 +- .../suites/deploymentservice/suite_test.go | 2 +- .../suites/networkingstack/suite_test.go | 2 +- 10 files changed, 77 insertions(+), 72 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index a9f52afe7..3a18c3597 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -15,7 +15,6 @@ package main import ( - "context" "flag" "os" "time" @@ -35,7 +34,7 @@ import ( resourcegraphdefinitionctrl "github.com/kro-run/kro/pkg/controller/resourcegraphdefinition" "github.com/kro-run/kro/pkg/dynamiccontroller" "github.com/kro-run/kro/pkg/graph" - //+kubebuilder:scaffold:imports + // +kubebuilder:scaffold:imports ) var ( @@ -48,7 +47,7 @@ func init() { utilruntime.Must(xv1alpha1.AddToScheme(scheme)) utilruntime.Must(extv1.AddToScheme(scheme)) - //+kubebuilder:scaffold:scheme + // +kubebuilder:scaffold:scheme } type customLevelEnabler struct { @@ -73,9 +72,9 @@ func main() { rateLimit int burstLimit int // reconciler parameters - resyncPeriod int - queueMaxRetries int - shutdownTimeout int + resyncPeriod int + queueMaxRetries int + gracefulShutdownTimeout time.Duration // var dynamicControllerDefaultResyncPeriod int logLevel int qps float64 @@ -88,6 +87,8 @@ func main() { "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") flag.BoolVar(&allowCRDDeletion, "allow-crd-deletion", false, "allow kro to delete CRDs") + flag.DurationVar(&gracefulShutdownTimeout, "graceful-shutdown-timeout", 60*time.Second, + "maximum duration to wait for the controller manager to gracefully shutdown") flag.IntVar(&resourceGraphDefinitionConcurrentReconciles, "resource-graph-definition-concurrent-reconciles", 1, "The number of resource graph definition reconciles to run in parallel", @@ -112,8 +113,6 @@ func main() { "interval at which the controller will re list resources even with no changes, in seconds") flag.IntVar(&queueMaxRetries, "dynamic-controller-default-queue-max-retries", 20, "maximum number of retries for an item in the queue will be retried before being dropped") - flag.IntVar(&shutdownTimeout, "dynamic-controller-default-shutdown-timeout", 60, - "maximum duration to wait for the controller to gracefully shutdown, in seconds") // log level flags flag.IntVar(&logLevel, "log-level", 10, "The log level verbosity. 0 is the least verbose, 5 is the most verbose.") // qps and burst @@ -147,9 +146,10 @@ func main() { Metrics: metricsserver.Options{ BindAddress: metricsAddr, }, - HealthProbeBindAddress: probeAddr, - LeaderElection: enableLeaderElection, - LeaderElectionID: "6f0f64a5.kro.run", + GracefulShutdownTimeout: &gracefulShutdownTimeout, + HealthProbeBindAddress: probeAddr, + LeaderElection: enableLeaderElection, + LeaderElectionID: "6f0f64a5.kro.run", // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily // when the Manager ends. This requires the binary to immediately end when the // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly @@ -170,7 +170,6 @@ func main() { dc := dynamiccontroller.NewDynamicController(rootLogger, dynamiccontroller.Config{ Workers: dynamicControllerConcurrentReconciles, - ShutdownTimeout: time.Duration(shutdownTimeout) * time.Second, ResyncPeriod: time.Duration(resyncPeriod) * time.Second, QueueMaxRetries: queueMaxRetries, MinRetryDelay: minRetryDelay, @@ -199,14 +198,12 @@ func main() { os.Exit(1) } - go func() { - err := dc.Run(context.Background()) - if err != nil { - setupLog.Error(err, "dynamic controller failed to run") - } - }() + if err := mgr.Add(dc); err != nil { + setupLog.Error(err, "unable to add dynamic controller to manager") + os.Exit(1) + } - //+kubebuilder:scaffold:builder + // +kubebuilder:scaffold:builder if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") @@ -218,15 +215,8 @@ func main() { os.Exit(1) } - ctx := ctrl.SetupSignalHandler() - - go func() { - if err := mgr.Start(ctx); err != nil { - setupLog.Error(err, "problem running manager") - os.Exit(1) - } - }() - - <-ctx.Done() - + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + setupLog.Error(err, "problem running manager") + os.Exit(1) + } } diff --git a/helm/templates/deployment.yaml b/helm/templates/deployment.yaml index 1f04e18ff..f6fa7af51 100644 --- a/helm/templates/deployment.yaml +++ b/helm/templates/deployment.yaml @@ -59,8 +59,8 @@ spec: value: {{ .Values.config.dynamicControllerDefaultResyncPeriod | quote }} - name: KRO_DYNAMIC_CONTROLLER_DEFAULT_QUEUE_MAX_RETRIES value: {{ .Values.config.dynamicControllerDefaultQueueMaxRetries | quote }} - - name: KRO_DYNAMIC_CONTROLLER_DEFAULT_SHUTDOWN_TIMEOUT - value: {{ .Values.config.dynamicControllerDefaultShutdownTimeout | quote }} + - name: KRO_GRACEFUL_SHUTDOWN_TIMEOUT + value: {{ .Values.config.gracefulShutdownTimeout | quote }} - name: KRO_CLIENT_QPS value: {{ .Values.config.clientQps | quote }} - name: KRO_CLIENT_BURST @@ -79,12 +79,12 @@ spec: - "$(KRO_DYNAMIC_CONTROLLER_CONCURRENT_RECONCILES)" - --log-level - "$(KRO_LOG_LEVEL)" + - --graceful-shutdown-timeout + - "$(KRO_GRACEFUL_SHUTDOWN_TIMEOUT)" - --dynamic-controller-default-resync-period - "$(KRO_DYNAMIC_CONTROLLER_DEFAULT_RESYNC_PERIOD)" - --dynamic-controller-default-queue-max-retries - "$(KRO_DYNAMIC_CONTROLLER_DEFAULT_QUEUE_MAX_RETRIES)" - - --dynamic-controller-default-shutdown-timeout - - "$(KRO_DYNAMIC_CONTROLLER_DEFAULT_SHUTDOWN_TIMEOUT)" - --client-qps - "$(KRO_CLIENT_QPS)" - --client-burst diff --git a/helm/values.yaml b/helm/values.yaml index c98864d3c..792d9eb4c 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -108,14 +108,14 @@ config: healthProbeBindAddress: :8079 # The number of resource graph definition reconciles to run in parallel resourceGraphDefinitionConcurrentReconciles: 1 + # The maximum duration to wait for the controller manager to gracefully shutdown + gracefulShutdownTimeout: 60s # The number of dynamic controller reconciles to run in parallel dynamicControllerConcurrentReconciles: 1 # The interval at which the controller will re list resources even with no changes, in seconds dynamicControllerDefaultResyncPeriod: 36000 # The maximum number of retries for an item in the queue will be retried before being dropped dynamicControllerDefaultQueueMaxRetries: 20 - # The maximum duration to wait for the controller to gracefully shutdown, in seconds - dynamicControllerDefaultShutdownTimeout: 60 # The log level verbosity. 0 is the least verbose, 5 is the most verbose logLevel: 3 diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index 1c33e5f0a..034f99fa9 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -94,10 +94,6 @@ type Config struct { // NOTE(a-hilaly): I'm not very sure how useful is this, i'm trying to avoid // situations where reconcile errors exhaust the queue. QueueMaxRetries int - // ShutdownTimeout is the maximum duration to wait for the controller to - // gracefully shutdown. We ideally want to avoid forceful shutdowns, giving - // the controller enough time to finish processing any pending items. - ShutdownTimeout time.Duration // MinRetryDelay is the minimum delay before retrying an item in the queue MinRetryDelay time.Duration // MaxRetryDelay is the maximum delay before retrying an item in the queue @@ -209,7 +205,7 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool { } // Run starts the DynamicController. -func (dc *DynamicController) Run(ctx context.Context) error { +func (dc *DynamicController) Start(ctx context.Context) error { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() @@ -224,17 +220,40 @@ func (dc *DynamicController) Run(ctx context.Context) error { // Spin up workers. // // TODO(a-hilaly): Allow for dynamic scaling of workers. + var wg sync.WaitGroup for i := 0; i < dc.config.Workers; i++ { - go wait.UntilWithContext(ctx, dc.worker, time.Second) + wg.Add(1) + go func() { + defer wg.Done() + wait.UntilWithContext(ctx, dc.worker, time.Second) + }() } + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + dc.log.Info("Received shutdown signal, shutting down dynamic controller queue") + dc.queue.ShutDown() + }() + + wg.Wait() + dc.log.Info("All workers have stopped") - <-ctx.Done() - return dc.gracefulShutdown(dc.config.ShutdownTimeout) + // when shutting down, the context given to Start is already closed, + // and the expectation is that we block until the graceful shutdown is complete. + return dc.shutdown(context.Background()) } // worker processes items from the queue. func (dc *DynamicController) worker(ctx context.Context) { - for dc.processNextWorkItem(ctx) { + for { + select { + case <-ctx.Done(): + dc.log.Info("Dynamic controller worker received shutdown signal, stopping") + return + default: + dc.processNextWorkItem(ctx) + } } } @@ -321,15 +340,14 @@ func (dc *DynamicController) syncFunc(ctx context.Context, oi ObjectIdentifiers) return err } -// gracefulShutdown performs a graceful shutdown of the controller. -func (dc *DynamicController) gracefulShutdown(timeout time.Duration) error { +// shutdown performs a graceful shutdown of the controller. +func (dc *DynamicController) shutdown(ctx context.Context) error { dc.log.Info("Starting graceful shutdown") - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - var wg sync.WaitGroup dc.informers.Range(func(key, value interface{}) bool { + k := key.(schema.GroupVersionResource) + dc.log.V(1).Info("Shutting down informer", "gvr", k.String()) wg.Add(1) go func(informer *informerWrapper) { defer wg.Done() diff --git a/pkg/dynamiccontroller/dynamic_controller_test.go b/pkg/dynamiccontroller/dynamic_controller_test.go index 1167da0d3..7c020784d 100644 --- a/pkg/dynamiccontroller/dynamic_controller_test.go +++ b/pkg/dynamiccontroller/dynamic_controller_test.go @@ -67,7 +67,6 @@ func TestNewDynamicController(t *testing.T) { Workers: 2, ResyncPeriod: 10 * time.Hour, QueueMaxRetries: 20, - ShutdownTimeout: 60 * time.Second, MinRetryDelay: 200 * time.Millisecond, MaxRetryDelay: 1000 * time.Second, RateLimit: 10, @@ -90,7 +89,6 @@ func TestRegisterAndUnregisterGVK(t *testing.T) { Workers: 1, ResyncPeriod: 1 * time.Second, QueueMaxRetries: 5, - ShutdownTimeout: 5 * time.Second, MinRetryDelay: 200 * time.Millisecond, MaxRetryDelay: 1000 * time.Second, RateLimit: 10, @@ -102,12 +100,12 @@ func TestRegisterAndUnregisterGVK(t *testing.T) { gvr := schema.GroupVersionResource{Group: "test", Version: "v1", Resource: "tests"} // Create a context with cancel for running the controller - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(t.Context()) defer cancel() // Start the controller in a goroutine go func() { - err := dc.Run(ctx) + err := dc.Start(ctx) require.NoError(t, err) }() diff --git a/test/integration/environment/setup.go b/test/integration/environment/setup.go index fbc6ff448..17e5a58d3 100644 --- a/test/integration/environment/setup.go +++ b/test/integration/environment/setup.go @@ -16,6 +16,7 @@ package environment import ( "context" + "errors" "fmt" "io" "path/filepath" @@ -23,6 +24,7 @@ import ( "github.com/go-logr/logr" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" @@ -49,6 +51,7 @@ type Environment struct { ClientSet *kroclient.Set CRDManager kroclient.CRDClient GraphBuilder *graph.Builder + ManagerResult chan error } type ControllerConfig struct { @@ -56,14 +59,14 @@ type ControllerConfig struct { ReconcileConfig ctrlinstance.ReconcileConfig } -func New(controllerConfig ControllerConfig) (*Environment, error) { +func New(ctx context.Context, controllerConfig ControllerConfig) (*Environment, error) { env := &Environment{ ControllerConfig: controllerConfig, } // Setup logging logf.SetLogger(zap.New(zap.WriteTo(io.Discard), zap.UseDevMode(true))) - env.context, env.cancel = context.WithCancel(context.Background()) + env.context, env.cancel = context.WithCancel(ctx) env.TestEnv = &envtest.Environment{ CRDDirectoryPaths: []string{ @@ -139,7 +142,6 @@ func (e *Environment) setupController() error { Workers: 3, ResyncPeriod: 60 * time.Second, QueueMaxRetries: 20, - ShutdownTimeout: 60 * time.Second, MinRetryDelay: 200 * time.Millisecond, MaxRetryDelay: 1000 * time.Second, RateLimit: 10, @@ -147,13 +149,6 @@ func (e *Environment) setupController() error { }, e.ClientSet.Dynamic()) - go func() { - err := dc.Run(e.context) - if err != nil { - panic(fmt.Sprintf("failed to run dynamic controller: %v", err)) - } - }() - rgReconciler := ctrlresourcegraphdefinition.NewResourceGraphDefinitionReconciler( e.ClientSet, e.ControllerConfig.AllowCRDDeletion, @@ -169,19 +164,23 @@ func (e *Environment) setupController() error { // Disable the metrics server BindAddress: "0", }, + GracefulShutdownTimeout: ptr.To(30 * time.Second), }) if err != nil { return fmt.Errorf("creating manager: %w", err) } + if err := e.CtrlManager.Add(dc); err != nil { + return fmt.Errorf("adding dynamic controller to manager: %w", err) + } + if err = rgReconciler.SetupWithManager(e.CtrlManager); err != nil { return fmt.Errorf("setting up reconciler: %w", err) } + e.ManagerResult = make(chan error, 1) go func() { - if err := e.CtrlManager.Start(e.context); err != nil { - panic(fmt.Sprintf("failed to start manager: %v", err)) - } + e.ManagerResult <- e.CtrlManager.Start(e.context) }() return nil @@ -190,7 +189,7 @@ func (e *Environment) setupController() error { func (e *Environment) Stop() error { e.cancel() time.Sleep(1 * time.Second) - return e.TestEnv.Stop() + return errors.Join(e.TestEnv.Stop(), <-e.ManagerResult) } func noopLogger() logr.Logger { diff --git a/test/integration/suites/ackekscluster/suite_test.go b/test/integration/suites/ackekscluster/suite_test.go index 21590cfc2..baad6441e 100644 --- a/test/integration/suites/ackekscluster/suite_test.go +++ b/test/integration/suites/ackekscluster/suite_test.go @@ -43,7 +43,7 @@ func TestEKSCluster(t *testing.T) { RegisterFailHandler(Fail) BeforeSuite(func() { var err error - env, err = environment.New( + env, err = environment.New(t.Context(), environment.ControllerConfig{ AllowCRDDeletion: true, ReconcileConfig: ctrlinstance.ReconcileConfig{ diff --git a/test/integration/suites/core/setup_test.go b/test/integration/suites/core/setup_test.go index a5c8522d6..11087da3a 100644 --- a/test/integration/suites/core/setup_test.go +++ b/test/integration/suites/core/setup_test.go @@ -33,7 +33,7 @@ func TestCore(t *testing.T) { RegisterFailHandler(Fail) BeforeSuite(func() { var err error - env, err = environment.New( + env, err = environment.New(t.Context(), environment.ControllerConfig{ AllowCRDDeletion: true, ReconcileConfig: ctrlinstance.ReconcileConfig{ diff --git a/test/integration/suites/deploymentservice/suite_test.go b/test/integration/suites/deploymentservice/suite_test.go index 6d5694180..df43bcf39 100644 --- a/test/integration/suites/deploymentservice/suite_test.go +++ b/test/integration/suites/deploymentservice/suite_test.go @@ -44,7 +44,7 @@ func TestDeploymentservice(t *testing.T) { RegisterFailHandler(Fail) BeforeSuite(func() { var err error - env, err = environment.New( + env, err = environment.New(t.Context(), environment.ControllerConfig{ AllowCRDDeletion: true, ReconcileConfig: ctrlinstance.ReconcileConfig{ diff --git a/test/integration/suites/networkingstack/suite_test.go b/test/integration/suites/networkingstack/suite_test.go index 1f50b31b6..4f8292fb1 100644 --- a/test/integration/suites/networkingstack/suite_test.go +++ b/test/integration/suites/networkingstack/suite_test.go @@ -42,7 +42,7 @@ func TestNetworkingStack(t *testing.T) { RegisterFailHandler(Fail) BeforeSuite(func() { var err error - env, err = environment.New( + env, err = environment.New(t.Context(), environment.ControllerConfig{ AllowCRDDeletion: true, ReconcileConfig: ctrlinstance.ReconcileConfig{