Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/blobstore/configuration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/blobstore/grpcclients",
"//pkg/blobstore/local",
"//pkg/blobstore/mirrored",
"//pkg/blobstore/quorum",
"//pkg/blobstore/readcaching",
"//pkg/blobstore/readfallback",
"//pkg/blobstore/replication",
Expand Down
30 changes: 30 additions & 0 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/local"
"github.com/buildbarn/bb-storage/pkg/blobstore/mirrored"
"github.com/buildbarn/bb-storage/pkg/blobstore/quorum"
"github.com/buildbarn/bb-storage/pkg/blobstore/readcaching"
"github.com/buildbarn/bb-storage/pkg/blobstore/readfallback"
"github.com/buildbarn/bb-storage/pkg/blobstore/sharding"
Expand Down Expand Up @@ -143,6 +144,35 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
BlobAccess: mirrored.NewMirroredBlobAccess(backendA.BlobAccess, backendB.BlobAccess, replicatorAToB, replicatorBToA),
DigestKeyFormat: backendA.DigestKeyFormat.Combine(backendB.DigestKeyFormat),
}, "mirrored", nil
case *pb.BlobAccessConfiguration_Quorum:
backends := make([]blobstore.BlobAccess, 0, len(backend.Quorum.Backends))
var combinedDigestKeyFormat *digest.KeyFormat

for _, b := range backend.Quorum.Backends {
backend, err := nc.NewNestedBlobAccess(b, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, backend.BlobAccess)
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}
}
if len(backends) == 0 {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create quorum blob access without any backends")
}
readQuorum := int(backend.Quorum.ReadQuorum)
writeQuorum := int(backend.Quorum.WriteQuorum)
if readQuorum + writeQuorum <= len(backends) {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Quorum blob access requires read_quorum + write_quorum > number of backends")
}
return BlobAccessInfo{
BlobAccess: quorum.NewQuorumBlobAccess(backends, readQuorum, writeQuorum),
DigestKeyFormat: *combinedDigestKeyFormat,
}, "quorum", nil
case *pb.BlobAccessConfiguration_Local:
digestKeyFormat := digest.KeyWithInstance
if !backend.Local.HierarchicalInstanceNames {
Expand Down
40 changes: 40 additions & 0 deletions pkg/blobstore/quorum/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "quorum",
srcs = ["quorum_blob_access.go"],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/quorum",
visibility = ["//visibility:public"],
deps = [
"//pkg/blobstore",
"//pkg/blobstore/buffer",
"//pkg/blobstore/replication",
"//pkg/blobstore/slicing",
"//pkg/digest",
"//pkg/random",
"//pkg/util",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
],
)

go_test(
name = "quorum_test",
srcs = ["quorum_blob_access_test.go"],
deps = [
":quorum",
"//internal/mock",
"//pkg/blobstore",
"//pkg/blobstore/buffer",
"//pkg/digest",
"//pkg/testutil",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_mock//gomock",
],
)
218 changes: 218 additions & 0 deletions pkg/blobstore/quorum/quorum_blob_access.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package quorum

import (
"context"
"sync"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/blobstore/slicing"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/random"
"github.com/buildbarn/bb-storage/pkg/util"
"github.com/prometheus/client_golang/prometheus"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
quorumBlobAccessPrometheusMetrics sync.Once

quorumBlobAccessFindMissingSynchronizations = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "buildbarn",
Subsystem: "blobstore",
Name: "quorum_blob_access_find_missing_synchronizations",
Help: "Number of blobs synchronized in FindMissing()",
Buckets: append([]float64{0}, prometheus.ExponentialBuckets(1.0, 2.0, 16)...),
},
[]string{"direction"})
)

type quorumBlobAccess struct {
backends []blobstore.BlobAccess
readQuorum int
writeQuorum int
generator random.ThreadSafeGenerator
}

// NewQuorumBlobAccess creates a BlobAccess that applies operations to a subset
// of storage backends, retrying on infrastructure errors. Read and write quorum
// size should be chosen so that they overlap by at least one backend.
// Note: Data is not replicated again after the original write.
func NewQuorumBlobAccess(backends []blobstore.BlobAccess, readQuorum, writeQuorum int) blobstore.BlobAccess {
quorumBlobAccessPrometheusMetrics.Do(func() {
prometheus.MustRegister(quorumBlobAccessFindMissingSynchronizations)
})

return &quorumBlobAccess{
backends: backends,
readQuorum: readQuorum,
writeQuorum: writeQuorum,
generator: random.FastThreadSafeGenerator,
}
}

func (ba *quorumBlobAccess) shuffledBackends() []blobstore.BlobAccess {
backends := make([]blobstore.BlobAccess, len(ba.backends))
copy(backends, ba.backends)

ba.generator.Shuffle(len(backends), func(i, j int) {
backends[i], backends[j] = backends[j], backends[i]
})

return backends
}

type getQuorumErrorHandler struct {
remainingBackends []blobstore.BlobAccess
remainingReads int
retry func(blobstore.BlobAccess) buffer.Buffer
}

func (eh *getQuorumErrorHandler) tryNextBackendOrError(err error) (buffer.Buffer, error) {
if len(eh.remainingBackends) > 0 {
nextBackend := eh.remainingBackends[0]
eh.remainingBackends = eh.remainingBackends[1:]
return eh.retry(nextBackend), nil
}
return nil, err
}

func (eh *getQuorumErrorHandler) OnError(err error) (buffer.Buffer, error) {
fallbackErr := status.Error(codes.Unavailable, "Too many backends unavailable")
if util.IsInfrastructureError(err) {
// I/O error. Try again on another backend.
return eh.tryNextBackendOrError(fallbackErr)

} else if status.Code(err) == codes.NotFound {
// Not found. Try again on another backend - if we haven't seen enough yet.
if eh.remainingReads <= 1 {
// Observed sufficient NotFounds. Return conclusive NotFound.
return nil, err
}
eh.remainingReads--

// Haven't been able to check enough backends. Can't conclude not found.
return eh.tryNextBackendOrError(fallbackErr)
}

return nil, err
}

func (eh getQuorumErrorHandler) Done() {}

func (ba *quorumBlobAccess) get(getter func(b blobstore.BlobAccess) buffer.Buffer) buffer.Buffer {
backends := ba.shuffledBackends()

backend := backends[0]
remainingBackends := backends[1:]

return buffer.WithErrorHandler(
getter(backend),
&getQuorumErrorHandler{
remainingBackends: remainingBackends,
remainingReads: ba.readQuorum,
retry: getter,
})
}

func (ba *quorumBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
return ba.get(func(b blobstore.BlobAccess) buffer.Buffer {
return b.Get(ctx, digest)
})
}

func (ba *quorumBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
return ba.get(func(b blobstore.BlobAccess) buffer.Buffer {
return b.GetFromComposite(ctx, parentDigest, childDigest, slicer)
})
}

func (ba *quorumBlobAccess) shuffledBackendQueue() <-chan blobstore.BlobAccess {
queue := make(chan blobstore.BlobAccess)
go func() error {
backends := ba.shuffledBackends()

for _, b := range backends {
queue <- b
}

close(queue)
return nil
}()
return queue
}

func (ba *quorumBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
// Store object in at least writeQuorum storage backends.
group, ctx := errgroup.WithContext(ctx)
backendQueue := ba.shuffledBackendQueue()

// Spawn writeQuorum writers. Each of these goroutines needs to succeed once.
for i := 0; i < ba.writeQuorum; i++ {
var b1 buffer.Buffer
if i == ba.writeQuorum-1 {
// Last writer, no need to clone buffer
b1 = b
} else {
b, b1 = b.CloneStream()
}

group.Go(func() error {
var err error
for backend := range backendQueue {
err = backend.Put(ctx, digest, b1)
if err == nil {
// Success
return nil
}
}
return err
})
}

return group.Wait()
}

func (ba *quorumBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
// Call FindMissing() on readQuorum backends.
group, ctx := errgroup.WithContext(ctx)
backendQueue := ba.shuffledBackendQueue()

results := make([]digest.Set, ba.readQuorum)
for i := 0; i < ba.readQuorum; i++ {
resultIdx := i
group.Go(func() error {
var err error
for backend := range backendQueue {
results[resultIdx], err = backend.FindMissing(ctx, digests)
if err == nil {
// Success
return nil
}
}
return err
})
}

if err := group.Wait(); err != nil {
return digest.EmptySet, err
}

// Find intersection of all results
missingFromAll := results[0]
for _, result := range results[1:] {
_, missingFromAll, _ = digest.GetDifferenceAndIntersection(missingFromAll, result)
}
return missingFromAll, nil
}

func (ba *quorumBlobAccess) GetCapabilities(ctx context.Context, instanceName digest.InstanceName) (*remoteexecution.ServerCapabilities, error) {
backends := ba.shuffledBackends()

return backends[0].GetCapabilities(ctx, instanceName)
}
Loading