diff --git a/deploy/kubernetes/storageclass.yaml b/deploy/kubernetes/storageclass.yaml index b377741..d816d8a 100644 --- a/deploy/kubernetes/storageclass.yaml +++ b/deploy/kubernetes/storageclass.yaml @@ -20,6 +20,7 @@ parameters: distr_ndcs: "1" distr_npcs: "1" lvol_priority_class: "0" + cluster_id: "aaaabbbbcccc" reclaimPolicy: Delete volumeBindingMode: Immediate allowVolumeExpansion: true diff --git a/pkg/spdk/controllerserver.go b/pkg/spdk/controllerserver.go index 60fc909..5a3d403 100644 --- a/pkg/spdk/controllerserver.go +++ b/pkg/spdk/controllerserver.go @@ -47,12 +47,17 @@ const ( type controllerServer struct { *csicommon.DefaultControllerServer volumeLocks *util.VolumeLocks - spdkNode *util.NodeNVMf } type spdkVolume struct { - lvolID string - poolName string + clusterID string + lvolID string + poolName string +} + +type spdkSnapshot struct { + clusterID string + snapshotID string } func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { @@ -60,13 +65,24 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol unlock := cs.volumeLocks.Lock(volumeID) defer unlock() - csiVolume, err := cs.createVolume(ctx, req) + clusterID, ok := req.GetParameters()["cluster_id"] + if !ok { + klog.Errorf("failed to get cluster_id from parameters") + return nil, status.Error(codes.Internal, "failed to get cluster_id from parameters") + } + + sbClient, err := util.NewsimplyBlockClient(clusterID) + if err != nil { + return nil, err + } + + csiVolume, err := cs.createVolume(ctx, req, sbClient) if err != nil { klog.Errorf("failed to create volume, volumeID: %s err: %v", volumeID, err) return nil, status.Error(codes.Internal, err.Error()) } - volumeInfo, err := cs.publishVolume(csiVolume.GetVolumeId()) + volumeInfo, err := cs.publishVolume(csiVolume.GetVolumeId(), sbClient) if err != nil { klog.Errorf("failed to publish volume, volumeID: %s err: %v", volumeID, err) cs.deleteVolume(csiVolume.GetVolumeId()) //nolint:errcheck // we can do little @@ -154,14 +170,20 @@ func (cs *controllerServer) CreateSnapshot(_ context.Context, req *csi.CreateSna klog.Errorf("failed to get spdk volume, volumeID: %s err: %v", volumeID, err) return nil, err } - snapshotID, err := cs.spdkNode.CreateSnapshot(spdkVol.lvolID, snapshotName) + sbclient, err := util.NewsimplyBlockClient(spdkVol.clusterID) + if err != nil { + klog.Errorf("failed to create spdk client: %v", err) + return nil, status.Error(codes.Internal, err.Error()) + } + + snapshotID, err := sbclient.CreateSnapshot(spdkVol.lvolID, snapshotName) klog.Infof("CreateSnapshot : snapshotID=%s", snapshotID) if err != nil { klog.Errorf("failed to create snapshot, volumeID: %s snapshotName: %s err: %v", volumeID, snapshotName, err) return nil, status.Error(codes.Internal, err.Error()) } - volSize, err := cs.spdkNode.GetVolumeSize(spdkVol.lvolID) + volSize, err := sbclient.GetVolumeSize(spdkVol.lvolID) klog.Infof("CreateSnapshot : volSize=%s", volSize) if err != nil { klog.Errorf("failed to get volume info, volumeID: %s err: %v", volumeID, err) @@ -188,13 +210,24 @@ func (cs *controllerServer) CreateSnapshot(_ context.Context, req *csi.CreateSna func (cs *controllerServer) DeleteSnapshot(_ context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { snapshotID := req.GetSnapshotId() + snapshot, err := getSnapshot(snapshotID) + if err != nil { + klog.Errorf("failed to get spdk snapshot, snapshotID: %s err: %v", snapshotID, err) + return nil, err + } + sbclient, err := util.NewsimplyBlockClient(snapshot.clusterID) + if err != nil { + klog.Errorf("failed to create spdk client: %v", err) + return nil, status.Error(codes.Internal, err.Error()) + } + klog.Infof("snapshotID=%s", snapshotID) unlock := cs.volumeLocks.Lock(snapshotID) defer unlock() klog.Infof("Deleting Snapshot : snapshotID=%s", snapshotID) - err := cs.spdkNode.DeleteSnapshot(snapshotID) + err = sbclient.DeleteSnapshot(snapshotID) if err != nil { klog.Errorf("failed to delete snapshot, snapshotID: %s err: %v", snapshotID, err) return nil, status.Error(codes.Internal, err.Error()) @@ -294,8 +327,8 @@ func prepareCreateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest, s return &createVolReq, nil } -func (cs *controllerServer) getExistingVolume(name, poolName string, vol *csi.Volume) (*csi.Volume, error) { - volumeID, err := cs.spdkNode.GetVolume(name, poolName) +func (cs *controllerServer) getExistingVolume(name, poolName string, sbclient *util.NodeNVMf, vol *csi.Volume) (*csi.Volume, error) { + volumeID, err := sbclient.GetVolume(name, poolName) if err == nil { vol.VolumeId = fmt.Sprintf("%s:%s", poolName, volumeID) klog.V(5).Info("volume already exists", vol.GetVolumeId()) @@ -304,7 +337,7 @@ func (cs *controllerServer) getExistingVolume(name, poolName string, vol *csi.Vo return nil, err } -func (cs *controllerServer) createVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.Volume, error) { +func (cs *controllerServer) createVolume(ctx context.Context, req *csi.CreateVolumeRequest, sbclient *util.NodeNVMf) (*csi.Volume, error) { size := req.GetCapacityRange().GetRequiredBytes() if size == 0 { klog.Warningln("invalid volume size, resize to 1G") @@ -319,7 +352,7 @@ func (cs *controllerServer) createVolume(ctx context.Context, req *csi.CreateVol klog.V(5).Info("provisioning volume from SDK node..") poolName := req.GetParameters()["pool_name"] - existingVolume, err := cs.getExistingVolume(req.GetName(), poolName, &vol) + existingVolume, err := cs.getExistingVolume(req.GetName(), poolName, sbclient, &vol) if err == nil { return existingVolume, nil } @@ -339,44 +372,62 @@ func (cs *controllerServer) createVolume(ctx context.Context, req *csi.CreateVol return nil, err } - volumeID, err := cs.spdkNode.CreateVolume(createVolReq) + volumeID, err := sbclient.CreateVolume(createVolReq) if err != nil { klog.Errorf("error creating simplyBlock volume: %v", err) return nil, err } - vol.VolumeId = fmt.Sprintf("%s:%s", poolName, volumeID) + vol.VolumeId = fmt.Sprintf("%s:%s:%s", sbclient.Client.ClusterID, poolName, volumeID) klog.V(5).Info("successfully created volume from Simplyblock with Volume ID: ", vol.GetVolumeId()) return &vol, nil } func getSPDKVol(csiVolumeID string) (*spdkVolume, error) { - // extract spdkNodeName and spdkLvolID from csiVolumeID - // csiVolumeID: node001:8e2dcb9d-3a79-4362-965e-fdb0cd3f4b8d + // extract clusterID, poolID and spdkLvolID from csiVolumeID + // csiVolumeID: 8ffac363-0c46-4714-a71b-f9c0b58a1269:pool01:8e2dcb9d-3a79-4362-965e-fdb0cd3f4b8d + // ClusterID: 8ffac363-0c46-4714-a71b-f9c0b58a1269 // spdkNodeName: node001 // spdklvolID: 8e2dcb9d-3a79-4362-965e-fdb0cd3f4b8d ids := strings.Split(csiVolumeID, ":") - if len(ids) == 2 { + if len(ids) == 3 { return &spdkVolume{ - poolName: ids[0], - lvolID: ids[1], + clusterID: ids[0], + poolName: ids[1], + lvolID: ids[2], + }, nil + } + return nil, fmt.Errorf("missing clusterID or poolName in volume: %s", csiVolumeID) +} + +func getSnapshot(csiSnapshotID string) (*spdkSnapshot, error) { + // extract clusterID and snapshotID from csiSnapshotID + // csiVolumeID: 8ffac363-0c46-4714-a71b-f9c0b58a1269:8e2dcb9d-3a79-4362-965e-fdb0cd3f4b8d + // ClusterID: 8ffac363-0c46-4714-a71b-f9c0b58a1269 + // snapshotID: 8e2dcb9d-3a79-4362-965e-fdb0cd3f4b8d + + ids := strings.Split(csiSnapshotID, ":") + if len(ids) == 2 { + return &spdkSnapshot{ + clusterID: ids[0], + snapshotID: ids[2], }, nil } - return nil, fmt.Errorf("missing poolName in volume: %s", csiVolumeID) + return nil, fmt.Errorf("missing clusterID in csiSnapshotID: %s", csiSnapshotID) } -func (cs *controllerServer) publishVolume(volumeID string) (map[string]string, error) { +func (cs *controllerServer) publishVolume(volumeID string, sbclient *util.NodeNVMf) (map[string]string, error) { spdkVol, err := getSPDKVol(volumeID) if err != nil { return nil, err } - err = cs.spdkNode.PublishVolume(spdkVol.lvolID) + err = sbclient.PublishVolume(spdkVol.lvolID) if err != nil { return nil, err } - volumeInfo, err := cs.spdkNode.VolumeInfo(spdkVol.lvolID) + volumeInfo, err := sbclient.VolumeInfo(spdkVol.lvolID) if err != nil { cs.unpublishVolume(volumeID) //nolint:errcheck // we can do little return nil, err @@ -389,7 +440,11 @@ func (cs *controllerServer) deleteVolume(volumeID string) error { if err != nil { return err } - return cs.spdkNode.DeleteVolume(spdkVol.lvolID) + sbclient, err := util.NewsimplyBlockClient(spdkVol.clusterID) + if err != nil { + return err + } + return sbclient.DeleteVolume(spdkVol.lvolID) } func (cs *controllerServer) unpublishVolume(volumeID string) error { @@ -397,7 +452,11 @@ func (cs *controllerServer) unpublishVolume(volumeID string) error { if err != nil { return err } - return cs.spdkNode.UnpublishVolume(spdkVol.lvolID) + sbclient, err := util.NewsimplyBlockClient(spdkVol.clusterID) + if err != nil { + return err + } + return sbclient.UnpublishVolume(spdkVol.lvolID) } func (cs *controllerServer) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { @@ -407,7 +466,13 @@ func (cs *controllerServer) ControllerExpandVolume(_ context.Context, req *csi.C if err != nil { return nil, err } - _, err = cs.spdkNode.ResizeVolume(spdkVol.lvolID, updatedSize) + + sbclient, err := util.NewsimplyBlockClient(spdkVol.clusterID) + if err != nil { + return nil, err + } + + _, err = sbclient.ResizeVolume(spdkVol.lvolID, updatedSize) if err != nil { klog.Errorf("failed to resize lvol, LVolID: %s err: %v", spdkVol.lvolID, err) return nil, err @@ -418,11 +483,29 @@ func (cs *controllerServer) ControllerExpandVolume(_ context.Context, req *csi.C }, nil } +// ListSnapshots lists all snapshots accross all clusters func (cs *controllerServer) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { - entries, err := cs.spdkNode.ListSnapshots() + + var entries []*util.SnapshotResp + clusters, err := ListClusters() if err != nil { return nil, err } + + for _, clusterID := range clusters { + sbclient, err := util.NewsimplyBlockClient(clusterID) + if err != nil { + klog.Errorf("failed to create spdk client: %v", err) + return nil, status.Error(codes.Internal, err.Error()) + } + + snapshotEntries, err := sbclient.ListSnapshots() + if err != nil { + return nil, err + } + entries = append(entries, snapshotEntries...) + } + var vca []*csi.ListSnapshotsResponse_Entry for _, entry := range entries { dt, err := strconv.ParseInt(entry.CreatedAt, 10, 64) @@ -451,33 +534,18 @@ func (cs *controllerServer) ListSnapshots(_ context.Context, _ *csi.ListSnapshot }, nil } -func NewsimplyBlockClient() (*util.NodeNVMf, error) { - // get spdk node configs, see deploy/kubernetes/config-map.yaml - var config struct { - Simplybk struct { - UUID string `json:"uuid"` - IP string `json:"ip"` - } `json:"simplybk"` - } - configFile := util.FromEnv("SPDKCSI_CONFIG", "/etc/spdkcsi-config/config.json") - err := util.ParseJSONFile(configFile, &config) - if err != nil { - return nil, err - } - - var secret struct { - Simplybk struct { - Secret string `json:"secret"` - } `json:"simplybk"` - } +func ListClusters() (clusters []string, err error) { + var secrets map[string]string secretFile := util.FromEnv("SPDKCSI_SECRET", "/etc/spdkcsi-secret/secret.json") - err = util.ParseJSONFile(secretFile, &secret) + err = util.ParseJSONFile(secretFile, &secrets) if err != nil { - return nil, err + klog.Errorf("failed to parse secret file: %v", err) + return } - klog.Infof("spdk node created: url=%s", config.Simplybk.IP) - - return util.NewNVMf(config.Simplybk.UUID, config.Simplybk.IP, secret.Simplybk.Secret), nil + for clusterID := range secrets { + clusters = append(clusters, clusterID) + } + return } // func (cs *controllerServer) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { @@ -524,7 +592,13 @@ func (cs *controllerServer) ControllerGetVolume(_ context.Context, req *csi.Cont return nil, err } - volumeInfo, err := cs.spdkNode.VolumeInfo(spdkVol.lvolID) + sbclient, err := util.NewsimplyBlockClient(spdkVol.clusterID) + if err != nil { + klog.Errorf("failed to create spdk client: %v", err) + return nil, status.Error(codes.Internal, err.Error()) + } + + volumeInfo, err := sbclient.VolumeInfo(spdkVol.lvolID) if err != nil { klog.Errorf("failed to get spdkVol for %s: %v", volumeID, err) @@ -562,44 +636,7 @@ func newControllerServer(d *csicommon.CSIDriver) (*controllerServer, error) { DefaultControllerServer: csicommon.NewDefaultControllerServer(d), volumeLocks: util.NewVolumeLocks(), } - - spdkNode, err := NewsimplyBlockClient() - if err != nil { - klog.Errorf("failed to create spdk node %v", err.Error()) - return nil, errors.New("no valid spdk node found") - } - - server.spdkNode = spdkNode return &server, nil - - // create spdk nodes - // for i := range config.Nodes { - // node := &config.Nodes[i] - // tokenFound := false - // // find secret per node - // for j := range secret.Tokens { - // token := &secret.Tokens[j] - // if token.Name == node.Name { - // tokenFound = true - // spdkNode, err := util.NewSpdkNode(node.URL, token.UserName, token.Password, node.TargetType, node.TargetAddr) - // if err != nil { - // klog.Errorf("failed to create spdk node %s: %s", node.Name, err.Error()) - // } else { - // klog.Infof("spdk node created: name=%s, url=%s", node.Name, node.URL) - // server.spdkNodes[node.Name] = spdkNode - // } - // break - // } - // } - // if !tokenFound { - // klog.Errorf("failed to find secret for spdk node %s", node.Name) - // } - // } - // if len(server.spdkNodes) == 0 { - // return nil, fmt.Errorf("no valid spdk node found") - // } - - // return &server, nil } func (cs *controllerServer) handleVolumeContentSource(req *csi.CreateVolumeRequest, poolName string, vol *csi.Volume, sizeMiB int64) (*csi.Volume, error) { @@ -618,11 +655,22 @@ func (cs *controllerServer) handleSnapshotSource(snapshot *csi.VolumeContentSour if snapshot == nil { return nil, nil } - snapshotID := snapshot.GetSnapshotId() - klog.Infof("CreateSnapshot : snapshotID=%s", snapshotID) + csiSnapshotID := snapshot.GetSnapshotId() + sbSnapshot, err := getSnapshot(csiSnapshotID) + if err != nil { + klog.Errorf("failed to get spdk snapshot, csiSnapshotID: %s err: %v", csiSnapshotID, err) + return nil, err + } + sbclient, err := util.NewsimplyBlockClient(sbSnapshot.clusterID) + if err != nil { + klog.Errorf("failed to create spdk client: %v", err) + return nil, status.Error(codes.Internal, err.Error()) + } + + klog.Infof("CreateSnapshot : snapshotID=%s", sbSnapshot.snapshotID) snapshotName := req.GetName() newSize := fmt.Sprintf("%dM", sizeMiB) - volumeID, err := cs.spdkNode.CloneSnapshot(snapshotID, snapshotName, newSize) + volumeID, err := sbclient.CloneSnapshot(snapshot.SnapshotId, snapshotName, newSize) if err != nil { klog.Errorf("error creating simplyBlock volume: %v", err) return nil, err @@ -647,16 +695,22 @@ func (cs *controllerServer) handleVolumeSource(srcVolume *csi.VolumeContentSourc klog.Errorf("failed to get spdk volume, srcVolumeID: %s err: %v", srcVolumeID, err) return nil, err } - klog.Infof("CreateSnapshot : poolName=%s", poolName) - snapshotID, err := cs.spdkNode.CreateSnapshot(spdkVol.lvolID, snapshotName) - klog.Infof("CreateSnapshot : snapshotID=%s", snapshotID) + sbclient, err := util.NewsimplyBlockClient(spdkVol.clusterID) + if err != nil { + klog.Errorf("failed to create spdk client: %v", err) + return nil, status.Error(codes.Internal, err.Error()) + } + + klog.Infof("CreateSnapshot: clusterID=%s poolName=%s", sbclient.Client.ClusterID, poolName) + snapshotID, err := sbclient.CreateSnapshot(spdkVol.lvolID, snapshotName) + klog.Infof("CreatedSnapshot: clusterID=%s snapshotID=%s", sbclient.Client.ClusterID, snapshotID) if err != nil { klog.Errorf("failed to create snapshot, srcVolumeID: %s snapshotName: %s err: %v", srcVolumeID, snapshotName, err) return nil, status.Error(codes.Internal, err.Error()) } newSize := fmt.Sprintf("%dM", sizeMiB) klog.Infof("CloneSnapshot : snapshotName=%s", snapshotName) - volumeID, err := cs.spdkNode.CloneSnapshot(snapshotID, snapshotName, newSize) + volumeID, err := sbclient.CloneSnapshot(snapshotID, snapshotName, newSize) if err != nil { klog.Errorf("error creating simplyBlock volume: %v", err) return nil, err diff --git a/pkg/spdk/controllerserver_test.go b/pkg/spdk/controllerserver_test.go deleted file mode 100644 index de77a6c..0000000 --- a/pkg/spdk/controllerserver_test.go +++ /dev/null @@ -1,373 +0,0 @@ -package spdk - -// import ( -// "context" -// "errors" -// "fmt" -// "os" -// "sync" -// "sync/atomic" -// "testing" - -// "github.com/container-storage-interface/spec/lib/go/csi" - -// csicommon "github.com/spdk/spdk-csi/pkg/csi-common" -// "github.com/spdk/spdk-csi/pkg/util" -// ) - -// func TestNvmeofVolume(t *testing.T) { -// testVolume("nvme-tcp", t) -// } - -// func TestNvmeofIdempotency(t *testing.T) { -// testIdempotency("nvme-tcp", t) -// } - -// func TestNvmeofConcurrency(t *testing.T) { -// testConcurrency("nvme-tcp", t) -// } - -// func TestIscsiVolume(t *testing.T) { -// testVolume("iscsi", t) -// } - -// func TestIscsiIdempotency(t *testing.T) { -// testIdempotency("iscsi", t) -// } - -// func TestIscsiConcurrency(t *testing.T) { -// testConcurrency("iscsi", t) -// } - -// func testVolume(targetType string, t *testing.T) { -// cs, err := createTestController(targetType) -// if err != nil { -// t.Fatal(err) -// } - -// const volumeName = "test-volume" -// const volumeSize = 256 * 1024 * 1024 - -// // create volume#1 -// volumeID1, err := createTestVolume(cs, volumeName, volumeSize) -// if err != nil { -// t.Fatal(err) -// } -// // delete volume#1 -// err = deleteTestVolume(cs, volumeID1) -// if err != nil { -// t.Fatal(err) -// } - -// // create volume#2 with same name -// volumeID2, err := createTestVolume(cs, volumeName, volumeSize) -// if err != nil { -// t.Fatal(err) -// } -// // delete volume#2 -// err = deleteTestVolume(cs, volumeID2) -// if err != nil { -// t.Fatal(err) -// } - -// // make sure volumeID1 != volumeID2 -// if volumeID1 == volumeID2 { -// t.Fatal("volume id should be different") -// } - -// // make sure we didn't leave any garbage in lvstores -// // if !verifyLVSS(cs, lvss) { -// // t.Fatal("lvstore status doesn't match") -// // } -// } - -// func testIdempotency(targetType string, t *testing.T) { -// cs, err := createTestController(targetType) -// if err != nil { -// t.Fatal(err) -// } - -// const volumeName = "test-volume-idem" -// const requestCount = 50 -// const volumeSize = 256 * 1024 * 1024 - -// volumeID, err := createSameVolumeInParallel(cs, volumeName, requestCount, volumeSize) -// if err != nil { -// t.Fatal(err) -// } -// err = deleteSameVolumeInParallel(cs, volumeID, requestCount) -// if err != nil { -// t.Fatal(err) -// } - -// // if !verifyLVSS(cs, lvss) { -// // t.Fatal("lvstore status doesn't match") -// // } -// } - -// func testConcurrency(targetType string, t *testing.T) { -// cs, err := createTestController(targetType) -// if err != nil { -// t.Fatal(err) -// } - -// // count * size cannot exceed storage capacity -// const volumeNamePrefix = "test-volume-con-" -// const volumeCount = 20 -// const volumeSize = 16 * 1024 * 1024 - -// // create/delete multiple volumes in parallel -// var wg sync.WaitGroup -// var errCount int32 -// for i := 0; i < volumeCount; i++ { -// volumeName := fmt.Sprintf("%s%d", volumeNamePrefix, i) -// wg.Add(1) -// go func(volumeNameLocal string) { -// defer wg.Done() - -// volumeIDLocal, errLocal := createTestVolume(cs, volumeNameLocal, volumeSize) -// if errLocal != nil { -// t.Logf("createTestVolume failed: %s", errLocal) -// atomic.AddInt32(&errCount, 1) -// } - -// errLocal = deleteTestVolume(cs, volumeIDLocal) -// if errLocal != nil { -// t.Logf("deleteTestVolume failed: %s", errLocal) -// atomic.AddInt32(&errCount, 1) -// } -// }(volumeName) -// } -// wg.Wait() -// if errCount != 0 { -// t.Fatal("concurrency test failed") -// } - -// // if !verifyLVSS(cs, lvss) { -// // t.Fatal("lvstore status doesn't match") -// // } -// } - -// func createTestController(targetType string) (cs *controllerServer, err error) { -// err = createConfigFiles(targetType) -// if err != nil { -// return nil, err -// } -// defer func() { -// os.Remove(os.Getenv("SPDKCSI_CONFIG")) -// os.Remove(os.Getenv("SPDKCSI_SECRET")) -// }() - -// cd := csicommon.NewCSIDriver("test-driver", "test-version", "test-node") -// cs, err = newControllerServer(cd) -// if err != nil { -// return nil, err -// } - -// // lvss, err = getLVSS(cs) -// // if err != nil { -// // return nil, err -// // } - -// return cs, nil -// } - -// func createConfigFiles(targetType string) error { -// configFile, err := os.CreateTemp("", "spdkcsi-config*.json") -// if err != nil { -// return err -// } -// defer configFile.Close() -// var config string -// switch targetType { -// case "nvme-tcp": -// config = ` -// { -// "nodes": [ -// { -// "name": "localhost", -// "rpcURL": "http://127.0.0.1:9009", -// "targetType": "nvme-tcp", -// "targetAddr": "127.0.0.1" -// } -// ] -// }` -// case "iscsi": -// config = ` -// { -// "nodes": [ -// { -// "name": "localhost", -// "rpcURL": "http://127.0.0.1:9009", -// "targetType": "iscsi", -// "targetAddr": "127.0.0.1" -// } -// ] -// }` -// } -// _, err = configFile.WriteString(config) -// if err != nil { -// os.Remove(configFile.Name()) -// return err -// } -// os.Setenv("SPDKCSI_CONFIG", configFile.Name()) - -// secretFile, err := os.CreateTemp("", "spdkcsi-secret*.json") -// if err != nil { -// os.Remove(configFile.Name()) -// return err -// } -// defer secretFile.Close() -// //nolint:gosec // only for test -// secret := ` -// { -// "rpcTokens": [ -// { -// "name": "localhost", -// "username": "spdkcsiuser", -// "password": "spdkcsipass" -// } -// ] -// }` -// _, err = secretFile.WriteString(secret) -// if err != nil { -// os.Remove(configFile.Name()) -// os.Remove(secretFile.Name()) -// return err -// } -// os.Setenv("SPDKCSI_SECRET", secretFile.Name()) - -// return nil -// } - -// func createTestVolume(cs *controllerServer, name string, size int64) (string, error) { -// reqCreate := csi.CreateVolumeRequest{ -// Name: name, -// CapacityRange: &csi.CapacityRange{RequiredBytes: size}, -// } - -// resp, err := cs.CreateVolume(context.TODO(), &reqCreate) -// if err != nil { -// return "", err -// } - -// volumeID := resp.GetVolume().GetVolumeId() -// if volumeID == "" { -// return "", fmt.Errorf("empty volume id") -// } - -// return volumeID, nil -// } - -// func deleteTestVolume(cs *controllerServer, volumeID string) error { -// reqDelete := csi.DeleteVolumeRequest{VolumeId: volumeID} -// _, err := cs.DeleteVolume(context.TODO(), &reqDelete) -// return err -// } - -// func createSameVolumeInParallel(cs *controllerServer, name string, count int, size int64) (string, error) { -// var wg sync.WaitGroup -// var errCount int32 - -// // issue multiple create requests to create *same* volume in parallel -// volumeID := make([]string, count) -// reqCreate := csi.CreateVolumeRequest{ -// Name: name, -// CapacityRange: &csi.CapacityRange{RequiredBytes: size}, -// } -// for i := 0; i < count; i++ { -// wg.Add(1) -// go func(i int, wg *sync.WaitGroup) { -// defer wg.Done() -// for { -// resp, errLocal := cs.CreateVolume(context.TODO(), &reqCreate) -// if errors.Is(errLocal, errVolumeInCreation) { -// continue -// } -// if errLocal != nil { -// atomic.AddInt32(&errCount, 1) -// } -// volumeID[i] = resp.GetVolume().GetVolumeId() -// break -// } -// }(i, &wg) -// } -// wg.Wait() - -// if atomic.LoadInt32(&errCount) != 0 { -// return "", fmt.Errorf("some cs.CreateVolume failed") -// } - -// // verify all returned volume ids are not empty and identical -// if volumeID[0] == "" { -// return "", fmt.Errorf("empty volume id") -// } -// for i := 1; i < count; i++ { -// if volumeID[i] != volumeID[0] { -// return "", fmt.Errorf("volume id mismatch") -// } -// } - -// return volumeID[0], nil -// } - -// func deleteSameVolumeInParallel(cs *controllerServer, volumeID string, count int) error { -// var wg sync.WaitGroup -// var errCount int32 - -// // issue delete requests to *same* volume in parallel -// reqDelete := csi.DeleteVolumeRequest{VolumeId: volumeID} -// for i := 0; i < count; i++ { -// wg.Add(1) -// go func(wg *sync.WaitGroup) { -// defer wg.Done() -// _, err := cs.DeleteVolume(context.TODO(), &reqDelete) -// if err != nil { -// atomic.AddInt32(&errCount, 1) -// } -// }(&wg) -// } -// wg.Wait() - -// if atomic.LoadInt32(&errCount) != 0 { -// return fmt.Errorf("some cs.DeleteVolume failed") -// } -// return nil -// } - -// func getLVSS(cs *controllerServer) ([][]util.LvStore, error) { -// var lvss [][]util.LvStore -// for _, spdkNode := range cs.spdkNodes { -// lvs, err := spdkNode.LvStores() -// if err != nil { -// return nil, err -// } -// lvss = append(lvss, lvs) -// } -// return lvss, nil -// } - -// func verifyLVSS(cs *controllerServer, lvss [][]util.LvStore) bool { -// lvssNow, err := getLVSS(cs) -// if err != nil { -// return false -// } - -// // deep compare lvss and lvssNow -// if len(lvss) != len(lvssNow) { -// return false -// } -// for i := 0; i < len(lvss); i++ { -// lvs := lvss[i] -// lvsNow := lvssNow[i] -// if len(lvs) != len(lvsNow) { -// return false -// } -// for j := 0; j < len(lvs); j++ { -// if lvs[j] != lvsNow[j] { -// return false -// } -// } -// } -// return true -// } diff --git a/pkg/spdk/nodeserver.go b/pkg/spdk/nodeserver.go index 769acd1..1da51ad 100644 --- a/pkg/spdk/nodeserver.go +++ b/pkg/spdk/nodeserver.go @@ -45,7 +45,6 @@ type nodeServer struct { xpuConnClient *grpc.ClientConn xpuTargetType string kvmPciBridges int - spdkNode *util.NodeNVMf } func newNodeServer(d *csicommon.CSIDriver) (*nodeServer, error) { @@ -124,13 +123,7 @@ func newNodeServer(d *csicommon.CSIDriver) (*nodeServer, error) { ns.xpuConnClient = xpuConnClient ns.xpuTargetType = xpuTargetType - ns.spdkNode, err = NewsimplyBlockClient() - if err != nil { - klog.Error("failed to create simplyblock client", err) - return nil, err - } - - go util.MonitorConnection(ns.spdkNode) + go util.MonitorConnection() return ns, nil } @@ -160,15 +153,8 @@ func (ns *nodeServer) NodeStageVolume(_ context.Context, req *csi.NodeStageVolum var initiator util.SpdkCsiInitiator vc := req.GetVolumeContext() - // if ns.xpuConnClient != nil && ns.xpuTargetType != "" { - // vc["stagingParentPath"] = stagingParentPath - // initiator, err = util.NewSpdkCsiXpuInitiator(vc, ns.xpuConnClient, ns.xpuTargetType, ns.kvmPciBridges) - // } else { - // initiator, err = util.NewSpdkCsiInitiator(req.GetVolumeContext()) - // } - vc["stagingParentPath"] = stagingParentPath - initiator, err = util.NewSpdkCsiInitiator(vc, ns.spdkNode) + initiator, err = util.NewSpdkCsiInitiator(vc) if err != nil { klog.Errorf("failed to create spdk initiator, volumeID: %s err: %v", volumeID, err) return nil, status.Error(codes.Internal, err.Error()) @@ -219,7 +205,7 @@ func (ns *nodeServer) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageV klog.Errorf("failed to lookup volume context, volumeID: %s err: %v", volumeID, err) return nil, status.Error(codes.Internal, err.Error()) } - initiator, err := util.NewSpdkCsiInitiator(volumeContext, ns.spdkNode) + initiator, err := util.NewSpdkCsiInitiator(volumeContext) if err != nil { klog.Errorf("failed to create spdk initiator, volumeID: %s err: %v", volumeID, err) return nil, status.Error(codes.Internal, err.Error()) diff --git a/pkg/util/initiator.go b/pkg/util/initiator.go index 2db411c..268f7e8 100644 --- a/pkg/util/initiator.go +++ b/pkg/util/initiator.go @@ -32,6 +32,17 @@ import ( "k8s.io/klog" ) +const ( + // DevDiskByID is the path to the device file under /dev/disk/by-id + DevDiskByID = "/dev/disk/by-id/*%s*" + + // TargetTypeNVMf is the target type for NVMe over Fabrics + TargetTypeNVMf = "tcp" + + // TargetTypeISCSI is the target type for cache + TargetTypeCache = "cache" +) + // SpdkCsiInitiator defines interface for NVMeoF/iSCSI initiator // - Connect initiates target connection and returns local block device filename // e.g., /dev/disk/by-id/nvme-SPDK_Controller1_SPDK00000000000001 @@ -43,41 +54,7 @@ type SpdkCsiInitiator interface { Disconnect() error } -const DevDiskByID = "/dev/disk/by-id/*%s*" - - -func NewSpdkCsiInitiator(volumeContext map[string]string, spdkNode *NodeNVMf) (SpdkCsiInitiator, error) { - targetType := strings.ToLower(volumeContext["targetType"]) - switch targetType { - case "rdma", "tcp": - var connections []connectionInfo - err := json.Unmarshal([]byte(volumeContext["connections"]), &connections) - if err != nil { - return nil, fmt.Errorf("failed to unmarshall connections. Error: %v", err.Error()) - } - return &initiatorNVMf{ - // see util/nvmf.go VolumeInfo() - targetType: volumeContext["targetType"], - connections: connections, - nqn: volumeContext["nqn"], - reconnectDelay: volumeContext["reconnectDelay"], - nrIoQueues: volumeContext["nrIoQueues"], - ctrlLossTmo: volumeContext["ctrlLossTmo"], - model: volumeContext["model"], - client: *spdkNode.client, - }, nil - case "cache": - return &initiatorCache{ - lvol: volumeContext["uuid"], - model: volumeContext["model"], - client: *spdkNode.client, - }, nil - default: - return nil, fmt.Errorf("unknown initiator: %s", targetType) - } -} - -// NVMf initiator implementation +// initiatorNVMf is an implementation of NVMf tcp initiator type initiatorNVMf struct { targetType string connections []connectionInfo @@ -86,13 +63,13 @@ type initiatorNVMf struct { nrIoQueues string ctrlLossTmo string model string - client RPCClient } +// initiatorCache is an implementation of NVMf cache initiator type initiatorCache struct { lvol string model string - client RPCClient + client RPCClient // TODO: support multi cluster for cache } type cachingNodeList struct { @@ -100,17 +77,11 @@ type cachingNodeList struct { UUID string `json:"id"` } -type LVolCachingNodeConnect struct { +type lVolCachingNodeConnect struct { LvolID string `json:"lvol_id"` } -type Subsystem struct { - Name string `json:"Name"` - NQN string `json:"NQN"` - Paths []Path `json:"Paths"` -} - -type Path struct { +type path struct { Name string `json:"Name"` Transport string `json:"Transport"` Address string `json:"Address"` @@ -118,8 +89,14 @@ type Path struct { ANAState string `json:"ANAState"` } -type SubsystemResponse struct { - Subsystems []Subsystem `json:"Subsystems"` +type subsystem struct { + Name string `json:"Name"` + NQN string `json:"NQN"` + Paths []path `json:"Paths"` +} + +type subsystemResponse struct { + Subsystems []subsystem `json:"Subsystems"` } type NodeInfo struct { @@ -128,16 +105,80 @@ type NodeInfo struct { Status string `json:"status"` } -type NVMeDeviceInfo struct { - DevicePath string - SerialNumber string +type nvmeDeviceInfo struct { + devicePath string + serialNumber string +} + +// NewsimplyBlockClient create a new Simplyblock client +// should be called for every CSI driver operation +func NewsimplyBlockClient(clusterID string) (*NodeNVMf, error) { + // get spdk node configs, see deploy/kubernetes/config-map.yaml + var config struct { + Simplybk struct { + IP string `json:"ip"` + } `json:"simplybk"` + } + configFile := FromEnv("SPDKCSI_CONFIG", "/etc/spdkcsi-config/config.json") + err := ParseJSONFile(configFile, &config) + if err != nil { + return nil, err + } + + var secrets map[string]string + secretFile := FromEnv("SPDKCSI_SECRET", "/etc/spdkcsi-secret/secret.json") + err = ParseJSONFile(secretFile, &secrets) + if err != nil { + return nil, err + } + + secret, ok := secrets[clusterID] + if !ok { + return nil, fmt.Errorf("failed to find secret for clusterID %s", clusterID) + } + + klog.Infof("Simplyblock client created for ClusterID:%s url=%s", clusterID, config.Simplybk.IP) + return NewNVMf(clusterID, config.Simplybk.IP, secret), nil +} + +// NewSpdkCsiInitiator creates a new SpdkCsiInitiator based on the target type +func NewSpdkCsiInitiator(volumeContext map[string]string) (SpdkCsiInitiator, error) { + targetType := strings.ToLower(volumeContext["targetType"]) + switch targetType { + case TargetTypeNVMf: + var connections []connectionInfo + + err := json.Unmarshal([]byte(volumeContext["connections"]), &connections) + if err != nil { + return nil, fmt.Errorf("failed to unmarshall connections. Error: %v", err.Error()) + } + + return &initiatorNVMf{ + targetType: volumeContext["targetType"], + connections: connections, + nqn: volumeContext["nqn"], + reconnectDelay: volumeContext["reconnectDelay"], + nrIoQueues: volumeContext["nrIoQueues"], + ctrlLossTmo: volumeContext["ctrlLossTmo"], + model: volumeContext["model"], + }, nil + + case "cache": + return &initiatorCache{ + lvol: volumeContext["uuid"], + model: volumeContext["model"], + }, nil + + default: + return nil, fmt.Errorf("unknown initiator: %s", targetType) + } } func (cache *initiatorCache) Connect() (string, error) { // get the hostname hostname, err := os.Hostname() if err != nil { - os.Exit(1) + panic(err) } hostname = strings.Split(hostname, ".")[0] klog.Info("hostname: ", hostname) @@ -167,7 +208,7 @@ func (cache *initiatorCache) Connect() (string, error) { } var resp interface{} - req := LVolCachingNodeConnect{ + req := lVolCachingNodeConnect{ LvolID: cache.lvol, } klog.Info("connecting caching node: ", cnode.Hostname, " with lvol: ", cache.lvol) @@ -230,7 +271,7 @@ func (cache *initiatorCache) Disconnect() error { continue } klog.Info("disconnect caching node: ", cnode.Hostname, "with lvol: ", cache.lvol) - req := LVolCachingNodeConnect{ + req := lVolCachingNodeConnect{ LvolID: cache.lvol, } resp, err := cache.client.CallSBCLI("PUT", "/cachingnode/disconnect/"+cnode.UUID, req) @@ -378,7 +419,7 @@ func execWithTimeout(cmdLine []string, timeout int) error { } func disconnectDevicePath(devicePath string) error { - var paths []Path + var paths []path realPath, err := filepath.EvalSymlinks(devicePath) if err != nil { @@ -392,10 +433,10 @@ func disconnectDevicePath(devicePath string) error { for _, host := range subsystems { for _, subsystem := range host.Subsystems { - for _, path := range subsystem.Paths { - paths = append(paths, Path{ - Name: path.Name, - ANAState: path.ANAState, + for _, p := range subsystem.Paths { + paths = append(paths, path{ + Name: p.Name, + ANAState: p.ANAState, }) } } @@ -420,7 +461,7 @@ func disconnectDevicePath(devicePath string) error { return nil } -func getNVMeDeviceInfos() ([]NVMeDeviceInfo, error) { +func getNVMeDeviceInfos() ([]nvmeDeviceInfo, error) { cmd := exec.Command("nvme", "list", "-o", "json") output, err := cmd.Output() if err != nil { @@ -438,25 +479,25 @@ func getNVMeDeviceInfos() ([]NVMeDeviceInfo, error) { return nil, fmt.Errorf("failed to unmarshal nvme list output: %v", err) } - var devices []NVMeDeviceInfo + var devices []nvmeDeviceInfo for _, dev := range response.Devices { - devices = append(devices, NVMeDeviceInfo{ - DevicePath: dev.DevicePath, - SerialNumber: dev.SerialNumber, + devices = append(devices, nvmeDeviceInfo{ + devicePath: dev.DevicePath, + serialNumber: dev.SerialNumber, }) } return devices, nil } -func getSubsystemsForDevice(devicePath string) ([]SubsystemResponse, error) { +func getSubsystemsForDevice(devicePath string) ([]subsystemResponse, error) { cmd := exec.Command("nvme", "list-subsys", "-o", "json", devicePath) output, err := cmd.Output() if err != nil { return nil, fmt.Errorf("failed to execute nvme list-subsys: %v", err) } - var subsystems []SubsystemResponse + var subsystems []subsystemResponse if err := json.Unmarshal(output, &subsystems); err != nil { return nil, fmt.Errorf("failed to unmarshal nvme list-subsys output: %v", err) } @@ -464,12 +505,12 @@ func getSubsystemsForDevice(devicePath string) ([]SubsystemResponse, error) { return subsystems, nil } -func getLvolIDFromNQN(nqn string) string { +func getLvolIDFromNQN(nqn string) (clusterID, lvolID string) { parts := strings.Split(nqn, ":lvol:") if len(parts) > 1 { - return parts[1] + return parts[0], parts[1] } - return "" + return "", "" } func parseAddress(address string) string { @@ -482,39 +523,36 @@ func parseAddress(address string) string { return "" } -func reconnectSubsystems(spdkNode *NodeNVMf) error { +func reconnectSubsystems() error { devices, err := getNVMeDeviceInfos() if err != nil { return fmt.Errorf("failed to get NVMe device paths: %v", err) } for _, device := range devices { - subsystems, err := getSubsystemsForDevice(device.DevicePath) + subsystems, err := getSubsystemsForDevice(device.devicePath) if err != nil { - klog.Errorf("failed to get subsystems for device %s: %v", device.DevicePath, err) + klog.Errorf("failed to get subsystems for device %s: %v", device.devicePath, err) continue } for _, host := range subsystems { for _, subsystem := range host.Subsystems { - lvolID := getLvolIDFromNQN(subsystem.NQN) + clusterID, lvolID := getLvolIDFromNQN(subsystem.NQN) if lvolID == "" { continue } if len(subsystem.Paths) == 1 { - confirm := confirmSubsystemStillSinglePath(&subsystem, device.DevicePath) + confirm := confirmSubsystemStillSinglePath(&subsystem, device.devicePath) if !confirm { continue } for _, path := range subsystem.Paths { - if path.State == "connecting" && device.SerialNumber == "single" { - if err := checkOnlineNode(spdkNode, lvolID, path); err != nil { - klog.Errorf("failed to reconnect subsystem for lvolID %s: %v", lvolID, err) - } - } else if (path.ANAState == "optimized" || path.ANAState == "non-optimized") && device.SerialNumber == "ha" { - if err := checkOnlineNode(spdkNode, lvolID, path); err != nil { + if path.State == "connecting" && device.serialNumber == "single" || + ((path.ANAState == "optimized" || path.ANAState == "non-optimized") && device.serialNumber == "ha") { + if err := checkOnlineNode(clusterID, lvolID, path); err != nil { klog.Errorf("failed to reconnect subsystem for lvolID %s: %v", lvolID, err) } } @@ -526,8 +564,13 @@ func reconnectSubsystems(spdkNode *NodeNVMf) error { return nil } -func checkOnlineNode(spdkNode *NodeNVMf, lvolID string, path Path) error { - nodeInfo, err := fetchNodeInfo(spdkNode, lvolID) +func checkOnlineNode(clusterID, lvolID string, path path) error { + sbcClient, err := NewsimplyBlockClient(clusterID) + if err != nil { + return fmt.Errorf("failed to create SPDK client: %w", err) + } + + nodeInfo, err := fetchNodeInfo(sbcClient, lvolID) if err != nil { return fmt.Errorf("failed to fetch node info: %w", err) } @@ -537,12 +580,12 @@ func checkOnlineNode(spdkNode *NodeNVMf, lvolID string, path Path) error { continue } - if !isNodeOnline(spdkNode, nodeID) { + if !isNodeOnline(sbcClient, nodeID) { klog.Infof("Node %s is not yet online", nodeID) continue } - connections, err := fetchLvolConnection(spdkNode, lvolID) + connections, err := fetchLvolConnection(sbcClient, lvolID) if err != nil { klog.Errorf("Failed to get lvol connection: %v", err) continue @@ -593,7 +636,7 @@ func shouldConnectToNode(anaState, currentNodeID, targetNodeID string) bool { } func fetchNodeInfo(spdkNode *NodeNVMf, lvolID string) (*NodeInfo, error) { - resp, err := spdkNode.client.CallSBCLI("GET", "/lvol/"+lvolID, nil) + resp, err := spdkNode.Client.CallSBCLI("GET", "/lvol/"+lvolID, nil) if err != nil { return nil, fmt.Errorf("failed to fetch node info: %v", err) } @@ -611,7 +654,7 @@ func fetchNodeInfo(spdkNode *NodeNVMf, lvolID string) (*NodeInfo, error) { } func isNodeOnline(spdkNode *NodeNVMf, nodeID string) bool { - resp, err := spdkNode.client.CallSBCLI("GET", "/storagenode/"+nodeID, nil) + resp, err := spdkNode.Client.CallSBCLI("GET", "/storagenode/"+nodeID, nil) if err != nil { klog.Errorf("failed to fetch node status for node %s: %v", nodeID, err) return false @@ -626,7 +669,7 @@ func isNodeOnline(spdkNode *NodeNVMf, nodeID string) bool { } func fetchLvolConnection(spdkNode *NodeNVMf, lvolID string) ([]*LvolConnectResp, error) { - resp, err := spdkNode.client.CallSBCLI("GET", "/lvol/connect/"+lvolID, nil) + resp, err := spdkNode.Client.CallSBCLI("GET", "/lvol/connect/"+lvolID, nil) if err != nil { return nil, fmt.Errorf("failed to fetch connection: %v", err) } @@ -654,7 +697,7 @@ func connectViaNVMe(conn *LvolConnectResp, ctrlLossTmo int) error { return nil } -func disconnectViaNVMe(path Path) error { +func disconnectViaNVMe(path path) error { cmd := []string{ "nvme", "disconnect", "-d", path.Name, } @@ -665,7 +708,7 @@ func disconnectViaNVMe(path Path) error { return nil } -func confirmSubsystemStillSinglePath(subsystem *Subsystem, devicePath string) bool { +func confirmSubsystemStillSinglePath(subsystem *subsystem, devicePath string) bool { for i := 0; i < 5; i++ { recheck, err := getSubsystemsForDevice(devicePath) if err != nil { @@ -695,15 +738,11 @@ func confirmSubsystemStillSinglePath(subsystem *Subsystem, devicePath string) bo return true } -func MonitorConnection(spdkNode *NodeNVMf) { - +// MonitorConnection monitors the connection to the SPDK node and reconnects if necessary +// TODO: make this monitoring multiple connections +func MonitorConnection() { for { - if spdkNode.client == nil { - klog.Errorf("RPC client is not initialized") - continue - } - - if err := reconnectSubsystems(spdkNode); err != nil { + if err := reconnectSubsystems(); err != nil { klog.Errorf("Error: %v\n", err) continue } diff --git a/pkg/util/jsonrpc.go b/pkg/util/jsonrpc.go index d2fa8af..234f259 100644 --- a/pkg/util/jsonrpc.go +++ b/pkg/util/jsonrpc.go @@ -128,6 +128,7 @@ type BDev struct { LvolSize int64 `json:"size"` } +// RPCClient holds the connection information to the SimplyBlock Cluster type RPCClient struct { ClusterID string ClusterIP string @@ -135,6 +136,7 @@ type RPCClient struct { HTTPClient *http.Client } +// CSIPoolsResp is the response of /pool/get_pools type CSIPoolsResp struct { FreeClusters int64 `json:"free_clusters"` ClusterSize int64 `json:"cluster_size"` @@ -143,6 +145,7 @@ type CSIPoolsResp struct { UUID string `json:"uuid"` } +// SnapshotResp is the response of /snapshot type SnapshotResp struct { Name string `json:"snapshot_name"` UUID string `json:"uuid"` @@ -155,10 +158,18 @@ type SnapshotResp struct { } `json:"lvol"` } +// CreateLVolData is the response for /lvol type CreateVolResp struct { LVols []string `json:"lvols"` } +// ResizeVolReq is the request for /lvol/resize +type ResizeVolReq struct { + LvolID string `json:"lvol_id"` + NewSize int64 `json:"size"` +} + +// Error represents SBCLI's common error response type Error struct { Code int `json:"code"` Message string `json:"message"` @@ -168,6 +179,7 @@ func (client *RPCClient) info() string { return client.ClusterID } +// lvStores returns all available logical volume stores func (client *RPCClient) lvStores() ([]LvStore, error) { var result []CSIPoolsResp @@ -213,7 +225,7 @@ func (client *RPCClient) createVolume(params *CreateLVolData) (string, error) { return lvolID, err } -// get a volume and return a BDev,, lvsName/lvolName +// getVolume gets a volume and return a BDev,, lvsName/lvolName func (client *RPCClient) getVolume(lvolID string) (*BDev, error) { var result []BDev @@ -235,6 +247,7 @@ func (client *RPCClient) getVolume(lvolID string) (*BDev, error) { return &result[0], err } +// listVolumes returns all volumes func (client *RPCClient) listVolumes() ([]*BDev, error) { var results []*BDev @@ -253,7 +266,7 @@ func (client *RPCClient) listVolumes() ([]*BDev, error) { return results, nil } -// get a volume and return a BDev +// getVolumeInfo gets a volume along with its connection info func (client *RPCClient) getVolumeInfo(lvolID string) (map[string]string, error) { var result []*LvolConnectResp @@ -300,6 +313,7 @@ func (client *RPCClient) getVolumeInfo(lvolID string) (map[string]string, error) }, nil } +// deleteVolume deletes a volume func (client *RPCClient) deleteVolume(lvolID string) error { _, err := client.CallSBCLI("DELETE", "/lvol/"+lvolID, nil) if errorMatches(err, ErrJSONNoSuchDevice) { @@ -309,11 +323,7 @@ func (client *RPCClient) deleteVolume(lvolID string) error { return err } -type ResizeVolReq struct { - LvolID string `json:"lvol_id"` - NewSize int64 `json:"size"` -} - +// resizeVolume resizes a volume func (client *RPCClient) resizeVolume(lvolID string, size int64) (bool, error) { params := ResizeVolReq{ LvolID: lvolID, @@ -331,6 +341,7 @@ func (client *RPCClient) resizeVolume(lvolID string, size int64) (bool, error) { return result, nil } +// cloneSnapshot clones a snapshot func (client *RPCClient) cloneSnapshot(snapshotID, cloneName, newSize string) (string, error) { params := struct { SnapshotID string `json:"snapshot_id"` @@ -360,6 +371,7 @@ func (client *RPCClient) cloneSnapshot(snapshotID, cloneName, newSize string) (s return lvolID, err } +// listSnapshots returns all snapshots func (client *RPCClient) listSnapshots() ([]*SnapshotResp, error) { var results []*SnapshotResp @@ -378,16 +390,7 @@ func (client *RPCClient) listSnapshots() ([]*SnapshotResp, error) { return results, nil } -func (client *RPCClient) deleteSnapshot(snapshotID string) error { - _, err := client.CallSBCLI("DELETE", "/snapshot/"+snapshotID, nil) - - if errorMatches(err, ErrJSONNoSuchDevice) { - err = ErrJSONNoSuchDevice // may happen in concurrency - } - - return err -} - +// snapshot creates a snapshot func (client *RPCClient) snapshot(lvolID, snapShotName string) (string, error) { params := struct { LvolName string `json:"lvol_id"` @@ -412,6 +415,18 @@ func (client *RPCClient) snapshot(lvolID, snapShotName string) (string, error) { return snapshotID, err } +// deleteSnapshot deletes a snapshot +func (client *RPCClient) deleteSnapshot(snapshotID string) error { + _, err := client.CallSBCLI("DELETE", "/snapshot/"+snapshotID, nil) + + if errorMatches(err, ErrJSONNoSuchDevice) { + err = ErrJSONNoSuchDevice // may happen in concurrency + } + + return err +} + +// CallSBCLI is a generic function to call the SimplyBlock API func (client *RPCClient) CallSBCLI(method, path string, args interface{}) (interface{}, error) { data := []byte(`{}`) var err error @@ -468,6 +483,7 @@ func (client *RPCClient) CallSBCLI(method, path string, args interface{}) (inter return response.Results, nil } +// errorMatches checks if the error message from the full error func errorMatches(errFull, errJSON error) bool { if errFull == nil { return false diff --git a/pkg/util/nvmf.go b/pkg/util/nvmf.go index b7368fb..c85c87a 100644 --- a/pkg/util/nvmf.go +++ b/pkg/util/nvmf.go @@ -26,42 +26,31 @@ import ( ) type NodeNVMf struct { - client *RPCClient - - clusterID string - clusterIP string - clusterSecret string + Client *RPCClient } -// func newNVMf(client *rpcClient, targetType, targetAddr string) *nodeNVMf { -// config.Simplybk.Uuid, config.Simplybk.Ip, secret.Simplybk.Secret +// NewNVMf creates a new NVMf client func NewNVMf(clusterID, clusterIP, clusterSecret string) *NodeNVMf { client := RPCClient{ - ClusterID: clusterID, - ClusterIP: clusterIP, - ClusterSecret: clusterSecret, HTTPClient: &http.Client{Timeout: cfgRPCTimeoutSeconds * time.Second}, } return &NodeNVMf{ - client: &client, - clusterID: clusterID, - clusterIP: clusterIP, - clusterSecret: clusterSecret, + Client: &client, } } func (node *NodeNVMf) Info() string { - return node.client.info() + return node.Client.info() } func (node *NodeNVMf) LvStores() ([]LvStore, error) { - return node.client.lvStores() + return node.Client.lvStores() } // VolumeInfo returns a string:string map containing information necessary // for CSI node(initiator) to connect to this target and identify the disk. func (node *NodeNVMf) VolumeInfo(lvolID string) (map[string]string, error) { - lvol, err := node.client.getVolumeInfo(lvolID) + lvol, err := node.Client.getVolumeInfo(lvolID) if err != nil { return nil, err } @@ -94,7 +83,7 @@ type CreateLVolData struct { // CreateVolume creates a logical volume and returns volume ID func (node *NodeNVMf) CreateVolume(params *CreateLVolData) (string, error) { - lvolID, err := node.client.createVolume(params) + lvolID, err := node.Client.createVolume(params) if err != nil { return "", err } @@ -104,15 +93,16 @@ func (node *NodeNVMf) CreateVolume(params *CreateLVolData) (string, error) { // GetVolume returns the volume id of the given volume name and lvstore name. return error if not found. func (node *NodeNVMf) GetVolume(lvolName, poolName string) (string, error) { - lvol, err := node.client.getVolume(fmt.Sprintf("%s/%s", poolName, lvolName)) + lvol, err := node.Client.getVolume(fmt.Sprintf("%s/%s", poolName, lvolName)) if err != nil { return "", err } return lvol.UUID, err } +// GetVolumeSize returns the size of the volume func (node *NodeNVMf) GetVolumeSize(lvolID string) (string, error) { - lvol, err := node.client.getVolume(lvolID) + lvol, err := node.Client.getVolume(lvolID) if err != nil { return "", err } @@ -121,22 +111,24 @@ func (node *NodeNVMf) GetVolumeSize(lvolID string) (string, error) { return size, err } +// ListVolumes returns a list of volumes func (node *NodeNVMf) ListVolumes() ([]*BDev, error) { - return node.client.listVolumes() + return node.Client.listVolumes() } // ResizeVolume resizes a volume func (node *NodeNVMf) ResizeVolume(lvolID string, newSize int64) (bool, error) { - return node.client.resizeVolume(lvolID, newSize) + return node.Client.resizeVolume(lvolID, newSize) } // ListSnapshots returns a list of snapshots func (node *NodeNVMf) ListSnapshots() ([]*SnapshotResp, error) { - return node.client.listSnapshots() + return node.Client.listSnapshots() } +// CloneSnapshot clones a snapshot to a new volume func (node *NodeNVMf) CloneSnapshot(snapshotID, cloneName, newSize string) (string, error) { - lvolID, err := node.client.cloneSnapshot(snapshotID, cloneName, newSize) + lvolID, err := node.Client.cloneSnapshot(snapshotID, cloneName, newSize) if err != nil { return "", err } @@ -144,8 +136,9 @@ func (node *NodeNVMf) CloneSnapshot(snapshotID, cloneName, newSize string) (stri return lvolID, nil } +// CreateSnapshot creates a snapshot of a volume func (node *NodeNVMf) CreateSnapshot(lvolID, snapshotName string) (string, error) { - snapshotID, err := node.client.snapshot(lvolID, snapshotName) + snapshotID, err := node.Client.snapshot(lvolID, snapshotName) if err != nil { return "", err } @@ -155,7 +148,7 @@ func (node *NodeNVMf) CreateSnapshot(lvolID, snapshotName string) (string, error // DeleteVolume deletes a volume func (node *NodeNVMf) DeleteVolume(lvolID string) error { - err := node.client.deleteVolume(lvolID) + err := node.Client.deleteVolume(lvolID) if err != nil { return err } @@ -165,7 +158,7 @@ func (node *NodeNVMf) DeleteVolume(lvolID string) error { // DeleteSnapshot deletes a snapshot func (node *NodeNVMf) DeleteSnapshot(snapshotID string) error { - err := node.client.deleteSnapshot(snapshotID) + err := node.Client.deleteSnapshot(snapshotID) if err != nil { return err } @@ -175,7 +168,7 @@ func (node *NodeNVMf) DeleteSnapshot(snapshotID string) error { // PublishVolume exports a volume through NVMf target func (node *NodeNVMf) PublishVolume(lvolID string) error { - _, err := node.client.CallSBCLI("GET", "/lvol/"+lvolID, nil) + _, err := node.Client.CallSBCLI("GET", "/lvol/"+lvolID, nil) if err != nil { return err } @@ -183,8 +176,9 @@ func (node *NodeNVMf) PublishVolume(lvolID string) error { return nil } +// UnpublishVolume unexports a volume through NVMf target func (node *NodeNVMf) UnpublishVolume(lvolID string) error { - _, err := node.client.CallSBCLI("GET", "/lvol/"+lvolID, nil) + _, err := node.Client.CallSBCLI("GET", "/lvol/"+lvolID, nil) if err != nil { return err } diff --git a/pkg/util/nvmf_test.go b/pkg/util/nvmf_test.go deleted file mode 100644 index 329134c..0000000 --- a/pkg/util/nvmf_test.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Copyright (c) Arm Limited and Contributors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import "testing" - -func TestNewNVMf(t *testing.T) { - clusterID := "clusterID" - clusterIP := "clusterIP" - clusterSecret := "clusterSecret" - - node := NewNVMf(clusterID, clusterIP, clusterSecret) - - if node == nil { - t.Fatal("NewNVMf returned nil") - } - - if node.clusterID != clusterID { - t.Errorf("Expected clusterID %s, but got %s", clusterID, node.clusterID) - } - - if node.clusterIP != clusterIP { - t.Errorf("Expected clusterIP %s, but got %s", clusterIP, node.clusterIP) - } - - if node.clusterSecret != clusterSecret { - t.Errorf("Expected clusterSecret %s, but got %s", clusterSecret, node.clusterSecret) - } -}