Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package main

import (
"context"
"flag"
"os"
"time"
Expand All @@ -35,7 +34,7 @@ import (
resourcegraphdefinitionctrl "github.com/kro-run/kro/pkg/controller/resourcegraphdefinition"
"github.com/kro-run/kro/pkg/dynamiccontroller"
"github.com/kro-run/kro/pkg/graph"
//+kubebuilder:scaffold:imports
// +kubebuilder:scaffold:imports
)

var (
Expand All @@ -48,7 +47,7 @@ func init() {

utilruntime.Must(xv1alpha1.AddToScheme(scheme))
utilruntime.Must(extv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
// +kubebuilder:scaffold:scheme
}

type customLevelEnabler struct {
Expand All @@ -74,9 +73,9 @@ func main() {
rateLimit int
burstLimit int
// reconciler parameters
resyncPeriod int
queueMaxRetries int
shutdownTimeout int
resyncPeriod int
queueMaxRetries int
gracefulShutdownTimeout time.Duration
// var dynamicControllerDefaultResyncPeriod int
logLevel int
qps float64
Expand All @@ -93,6 +92,8 @@ func main() {
"leader election. By default it will try to use the namespace of the service account mounted"+
" to the controller pod.")
flag.BoolVar(&allowCRDDeletion, "allow-crd-deletion", false, "allow kro to delete CRDs")
flag.DurationVar(&gracefulShutdownTimeout, "graceful-shutdown-timeout", 60*time.Second,
"maximum duration to wait for the controller manager to gracefully shutdown")
flag.IntVar(&resourceGraphDefinitionConcurrentReconciles,
"resource-graph-definition-concurrent-reconciles", 1,
"The number of resource graph definition reconciles to run in parallel",
Expand All @@ -117,8 +118,6 @@ func main() {
"interval at which the controller will re list resources even with no changes, in seconds")
flag.IntVar(&queueMaxRetries, "dynamic-controller-default-queue-max-retries", 20,
"maximum number of retries for an item in the queue will be retried before being dropped")
flag.IntVar(&shutdownTimeout, "dynamic-controller-default-shutdown-timeout", 60,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably call this out in the release notes since it's breaking, especially given we've also changed the name of the parameter in the Helm chart.

@a-hilaly do we have a good process to flag something that should be highlighted in the release notes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we would use something like labels and conventional commits, we could make it so that something like fix!: myfix would mark the breaking change with a ! (see https://www.conventionalcommits.org/en/v1.0.0/#summary). but idk if thats too restrictive. apart from that it would be good to mark it somehow manually with a label and then be able to generate release notes from that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@a-hilaly do we have a good process to flag something that should be highlighted in the release notes?

Not really, so far we manually flagged things. We do use https://github.com/softprops/action-gh-release for github releases and at first glance looks like they parse conventional commits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I marked it right now and will pay attention on the next release, if the notes get generated correctly, all good, then it should just become part of the contributor guide

"maximum duration to wait for the controller to gracefully shutdown, in seconds")
// log level flags
flag.IntVar(&logLevel, "log-level", 10, "The log level verbosity. 0 is the least verbose, 5 is the most verbose.")
// qps and burst
Expand Down Expand Up @@ -152,6 +151,7 @@ func main() {
Metrics: metricsserver.Options{
BindAddress: metricsAddr,
},
GracefulShutdownTimeout: &gracefulShutdownTimeout,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "6f0f64a5.kro.run",
Expand All @@ -176,7 +176,6 @@ func main() {

dc := dynamiccontroller.NewDynamicController(rootLogger, dynamiccontroller.Config{
Workers: dynamicControllerConcurrentReconciles,
ShutdownTimeout: time.Duration(shutdownTimeout) * time.Second,
ResyncPeriod: time.Duration(resyncPeriod) * time.Second,
QueueMaxRetries: queueMaxRetries,
MinRetryDelay: minRetryDelay,
Expand Down Expand Up @@ -205,14 +204,12 @@ func main() {
os.Exit(1)
}

go func() {
err := dc.Run(context.Background())
if err != nil {
setupLog.Error(err, "dynamic controller failed to run")
}
}()
if err := mgr.Add(dc); err != nil {
setupLog.Error(err, "unable to add dynamic controller to manager")
os.Exit(1)
}

//+kubebuilder:scaffold:builder
// +kubebuilder:scaffold:builder

if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
Expand All @@ -224,15 +221,8 @@ func main() {
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()

go func() {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}()

<-ctx.Done()

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
8 changes: 4 additions & 4 deletions helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ spec:
value: {{ .Values.config.dynamicControllerDefaultResyncPeriod | quote }}
- name: KRO_DYNAMIC_CONTROLLER_DEFAULT_QUEUE_MAX_RETRIES
value: {{ .Values.config.dynamicControllerDefaultQueueMaxRetries | quote }}
- name: KRO_DYNAMIC_CONTROLLER_DEFAULT_SHUTDOWN_TIMEOUT
value: {{ .Values.config.dynamicControllerDefaultShutdownTimeout | quote }}
- name: KRO_GRACEFUL_SHUTDOWN_TIMEOUT
value: {{ .Values.config.gracefulShutdownTimeout | quote }}
- name: KRO_CLIENT_QPS
value: {{ .Values.config.clientQps | quote }}
- name: KRO_CLIENT_BURST
Expand All @@ -79,12 +79,12 @@ spec:
- "$(KRO_DYNAMIC_CONTROLLER_CONCURRENT_RECONCILES)"
- --log-level
- "$(KRO_LOG_LEVEL)"
- --graceful-shutdown-timeout
- "$(KRO_GRACEFUL_SHUTDOWN_TIMEOUT)"
- --dynamic-controller-default-resync-period
- "$(KRO_DYNAMIC_CONTROLLER_DEFAULT_RESYNC_PERIOD)"
- --dynamic-controller-default-queue-max-retries
- "$(KRO_DYNAMIC_CONTROLLER_DEFAULT_QUEUE_MAX_RETRIES)"
- --dynamic-controller-default-shutdown-timeout
- "$(KRO_DYNAMIC_CONTROLLER_DEFAULT_SHUTDOWN_TIMEOUT)"
- --client-qps
- "$(KRO_CLIENT_QPS)"
- --client-burst
Expand Down
4 changes: 2 additions & 2 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ config:
healthProbeBindAddress: :8079
# The number of resource graph definition reconciles to run in parallel
resourceGraphDefinitionConcurrentReconciles: 1
# The maximum duration to wait for the controller manager to gracefully shutdown
gracefulShutdownTimeout: 60s
# The number of dynamic controller reconciles to run in parallel
dynamicControllerConcurrentReconciles: 1
# The interval at which the controller will re list resources even with no changes, in seconds
dynamicControllerDefaultResyncPeriod: 36000
# The maximum number of retries for an item in the queue will be retried before being dropped
dynamicControllerDefaultQueueMaxRetries: 20
# The maximum duration to wait for the controller to gracefully shutdown, in seconds
dynamicControllerDefaultShutdownTimeout: 60
# The log level verbosity. 0 is the least verbose, 5 is the most verbose
logLevel: 3

Expand Down
46 changes: 32 additions & 14 deletions pkg/dynamiccontroller/dynamic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ type Config struct {
// NOTE(a-hilaly): I'm not very sure how useful is this, i'm trying to avoid
// situations where reconcile errors exhaust the queue.
QueueMaxRetries int
// ShutdownTimeout is the maximum duration to wait for the controller to
// gracefully shutdown. We ideally want to avoid forceful shutdowns, giving
// the controller enough time to finish processing any pending items.
ShutdownTimeout time.Duration
// MinRetryDelay is the minimum delay before retrying an item in the queue
MinRetryDelay time.Duration
// MaxRetryDelay is the maximum delay before retrying an item in the queue
Expand Down Expand Up @@ -209,7 +205,7 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool {
}

// Run starts the DynamicController.
func (dc *DynamicController) Run(ctx context.Context) error {
func (dc *DynamicController) Start(ctx context.Context) error {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()

Expand All @@ -224,17 +220,40 @@ func (dc *DynamicController) Run(ctx context.Context) error {
// Spin up workers.
//
// TODO(a-hilaly): Allow for dynamic scaling of workers.
var wg sync.WaitGroup
for i := 0; i < dc.config.Workers; i++ {
go wait.UntilWithContext(ctx, dc.worker, time.Second)
wg.Add(1)
go func() {
defer wg.Done()
wait.UntilWithContext(ctx, dc.worker, time.Second)
}()
}
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
dc.log.Info("Received shutdown signal, shutting down dynamic controller queue")
dc.queue.ShutDown()
}()

wg.Wait()
dc.log.Info("All workers have stopped")

<-ctx.Done()
return dc.gracefulShutdown(dc.config.ShutdownTimeout)
// when shutting down, the context given to Start is already closed,
// and the expectation is that we block until the graceful shutdown is complete.
return dc.shutdown(context.Background())
}

// worker processes items from the queue.
func (dc *DynamicController) worker(ctx context.Context) {
for dc.processNextWorkItem(ctx) {
for {
select {
case <-ctx.Done():
dc.log.Info("Dynamic controller worker received shutdown signal, stopping")
return
default:
dc.processNextWorkItem(ctx)
}
}
}

Expand Down Expand Up @@ -321,15 +340,14 @@ func (dc *DynamicController) syncFunc(ctx context.Context, oi ObjectIdentifiers)
return err
}

// gracefulShutdown performs a graceful shutdown of the controller.
func (dc *DynamicController) gracefulShutdown(timeout time.Duration) error {
// shutdown performs a graceful shutdown of the controller.
func (dc *DynamicController) shutdown(ctx context.Context) error {
dc.log.Info("Starting graceful shutdown")

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

var wg sync.WaitGroup
dc.informers.Range(func(key, value interface{}) bool {
k := key.(schema.GroupVersionResource)
dc.log.V(1).Info("Shutting down informer", "gvr", k.String())
wg.Add(1)
go func(informer *informerWrapper) {
defer wg.Done()
Expand Down
6 changes: 2 additions & 4 deletions pkg/dynamiccontroller/dynamic_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func TestNewDynamicController(t *testing.T) {
Workers: 2,
ResyncPeriod: 10 * time.Hour,
QueueMaxRetries: 20,
ShutdownTimeout: 60 * time.Second,
MinRetryDelay: 200 * time.Millisecond,
MaxRetryDelay: 1000 * time.Second,
RateLimit: 10,
Expand All @@ -90,7 +89,6 @@ func TestRegisterAndUnregisterGVK(t *testing.T) {
Workers: 1,
ResyncPeriod: 1 * time.Second,
QueueMaxRetries: 5,
ShutdownTimeout: 5 * time.Second,
MinRetryDelay: 200 * time.Millisecond,
MaxRetryDelay: 1000 * time.Second,
RateLimit: 10,
Expand All @@ -102,12 +100,12 @@ func TestRegisterAndUnregisterGVK(t *testing.T) {
gvr := schema.GroupVersionResource{Group: "test", Version: "v1", Resource: "tests"}

// Create a context with cancel for running the controller
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

// Start the controller in a goroutine
go func() {
err := dc.Run(ctx)
err := dc.Start(ctx)
require.NoError(t, err)
}()

Expand Down
27 changes: 13 additions & 14 deletions test/integration/environment/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package environment

import (
"context"
"errors"
"fmt"
"io"
"path/filepath"
"time"

"github.com/go-logr/logr"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
Expand All @@ -49,21 +51,22 @@ type Environment struct {
ClientSet *kroclient.Set
CRDManager kroclient.CRDClient
GraphBuilder *graph.Builder
ManagerResult chan error
}

type ControllerConfig struct {
AllowCRDDeletion bool
ReconcileConfig ctrlinstance.ReconcileConfig
}

func New(controllerConfig ControllerConfig) (*Environment, error) {
func New(ctx context.Context, controllerConfig ControllerConfig) (*Environment, error) {
env := &Environment{
ControllerConfig: controllerConfig,
}

// Setup logging
logf.SetLogger(zap.New(zap.WriteTo(io.Discard), zap.UseDevMode(true)))
env.context, env.cancel = context.WithCancel(context.Background())
env.context, env.cancel = context.WithCancel(ctx)

env.TestEnv = &envtest.Environment{
CRDDirectoryPaths: []string{
Expand Down Expand Up @@ -139,21 +142,13 @@ func (e *Environment) setupController() error {
Workers: 3,
ResyncPeriod: 60 * time.Second,
QueueMaxRetries: 20,
ShutdownTimeout: 60 * time.Second,
MinRetryDelay: 200 * time.Millisecond,
MaxRetryDelay: 1000 * time.Second,
RateLimit: 10,
BurstLimit: 100,
},
e.ClientSet.Dynamic())

go func() {
err := dc.Run(e.context)
if err != nil {
panic(fmt.Sprintf("failed to run dynamic controller: %v", err))
}
}()

rgReconciler := ctrlresourcegraphdefinition.NewResourceGraphDefinitionReconciler(
e.ClientSet,
e.ControllerConfig.AllowCRDDeletion,
Expand All @@ -169,19 +164,23 @@ func (e *Environment) setupController() error {
// Disable the metrics server
BindAddress: "0",
},
GracefulShutdownTimeout: ptr.To(30 * time.Second),
})
if err != nil {
return fmt.Errorf("creating manager: %w", err)
}

if err := e.CtrlManager.Add(dc); err != nil {
return fmt.Errorf("adding dynamic controller to manager: %w", err)
}

if err = rgReconciler.SetupWithManager(e.CtrlManager); err != nil {
return fmt.Errorf("setting up reconciler: %w", err)
}

e.ManagerResult = make(chan error, 1)
go func() {
if err := e.CtrlManager.Start(e.context); err != nil {
panic(fmt.Sprintf("failed to start manager: %v", err))
}
e.ManagerResult <- e.CtrlManager.Start(e.context)
}()

return nil
Expand All @@ -190,7 +189,7 @@ func (e *Environment) setupController() error {
func (e *Environment) Stop() error {
e.cancel()
time.Sleep(1 * time.Second)
return e.TestEnv.Stop()
return errors.Join(e.TestEnv.Stop(), <-e.ManagerResult)
}

func noopLogger() logr.Logger {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/suites/ackekscluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestEKSCluster(t *testing.T) {
RegisterFailHandler(Fail)
BeforeSuite(func() {
var err error
env, err = environment.New(
env, err = environment.New(t.Context(),
environment.ControllerConfig{
AllowCRDDeletion: true,
ReconcileConfig: ctrlinstance.ReconcileConfig{
Expand Down
2 changes: 1 addition & 1 deletion test/integration/suites/core/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestCore(t *testing.T) {
RegisterFailHandler(Fail)
BeforeSuite(func() {
var err error
env, err = environment.New(
env, err = environment.New(t.Context(),
environment.ControllerConfig{
AllowCRDDeletion: true,
ReconcileConfig: ctrlinstance.ReconcileConfig{
Expand Down
2 changes: 1 addition & 1 deletion test/integration/suites/deploymentservice/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestDeploymentservice(t *testing.T) {
RegisterFailHandler(Fail)
BeforeSuite(func() {
var err error
env, err = environment.New(
env, err = environment.New(t.Context(),
environment.ControllerConfig{
AllowCRDDeletion: true,
ReconcileConfig: ctrlinstance.ReconcileConfig{
Expand Down
Loading