@@ -174,7 +174,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
174
174
logger .Info ("Replication handle" , "ReplicationHandleName" , replicationHandle )
175
175
}
176
176
177
- replicationClient , err := r .getReplicationClient (vrcObj .Spec .Provisioner )
177
+ replicationClient , err := r .getReplicationClient (ctx , vrcObj .Spec .Provisioner )
178
178
if err != nil {
179
179
logger .Error (err , "Failed to get ReplicationClient" )
180
180
@@ -453,27 +453,25 @@ func getInfoReconcileInterval(parameters map[string]string, logger logr.Logger)
453
453
return scheduleTime / 2
454
454
}
455
455
456
- func (r * VolumeReplicationReconciler ) getReplicationClient (driverName string ) (grpcClient.VolumeReplication , error ) {
457
- conns := r .Connpool .GetByNodeID (driverName , "" )
458
-
459
- // Iterate through the connections and find the one that matches the driver name
460
- // provided in the VolumeReplication spec; so that corresponding
461
- // operations can be performed.
462
- for _ , v := range conns {
463
- for _ , cap := range v .Capabilities {
464
- // validate if VOLUME_REPLICATION capability is supported by the driver.
465
- if cap .GetVolumeReplication () == nil {
466
- continue
467
- }
456
+ func (r * VolumeReplicationReconciler ) getReplicationClient (ctx context.Context , driverName string ) (grpcClient.VolumeReplication , error ) {
457
+ conn , err := r .Connpool .GetLeaderByDriver (ctx , r .Client , driverName )
458
+ if err != nil {
459
+ return nil , fmt .Errorf ("no leader for the ControllerService of driver %q" , driverName )
460
+ }
468
461
469
- // validate of VOLUME_REPLICATION capability is enabled by the storage driver.
470
- if cap .GetVolumeReplication ().GetType () == identity .Capability_VolumeReplication_VOLUME_REPLICATION {
471
- return grpcClient .NewReplicationClient (v .Client , r .Timeout ), nil
472
- }
462
+ for _ , cap := range conn .Capabilities {
463
+ // validate if VOLUME_REPLICATION capability is supported by the driver.
464
+ if cap .GetVolumeReplication () == nil {
465
+ continue
466
+ }
467
+
468
+ // validate of VOLUME_REPLICATION capability is enabled by the storage driver.
469
+ if cap .GetVolumeReplication ().GetType () == identity .Capability_VolumeReplication_VOLUME_REPLICATION {
470
+ return grpcClient .NewReplicationClient (conn .Client , r .Timeout ), nil
473
471
}
474
472
}
475
473
476
- return nil , fmt .Errorf ("no connections for driver: %s" , driverName )
474
+ return nil , fmt .Errorf ("leading CSIAddonsNode %q for driver %q does not support VolumeReplication" , conn . Name , driverName )
477
475
478
476
}
479
477
0 commit comments