diff --git a/.mockery.yaml b/.mockery.yaml index 60434ff4dd..cff1a1a4a6 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -9,8 +9,13 @@ packages: github.com/e2b-dev/infra/packages/shared/pkg/storage: interfaces: + StorageProvider: + config: + dir: packages/shared/pkg/storage + filename: mockstorageprovider_test.go + pkgname: storage StorageObjectProvider: config: - dir: packages/shared/pkg/storage/mocks - filename: mocks.go - pkgname: storagemocks + dir: packages/shared/pkg/storage + filename: mockstorageobjectprovider_test.go + pkgname: storage diff --git a/packages/orchestrator/benchmark_test.go b/packages/orchestrator/benchmark_test.go index 3b4c18fd7e..4989a97e66 100644 --- a/packages/orchestrator/benchmark_test.go +++ b/packages/orchestrator/benchmark_test.go @@ -243,6 +243,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) { sandboxes, templateCache, buildMetrics, + featureFlags, ) buildPath := filepath.Join(os.Getenv("LOCAL_TEMPLATE_STORAGE_BASE_PATH"), buildID, "rootfs.ext4") diff --git a/packages/orchestrator/cmd/build-template/main.go b/packages/orchestrator/cmd/build-template/main.go index 1b56d94986..50d652e9fe 100644 --- a/packages/orchestrator/cmd/build-template/main.go +++ b/packages/orchestrator/cmd/build-template/main.go @@ -199,6 +199,7 @@ func buildTemplate( sandboxes, templateCache, buildMetrics, + featureFlags, ) logger = logger. diff --git a/packages/orchestrator/internal/template/build/builder.go b/packages/orchestrator/internal/template/build/builder.go index 09f7830a01..724bafae28 100644 --- a/packages/orchestrator/internal/template/build/builder.go +++ b/packages/orchestrator/internal/template/build/builder.go @@ -32,6 +32,8 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/template/constants" artifactsregistry "github.com/e2b-dev/infra/packages/shared/pkg/artifacts-registry" "github.com/e2b-dev/infra/packages/shared/pkg/dockerhub" + "github.com/e2b-dev/infra/packages/shared/pkg/env" + featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" @@ -55,6 +57,9 @@ type Builder struct { sandboxes *sandbox.Map templateCache *sbxtemplate.Cache metrics *metrics.BuildMetrics + flags *featureflags.Client + + rootCachePath string } func NewBuilder( @@ -68,6 +73,7 @@ func NewBuilder( sandboxes *sandbox.Map, templateCache *sbxtemplate.Cache, buildMetrics *metrics.BuildMetrics, + flags *featureflags.Client, ) *Builder { return &Builder{ logger: logger, @@ -80,6 +86,8 @@ func NewBuilder( sandboxes: sandboxes, templateCache: templateCache, metrics: buildMetrics, + flags: flags, + rootCachePath: env.GetEnv("SHARED_CHUNK_CACHE_PATH", ""), } } @@ -196,13 +204,32 @@ func (b *Builder) Build(ctx context.Context, template storage.TemplateFiles, cfg return runBuild(ctx, logger, buildContext, b) } +func (b *Builder) useNFSCache(ctx context.Context) bool { + if b.rootCachePath == "" { + // can't enable cache if we don't have a cache path + return false + } + + flag, err := b.flags.BoolFlag(ctx, featureflags.BuildingFeatureFlagName) + if err != nil { + zap.L().Error("failed to get nfs cache feature flag", zap.Error(err)) + } + + return flag +} + func runBuild( ctx context.Context, userLogger *zap.Logger, bc buildcontext.BuildContext, builder *Builder, ) (*Result, error) { - index := cache.NewHashIndex(bc.CacheScope, builder.buildStorage, builder.templateStorage) + templateStorage := builder.templateStorage + if builder.useNFSCache(ctx) { + templateStorage = storage.NewCachedProvider(builder.rootCachePath, templateStorage) + } + + index := cache.NewHashIndex(bc.CacheScope, builder.buildStorage, templateStorage) layerExecutor := layer.NewLayerExecutor( bc, @@ -210,7 +237,7 @@ func runBuild( builder.templateCache, builder.proxy, builder.sandboxes, - builder.templateStorage, + templateStorage, builder.buildStorage, index, ) @@ -219,7 +246,7 @@ func runBuild( bc, builder.logger, builder.proxy, - builder.templateStorage, + templateStorage, builder.artifactRegistry, builder.dockerhubRepository, layerExecutor, @@ -261,7 +288,7 @@ func runBuild( postProcessingBuilder := finalize.New( bc, builder.sandboxFactory, - builder.templateStorage, + templateStorage, builder.proxy, layerExecutor, ) @@ -295,7 +322,7 @@ func runBuild( // Get the base rootfs size from the template files // This is the size of the rootfs after provisioning and before building the layers // (as they don't change the rootfs size) - rootfsSize, err := getRootfsSize(ctx, builder.templateStorage, lastLayerResult.Metadata.Template) + rootfsSize, err := getRootfsSize(ctx, templateStorage, lastLayerResult.Metadata.Template) if err != nil { return nil, fmt.Errorf("error getting rootfs size: %w", err) } diff --git a/packages/orchestrator/internal/template/server/main.go b/packages/orchestrator/internal/template/server/main.go index 236e5c5f5a..639edaa3f6 100644 --- a/packages/orchestrator/internal/template/server/main.go +++ b/packages/orchestrator/internal/template/server/main.go @@ -20,6 +20,7 @@ import ( artifactsregistry "github.com/e2b-dev/infra/packages/shared/pkg/artifacts-registry" "github.com/e2b-dev/infra/packages/shared/pkg/dockerhub" "github.com/e2b-dev/infra/packages/shared/pkg/env" + featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags" templatemanager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager" "github.com/e2b-dev/infra/packages/shared/pkg/limit" "github.com/e2b-dev/infra/packages/shared/pkg/storage" @@ -58,6 +59,7 @@ func New( templatePersistence storage.StorageProvider, limiter *limit.Limiter, info *service.ServiceInfo, + flags *featureflags.Client, ) (s *ServerStore, e error) { logger.Info("Initializing template manager") @@ -107,6 +109,7 @@ func New( sandboxes, templateCache, buildMetrics, + flags, ) store := &ServerStore{ diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 75cd38117b..42041eb8c6 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -416,6 +416,7 @@ func run(config cfg.Config) (success bool) { persistence, limiter, serviceInfo, + featureFlags, ) if err != nil { zap.L().Fatal("failed to create template manager", zap.Error(err)) diff --git a/packages/shared/pkg/feature-flags/flags.go b/packages/shared/pkg/feature-flags/flags.go index 74d09dd2af..f90dd46da6 100644 --- a/packages/shared/pkg/feature-flags/flags.go +++ b/packages/shared/pkg/feature-flags/flags.go @@ -49,6 +49,7 @@ var ( SandboxLifeCycleEventsWriteFlagName = newBoolFlag("sandbox-lifecycle-events-write", env.IsDevelopment()) SnapshotFeatureFlagName = newBoolFlag("use-nfs-for-snapshots", env.IsDevelopment()) TemplateFeatureFlagName = newBoolFlag("use-nfs-for-templates", env.IsDevelopment()) + BuildingFeatureFlagName = newBoolFlag("use-nfs-for-building-templates", env.IsDevelopment()) SandboxEventsPublishFlagName = newBoolFlag("sandbox-events-publish", env.IsDevelopment()) BestOfKPlacementAlgorithm = newBoolFlag("best-of-k-placement-algorithm", env.IsDevelopment()) BestOfKCanFit = newBoolFlag("best-of-k-can-fit", true) diff --git a/packages/shared/pkg/storage/mockstorageobjectprovider_test.go b/packages/shared/pkg/storage/mockstorageobjectprovider_test.go new file mode 100644 index 0000000000..744a3ddb2a --- /dev/null +++ b/packages/shared/pkg/storage/mockstorageobjectprovider_test.go @@ -0,0 +1,411 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package storage + +import ( + "context" + "io" + + mock "github.com/stretchr/testify/mock" +) + +// NewMockStorageObjectProvider creates a new instance of MockStorageObjectProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockStorageObjectProvider(t interface { + mock.TestingT + Cleanup(func()) +}) *MockStorageObjectProvider { + mock := &MockStorageObjectProvider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockStorageObjectProvider is an autogenerated mock type for the StorageObjectProvider type +type MockStorageObjectProvider struct { + mock.Mock +} + +type MockStorageObjectProvider_Expecter struct { + mock *mock.Mock +} + +func (_m *MockStorageObjectProvider) EXPECT() *MockStorageObjectProvider_Expecter { + return &MockStorageObjectProvider_Expecter{mock: &_m.Mock} +} + +// Delete provides a mock function for the type MockStorageObjectProvider +func (_mock *MockStorageObjectProvider) Delete(ctx context.Context) error { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Delete") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStorageObjectProvider_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete' +type MockStorageObjectProvider_Delete_Call struct { + *mock.Call +} + +// Delete is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockStorageObjectProvider_Expecter) Delete(ctx interface{}) *MockStorageObjectProvider_Delete_Call { + return &MockStorageObjectProvider_Delete_Call{Call: _e.mock.On("Delete", ctx)} +} + +func (_c *MockStorageObjectProvider_Delete_Call) Run(run func(ctx context.Context)) *MockStorageObjectProvider_Delete_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockStorageObjectProvider_Delete_Call) Return(err error) *MockStorageObjectProvider_Delete_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStorageObjectProvider_Delete_Call) RunAndReturn(run func(ctx context.Context) error) *MockStorageObjectProvider_Delete_Call { + _c.Call.Return(run) + return _c +} + +// ReadAt provides a mock function for the type MockStorageObjectProvider +func (_mock *MockStorageObjectProvider) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { + ret := _mock.Called(ctx, p, off) + + if len(ret) == 0 { + panic("no return value specified for ReadAt") + } + + var r0 int + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte, int64) (int, error)); ok { + return returnFunc(ctx, p, off) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte, int64) int); ok { + r0 = returnFunc(ctx, p, off) + } else { + r0 = ret.Get(0).(int) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte, int64) error); ok { + r1 = returnFunc(ctx, p, off) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockStorageObjectProvider_ReadAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadAt' +type MockStorageObjectProvider_ReadAt_Call struct { + *mock.Call +} + +// ReadAt is a helper method to define mock.On call +// - ctx context.Context +// - p []byte +// - off int64 +func (_e *MockStorageObjectProvider_Expecter) ReadAt(ctx interface{}, p interface{}, off interface{}) *MockStorageObjectProvider_ReadAt_Call { + return &MockStorageObjectProvider_ReadAt_Call{Call: _e.mock.On("ReadAt", ctx, p, off)} +} + +func (_c *MockStorageObjectProvider_ReadAt_Call) Run(run func(ctx context.Context, p []byte, off int64)) *MockStorageObjectProvider_ReadAt_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + var arg2 int64 + if args[2] != nil { + arg2 = args[2].(int64) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockStorageObjectProvider_ReadAt_Call) Return(n int, err error) *MockStorageObjectProvider_ReadAt_Call { + _c.Call.Return(n, err) + return _c +} + +func (_c *MockStorageObjectProvider_ReadAt_Call) RunAndReturn(run func(ctx context.Context, p []byte, off int64) (int, error)) *MockStorageObjectProvider_ReadAt_Call { + _c.Call.Return(run) + return _c +} + +// Size provides a mock function for the type MockStorageObjectProvider +func (_mock *MockStorageObjectProvider) Size(ctx context.Context) (int64, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Size") + } + + var r0 int64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) int64); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(int64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockStorageObjectProvider_Size_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Size' +type MockStorageObjectProvider_Size_Call struct { + *mock.Call +} + +// Size is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockStorageObjectProvider_Expecter) Size(ctx interface{}) *MockStorageObjectProvider_Size_Call { + return &MockStorageObjectProvider_Size_Call{Call: _e.mock.On("Size", ctx)} +} + +func (_c *MockStorageObjectProvider_Size_Call) Run(run func(ctx context.Context)) *MockStorageObjectProvider_Size_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockStorageObjectProvider_Size_Call) Return(n int64, err error) *MockStorageObjectProvider_Size_Call { + _c.Call.Return(n, err) + return _c +} + +func (_c *MockStorageObjectProvider_Size_Call) RunAndReturn(run func(ctx context.Context) (int64, error)) *MockStorageObjectProvider_Size_Call { + _c.Call.Return(run) + return _c +} + +// Write provides a mock function for the type MockStorageObjectProvider +func (_mock *MockStorageObjectProvider) Write(ctx context.Context, p []byte) (int, error) { + ret := _mock.Called(ctx, p) + + if len(ret) == 0 { + panic("no return value specified for Write") + } + + var r0 int + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) (int, error)); ok { + return returnFunc(ctx, p) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) int); ok { + r0 = returnFunc(ctx, p) + } else { + r0 = ret.Get(0).(int) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte) error); ok { + r1 = returnFunc(ctx, p) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockStorageObjectProvider_Write_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Write' +type MockStorageObjectProvider_Write_Call struct { + *mock.Call +} + +// Write is a helper method to define mock.On call +// - ctx context.Context +// - p []byte +func (_e *MockStorageObjectProvider_Expecter) Write(ctx interface{}, p interface{}) *MockStorageObjectProvider_Write_Call { + return &MockStorageObjectProvider_Write_Call{Call: _e.mock.On("Write", ctx, p)} +} + +func (_c *MockStorageObjectProvider_Write_Call) Run(run func(ctx context.Context, p []byte)) *MockStorageObjectProvider_Write_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStorageObjectProvider_Write_Call) Return(n int, err error) *MockStorageObjectProvider_Write_Call { + _c.Call.Return(n, err) + return _c +} + +func (_c *MockStorageObjectProvider_Write_Call) RunAndReturn(run func(ctx context.Context, p []byte) (int, error)) *MockStorageObjectProvider_Write_Call { + _c.Call.Return(run) + return _c +} + +// WriteFromFileSystem provides a mock function for the type MockStorageObjectProvider +func (_mock *MockStorageObjectProvider) WriteFromFileSystem(ctx context.Context, path string) error { + ret := _mock.Called(ctx, path) + + if len(ret) == 0 { + panic("no return value specified for WriteFromFileSystem") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = returnFunc(ctx, path) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStorageObjectProvider_WriteFromFileSystem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteFromFileSystem' +type MockStorageObjectProvider_WriteFromFileSystem_Call struct { + *mock.Call +} + +// WriteFromFileSystem is a helper method to define mock.On call +// - ctx context.Context +// - path string +func (_e *MockStorageObjectProvider_Expecter) WriteFromFileSystem(ctx interface{}, path interface{}) *MockStorageObjectProvider_WriteFromFileSystem_Call { + return &MockStorageObjectProvider_WriteFromFileSystem_Call{Call: _e.mock.On("WriteFromFileSystem", ctx, path)} +} + +func (_c *MockStorageObjectProvider_WriteFromFileSystem_Call) Run(run func(ctx context.Context, path string)) *MockStorageObjectProvider_WriteFromFileSystem_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStorageObjectProvider_WriteFromFileSystem_Call) Return(err error) *MockStorageObjectProvider_WriteFromFileSystem_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStorageObjectProvider_WriteFromFileSystem_Call) RunAndReturn(run func(ctx context.Context, path string) error) *MockStorageObjectProvider_WriteFromFileSystem_Call { + _c.Call.Return(run) + return _c +} + +// WriteTo provides a mock function for the type MockStorageObjectProvider +func (_mock *MockStorageObjectProvider) WriteTo(ctx context.Context, w io.Writer) (int64, error) { + ret := _mock.Called(ctx, w) + + if len(ret) == 0 { + panic("no return value specified for WriteTo") + } + + var r0 int64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, io.Writer) (int64, error)); ok { + return returnFunc(ctx, w) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, io.Writer) int64); ok { + r0 = returnFunc(ctx, w) + } else { + r0 = ret.Get(0).(int64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, io.Writer) error); ok { + r1 = returnFunc(ctx, w) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockStorageObjectProvider_WriteTo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteTo' +type MockStorageObjectProvider_WriteTo_Call struct { + *mock.Call +} + +// WriteTo is a helper method to define mock.On call +// - ctx context.Context +// - w io.Writer +func (_e *MockStorageObjectProvider_Expecter) WriteTo(ctx interface{}, w interface{}) *MockStorageObjectProvider_WriteTo_Call { + return &MockStorageObjectProvider_WriteTo_Call{Call: _e.mock.On("WriteTo", ctx, w)} +} + +func (_c *MockStorageObjectProvider_WriteTo_Call) Run(run func(ctx context.Context, w io.Writer)) *MockStorageObjectProvider_WriteTo_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 io.Writer + if args[1] != nil { + arg1 = args[1].(io.Writer) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStorageObjectProvider_WriteTo_Call) Return(n int64, err error) *MockStorageObjectProvider_WriteTo_Call { + _c.Call.Return(n, err) + return _c +} + +func (_c *MockStorageObjectProvider_WriteTo_Call) RunAndReturn(run func(ctx context.Context, w io.Writer) (int64, error)) *MockStorageObjectProvider_WriteTo_Call { + _c.Call.Return(run) + return _c +} diff --git a/packages/shared/pkg/storage/mockstorageprovider_test.go b/packages/shared/pkg/storage/mockstorageprovider_test.go new file mode 100644 index 0000000000..d8e638e696 --- /dev/null +++ b/packages/shared/pkg/storage/mockstorageprovider_test.go @@ -0,0 +1,280 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package storage + +import ( + "context" + "time" + + mock "github.com/stretchr/testify/mock" +) + +// NewMockStorageProvider creates a new instance of MockStorageProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockStorageProvider(t interface { + mock.TestingT + Cleanup(func()) +}) *MockStorageProvider { + mock := &MockStorageProvider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockStorageProvider is an autogenerated mock type for the StorageProvider type +type MockStorageProvider struct { + mock.Mock +} + +type MockStorageProvider_Expecter struct { + mock *mock.Mock +} + +func (_m *MockStorageProvider) EXPECT() *MockStorageProvider_Expecter { + return &MockStorageProvider_Expecter{mock: &_m.Mock} +} + +// DeleteObjectsWithPrefix provides a mock function for the type MockStorageProvider +func (_mock *MockStorageProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error { + ret := _mock.Called(ctx, prefix) + + if len(ret) == 0 { + panic("no return value specified for DeleteObjectsWithPrefix") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = returnFunc(ctx, prefix) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStorageProvider_DeleteObjectsWithPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteObjectsWithPrefix' +type MockStorageProvider_DeleteObjectsWithPrefix_Call struct { + *mock.Call +} + +// DeleteObjectsWithPrefix is a helper method to define mock.On call +// - ctx context.Context +// - prefix string +func (_e *MockStorageProvider_Expecter) DeleteObjectsWithPrefix(ctx interface{}, prefix interface{}) *MockStorageProvider_DeleteObjectsWithPrefix_Call { + return &MockStorageProvider_DeleteObjectsWithPrefix_Call{Call: _e.mock.On("DeleteObjectsWithPrefix", ctx, prefix)} +} + +func (_c *MockStorageProvider_DeleteObjectsWithPrefix_Call) Run(run func(ctx context.Context, prefix string)) *MockStorageProvider_DeleteObjectsWithPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStorageProvider_DeleteObjectsWithPrefix_Call) Return(err error) *MockStorageProvider_DeleteObjectsWithPrefix_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStorageProvider_DeleteObjectsWithPrefix_Call) RunAndReturn(run func(ctx context.Context, prefix string) error) *MockStorageProvider_DeleteObjectsWithPrefix_Call { + _c.Call.Return(run) + return _c +} + +// GetDetails provides a mock function for the type MockStorageProvider +func (_mock *MockStorageProvider) GetDetails() string { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for GetDetails") + } + + var r0 string + if returnFunc, ok := ret.Get(0).(func() string); ok { + r0 = returnFunc() + } else { + r0 = ret.Get(0).(string) + } + return r0 +} + +// MockStorageProvider_GetDetails_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDetails' +type MockStorageProvider_GetDetails_Call struct { + *mock.Call +} + +// GetDetails is a helper method to define mock.On call +func (_e *MockStorageProvider_Expecter) GetDetails() *MockStorageProvider_GetDetails_Call { + return &MockStorageProvider_GetDetails_Call{Call: _e.mock.On("GetDetails")} +} + +func (_c *MockStorageProvider_GetDetails_Call) Run(run func()) *MockStorageProvider_GetDetails_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStorageProvider_GetDetails_Call) Return(s string) *MockStorageProvider_GetDetails_Call { + _c.Call.Return(s) + return _c +} + +func (_c *MockStorageProvider_GetDetails_Call) RunAndReturn(run func() string) *MockStorageProvider_GetDetails_Call { + _c.Call.Return(run) + return _c +} + +// OpenObject provides a mock function for the type MockStorageProvider +func (_mock *MockStorageProvider) OpenObject(ctx context.Context, path string) (StorageObjectProvider, error) { + ret := _mock.Called(ctx, path) + + if len(ret) == 0 { + panic("no return value specified for OpenObject") + } + + var r0 StorageObjectProvider + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) (StorageObjectProvider, error)); ok { + return returnFunc(ctx, path) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, string) StorageObjectProvider); ok { + r0 = returnFunc(ctx, path) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(StorageObjectProvider) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = returnFunc(ctx, path) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockStorageProvider_OpenObject_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OpenObject' +type MockStorageProvider_OpenObject_Call struct { + *mock.Call +} + +// OpenObject is a helper method to define mock.On call +// - ctx context.Context +// - path string +func (_e *MockStorageProvider_Expecter) OpenObject(ctx interface{}, path interface{}) *MockStorageProvider_OpenObject_Call { + return &MockStorageProvider_OpenObject_Call{Call: _e.mock.On("OpenObject", ctx, path)} +} + +func (_c *MockStorageProvider_OpenObject_Call) Run(run func(ctx context.Context, path string)) *MockStorageProvider_OpenObject_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStorageProvider_OpenObject_Call) Return(storageObjectProvider StorageObjectProvider, err error) *MockStorageProvider_OpenObject_Call { + _c.Call.Return(storageObjectProvider, err) + return _c +} + +func (_c *MockStorageProvider_OpenObject_Call) RunAndReturn(run func(ctx context.Context, path string) (StorageObjectProvider, error)) *MockStorageProvider_OpenObject_Call { + _c.Call.Return(run) + return _c +} + +// UploadSignedURL provides a mock function for the type MockStorageProvider +func (_mock *MockStorageProvider) UploadSignedURL(ctx context.Context, path string, ttl time.Duration) (string, error) { + ret := _mock.Called(ctx, path, ttl) + + if len(ret) == 0 { + panic("no return value specified for UploadSignedURL") + } + + var r0 string + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string, time.Duration) (string, error)); ok { + return returnFunc(ctx, path, ttl) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, string, time.Duration) string); ok { + r0 = returnFunc(ctx, path, ttl) + } else { + r0 = ret.Get(0).(string) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, string, time.Duration) error); ok { + r1 = returnFunc(ctx, path, ttl) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockStorageProvider_UploadSignedURL_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UploadSignedURL' +type MockStorageProvider_UploadSignedURL_Call struct { + *mock.Call +} + +// UploadSignedURL is a helper method to define mock.On call +// - ctx context.Context +// - path string +// - ttl time.Duration +func (_e *MockStorageProvider_Expecter) UploadSignedURL(ctx interface{}, path interface{}, ttl interface{}) *MockStorageProvider_UploadSignedURL_Call { + return &MockStorageProvider_UploadSignedURL_Call{Call: _e.mock.On("UploadSignedURL", ctx, path, ttl)} +} + +func (_c *MockStorageProvider_UploadSignedURL_Call) Run(run func(ctx context.Context, path string, ttl time.Duration)) *MockStorageProvider_UploadSignedURL_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + var arg2 time.Duration + if args[2] != nil { + arg2 = args[2].(time.Duration) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockStorageProvider_UploadSignedURL_Call) Return(s string, err error) *MockStorageProvider_UploadSignedURL_Call { + _c.Call.Return(s, err) + return _c +} + +func (_c *MockStorageProvider_UploadSignedURL_Call) RunAndReturn(run func(ctx context.Context, path string, ttl time.Duration) (string, error)) *MockStorageProvider_UploadSignedURL_Call { + _c.Call.Return(run) + return _c +} diff --git a/packages/shared/pkg/storage/storage_cache.go b/packages/shared/pkg/storage/storage_cache.go index 3c57c8b6da..18136513c0 100644 --- a/packages/shared/pkg/storage/storage_cache.go +++ b/packages/shared/pkg/storage/storage_cache.go @@ -9,11 +9,13 @@ import ( "os" "path/filepath" "strconv" + "sync" "time" "github.com/google/uuid" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -23,8 +25,15 @@ import ( ) const ( - cacheFilePermissions = 0o600 - cacheDirPermissions = 0o700 + cacheFilePermissions = 0o600 + cacheDirPermissions = 0o700 + maxCacheWriterConcurrency = 10 +) + +var ( + cacheOpWriteTo = attribute.String("cache_op", "write_to") + cacheOpReadAt = attribute.String("cache_op", "read_at") + cacheOpSize = attribute.String("cache_op", "size") ) var ( @@ -48,18 +57,20 @@ var ( ) type CachedProvider struct { - rootPath string chunkSize int64 inner StorageProvider + rootPath string } var _ StorageProvider = (*CachedProvider)(nil) -func NewCachedProvider(rootPath string, inner StorageProvider) *CachedProvider { - return &CachedProvider{rootPath: rootPath, inner: inner, chunkSize: MemoryChunkSize} +func NewCachedProvider(cacheRootPath string, inner StorageProvider) *CachedProvider { + return &CachedProvider{rootPath: cacheRootPath, inner: inner, chunkSize: MemoryChunkSize} } func (c CachedProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error { + go c.deleteObjectsWithPrefix(prefix) + return c.inner.DeleteObjectsWithPrefix(ctx, prefix) } @@ -86,6 +97,16 @@ func (c CachedProvider) GetDetails() string { c.rootPath, c.inner.GetDetails()) } +func (c CachedProvider) deleteObjectsWithPrefix(prefix string) { + fullPrefix := filepath.Join(c.rootPath, prefix) + if err := os.RemoveAll(fullPrefix); err != nil { + zap.L().Error("failed to remove object with prefix", + zap.String("prefix", prefix), + zap.String("path", fullPrefix), + zap.Error(err)) + } +} + type CachedFileObjectProvider struct { path string chunkSize int64 @@ -94,32 +115,73 @@ type CachedFileObjectProvider struct { var _ StorageObjectProvider = (*CachedFileObjectProvider)(nil) -// WriteTo is used for very small files and we can check against their size to ensure the content is valid. -func (c *CachedFileObjectProvider) WriteTo(ctx context.Context, dst io.Writer) (int64, error) { +// WriteTo is used for very small files, and we can check against their size to ensure the content is valid. +func (c *CachedFileObjectProvider) WriteTo(ctx context.Context, dst io.Writer) (written int64, err error) { ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.WriteTo") defer span.End() if bytesRead, ok := c.copyFullFileFromCache(ctx, dst); ok { + cacheHits.Add(ctx, 1, metric.WithAttributes(cacheOpWriteTo)) return bytesRead, nil } + cacheMisses.Add(ctx, 1, metric.WithAttributes(cacheOpWriteTo)) return c.readAndCacheFullRemoteFile(ctx, dst) } -func (c *CachedFileObjectProvider) WriteFromFileSystem(ctx context.Context, path string) error { - return c.inner.WriteFromFileSystem(ctx, path) +func (c *CachedFileObjectProvider) WriteFromFileSystem(ctx context.Context, path string) (err error) { + ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.WriteFromFileSystem", + trace.WithAttributes(attribute.String("path", path))) + defer func() { + recordError(span, err) + span.End() + }() + + // write the file to the disk and the remote system at the same time. + // this opens the file twice, but the API makes it difficult to use a MultiWriter + + go func() { + if err := c.createCacheBlocksFromFile(context.WithoutCancel(ctx), path); err != nil { + zap.L().Error("failed to create cache blocks from file", + zap.String("path", path), + zap.Error(err), + ) + } + }() + + if err := c.inner.WriteFromFileSystem(ctx, path); err != nil { + return fmt.Errorf("failed to write to remote storage: %w", err) + } + + return nil } -func (c *CachedFileObjectProvider) Write(ctx context.Context, src []byte) (int, error) { - return c.inner.Write(ctx, src) +func (c *CachedFileObjectProvider) Write(ctx context.Context, src []byte) (num int, err error) { + ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.Write", trace.WithAttributes(attribute.Int("size", len(src)))) + defer func() { + recordError(span, err) + span.End() + }() + + num, err = c.writeCacheAndRemote(ctx, src) + if err != nil { + return 0, err + } else if num != len(src) { + return 0, fmt.Errorf("expected %d bytes, only got %d bytes", len(src), num) + } + + return num, nil } -func (c *CachedFileObjectProvider) ReadAt(ctx context.Context, buff []byte, offset int64) (int, error) { +func (c *CachedFileObjectProvider) ReadAt(ctx context.Context, buff []byte, offset int64) (readCount int, err error) { ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.ReadAt", trace.WithAttributes( attribute.Int64("offset", offset), attribute.Int("buff_len", len(buff)), )) - defer span.End() + defer func() { + recordError(span, err) + span.End() + }() if err := c.validateReadAtParams(int64(len(buff)), offset); err != nil { return 0, err @@ -131,11 +193,12 @@ func (c *CachedFileObjectProvider) ReadAt(ctx context.Context, buff []byte, offs readTimer := cacheReadTimerFactory.Begin() count, err := c.readAtFromCache(chunkPath, buff) if ignoreEOF(err) == nil { - cacheHits.Add(ctx, 1) + cacheHits.Add(ctx, 1, metric.WithAttributes(cacheOpReadAt)) readTimer.End(ctx, int64(count)) + span.SetAttributes(attribute.String("read-from", "local")) return count, err // return `err` in case it's io.EOF } - cacheMisses.Add(ctx, 1) + cacheMisses.Add(ctx, 1, metric.WithAttributes(cacheOpReadAt)) zap.L().Debug("failed to read cached chunk, falling back to remote read", zap.String("chunk_path", chunkPath), @@ -143,31 +206,25 @@ func (c *CachedFileObjectProvider) ReadAt(ctx context.Context, buff []byte, offs zap.Error(err)) // read remote file - readCount, err := c.inner.ReadAt(ctx, buff, offset) + readCount, err = c.inner.ReadAt(ctx, buff, offset) if err != nil { return 0, fmt.Errorf("failed to perform uncached read: %w", err) } - go func() { - c.writeChunkToCache(context.WithoutCancel(ctx), offset, chunkPath, buff[:readCount]) - }() + go func(count int) { + c.writeChunkToCache(context.WithoutCancel(ctx), offset, chunkPath, buff[:count]) + }(readCount) + span.SetAttributes(attribute.String("read-from", "remote")) return readCount, nil } -var ( - ErrOffsetUnaligned = errors.New("offset must be a multiple of chunk size") - ErrBufferTooSmall = errors.New("buffer is too small") - ErrMultipleChunks = errors.New("cannot read multiple chunks") - ErrBufferTooLarge = errors.New("buffer is too large") -) - func (c *CachedFileObjectProvider) Size(ctx context.Context) (int64, error) { if size, ok := c.readLocalSize(); ok { - cacheHits.Add(ctx, 1) + cacheHits.Add(ctx, 1, metric.WithAttributes(cacheOpSize)) return size, nil } - cacheMisses.Add(ctx, 1) + cacheMisses.Add(ctx, 1, metric.WithAttributes(cacheOpSize)) size, err := c.inner.Size(ctx) if err != nil { @@ -180,9 +237,65 @@ func (c *CachedFileObjectProvider) Size(ctx context.Context) (int64, error) { } func (c *CachedFileObjectProvider) Delete(ctx context.Context) error { + go func() { + if err := os.RemoveAll(c.path); ignoreFileMissingError(err) != nil { + zap.L().Error("error on cache delete", zap.String("path", c.path), zap.Error(err)) + } + }() + return c.inner.Delete(ctx) } +// writeCacheAndRemote simultaneously writes a full file to both local cache and the remote persistence store. It does +// not need to worry about race conditions, as the files will only exist on the local machine, and can't be generated +// in parallel on any other machines. +func (c *CachedFileObjectProvider) writeCacheAndRemote(ctx context.Context, src []byte) (size int, err error) { + ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.writeCacheAndRemote") + defer func() { + recordError(span, err) + span.End() + }() + + size, err = c.inner.Write(ctx, src) + if err != nil { + return 0, fmt.Errorf("failed to remote write from byte array: %w", err) + } + if size != len(src) { + zap.L().Warn("remote write didn't match data length", + zap.Int("expected_size", len(src)), + zap.Int("actual_size", size), + zap.String("root_path", c.path), + ) + } + + chunkSize := int(c.chunkSize) + for offset := 0; offset < size; offset += chunkSize { + // read from the source + offsetEnd := min(offset+chunkSize, size) + buf := src[offset:offsetEnd] + + go func(offset int, buf []byte) { + // write to the cache file + filename := c.makeChunkFilename(int64(offset)) + if err2 := os.WriteFile(filename, buf, cacheFilePermissions); err2 != nil { + safelyRemoveFile(filename) + zap.L().Warn("failed to write chunk file", + zap.String("filename", filename), + zap.Error(err2)) + } + }(offset, buf) + } + + return size, nil +} + +var ( + ErrOffsetUnaligned = errors.New("offset must be a multiple of chunk size") + ErrBufferTooSmall = errors.New("buffer is too small") + ErrMultipleChunks = errors.New("cannot read multiple chunks") + ErrBufferTooLarge = errors.New("buffer is too large") +) + func (c *CachedFileObjectProvider) readLocalSize() (int64, bool) { fname := c.sizeFilename() content, err := os.ReadFile(fname) @@ -225,6 +338,103 @@ func (c *CachedFileObjectProvider) sizeFilename() string { return filepath.Join(c.path, "size.txt") } +func (c *CachedFileObjectProvider) createCacheBlocksFromFile(ctx context.Context, inputPath string) (err error) { + ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.createCacheBlocksFromFile") + defer func() { + recordError(span, err) + span.End() + }() + + input, err := os.Open(inputPath) + if err != nil { + return fmt.Errorf("failed to open input file: %w", err) + } + defer cleanup("failed to close file", input.Close) + + stat, err := input.Stat() + if err != nil { + return fmt.Errorf("failed to stat input file: %w", err) + } + + totalSize := stat.Size() + var wg sync.WaitGroup + workers := make(chan struct{}, maxCacheWriterConcurrency) + for offset := int64(0); offset < totalSize; offset += c.chunkSize { + wg.Add(1) + go func(offset int64) { + defer wg.Done() + + // limit concurrency + workers <- struct{}{} + defer func() { <-workers }() + + if err := c.writeChunkFromFile(ctx, offset, input); err != nil { + zap.L().Error("failed to write chunk file", + zap.String("path", inputPath), + zap.Int64("offset", offset), + zap.Error(err)) + } + }(offset) + } + wg.Wait() + + return nil +} + +type offsetReader struct { + wrapped io.ReaderAt + offset int64 +} + +var _ io.Reader = (*offsetReader)(nil) + +func (r *offsetReader) Read(p []byte) (n int, err error) { + n, err = r.wrapped.ReadAt(p, r.offset) + r.offset += int64(n) + return +} + +func newOffsetReader(file *os.File, offset int64) *offsetReader { + return &offsetReader{file, offset} +} + +// writeChunkFromFile writes a piece of a local file. It does not need to worry about race conditions, as it will only +// be called when building templates, and templates cannot be built on multiple machines at the same time.x +func (c *CachedFileObjectProvider) writeChunkFromFile(ctx context.Context, offset int64, input *os.File) (err error) { + _, span := tracer.Start(ctx, "write chunk-from-file", trace.WithAttributes( + attribute.Int64("offset", offset), + )) + defer func() { + recordError(span, err) + span.End() + }() + + chunkPath := c.makeChunkFilename(offset) + span.SetAttributes(attribute.String("chunk_path", chunkPath)) + + output, err := os.OpenFile(chunkPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, cacheFilePermissions) + if err != nil { + return fmt.Errorf("failed to open file %s: %w", chunkPath, err) + } + defer cleanup("failed to close file", output.Close) + + offsetReader := newOffsetReader(input, offset) + if _, err := io.CopyN(output, offsetReader, c.chunkSize); ignoreEOF(err) != nil { + safelyRemoveFile(chunkPath) + return fmt.Errorf("failed to copy chunk: %w", err) + } + + return err // in case err == io.EOF +} + +func ignoreFileMissingError(err error) error { + if os.IsNotExist(err) { + return nil + } + + return err +} + func (c *CachedFileObjectProvider) writeLocalSize(size int64) { tempFilename := filepath.Join(c.path, fmt.Sprintf(".size.bin.%s", uuid.NewString())) @@ -432,3 +642,20 @@ func moveWithoutReplace(oldPath, newPath string) error { return nil } + +func safelyRemoveFile(path string) { + if err := os.Remove(path); ignoreFileMissingError(err) != nil { + zap.L().Warn("failed to remove file", + zap.String("path", path), + zap.Error(err)) + } +} + +func recordError(span trace.Span, err error) { + if err == nil { + return + } + + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) +} diff --git a/packages/shared/pkg/storage/storage_cache_test.go b/packages/shared/pkg/storage/storage_cache_test.go index 9c0487dbdb..b660c11b7b 100644 --- a/packages/shared/pkg/storage/storage_cache_test.go +++ b/packages/shared/pkg/storage/storage_cache_test.go @@ -3,17 +3,17 @@ package storage import ( "bytes" "context" + "crypto/rand" "io" "os" "path/filepath" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - - storagemocks "github.com/e2b-dev/infra/packages/shared/pkg/storage/mocks" ) func TestCachedFileObjectProvider_MakeChunkFilename(t *testing.T) { @@ -22,11 +22,48 @@ func TestCachedFileObjectProvider_MakeChunkFilename(t *testing.T) { assert.Equal(t, "/a/b/c/000000000004-1024.bin", filename) } +func TestCachedProvider_DeleteObjectsWithPrefix(t *testing.T) { + inner := NewMockStorageProvider(t) + inner.EXPECT().DeleteObjectsWithPrefix(mock.Anything, mock.Anything).Return(nil) + + rootDir := t.TempDir() + buildID := uuid.NewString() + buildDir := filepath.Join(rootDir, buildID) + + filesToWrite := map[string]struct{}{ + "file-1.bin": {}, + "file-2.bin/chunk1.bin": {}, + "file-2.bin/chunk2.bin": {}, + } + + var err error + for fname := range filesToWrite { + full := filepath.Join(buildDir, fname) + dirname := filepath.Dir(full) + err = os.MkdirAll(dirname, 0o700) + require.NoError(t, err) + err := os.WriteFile(full, []byte{}, 0o600) + require.NoError(t, err) + } + + p := CachedProvider{inner: inner, chunkSize: MemoryChunkSize, rootPath: rootDir} + err = p.DeleteObjectsWithPrefix(t.Context(), buildID) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 20) + + for fname := range filesToWrite { + full := filepath.Join(buildDir, fname) + _, err := os.Stat(full) + require.ErrorIs(t, err, os.ErrNotExist) + } +} + func TestCachedFileObjectProvider_Size(t *testing.T) { t.Run("can be cached successfully", func(t *testing.T) { const expectedSize int64 = 1024 - inner := storagemocks.NewMockStorageObjectProvider(t) + inner := NewMockStorageObjectProvider(t) inner.EXPECT().Size(mock.Anything).Return(expectedSize, nil) c := CachedFileObjectProvider{path: t.TempDir(), inner: inner} @@ -48,7 +85,7 @@ func TestCachedFileObjectProvider_Size(t *testing.T) { }) } -func TestCachedFileObjectProvider_WriteTo(t *testing.T) { +func TestCachedFileObjectProvider_ReadAt(t *testing.T) { t.Run("read from cache when the file exists", func(t *testing.T) { tempDir := t.TempDir() @@ -72,7 +109,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { t.Run("consecutive ReadAt calls should cache", func(t *testing.T) { fakeData := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - fakeStorageObjectProvider := storagemocks.NewMockStorageObjectProvider(t) + fakeStorageObjectProvider := NewMockStorageObjectProvider(t) fakeStorageObjectProvider.EXPECT(). ReadAt(mock.Anything, mock.Anything, mock.Anything). @@ -113,7 +150,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { t.Run("WriteTo calls should read from cache", func(t *testing.T) { fakeData := []byte{1, 2, 3} - fakeStorageObjectProvider := storagemocks.NewMockStorageObjectProvider(t) + fakeStorageObjectProvider := NewMockStorageObjectProvider(t) fakeStorageObjectProvider.EXPECT(). WriteTo(mock.Anything, mock.Anything). RunAndReturn(func(_ context.Context, dst io.Writer) (int64, error) { @@ -144,6 +181,132 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(len(fakeData)), count) }) + + t.Run("WriteFromFileSystem should write to cache", func(t *testing.T) { + const megabyte = 1024 * 1024 + const fileSize = 11 * megabyte + const chunkSize = 2 * megabyte + + fakeData := generateBytes(t, fileSize) + + fakeStorageObjectProvider := NewMockStorageObjectProvider(t) + fakeStorageObjectProvider. + EXPECT(). + WriteFromFileSystem(mock.Anything, mock.Anything). + Return(nil) + + tempDir := t.TempDir() + c := CachedFileObjectProvider{ + path: tempDir, + chunkSize: chunkSize, + inner: fakeStorageObjectProvider, + } + + // create temp file + inputFile := filepath.Join(tempDir, "tempfile.bin") + err := os.WriteFile(inputFile, fakeData, 0o644) + require.NoError(t, err) + + // write file to object store + err = c.WriteFromFileSystem(t.Context(), inputFile) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 20) + + // ensure remote is not called + c.inner = nil + + // read bytes 4-6 MB + buffer := make([]byte, chunkSize) + read, err := c.ReadAt(t.Context(), buffer, 4*megabyte) // read 4-6 MB + require.NoError(t, err) + assert.Equal(t, fakeData[4*megabyte:6*megabyte], buffer) + assert.Equal(t, len(buffer), read) + + // read bytes 10-11 MB + buffer = make([]byte, chunkSize) + read, err = c.ReadAt(t.Context(), buffer, 10*megabyte) // read 10-11 MB + require.ErrorIs(t, err, io.EOF) + assert.Equal(t, megabyte, read) // short read + assert.Equal(t, fakeData[10*megabyte:], buffer[:read]) + + // verify all chunk files are len(file) == chunkSize + for offset := int64(0); offset < fileSize; offset += chunkSize { + fname := c.makeChunkFilename(offset) + info, err := os.Stat(fname) + require.NoError(t, err) + assert.Equal(t, min(chunkSize, fileSize-offset), info.Size()) + } + }) + + t.Run("ReadFrom should read from cache", func(t *testing.T) { + fakeData := []byte{1, 2, 3} + + fakeStorageObjectProvider := NewMockStorageObjectProvider(t) + fakeStorageObjectProvider.EXPECT(). + Write(mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, src []byte) (int, error) { + return len(src), nil + }) + + tempDir := t.TempDir() + c := CachedFileObjectProvider{ + path: tempDir, + chunkSize: 3, + inner: fakeStorageObjectProvider, + } + + read, err := c.Write(t.Context(), fakeData) + require.NoError(t, err) + assert.Equal(t, len(fakeData), read) + + time.Sleep(time.Millisecond * 20) + + buf := make([]byte, 3) + read2, err := c.ReadAt(t.Context(), buf, 0) + require.NoError(t, err) + assert.Equal(t, fakeData, buf) + assert.Equal(t, 3, read2) + }) + + t.Run("ReadFrom should handle multiple chunks at once", func(t *testing.T) { + fakeData := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + + fakeStorageObjectProvider := NewMockStorageObjectProvider(t) + fakeStorageObjectProvider.EXPECT(). + Write(mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, src []byte) (int, error) { + return len(src), nil + }) + + tempDir := t.TempDir() + c := CachedFileObjectProvider{ + path: tempDir, + chunkSize: 3, + inner: fakeStorageObjectProvider, + } + + // write the data to the cache + read64, err := c.Write(t.Context(), fakeData) + require.NoError(t, err) + assert.Equal(t, len(fakeData), read64) + + time.Sleep(time.Millisecond * 20) + + // get first chunk + buf := make([]byte, 3) + read, err := c.ReadAt(t.Context(), buf, 0) + require.NoError(t, err) + assert.Equal(t, fakeData[0:3], buf) + assert.Equal(t, 3, read) + + // get last chunk + buf = make([]byte, 1) + read, err = c.ReadAt(t.Context(), buf, 9) + require.NoError(t, err) + assert.Equal(t, fakeData[9:], buf) + assert.Equal(t, 1, read) + }) } func TestCachedFileObjectProvider_validateReadAtParams(t *testing.T) { @@ -256,3 +419,13 @@ func TestMoveWithoutReplace_Fail(t *testing.T) { _, err = os.Stat(src) assert.ErrorIs(t, err, os.ErrNotExist) } + +func generateBytes(t *testing.T, n int) []byte { + t.Helper() + + buf := make([]byte, n) + count, err := rand.Read(buf) + require.NoError(t, err) + require.Equal(t, n, count) + return buf +}