diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 47b519e..1902a20 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -24,10 +24,9 @@ import ( "github.com/go-logr/logr" - mchandler "github.com/multicluster-runtime/multicluster-runtime/pkg/handler" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -35,15 +34,18 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" mccontroller "github.com/multicluster-runtime/multicluster-runtime/pkg/controller" + mchandler "github.com/multicluster-runtime/multicluster-runtime/pkg/handler" mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager" mcreconcile "github.com/multicluster-runtime/multicluster-runtime/pkg/reconcile" mcsource "github.com/multicluster-runtime/multicluster-runtime/pkg/source" + mcworkqueue "github.com/multicluster-runtime/multicluster-runtime/pkg/workqueue" ) // project represents other forms that we can use to @@ -465,6 +467,23 @@ func (blder *TypedBuilder[request]) doController(r reconcile.TypedReconciler[req if ctrlOptions.Reconciler == nil { ctrlOptions.Reconciler = r } + if ctrlOptions.NewQueue == nil { + ctrlOptions.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { + if ptr.Deref(blder.mgr.GetControllerOptions().UsePriorityQueue, false) { + return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { + o.Log = blder.mgr.GetLogger().WithValues("controller", controllerName) + o.RateLimiter = rateLimiter + }) + } + return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{ + Name: controllerName, + DelayingQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[request]{ + Name: controllerName, + Queue: mcworkqueue.NewClusterFair[request](), + }), + }) + } + } // Retrieve the GVK from the object we're reconciling // to pre-populate logger information, and to optionally generate a default name. diff --git a/pkg/workqueue/cluster.go b/pkg/workqueue/cluster.go new file mode 100644 index 0000000..353e32f --- /dev/null +++ b/pkg/workqueue/cluster.go @@ -0,0 +1,32 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + mcreconcile "github.com/multicluster-runtime/multicluster-runtime/pkg/reconcile" +) + +// ClusterFair is a queue that ensures items are dequeued fairly across different +// clusters of cluster-aware requests. +type ClusterFair[request mcreconcile.ClusterAware[request]] TypedFair[request] + +// NewClusterFair creates a new ClusterFair instance. +func NewClusterFair[request mcreconcile.ClusterAware[request]]() *TypedFair[request] { + return NewTypedFair[request](func(r request) string { + return r.Cluster() + }) +} diff --git a/pkg/workqueue/fair.go b/pkg/workqueue/fair.go new file mode 100644 index 0000000..3e8848e --- /dev/null +++ b/pkg/workqueue/fair.go @@ -0,0 +1,185 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "container/list" + "sync" + + "k8s.io/client-go/util/workqueue" +) + +var _ workqueue.TypedInterface[any] = &TypedFair[any]{} + +// TypedFair is a queue that ensures items are dequeued fairly across different +// fairness keys while maintaining FIFO order within each key. +type TypedFair[T comparable] struct { + mu sync.Mutex + entries map[string]*queueEntry[T] + activeList *list.List + keyFunc FairnessKeyFunc[T] +} + +// Fair is a queue that ensures items are dequeued fairly across different +// fairness keys while maintaining FIFO order within each key. +type Fair TypedFair[any] + +// FairnessKeyFunc is a function that returns a string key for a given item. +// Items with different keys are dequeued fairly. +type FairnessKeyFunc[T comparable] func(T) string + +// NewFair creates a new Fair instance. +func NewFair(keyFunc FairnessKeyFunc[any]) *Fair { + return (*Fair)(NewTypedFair[any](keyFunc)) +} + +// NewTypedFair creates a new TypedFair instance. +func NewTypedFair[T comparable](keyFunc FairnessKeyFunc[T]) *TypedFair[T] { + return &TypedFair[T]{ + entries: make(map[string]*queueEntry[T]), + activeList: list.New(), + keyFunc: keyFunc, + } +} + +type queueEntry[T comparable] struct { + queue workqueue.TypedInterface[T] + activeElem *list.Element // Reference to the element in activeList +} + +// Add inserts an item into the queue under its fairness key. +func (q *TypedFair[T]) Add(item T) { + key := q.keyFunc(item) + + q.mu.Lock() + defer q.mu.Unlock() + + entry, exists := q.entries[key] + if !exists { + entry = &queueEntry[T]{ + queue: workqueue.NewTyped[T](), + } + q.entries[key] = entry + } + + entry.queue.Add(item) + + // If the queue was previously empty, add to activeList + if entry.queue.Len() == 1 && entry.activeElem == nil { + entry.activeElem = q.activeList.PushBack(key) + } +} + +// Get retrieves the next item from the queue, ensuring fairness across keys. +func (q *TypedFair[T]) Get() (item T, shutdown bool) { + q.mu.Lock() + defer q.mu.Unlock() + + var elem *list.Element + for elem = q.activeList.Front(); elem != nil; elem = elem.Next() { + key := elem.Value.(string) + entry := q.entries[key] + + if entry.queue.Len() == 0 { + q.activeList.Remove(elem) + entry.activeElem = nil + continue + } + + item, shutdown = entry.queue.Get() + if shutdown { + continue + } + + // Check if the queue is now empty and update activeList + if entry.queue.Len() == 0 { + q.activeList.Remove(elem) + entry.activeElem = nil + } else { + // Move to back to maintain round-robin order + q.activeList.MoveToBack(elem) + } + + return item, false + } + + var zero T + return zero, true +} + +// Done marks the processing of an item as complete. +func (q *TypedFair[T]) Done(item T) { + key := q.keyFunc(item) + + q.mu.Lock() + defer q.mu.Unlock() + + if entry, exists := q.entries[key]; exists { + entry.queue.Done(item) + } +} + +// Len returns the total number of items across all keys. +func (q *TypedFair[T]) Len() int { + q.mu.Lock() + defer q.mu.Unlock() + + total := 0 + for _, entry := range q.entries { + total += entry.queue.Len() + } + return total +} + +// ShutDown terminates the queue and all sub-queues. +func (q *TypedFair[T]) ShutDown() { + q.mu.Lock() + defer q.mu.Unlock() + + for _, entry := range q.entries { + entry.queue.ShutDown() + } +} + +// ShuttingDown checks if all sub-queues are shutting down. +func (q *TypedFair[T]) ShuttingDown() bool { + q.mu.Lock() + defer q.mu.Unlock() + + for _, entry := range q.entries { + if !entry.queue.ShuttingDown() { + return false + } + } + return true +} + +// ShutDownWithDrain terminates the queue and all sub-queues, draining all. +func (q *TypedFair[T]) ShutDownWithDrain() { + q.mu.Lock() + defer q.mu.Unlock() + + var wg sync.WaitGroup + for _, entry := range q.entries { + wg.Add(1) + go func(entry *queueEntry[T]) { + defer wg.Done() + entry.queue.ShutDownWithDrain() + }(entry) + } + wg.Wait() +} diff --git a/providers/kind/provider.go b/providers/kind/provider.go index 5ef34df..6ccc365 100644 --- a/providers/kind/provider.go +++ b/providers/kind/provider.go @@ -24,6 +24,7 @@ import ( "time" "github.com/go-logr/logr" + mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager" "github.com/multicluster-runtime/multicluster-runtime/pkg/multicluster" "k8s.io/apimachinery/pkg/util/sets" diff --git a/providers/namespace/provider.go b/providers/namespace/provider.go index b3ef306..dde94a3 100644 --- a/providers/namespace/provider.go +++ b/providers/namespace/provider.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/go-logr/logr" + mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager" corev1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime"