diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index bb81589b0d0..d6c9e567da8 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -34,6 +34,7 @@ + diff --git a/src/KurrentDB.Core.XUnit.Tests/KurrentDB.Core.XUnit.Tests.csproj b/src/KurrentDB.Core.XUnit.Tests/KurrentDB.Core.XUnit.Tests.csproj index e56783c255c..b03a8afcf33 100644 --- a/src/KurrentDB.Core.XUnit.Tests/KurrentDB.Core.XUnit.Tests.csproj +++ b/src/KurrentDB.Core.XUnit.Tests/KurrentDB.Core.XUnit.Tests.csproj @@ -3,7 +3,7 @@ true true - + diff --git a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/ArchiveStorageTestsBase.cs b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/ArchiveStorageTestsBase.cs index bfd3e9a3f01..d51a751c079 100644 --- a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/ArchiveStorageTestsBase.cs +++ b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/ArchiveStorageTestsBase.cs @@ -20,6 +20,7 @@ namespace KurrentDB.Core.XUnit.Tests.Services.Archive.Storage; public abstract class ArchiveStorageTestsBase : DirectoryPerTest { private const string AwsRegion = "eu-west-1"; private const string AwsBucket = "archiver-unit-tests"; + private const string GcpBucket = "archiver-unit-tests"; private const string ChunkPrefix = "chunk-"; private string ArchivePath => Path.Combine(Fixture.Directory, "archive"); @@ -44,6 +45,9 @@ protected IArchiveStorage CreateSut(StorageType storageType) { Region = AwsRegion, }, Azure = AzuriteHelpers.Options, + GCP = new () { + Bucket = GcpBucket, + }, }, archiveNamingStrategy); diff --git a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/BlobStorageTests.cs b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/BlobStorageTests.cs index 420a7788b17..d287504667c 100644 --- a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/BlobStorageTests.cs +++ b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/BlobStorageTests.cs @@ -10,6 +10,7 @@ using KurrentDB.Core.Services.Archive; using KurrentDB.Core.Services.Archive.Storage; using KurrentDB.Core.Services.Archive.Storage.Azure; +using KurrentDB.Core.Services.Archive.Storage.Gcp; using KurrentDB.Core.Services.Archive.Storage.S3; using Xunit; @@ -20,6 +21,7 @@ namespace KurrentDB.Core.XUnit.Tests.Services.Archive.Storage; public class BlobStorageTests : DirectoryPerTest { private const string AwsRegion = "eu-west-1"; private const string AwsBucket = "archiver-unit-tests"; + private const string GcpBucket = "archiver-unit-tests"; private string ArchivePath => Path.Combine(Fixture.Directory, "archive"); private string LocalPath => Path.Combine(Fixture.Directory, "local"); @@ -47,6 +49,11 @@ IBlobStorage CreateSut(StorageType storageType) { storage = new AzureBlobStorage(AzuriteHelpers.Options); AzuriteHelpers.ConfigureEnvironment(); break; + case StorageType.GCP: + storage = new GcpBlobStorage(new() { + Bucket = GcpBucket, + }); + break; default: throw new NotImplementedException(); } @@ -68,6 +75,7 @@ private async ValueTask CreateFile(string fileName, int fileSize) { [Theory] [StorageData.S3] [StorageData.Azure] + [StorageData.GCP] [StorageData.FileSystem] public async Task can_read_file_entirely(StorageType storageType) { var sut = CreateSut(storageType); @@ -85,17 +93,19 @@ public async Task can_read_file_entirely(StorageType storageType) { // read the uploaded file using var buffer = Memory.AllocateExactly(fileSize); - await sut.ReadAsync("output.file", buffer.Memory, offset: 0, CancellationToken.None); + var numRead = await sut.ReadAsync("output.file", buffer.Memory, offset: 0, CancellationToken.None); // then Assert.Equal(localContent, buffer.Span); + Assert.Equal(localContent.Length, numRead); } [Theory] [StorageData.S3] [StorageData.Azure] + [StorageData.GCP] [StorageData.FileSystem] - public async Task can_store_and_read_file_partially(StorageType storageType) { + public async Task can_read_file_partially(StorageType storageType) { var sut = CreateSut(storageType); // create a file and upload it @@ -113,15 +123,72 @@ public async Task can_store_and_read_file_partially(StorageType storageType) { var end = localContent.Length; var length = end - start; using var buffer = Memory.AllocateExactly(length); - await sut.ReadAsync("output.file", buffer.Memory, offset: start, CancellationToken.None); + var numRead = await sut.ReadAsync("output.file", buffer.Memory, offset: start, CancellationToken.None); // then Assert.Equal(localContent.AsSpan(start..end), buffer.Span); + Assert.Equal(localContent.Length / 2, numRead); + } + + [Theory] + [StorageData.S3] + [StorageData.Azure] + [StorageData.GCP] + [StorageData.FileSystem] + public async Task can_read_file_partially_and_past_end_of_file(StorageType storageType) { + var sut = CreateSut(storageType); + + // create a file and upload it + string localPath; + await using (var fs = await CreateFile("local.file", fileSize: 1024)) { + await sut.StoreAsync(fs, "output.file", CancellationToken.None); + localPath = fs.Name; + } + + // read the local file + var localContent = await File.ReadAllBytesAsync(localPath); + + // read the uploaded file partially with a buffer that goes past the end of the file + var start = localContent.Length / 2; + using var buffer = Memory.AllocateExactly(localContent.Length); + var numRead = await sut.ReadAsync("output.file", buffer.Memory, offset: start, CancellationToken.None); + + // then + Assert.Equal(localContent.AsSpan(start..), buffer.Span[..numRead]); + Assert.Equal(localContent.Length / 2, numRead); + } + + [Theory] + [StorageData.S3] + [StorageData.Azure] + [StorageData.GCP] + [StorageData.FileSystem] + public async Task can_read_past_end_of_file(StorageType storageType) { + var sut = CreateSut(storageType); + + // create a file and upload it + string localPath; + await using (var fs = await CreateFile("local.file", fileSize: 1024)) { + await sut.StoreAsync(fs, "output.file", CancellationToken.None); + localPath = fs.Name; + } + + // read the local file + var localContent = await File.ReadAllBytesAsync(localPath); + + // read past the end of the uploaded file + var start = localContent.Length; + using var buffer = Memory.AllocateExactly(localContent.Length); + var numRead = await sut.ReadAsync("output.file", buffer.Memory, offset: start, CancellationToken.None); + + // then + Assert.Equal(0, numRead); } [Theory] [StorageData.S3] [StorageData.Azure] + [StorageData.GCP] [StorageData.FileSystem] public async Task can_retrieve_metadata(StorageType storageType) { var sut = CreateSut(storageType); @@ -142,6 +209,7 @@ public async Task can_retrieve_metadata(StorageType storageType) { [Theory] [StorageData.S3] [StorageData.Azure] + [StorageData.GCP] [StorageData.FileSystem] public async Task read_missing_file_throws_FileNotFoundException(StorageType storageType) { var sut = CreateSut(storageType); diff --git a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/GcpCliDirectoryNotFoundException.cs b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/GcpCliDirectoryNotFoundException.cs new file mode 100644 index 00000000000..4549d1cd2ee --- /dev/null +++ b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/GcpCliDirectoryNotFoundException.cs @@ -0,0 +1,9 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.IO; + +namespace KurrentDB.Core.XUnit.Tests.Services.Archive.Storage; + +public sealed class GcpCliDirectoryNotFoundException(string path) + : DirectoryNotFoundException($"Directory '{path}' with config files for Google Cloud CLI doesn't exist"); diff --git a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/RemoteFileSystemTests.cs b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/RemoteFileSystemTests.cs index 364d6efd012..d0f32a17aa2 100644 --- a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/RemoteFileSystemTests.cs +++ b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/RemoteFileSystemTests.cs @@ -25,6 +25,7 @@ public sealed class RemoteFileSystemTests : ArchiveStorageTestsBase CheckPrerequisites(ref isSet)) { + const string Symbol = "RUN_GCP_TESTS"; + + [Conditional(Symbol)] + private static void CheckPrerequisites(ref bool symbolSet) { + symbolSet = true; + const string gcpDirectoryNameLinux = ".config/gcloud"; + const string gcpDirectoryNameWindows = "gcloud"; + var homeDir = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile); + + if (OperatingSystem.IsLinux()) + homeDir = Path.Combine(homeDir, gcpDirectoryNameLinux); + else if (OperatingSystem.IsWindows()) + homeDir = Path.Combine(homeDir, gcpDirectoryNameWindows); + else + throw new NotSupportedException(); + + if (!Directory.Exists(homeDir)) + throw new GcpCliDirectoryNotFoundException(homeDir); + } + } + public sealed class FileSystemAttribute(params object[] args) : RemoteStorageDataAttribute( StorageType.FileSystemDevelopmentOnly, args, diff --git a/src/KurrentDB.Core/KurrentDB.Core.csproj b/src/KurrentDB.Core/KurrentDB.Core.csproj index 12612139185..9d02c0bdc51 100644 --- a/src/KurrentDB.Core/KurrentDB.Core.csproj +++ b/src/KurrentDB.Core/KurrentDB.Core.csproj @@ -12,6 +12,7 @@ + diff --git a/src/KurrentDB.Core/Services/Archive/ArchiveOptions.cs b/src/KurrentDB.Core/Services/Archive/ArchiveOptions.cs index 8fe464cf7dd..cab6120a9f0 100644 --- a/src/KurrentDB.Core/Services/Archive/ArchiveOptions.cs +++ b/src/KurrentDB.Core/Services/Archive/ArchiveOptions.cs @@ -12,6 +12,7 @@ public class ArchiveOptions { public FileSystemOptions FileSystem { get; init; } = new(); public S3Options S3 { get; init; } = new(); public AzureOptions Azure { get; init; } = new(); + public GcpOptions GCP { get; init; } = new(); public RetentionOptions RetainAtLeast { get; init; } = new(); public void Validate() { @@ -28,7 +29,7 @@ private void ValidateImpl() { switch (StorageType) { case StorageType.Unspecified: - throw new InvalidConfigurationException("Please specify a StorageType (e.g. S3)"); + throw new InvalidConfigurationException("Please specify a StorageType (e.g. S3, Azure, GCP)"); case StorageType.FileSystemDevelopmentOnly: FileSystem.Validate(); break; @@ -37,6 +38,9 @@ private void ValidateImpl() { break; case StorageType.Azure: Azure.Validate(); + break; + case StorageType.GCP: + GCP.Validate(); break; default: throw new InvalidConfigurationException("Unknown StorageType"); @@ -53,13 +57,14 @@ public enum StorageType { FileSystemDevelopmentOnly, S3, Azure, + GCP, } public class FileSystemOptions { public string Path { get; init; } = ""; public void Validate() { - if (string.IsNullOrEmpty(Path)) + if (string.IsNullOrWhiteSpace(Path)) throw new InvalidConfigurationException("Please provide a Path for the FileSystem archive"); } } @@ -69,10 +74,10 @@ public class S3Options { public string Region { get; init; } = ""; public void Validate() { - if (string.IsNullOrEmpty(Bucket)) + if (string.IsNullOrWhiteSpace(Bucket)) throw new InvalidConfigurationException("Please provide a Bucket for the S3 archive"); - if (string.IsNullOrEmpty(Region)) + if (string.IsNullOrWhiteSpace(Region)) throw new InvalidConfigurationException("Please provide a Region for the S3 archive"); } } @@ -161,6 +166,15 @@ public enum AuthenticationType { } } +public class GcpOptions { + public string Bucket { get; init; } = ""; + + public void Validate() { + if (string.IsNullOrWhiteSpace(Bucket)) + throw new InvalidConfigurationException("Please provide a Bucket for the GCP archive"); + } +} + // Local chunks are removed after they have passed beyond both criteria, so they // must both be set to be useful. public class RetentionOptions { diff --git a/src/KurrentDB.Core/Services/Archive/Storage/ArchiveStorageFactory.cs b/src/KurrentDB.Core/Services/Archive/Storage/ArchiveStorageFactory.cs index 7353d0e9d19..ef69a49587c 100644 --- a/src/KurrentDB.Core/Services/Archive/Storage/ArchiveStorageFactory.cs +++ b/src/KurrentDB.Core/Services/Archive/Storage/ArchiveStorageFactory.cs @@ -4,6 +4,7 @@ using System; using KurrentDB.Core.Services.Archive.Naming; using KurrentDB.Core.Services.Archive.Storage.Azure; +using KurrentDB.Core.Services.Archive.Storage.Gcp; using KurrentDB.Core.Services.Archive.Storage.S3; namespace KurrentDB.Core.Services.Archive.Storage; @@ -17,6 +18,7 @@ public static IArchiveStorage Create(ArchiveOptions options, IArchiveNamingStrat StorageType.FileSystemDevelopmentOnly => new FileSystemBlobStorage(options.FileSystem), StorageType.S3 => new S3BlobStorage(options.S3), StorageType.Azure => new AzureBlobStorage(options.Azure), + StorageType.GCP => new GcpBlobStorage(options.GCP), _ => throw new ArgumentOutOfRangeException(nameof(options.StorageType)) }; diff --git a/src/KurrentDB.Core/Services/Archive/Storage/Gcp/GcpBlobStorage.cs b/src/KurrentDB.Core/Services/Archive/Storage/Gcp/GcpBlobStorage.cs new file mode 100644 index 00000000000..3ec8e7145ec --- /dev/null +++ b/src/KurrentDB.Core/Services/Archive/Storage/Gcp/GcpBlobStorage.cs @@ -0,0 +1,119 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System; +using System.IO; +using System.Net; +using System.Net.Http.Headers; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using DotNext; +using DotNext.Buffers; +using DotNext.IO; +using Google; +using Google.Cloud.Storage.V1; +using KurrentDB.Common.Exceptions; +using Serilog; + +namespace KurrentDB.Core.Services.Archive.Storage.Gcp; + +public class GcpBlobStorage : IBlobStorage { + private readonly GcpOptions _options; + private readonly StorageClient _storageClient; + + private static readonly ILogger Logger = Log.ForContext(); + + public GcpBlobStorage(GcpOptions options) { + _options = options; + + if (string.IsNullOrEmpty(options.Bucket)) + throw new InvalidConfigurationException("Please specify an Archive GCP Bucket"); + + _storageClient = StorageClient.Create(); + } + + public async ValueTask ReadAsync(string name, Memory buffer, long offset, CancellationToken ct) { + ArgumentOutOfRangeException.ThrowIfNegative(offset); + + if (buffer.IsEmpty) + return 0; + + var destination = StreamSource.AsSynchronousStream(new MemoryWriter(buffer)); + try { + await _storageClient.DownloadObjectAsync( + bucket: _options.Bucket, + objectName: name, + destination: destination, + options: new DownloadObjectOptions { + Range = GetRange(offset, buffer.Length) + }, cancellationToken: ct); + + return (int)destination.Length; // the cast is safe, because Stream.Length cannot be greater than Memory.Length + } catch (GoogleApiException ex) when ( + ex.HttpStatusCode is HttpStatusCode.NotFound && + ex.Error.ErrorResponseContent.StartsWith("No such object:")) { + throw new FileNotFoundException(); + } catch (GoogleApiException ex) when (ex.HttpStatusCode is HttpStatusCode.RequestedRangeNotSatisfiable) { + return 0; + } catch (GoogleApiException ex) { + Logger.Error(ex, "Failed to read object '{name}' at offset: {offset}, length: {length}", name, offset, buffer.Length); + throw; + } finally { + await destination.DisposeAsync(); + } + } + + public async ValueTask StoreAsync(Stream readableStream, string name, CancellationToken ct) { + try { + await _storageClient.UploadObjectAsync(_options.Bucket, name, string.Empty, readableStream, cancellationToken: ct); + } catch (GoogleApiException ex) { + Logger.Error(ex, "Failed to store object '{name}'", name); + throw; + } + } + + public async ValueTask GetMetadataAsync(string name, CancellationToken token) { + try { + var obj = await _storageClient.GetObjectAsync(_options.Bucket, name, cancellationToken: token); + return new BlobMetadata(Size: long.CreateSaturating(obj.Size!.Value)); + } catch (GoogleApiException ex) { + Logger.Error(ex, "Failed to fetch metadata for object '{name}'", name); + throw; + } + } + + private static RangeHeaderValue GetRange(long offset, int length) => new( + from: offset, + to: offset + length - 1L); + + [StructLayout(LayoutKind.Auto)] + private struct MemoryWriter(Memory output) : IReadOnlySpanConsumer, IFlushable { + void IReadOnlySpanConsumer.Invoke(ReadOnlySpan input) => Copy(input); + + // We need to replace default interface implementation because .NET Runtime + // causes boxing when calling default impl on structs + ValueTask ISupplier, CancellationToken, ValueTask>. + Invoke(ReadOnlyMemory input, CancellationToken token) { + var task = ValueTask.CompletedTask; + try { + Copy(input.Span); + } catch (Exception e) { + task = ValueTask.FromException(e); + } + + return task; + } + + private void Copy(ReadOnlySpan input) { + input.CopyTo(output.Span); + output = output.Slice(input.Length); + } + + readonly void IFlushable.Flush() { + // nothing to do + } + + readonly Task IFlushable.FlushAsync(CancellationToken token) => Task.CompletedTask; + } +}