Skip to content
This repository was archived by the owner on Mar 26, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions doc/endpoints.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions glusterd2/commands/volumes/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ func (c *Command) Routes() route.Routes {
RequestType: utils.GetTypeString((*api.VolExpandReq)(nil)),
ResponseType: utils.GetTypeString((*api.VolumeExpandResp)(nil)),
HandlerFunc: volumeExpandHandler},
route.Route{
Name: "VolumeShrink",
Method: "POST",
Pattern: "/volumes/{volname}/shrink",
Version: 1,
RequestType: utils.GetTypeString((*api.VolShrinkReq)(nil)),
ResponseType: utils.GetTypeString((*api.VolumeShrinkResp)(nil)),
HandlerFunc: volumeShrinkHandler},
// TODO: Implmement volume reset as
// DELETE /volumes/{volname}/options
route.Route{
Expand Down Expand Up @@ -160,6 +168,7 @@ func (c *Command) RegisterStepFuncs() {
registerVolStopStepFuncs()
registerBricksStatusStepFuncs()
registerVolExpandStepFuncs()
registerVolShrinkStepFuncs()
registerVolOptionStepFuncs()
registerVolStatedumpFuncs()
}
30 changes: 30 additions & 0 deletions glusterd2/commands/volumes/volume-shrink-txn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package volumecommands

import (
"github.com/gluster/glusterd2/glusterd2/daemon"
"github.com/gluster/glusterd2/glusterd2/transaction"
rebalance "github.com/gluster/glusterd2/plugins/rebalance"
rebalanceapi "github.com/gluster/glusterd2/plugins/rebalance/api"
)

func startRebalance(c transaction.TxnCtx) error {
var rinfo rebalanceapi.RebalInfo
err := c.Get("rinfo", &rinfo)
if err != nil {
return err
}

rebalanceProcess, err := rebalance.NewRebalanceProcess(rinfo)
if err != nil {
return err
}

err = daemon.Start(rebalanceProcess, true, c.Logger())
if err != nil {
c.Logger().WithError(err).WithField(
"volume", rinfo.Volname).Error("Starting rebalance process failed")
return err
}

return err
}
219 changes: 219 additions & 0 deletions glusterd2/commands/volumes/volume-shrink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package volumecommands

import (
"errors"
"net/http"
"path/filepath"
"strings"

"github.com/gluster/glusterd2/glusterd2/events"
"github.com/gluster/glusterd2/glusterd2/gdctx"
restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils"
"github.com/gluster/glusterd2/glusterd2/transaction"
"github.com/gluster/glusterd2/glusterd2/volume"
"github.com/gluster/glusterd2/pkg/api"
customerror "github.com/gluster/glusterd2/pkg/errors"
rebalance "github.com/gluster/glusterd2/plugins/rebalance"
rebalanceapi "github.com/gluster/glusterd2/plugins/rebalance/api"

"github.com/gorilla/mux"
"github.com/pborman/uuid"
)

func registerVolShrinkStepFuncs() {
transaction.RegisterStepFunc(storeVolume, "vol-shrink.UpdateVolinfo")
transaction.RegisterStepFunc(notifyVolfileChange, "vol-shrink.NotifyClients")
transaction.RegisterStepFunc(startRebalance, "vol-shrink.StartRebalance")
}

func validateVolumeShrinkReq(req api.VolShrinkReq) error {
dupEntry := map[string]bool{}

for _, brick := range req.Bricks {
if dupEntry[brick.PeerID+filepath.Clean(brick.Path)] == true {
return customerror.ErrDuplicateBrickPath
}
dupEntry[brick.PeerID+filepath.Clean(brick.Path)] = true

}

return nil

}

func volumeShrinkHandler(w http.ResponseWriter, r *http.Request) {

ctx := r.Context()
logger := gdctx.GetReqLogger(ctx)

volname := mux.Vars(r)["volname"]

var req api.VolShrinkReq
if err := restutils.UnmarshalRequest(r, &req); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusUnprocessableEntity, err)
return
}

if err := validateVolumeShrinkReq(req); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, err)
return
}

txn, err := transaction.NewTxnWithLocks(ctx, volname)
if err != nil {
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
}
defer txn.Done()

volinfo, err := volume.GetVolume(volname)
if err != nil {
restutils.SendHTTPError(ctx, w, http.StatusNotFound, err)
return
}

for index := range req.Bricks {
for _, b := range req.Bricks {
isPresent := false
for _, brick := range volinfo.Subvols[index].Bricks {
if brick.PeerID.String() == b.PeerID && brick.Path == filepath.Clean(b.Path) {
isPresent = true
break
}
}
if !isPresent {
restutils.SendHTTPError(ctx, w, http.StatusBadRequest, "One or more bricks is not part of given volume")
return
}
}
}

switch volinfo.Type {
case volume.Distribute:
case volume.Replicate:
case volume.DistReplicate:
if len(req.Bricks)%volinfo.Subvols[0].ReplicaCount != 0 {
err := errors.New("wrong number of bricks to remove")
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}
default:
err := errors.New("not implemented: " + volinfo.Type.String())
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return

}

nodes, err := req.Nodes()
if err != nil {
logger.WithError(err).Error("could not prepare node list")
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}

txn.Steps = []*transaction.Step{
{
DoFunc: "vol-shrink.UpdateVolinfo",
Nodes: []uuid.UUID{gdctx.MyUUID},
},
{
DoFunc: "vol-shrink.NotifyClients",
Nodes: nodes,
},
{
DoFunc: "vol-shrink.StartRebalance",
Nodes: nodes,
},
}

decommissionedSubvols, err := findDecommissioned(req.Bricks, volinfo)
if err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}

// TODO: Find a better way to store information in the rebalance volfile.
volinfo.Metadata["distribute.decommissioned-bricks"] = strings.TrimSpace(decommissionedSubvols)

rinfo := rebalanceapi.RebalInfo{
Volname: volname,
RebalanceID: uuid.NewRandom(),
Cmd: rebalanceapi.CmdStartForce,
State: rebalanceapi.NotStarted,
CommitHash: rebalance.SetCommitHash(),
RebalStats: []rebalanceapi.RebalNodeStatus{},
}

if err := txn.Ctx.Set("rinfo", rinfo); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}

if err := txn.Ctx.Set("volinfo", volinfo); err != nil {
restutils.SendHTTPError(ctx, w, http.StatusInternalServerError, err)
return
}

if err = txn.Do(); err != nil {
logger.WithError(err).Error("remove bricks start transaction failed")
status, err := restutils.ErrToStatusCode(err)
restutils.SendHTTPError(ctx, w, status, err)
return
}
logger.WithField("volume-name", volinfo.Name).Info("volume shrink successful")
events.Broadcast(volume.NewEvent(volume.EventVolumeShrink, volinfo))
restutils.SendHTTPResponse(ctx, w, http.StatusOK, decommissionedSubvols)

}

func findDecommissioned(bricks []api.BrickReq, volinfo *volume.Volinfo) (string, error) {

brickSet := make(map[string]bool)
for _, brick := range bricks {
u := uuid.Parse(brick.PeerID)
if u == nil {
return "", errors.New("Invalid nodeid")
}
path, err := filepath.Abs(brick.Path)
if err != nil {
return "", err
}
brickSet[brick.PeerID+":"+path] = true
}

var subvolMap = make(map[string]int)
for _, subvol := range volinfo.Subvols {
for _, b := range subvol.Bricks {
if brickSet[b.PeerID.String()+":"+b.Path] {
if count, ok := subvolMap[subvol.Name]; !ok {
subvolMap[subvol.Name] = 1
} else {
subvolMap[subvol.Name] = count + 1
}
}
}
}

var base int
switch volinfo.Type {
case volume.Distribute:
base = 1
case volume.Replicate:
base = len(bricks)
case volume.DistReplicate:
base = volinfo.Subvols[0].ReplicaCount
default:
return "", errors.New("not implemented: " + volinfo.Type.String())
}

decommissioned := ""
for subvol, count := range subvolMap {
if count != base {
return "", errors.New("Wrong number of bricks in the subvolume")
}
decommissioned = decommissioned + subvol + " "
}

return decommissioned, nil
}
2 changes: 2 additions & 0 deletions glusterd2/volume/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
EventVolumeCreated Event = "volume.created"
// EventVolumeExpanded represents Volume Expand event
EventVolumeExpanded = "volume.expanded"
// EventVolumeShrink represents Volume Shrink event
EventVolumeShrink = "volume.shrink"
// EventVolumeStarted represents Volume Start event
EventVolumeStarted = "volume.started"
// EventVolumeStopped represents Volume Stop event
Expand Down
17 changes: 17 additions & 0 deletions pkg/api/brickutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,20 @@ func (req *VolExpandReq) Nodes() ([]uuid.UUID, error) {
}
return nodes, nil
}

// Nodes extracts list of Peer IDs from Volume Shrink request
func (req *VolShrinkReq) Nodes() ([]uuid.UUID, error) {
var nodesMap = make(map[string]bool)
var nodes []uuid.UUID
for _, brick := range req.Bricks {
if _, ok := nodesMap[brick.PeerID]; !ok {
nodesMap[brick.PeerID] = true
u := uuid.Parse(brick.PeerID)
if u == nil {
return nil, fmt.Errorf("Failed to parse peer ID: %s", brick.PeerID)
}
nodes = append(nodes, u)
}
}
return nodes, nil
}
5 changes: 5 additions & 0 deletions pkg/api/volume_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ type VolExpandReq struct {
Flags map[string]bool `json:"flags,omitempty"`
}

// VolShrinkReq represents a request to remove bricks from a volume
type VolShrinkReq struct {
Bricks []BrickReq `json:"bricks"`
}

// VolumeOption represents an option that is part of a profile
type VolumeOption struct {
Name string `json:"name"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/volume_resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type VolumeGetResp VolumeInfo
// VolumeExpandResp is the response sent for a volume expand request.
type VolumeExpandResp VolumeInfo

// VolumeShrinkResp is the response sent for a volume expand request.
type VolumeShrinkResp VolumeInfo

// VolumeStartResp is the response sent for a volume start request.
type VolumeStartResp VolumeInfo

Expand Down
2 changes: 1 addition & 1 deletion plugins/rebalance/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func createRebalanceInfo(volname string, req *rebalanceapi.StartReq) *rebalancea
RebalanceID: uuid.NewRandom(),
State: rebalanceapi.Started,
Cmd: getCmd(req),
CommitHash: setCommitHash(),
CommitHash: SetCommitHash(),
RebalStats: []rebalanceapi.RebalNodeStatus{},
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/rebalance/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
hash uint64
)

func setCommitHash() uint64 {
func SetCommitHash() uint64 {

/*
We need a commit hash that won't conflict with others we might have
Expand Down