@@ -48,6 +48,7 @@ type Manager struct {
48
48
clusterpediaclient crdclientset.Interface
49
49
informerFactory externalversions.SharedInformerFactory
50
50
51
+ shardingName string
51
52
queue workqueue.RateLimitingInterface
52
53
storage storage.StorageFactory
53
54
clusterlister clusterlister.PediaClusterLister
@@ -62,14 +63,15 @@ type Manager struct {
62
63
63
64
var _ kubestatemetrics.ClusterMetricsWriterListGetter = & Manager {}
64
65
65
- func NewManager (client crdclientset.Interface , storage storage.StorageFactory , syncConfig clustersynchro.ClusterSyncConfig ) * Manager {
66
+ func NewManager (client crdclientset.Interface , storage storage.StorageFactory , syncConfig clustersynchro.ClusterSyncConfig , shardingName string ) * Manager {
66
67
factory := externalversions .NewSharedInformerFactory (client , 0 )
67
68
clusterinformer := factory .Cluster ().V1alpha2 ().PediaClusters ()
68
69
clusterSyncResourcesInformer := factory .Cluster ().V1alpha2 ().ClusterSyncResources ()
69
70
70
71
manager := & Manager {
71
72
informerFactory : factory ,
72
73
clusterpediaclient : client ,
74
+ shardingName : shardingName ,
73
75
74
76
storage : storage ,
75
77
clusterlister : clusterinformer .Lister (),
@@ -166,7 +168,9 @@ func (manager *Manager) addCluster(obj interface{}) {
166
168
func (manager * Manager ) updateCluster (older , newer interface {}) {
167
169
oldObj := older .(* clusterv1alpha2.PediaCluster )
168
170
newObj := newer .(* clusterv1alpha2.PediaCluster )
169
- if newObj .DeletionTimestamp .IsZero () && equality .Semantic .DeepEqual (oldObj .Spec , newObj .Spec ) {
171
+ if newObj .DeletionTimestamp .IsZero () &&
172
+ equality .Semantic .DeepEqual (oldObj .Spec , newObj .Spec ) &&
173
+ oldObj .Status .ShardingName == newObj .Status .ShardingName {
170
174
return
171
175
}
172
176
@@ -183,6 +187,17 @@ func (manager *Manager) enqueue(obj interface{}) {
183
187
return
184
188
}
185
189
190
+ if cluster , ok := obj .(* clusterv1alpha2.PediaCluster ); ok {
191
+ currentSharding := cluster .Status .ShardingName
192
+ if cluster .Spec .ShardingName != manager .shardingName {
193
+ if currentSharding == nil || * currentSharding != manager .shardingName {
194
+ return
195
+ }
196
+ } else if currentSharding != nil && * currentSharding != manager .shardingName {
197
+ return
198
+ }
199
+ }
200
+
186
201
manager .queue .Add (key )
187
202
}
188
203
@@ -256,6 +271,17 @@ func (manager *Manager) processNextCluster() (continued bool) {
256
271
257
272
// if err returned is not nil, cluster will be requeued
258
273
func (manager * Manager ) reconcileCluster (cluster * clusterv1alpha2.PediaCluster ) controller.Result {
274
+ if cluster .Status .ShardingName == nil && cluster .Spec .ShardingName != manager .shardingName {
275
+ return controller .NoRequeueResult
276
+ }
277
+
278
+ if cluster .Status .ShardingName != nil && * cluster .Status .ShardingName != manager .shardingName {
279
+ return controller .NoRequeueResult
280
+ }
281
+ // After the above filtering, The cluster will be in the following state:
282
+ // 1. spec.sharding == manager.shardingName and status.sharding == nil
283
+ // 2. spec.sharding == manager.shardingName and status != nil and status.sharding == manager.shardingName
284
+ // 3. spec.sharding != manager.shardingName and status != nil and status.sharding == manager.shardingName
259
285
if ! cluster .DeletionTimestamp .IsZero () {
260
286
klog .InfoS ("remove cluster" , "cluster" , cluster .Name )
261
287
if err := manager .removeCluster (cluster .Name ); err != nil {
@@ -286,6 +312,20 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster)
286
312
}
287
313
}
288
314
315
+ if cluster .Spec .ShardingName != manager .shardingName {
316
+ // status.sharding == manager.shardingName
317
+ manager .stopClusterSynchro (cluster .Name )
318
+
319
+ if err := manager .UpdateClusterShardingStatus (context .TODO (), cluster .Name , nil ); err != nil {
320
+ klog .ErrorS (err , "Failed to remove cluster shardingName status" , "cluster" , cluster .Name )
321
+ return controller .RequeueResult (defaultRetryNum )
322
+ }
323
+
324
+ return controller .NoRequeueResult
325
+ }
326
+
327
+ cluster .Status .ShardingName = & manager .shardingName
328
+
289
329
manager .synchrolock .RLock ()
290
330
synchro := manager .synchros [cluster .Name ]
291
331
manager .synchrolock .RUnlock ()
@@ -387,6 +427,11 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster)
387
427
return controller .NoRequeueResult
388
428
}
389
429
430
+ if err := manager .UpdateClusterShardingStatus (context .TODO (), cluster .Name , & manager .shardingName ); err != nil {
431
+ klog .ErrorS (err , "Failed to update cluster shardingName status" , "cluster" , cluster .Name )
432
+ return controller .RequeueResult (defaultRetryNum )
433
+ }
434
+
390
435
manager .synchroWaitGroup .StartWithChannel (manager .stopCh , synchro .Run )
391
436
392
437
manager .synchrolock .Lock ()
@@ -398,6 +443,17 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster)
398
443
return controller .NoRequeueResult
399
444
}
400
445
446
+ func (manager * Manager ) stopClusterSynchro (name string ) {
447
+ manager .synchrolock .Lock ()
448
+ synchro := manager .synchros [name ]
449
+ delete (manager .synchros , name )
450
+ manager .synchrolock .Unlock ()
451
+
452
+ if synchro != nil {
453
+ synchro .Shutdown (true )
454
+ }
455
+ }
456
+
401
457
func (manager * Manager ) removeCluster (name string ) error {
402
458
manager .synchrolock .Lock ()
403
459
synchro := manager .synchros [name ]
@@ -532,6 +588,12 @@ func (manager *Manager) updateClusterStatus(ctx context.Context, name string, up
532
588
})
533
589
}
534
590
591
+ func (manager * Manager ) UpdateClusterShardingStatus (ctx context.Context , name string , shardingName * string ) error {
592
+ return manager .updateClusterStatus (ctx , name , func (clusterStatus * clusterv1alpha2.ClusterStatus ) {
593
+ clusterStatus .ShardingName = shardingName
594
+ })
595
+ }
596
+
535
597
func buildClusterConfig (cluster * clusterv1alpha2.PediaCluster ) (* rest.Config , error ) {
536
598
if len (cluster .Spec .Kubeconfig ) != 0 {
537
599
clientconfig , err := clientcmd .NewClientConfigFromBytes (cluster .Spec .Kubeconfig )
0 commit comments