Skip to content

Commit ce883aa

Browse files
Allow fuse implementation to download CAS artifacts directly to the filecache. (#8222)
If the workspace is backed by tmpfs, we can't download to the workspace and link the file into the filecache.
1 parent 2bdd7f5 commit ce883aa

File tree

6 files changed

+190
-29
lines changed

6 files changed

+190
-29
lines changed

enterprise/server/remote_execution/filecache/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"//proto:remote_execution_go_proto",
1515
"//server/interfaces",
1616
"//server/metrics",
17+
"//server/remote_cache/digest",
1718
"//server/util/claims",
1819
"//server/util/disk",
1920
"//server/util/fastcopy",
@@ -41,6 +42,7 @@ go_test(
4142
"//server/util/disk",
4243
"//server/util/hash",
4344
"//server/util/log",
45+
"//server/util/status",
4446
"//server/util/testing/flags",
4547
"@com_github_stretchr_testify//assert",
4648
"@com_github_stretchr_testify//require",

enterprise/server/remote_execution/filecache/filecache.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"hash"
89
"io/fs"
910
"os"
1011
"path/filepath"
@@ -15,6 +16,7 @@ import (
1516

1617
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
1718
"github.com/buildbuddy-io/buildbuddy/server/metrics"
19+
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
1820
"github.com/buildbuddy-io/buildbuddy/server/util/claims"
1921
"github.com/buildbuddy-io/buildbuddy/server/util/disk"
2022
"github.com/buildbuddy-io/buildbuddy/server/util/fastcopy"
@@ -442,6 +444,76 @@ func (c *fileCache) Write(ctx context.Context, node *repb.FileNode, b []byte) (n
442444
return n, nil
443445
}
444446

447+
type verifiedWriter struct {
448+
ctx context.Context
449+
fc *fileCache
450+
451+
csum hash.Hash
452+
node *repb.FileNode
453+
file *os.File
454+
}
455+
456+
func newVerifiedWriter(ctx context.Context, fc *fileCache, node *repb.FileNode, digestFunction repb.DigestFunction_Value, file *os.File) (*verifiedWriter, error) {
457+
csum, err := digest.HashForDigestType(digestFunction)
458+
if err != nil {
459+
return nil, err
460+
}
461+
return &verifiedWriter{
462+
ctx: ctx,
463+
fc: fc,
464+
csum: csum,
465+
node: node,
466+
file: file,
467+
}, nil
468+
}
469+
470+
func (v *verifiedWriter) Write(p []byte) (n int, err error) {
471+
if v.file == nil {
472+
return 0, status.FailedPreconditionError("writer is closed")
473+
}
474+
if _, err := v.csum.Write(p); err != nil {
475+
return 0, err
476+
}
477+
return v.file.Write(p)
478+
}
479+
480+
func (v *verifiedWriter) Commit() error {
481+
if v.file == nil {
482+
return status.FailedPreconditionError("writer is closed")
483+
}
484+
hashStr := fmt.Sprintf("%x", v.csum.Sum(nil))
485+
if v.node.GetDigest().GetHash() != hashStr {
486+
return status.DataLossErrorf("data checksum %q does not match expected checksum %q", hashStr, v.node.GetDigest().GetHash())
487+
}
488+
defer v.Close()
489+
return v.fc.AddFile(v.ctx, v.node, v.file.Name())
490+
}
491+
492+
func (v *verifiedWriter) Close() error {
493+
if v.file == nil {
494+
return nil
495+
}
496+
defer func() {
497+
if err := os.Remove(v.file.Name()); err != nil {
498+
log.Warningf("Failed to remove filecache temp file: %s", err)
499+
}
500+
v.file = nil
501+
}()
502+
return v.file.Close()
503+
}
504+
505+
func (c *fileCache) Writer(ctx context.Context, node *repb.FileNode, digestFunction repb.DigestFunction_Value) (interfaces.CommittedWriteCloser, error) {
506+
tmp, err := c.tempPath(node.GetDigest().GetHash())
507+
if err != nil {
508+
return nil, err
509+
}
510+
f, err := os.Create(tmp)
511+
if err != nil {
512+
return nil, status.InternalErrorf("filecache temp file creation failed: %s", err)
513+
}
514+
return newVerifiedWriter(ctx, c, node, digestFunction, f)
515+
}
516+
445517
func (c *fileCache) tempPath(name string) (string, error) {
446518
randStr, err := random.RandomString(10)
447519
if err != nil {

enterprise/server/remote_execution/filecache/filecache_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package filecache_test
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"io/fs"
78
"math/rand"
89
"os"
@@ -19,6 +20,7 @@ import (
1920
"github.com/buildbuddy-io/buildbuddy/server/util/disk"
2021
"github.com/buildbuddy-io/buildbuddy/server/util/hash"
2122
"github.com/buildbuddy-io/buildbuddy/server/util/log"
23+
"github.com/buildbuddy-io/buildbuddy/server/util/status"
2224
"github.com/buildbuddy-io/buildbuddy/server/util/testing/flags"
2325
"github.com/stretchr/testify/assert"
2426
"github.com/stretchr/testify/require"
@@ -35,7 +37,7 @@ func writeFile(t *testing.T, base string, path string, executable bool) {
3537
writeFileContent(t, base, path, content, executable)
3638
}
3739

38-
func writeFileContent(t *testing.T, base, path, content string, executable bool) {
40+
func writeFileContent(t *testing.T, base, path, content string, executable bool) string {
3941
mod := fs.FileMode(0644)
4042
if executable {
4143
mod = 0755
@@ -47,6 +49,7 @@ func writeFileContent(t *testing.T, base, path, content string, executable bool)
4749
if err := os.WriteFile(fullPath, []byte(content), mod); err != nil {
4850
t.Fatal(err)
4951
}
52+
return fullPath
5053
}
5154

5255
func fcRelativePath(group, h string) string {
@@ -430,6 +433,54 @@ func TestFileCacheEvictionAfterSubdirPrefixing(t *testing.T) {
430433
}
431434
}
432435

436+
func TestFileCacheWriter(t *testing.T) {
437+
ctx := context.Background()
438+
fcDir := testfs.MakeTempDir(t)
439+
// Create filecache
440+
fc, err := filecache.NewFileCache(fcDir, 100000, false)
441+
if err != nil {
442+
t.Fatal(err)
443+
}
444+
fc.WaitForDirectoryScanToComplete()
445+
446+
baseDir := testfs.MakeTempDir(t)
447+
448+
path := "my/fun/file"
449+
content := "hello"
450+
fullPath := writeFileContent(t, baseDir, path, content, false)
451+
d, err := digest.ComputeForFile(fullPath, repb.DigestFunction_BLAKE3)
452+
require.NoError(t, err)
453+
454+
node := &repb.FileNode{Digest: d}
455+
456+
w, err := fc.Writer(ctx, node, repb.DigestFunction_BLAKE3)
457+
require.NoError(t, err)
458+
_, err = w.Write([]byte("bad content"))
459+
require.NoError(t, err)
460+
err = w.Commit()
461+
require.Error(t, err)
462+
require.True(t, status.IsDataLossError(err))
463+
err = w.Close()
464+
require.NoError(t, err)
465+
466+
w, err = fc.Writer(ctx, node, repb.DigestFunction_BLAKE3)
467+
require.NoError(t, err)
468+
_, err = w.Write([]byte(content))
469+
require.NoError(t, err)
470+
err = w.Commit()
471+
require.NoError(t, err)
472+
err = w.Close()
473+
require.NoError(t, err)
474+
475+
f, err := fc.Open(ctx, node)
476+
require.NoError(t, err)
477+
rc, err := io.ReadAll(f)
478+
require.NoError(t, err)
479+
err = f.Close()
480+
require.NoError(t, err)
481+
require.Equal(t, content, string(rc))
482+
}
483+
433484
func BenchmarkFilecacheLink(b *testing.B) {
434485
ctx := context.TODO()
435486
flags.Set(b, "app.log_level", "warn")

enterprise/server/util/vfs_server/BUILD

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@ go_library(
1919
"@io_bazel_rules_go//go/platform:darwin_amd64": [
2020
"//enterprise/server/remote_execution/container",
2121
"//proto:remote_execution_go_proto",
22+
"//proto:resource_go_proto",
2223
"//proto:vfs_go_proto",
2324
"//server/cache/dirtools",
2425
"//server/environment",
26+
"//server/interfaces",
27+
"//server/remote_cache/cachetools",
2528
"//server/remote_cache/digest",
2629
"//server/util/alert",
30+
"//server/util/claims",
2731
"//server/util/grpc_server",
2832
"//server/util/log",
29-
"//server/util/random",
3033
"//server/util/status",
34+
"//third_party/singleflight",
3135
"@com_github_hanwen_go_fuse_v2//fs",
3236
"@com_github_hanwen_go_fuse_v2//fuse",
3337
"@org_golang_google_grpc//:grpc",
@@ -37,15 +41,19 @@ go_library(
3741
"@io_bazel_rules_go//go/platform:darwin_arm64": [
3842
"//enterprise/server/remote_execution/container",
3943
"//proto:remote_execution_go_proto",
44+
"//proto:resource_go_proto",
4045
"//proto:vfs_go_proto",
4146
"//server/cache/dirtools",
4247
"//server/environment",
48+
"//server/interfaces",
49+
"//server/remote_cache/cachetools",
4350
"//server/remote_cache/digest",
4451
"//server/util/alert",
52+
"//server/util/claims",
4553
"//server/util/grpc_server",
4654
"//server/util/log",
47-
"//server/util/random",
4855
"//server/util/status",
56+
"//third_party/singleflight",
4957
"@com_github_hanwen_go_fuse_v2//fs",
5058
"@com_github_hanwen_go_fuse_v2//fuse",
5159
"@org_golang_google_grpc//:grpc",
@@ -55,15 +63,19 @@ go_library(
5563
"@io_bazel_rules_go//go/platform:linux_amd64": [
5664
"//enterprise/server/remote_execution/container",
5765
"//proto:remote_execution_go_proto",
66+
"//proto:resource_go_proto",
5867
"//proto:vfs_go_proto",
5968
"//server/cache/dirtools",
6069
"//server/environment",
70+
"//server/interfaces",
71+
"//server/remote_cache/cachetools",
6172
"//server/remote_cache/digest",
6273
"//server/util/alert",
74+
"//server/util/claims",
6375
"//server/util/grpc_server",
6476
"//server/util/log",
65-
"//server/util/random",
6677
"//server/util/status",
78+
"//third_party/singleflight",
6779
"@com_github_hanwen_go_fuse_v2//fs",
6880
"@com_github_hanwen_go_fuse_v2//fuse",
6981
"@org_golang_google_grpc//:grpc",
@@ -74,15 +86,19 @@ go_library(
7486
"@io_bazel_rules_go//go/platform:linux_arm64": [
7587
"//enterprise/server/remote_execution/container",
7688
"//proto:remote_execution_go_proto",
89+
"//proto:resource_go_proto",
7790
"//proto:vfs_go_proto",
7891
"//server/cache/dirtools",
7992
"//server/environment",
93+
"//server/interfaces",
94+
"//server/remote_cache/cachetools",
8095
"//server/remote_cache/digest",
8196
"//server/util/alert",
97+
"//server/util/claims",
8298
"//server/util/grpc_server",
8399
"//server/util/log",
84-
"//server/util/random",
85100
"//server/util/status",
101+
"//third_party/singleflight",
86102
"@com_github_hanwen_go_fuse_v2//fs",
87103
"@com_github_hanwen_go_fuse_v2//fuse",
88104
"@org_golang_google_grpc//:grpc",

enterprise/server/util/vfs_server/vfs_server.go

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,21 @@ import (
1818
"github.com/buildbuddy-io/buildbuddy/enterprise/server/remote_execution/container"
1919
"github.com/buildbuddy-io/buildbuddy/server/cache/dirtools"
2020
"github.com/buildbuddy-io/buildbuddy/server/environment"
21+
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
22+
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/cachetools"
2123
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
2224
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
25+
"github.com/buildbuddy-io/buildbuddy/server/util/claims"
2326
"github.com/buildbuddy-io/buildbuddy/server/util/grpc_server"
2427
"github.com/buildbuddy-io/buildbuddy/server/util/log"
25-
"github.com/buildbuddy-io/buildbuddy/server/util/random"
2628
"github.com/buildbuddy-io/buildbuddy/server/util/status"
29+
"github.com/buildbuddy-io/buildbuddy/third_party/singleflight"
2730
"github.com/hanwen/go-fuse/v2/fuse"
2831
"google.golang.org/grpc"
2932
"google.golang.org/grpc/codes"
3033

3134
repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
35+
rspb "github.com/buildbuddy-io/buildbuddy/proto/resource"
3236
vfspb "github.com/buildbuddy-io/buildbuddy/proto/vfs"
3337
fusefs "github.com/hanwen/go-fuse/v2/fs"
3438
gstatus "google.golang.org/grpc/status"
@@ -277,9 +281,9 @@ type Server struct {
277281
nextId uint64
278282
nodes map[uint64]*fsNode
279283
internalTaskCtx context.Context
280-
fileFetcher *dirtools.BatchFileFetcher
281284
root *fsNode
282285
remoteInstanceName string
286+
digestFunction repb.DigestFunction_Value
283287
fileHandles map[uint64]*fileHandle
284288
}
285289

@@ -416,8 +420,9 @@ func (p *Server) Prepare(ctx context.Context, layout *container.FileSystemLayout
416420
p.mu.Lock()
417421
defer p.mu.Unlock()
418422
p.internalTaskCtx = ctx
423+
p.remoteInstanceName = layout.RemoteInstanceName
424+
p.digestFunction = layout.DigestFunction
419425
p.root = rootNode
420-
p.fileFetcher = dirtools.NewBatchFileFetcher(ctx, p.env, layout.RemoteInstanceName, layout.DigestFunction)
421426
return nil
422427
}
423428

@@ -658,36 +663,46 @@ func (p *Server) createFile(ctx context.Context, request *vfspb.CreateRequest, p
658663
return f, node, nil
659664
}
660665

666+
func groupIDStringFromContext(ctx context.Context) string {
667+
if c, err := claims.ClaimsFromContext(ctx); err == nil {
668+
return c.GroupID
669+
}
670+
return interfaces.AuthAnonymousUser
671+
}
672+
673+
var DownloadDeduper = singleflight.Group[string, struct{}]{}
674+
661675
func (p *Server) openCASFile(ctx context.Context, node *fsNode, flags uint32) (*os.File, error) {
662676
// If we can open the file directly from the file cache then use that.
663677
if f, err := p.env.GetFileCache().Open(ctx, node.fileNode); err == nil {
664678
return f, nil
665679
}
666680

667-
// If the file is not in the file cache, download it to the backing
668-
// directory and return a file handle to the backing file.
669-
localFileName, err := random.RandomString(16)
681+
dedupeKey := groupIDStringFromContext(ctx) + "-" + node.fileNode.GetDigest().GetHash()
682+
_, _, err := DownloadDeduper.Do(ctx, dedupeKey, func(ctx context.Context) (struct{}, error) {
683+
bsClient := p.env.GetByteStreamClient()
684+
rn := digest.NewResourceName(node.fileNode.GetDigest(), p.remoteInstanceName, rspb.CacheType_CAS, p.digestFunction)
685+
rn.SetCompressor(repb.Compressor_ZSTD)
686+
w, err := p.env.GetFileCache().Writer(p.taskCtx(), node.fileNode, p.digestFunction)
687+
if err != nil {
688+
return struct{}{}, err
689+
}
690+
defer w.Close()
691+
692+
if err := cachetools.GetBlob(p.taskCtx(), bsClient, rn, w); err != nil {
693+
return struct{}{}, err
694+
}
695+
696+
if err := w.Commit(); err != nil {
697+
return struct{}{}, err
698+
}
699+
return struct{}{}, nil
700+
})
670701
if err != nil {
671702
return nil, err
672703
}
673-
localFilePath := filepath.Join(p.workspacePath, localFileName)
674-
fileMap := dirtools.FileMap{
675-
digest.NewKey(node.fileNode.Digest): {&dirtools.FilePointer{
676-
FullPath: localFilePath,
677-
FileNode: node.fileNode,
678-
}},
679-
}
680-
if err := p.fileFetcher.FetchFiles(fileMap, &dirtools.DownloadTreeOpts{}); err != nil {
681-
return nil, err
682-
}
683-
f, err := os.OpenFile(localFilePath, int(flags), 0)
684-
if err != nil {
685-
return nil, syscallErrStatus(err)
686-
}
687-
node.mu.Lock()
688-
node.backingPath = localFilePath
689-
node.mu.Unlock()
690-
return f, nil
704+
705+
return p.env.GetFileCache().Open(ctx, node.fileNode)
691706
}
692707

693708
func (p *Server) Create(ctx context.Context, request *vfspb.CreateRequest) (*vfspb.CreateResponse, error) {

0 commit comments

Comments
 (0)