Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion internal/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,23 @@ gomock(
gomock(
name = "blobstore_sharding",
out = "blobstore_sharding.go",
interfaces = ["ShardPermuter"],
interfaces = ["ShardSelector"],
library = "//pkg/blobstore/sharding",
mockgen_model_library = "@org_uber_go_mock//mockgen/model",
mockgen_tool = "@org_uber_go_mock//mockgen",
package = "mock",
)

gomock(
name = "blobstore_legacy_sharding",
out = "blobstore_legacy_sharding.go",
interfaces = ["ShardPermuter"],
library = "//pkg/blobstore/sharding/legacy",
mockgen_model_library = "@org_uber_go_mock//mockgen/model",
mockgen_tool = "@org_uber_go_mock//mockgen",
package = "mock",
)

gomock(
name = "blobstore_slicing",
out = "blobstore_slicing.go",
Expand Down Expand Up @@ -357,6 +367,7 @@ go_library(
"aliases.go",
"auth.go",
"blobstore.go",
"blobstore_legacy_sharding.go",
"blobstore_local.go",
"blobstore_replication.go",
"blobstore_sharding.go",
Expand Down Expand Up @@ -391,6 +402,7 @@ go_library(
"//pkg/blobstore/buffer",
"//pkg/blobstore/local",
"//pkg/blobstore/sharding",
"//pkg/blobstore/sharding/legacy",
"//pkg/blobstore/slicing",
"//pkg/builder",
"//pkg/clock",
Expand Down
1 change: 1 addition & 0 deletions pkg/blobstore/configuration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/blobstore/readfallback",
"//pkg/blobstore/replication",
"//pkg/blobstore/sharding",
"//pkg/blobstore/sharding/legacy",
"//pkg/blockdevice",
"//pkg/capabilities",
"//pkg/clock",
Expand Down
94 changes: 73 additions & 21 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore/readcaching"
"github.com/buildbarn/bb-storage/pkg/blobstore/readfallback"
"github.com/buildbarn/bb-storage/pkg/blobstore/sharding"
"github.com/buildbarn/bb-storage/pkg/blobstore/sharding/legacy"
"github.com/buildbarn/bb-storage/pkg/blockdevice"
"github.com/buildbarn/bb-storage/pkg/clock"
"github.com/buildbarn/bb-storage/pkg/digest"
Expand Down Expand Up @@ -85,41 +86,92 @@ func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *
DigestKeyFormat: slow.DigestKeyFormat,
}, "read_caching", nil
case *pb.BlobAccessConfiguration_Sharding:
backends := make([]blobstore.BlobAccess, 0, len(backend.Sharding.Shards))
weights := make([]uint32, 0, len(backend.Sharding.Shards))
var combinedDigestKeyFormat *digest.KeyFormat
for _, shard := range backend.Sharding.Shards {
if shard.Backend == nil {
// Drained backend.
backends = append(backends, nil)
} else {
// Undrained backend.
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
if backend.Sharding.Legacy != nil {
backends := make([]blobstore.BlobAccess, 0, len(backend.Sharding.Legacy.ShardOrder))
weights := make([]uint32, 0, len(backend.Sharding.Legacy.ShardOrder))
var combinedDigestKeyFormat *digest.KeyFormat
for _, key := range backend.Sharding.Legacy.ShardOrder {
shard, exists := backend.Sharding.Shards[key]
if !exists {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Legacy sharding blob access refers to non-existing key %s", key)
}
backends = append(backends, backend.BlobAccess)
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
if shard.Backend == nil {
// Drained backend
backends = append(backends, nil)
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
// Undrained backend
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, backend.BlobAccess)
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}
}

if shard.Weight == 0 {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Shards must have positive weights")
}
weights = append(weights, shard.Weight)
}

if combinedDigestKeyFormat == nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any undrained backends")
}
return BlobAccessInfo{
BlobAccess: legacy.NewShardingBlobAccess(
backends,
legacy.NewWeightedShardPermuter(weights),
backend.Sharding.Legacy.HashInitialization,
),
DigestKeyFormat: *combinedDigestKeyFormat,
}, "sharding", nil
}
backends := make([]sharding.ShardBackend, 0, len(backend.Sharding.Shards))
shards := make([]sharding.Shard, 0, len(backend.Sharding.Shards))
keys := make([]string, 0, len(backend.Sharding.Shards))
var combinedDigestKeyFormat *digest.KeyFormat
for key, shard := range backend.Sharding.Shards {
if shard.Backend == nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Shard '%s' has an undefined backend, drained backends are only allowed when running in Legacy mode", key)
}
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backends = append(backends, sharding.ShardBackend{Backend: backend.BlobAccess, Key: key})
if combinedDigestKeyFormat == nil {
combinedDigestKeyFormat = &backend.DigestKeyFormat
} else {
newDigestKeyFormat := combinedDigestKeyFormat.Combine(backend.DigestKeyFormat)
combinedDigestKeyFormat = &newDigestKeyFormat
}

if shard.Weight == 0 {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Shards must have positive weights")
}
weights = append(weights, shard.Weight)
shards = append(shards, sharding.Shard{
Key: key,
Weight: shard.Weight,
})
keys = append(keys, key)
}
if combinedDigestKeyFormat == nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any undrained backends")
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Cannot create sharding blob access without any backends")
}
shardSelector, err := sharding.NewRendezvousShardSelector(shards)
if err != nil {
return BlobAccessInfo{}, "", status.Errorf(codes.InvalidArgument, "Could not create rendezvous shard selector")
}
return BlobAccessInfo{
BlobAccess: sharding.NewShardingBlobAccess(
backends,
sharding.NewWeightedShardPermuter(weights),
backend.Sharding.HashInitialization),
shardSelector,
),
DigestKeyFormat: *combinedDigestKeyFormat,
}, "sharding", nil
case *pb.BlobAccessConfiguration_Mirrored:
Expand Down
8 changes: 3 additions & 5 deletions pkg/blobstore/sharding/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "sharding",
srcs = [
"shard_permuter.go",
"rendezvous_shard_selector.go",
"shard_selector.go",
"sharding_blob_access.go",
"weighted_shard_permuter.go",
],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore/sharding",
visibility = ["//visibility:public"],
Expand All @@ -16,21 +16,19 @@ go_library(
"//pkg/digest",
"//pkg/util",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_lazybeaver_xorshift//:xorshift",
"@org_golang_x_sync//errgroup",
],
)

go_test(
name = "sharding_test",
srcs = [
"rendezvous_shard_selector_test.go",
"sharding_blob_access_test.go",
"weighted_shard_permuter_test.go",
],
deps = [
":sharding",
"//internal/mock",
"//pkg/blobstore",
"//pkg/blobstore/buffer",
"//pkg/digest",
"//pkg/testutil",
Expand Down
25 changes: 25 additions & 0 deletions pkg/blobstore/sharding/integration/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
load("@rules_go//go:def.bzl", "go_test")

go_test(
name = "integration",
srcs = ["benchmarking_integration_test.go"],
data = ["//cmd/bb_storage"],
deps = [
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
"@rules_go//go/runfiles",
],
)

go_test(
name = "integration_test",
srcs = ["benchmarking_integration_test.go"],
deps = [
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
"@rules_go//go/runfiles",
],
)
Loading