diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index bb81589b0d0..d6c9e567da8 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -34,6 +34,7 @@
+
diff --git a/src/KurrentDB.Core.XUnit.Tests/KurrentDB.Core.XUnit.Tests.csproj b/src/KurrentDB.Core.XUnit.Tests/KurrentDB.Core.XUnit.Tests.csproj
index e56783c255c..b03a8afcf33 100644
--- a/src/KurrentDB.Core.XUnit.Tests/KurrentDB.Core.XUnit.Tests.csproj
+++ b/src/KurrentDB.Core.XUnit.Tests/KurrentDB.Core.XUnit.Tests.csproj
@@ -3,7 +3,7 @@
true
true
-
+
diff --git a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/ArchiveStorageTestsBase.cs b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/ArchiveStorageTestsBase.cs
index bfd3e9a3f01..d51a751c079 100644
--- a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/ArchiveStorageTestsBase.cs
+++ b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/ArchiveStorageTestsBase.cs
@@ -20,6 +20,7 @@ namespace KurrentDB.Core.XUnit.Tests.Services.Archive.Storage;
public abstract class ArchiveStorageTestsBase : DirectoryPerTest {
private const string AwsRegion = "eu-west-1";
private const string AwsBucket = "archiver-unit-tests";
+ private const string GcpBucket = "archiver-unit-tests";
private const string ChunkPrefix = "chunk-";
private string ArchivePath => Path.Combine(Fixture.Directory, "archive");
@@ -44,6 +45,9 @@ protected IArchiveStorage CreateSut(StorageType storageType) {
Region = AwsRegion,
},
Azure = AzuriteHelpers.Options,
+ GCP = new () {
+ Bucket = GcpBucket,
+ },
},
archiveNamingStrategy);
diff --git a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/BlobStorageTests.cs b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/BlobStorageTests.cs
index 420a7788b17..d287504667c 100644
--- a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/BlobStorageTests.cs
+++ b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/BlobStorageTests.cs
@@ -10,6 +10,7 @@
using KurrentDB.Core.Services.Archive;
using KurrentDB.Core.Services.Archive.Storage;
using KurrentDB.Core.Services.Archive.Storage.Azure;
+using KurrentDB.Core.Services.Archive.Storage.Gcp;
using KurrentDB.Core.Services.Archive.Storage.S3;
using Xunit;
@@ -20,6 +21,7 @@ namespace KurrentDB.Core.XUnit.Tests.Services.Archive.Storage;
public class BlobStorageTests : DirectoryPerTest {
private const string AwsRegion = "eu-west-1";
private const string AwsBucket = "archiver-unit-tests";
+ private const string GcpBucket = "archiver-unit-tests";
private string ArchivePath => Path.Combine(Fixture.Directory, "archive");
private string LocalPath => Path.Combine(Fixture.Directory, "local");
@@ -47,6 +49,11 @@ IBlobStorage CreateSut(StorageType storageType) {
storage = new AzureBlobStorage(AzuriteHelpers.Options);
AzuriteHelpers.ConfigureEnvironment();
break;
+ case StorageType.GCP:
+ storage = new GcpBlobStorage(new() {
+ Bucket = GcpBucket,
+ });
+ break;
default:
throw new NotImplementedException();
}
@@ -68,6 +75,7 @@ private async ValueTask CreateFile(string fileName, int fileSize) {
[Theory]
[StorageData.S3]
[StorageData.Azure]
+ [StorageData.GCP]
[StorageData.FileSystem]
public async Task can_read_file_entirely(StorageType storageType) {
var sut = CreateSut(storageType);
@@ -85,17 +93,19 @@ public async Task can_read_file_entirely(StorageType storageType) {
// read the uploaded file
using var buffer = Memory.AllocateExactly(fileSize);
- await sut.ReadAsync("output.file", buffer.Memory, offset: 0, CancellationToken.None);
+ var numRead = await sut.ReadAsync("output.file", buffer.Memory, offset: 0, CancellationToken.None);
// then
Assert.Equal(localContent, buffer.Span);
+ Assert.Equal(localContent.Length, numRead);
}
[Theory]
[StorageData.S3]
[StorageData.Azure]
+ [StorageData.GCP]
[StorageData.FileSystem]
- public async Task can_store_and_read_file_partially(StorageType storageType) {
+ public async Task can_read_file_partially(StorageType storageType) {
var sut = CreateSut(storageType);
// create a file and upload it
@@ -113,15 +123,72 @@ public async Task can_store_and_read_file_partially(StorageType storageType) {
var end = localContent.Length;
var length = end - start;
using var buffer = Memory.AllocateExactly(length);
- await sut.ReadAsync("output.file", buffer.Memory, offset: start, CancellationToken.None);
+ var numRead = await sut.ReadAsync("output.file", buffer.Memory, offset: start, CancellationToken.None);
// then
Assert.Equal(localContent.AsSpan(start..end), buffer.Span);
+ Assert.Equal(localContent.Length / 2, numRead);
+ }
+
+ [Theory]
+ [StorageData.S3]
+ [StorageData.Azure]
+ [StorageData.GCP]
+ [StorageData.FileSystem]
+ public async Task can_read_file_partially_and_past_end_of_file(StorageType storageType) {
+ var sut = CreateSut(storageType);
+
+ // create a file and upload it
+ string localPath;
+ await using (var fs = await CreateFile("local.file", fileSize: 1024)) {
+ await sut.StoreAsync(fs, "output.file", CancellationToken.None);
+ localPath = fs.Name;
+ }
+
+ // read the local file
+ var localContent = await File.ReadAllBytesAsync(localPath);
+
+ // read the uploaded file partially with a buffer that goes past the end of the file
+ var start = localContent.Length / 2;
+ using var buffer = Memory.AllocateExactly(localContent.Length);
+ var numRead = await sut.ReadAsync("output.file", buffer.Memory, offset: start, CancellationToken.None);
+
+ // then
+ Assert.Equal(localContent.AsSpan(start..), buffer.Span[..numRead]);
+ Assert.Equal(localContent.Length / 2, numRead);
+ }
+
+ [Theory]
+ [StorageData.S3]
+ [StorageData.Azure]
+ [StorageData.GCP]
+ [StorageData.FileSystem]
+ public async Task can_read_past_end_of_file(StorageType storageType) {
+ var sut = CreateSut(storageType);
+
+ // create a file and upload it
+ string localPath;
+ await using (var fs = await CreateFile("local.file", fileSize: 1024)) {
+ await sut.StoreAsync(fs, "output.file", CancellationToken.None);
+ localPath = fs.Name;
+ }
+
+ // read the local file
+ var localContent = await File.ReadAllBytesAsync(localPath);
+
+ // read past the end of the uploaded file
+ var start = localContent.Length;
+ using var buffer = Memory.AllocateExactly(localContent.Length);
+ var numRead = await sut.ReadAsync("output.file", buffer.Memory, offset: start, CancellationToken.None);
+
+ // then
+ Assert.Equal(0, numRead);
}
[Theory]
[StorageData.S3]
[StorageData.Azure]
+ [StorageData.GCP]
[StorageData.FileSystem]
public async Task can_retrieve_metadata(StorageType storageType) {
var sut = CreateSut(storageType);
@@ -142,6 +209,7 @@ public async Task can_retrieve_metadata(StorageType storageType) {
[Theory]
[StorageData.S3]
[StorageData.Azure]
+ [StorageData.GCP]
[StorageData.FileSystem]
public async Task read_missing_file_throws_FileNotFoundException(StorageType storageType) {
var sut = CreateSut(storageType);
diff --git a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/GcpCliDirectoryNotFoundException.cs b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/GcpCliDirectoryNotFoundException.cs
new file mode 100644
index 00000000000..4549d1cd2ee
--- /dev/null
+++ b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/GcpCliDirectoryNotFoundException.cs
@@ -0,0 +1,9 @@
+// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
+// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+
+using System.IO;
+
+namespace KurrentDB.Core.XUnit.Tests.Services.Archive.Storage;
+
+public sealed class GcpCliDirectoryNotFoundException(string path)
+ : DirectoryNotFoundException($"Directory '{path}' with config files for Google Cloud CLI doesn't exist");
diff --git a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/RemoteFileSystemTests.cs b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/RemoteFileSystemTests.cs
index 364d6efd012..d0f32a17aa2 100644
--- a/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/RemoteFileSystemTests.cs
+++ b/src/KurrentDB.Core.XUnit.Tests/Services/Archive/Storage/RemoteFileSystemTests.cs
@@ -25,6 +25,7 @@ public sealed class RemoteFileSystemTests : ArchiveStorageTestsBase CheckPrerequisites(ref isSet)) {
+ const string Symbol = "RUN_GCP_TESTS";
+
+ [Conditional(Symbol)]
+ private static void CheckPrerequisites(ref bool symbolSet) {
+ symbolSet = true;
+ const string gcpDirectoryNameLinux = ".config/gcloud";
+ const string gcpDirectoryNameWindows = "gcloud";
+ var homeDir = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
+
+ if (OperatingSystem.IsLinux())
+ homeDir = Path.Combine(homeDir, gcpDirectoryNameLinux);
+ else if (OperatingSystem.IsWindows())
+ homeDir = Path.Combine(homeDir, gcpDirectoryNameWindows);
+ else
+ throw new NotSupportedException();
+
+ if (!Directory.Exists(homeDir))
+ throw new GcpCliDirectoryNotFoundException(homeDir);
+ }
+ }
+
public sealed class FileSystemAttribute(params object[] args) : RemoteStorageDataAttribute(
StorageType.FileSystemDevelopmentOnly,
args,
diff --git a/src/KurrentDB.Core/KurrentDB.Core.csproj b/src/KurrentDB.Core/KurrentDB.Core.csproj
index 12612139185..9d02c0bdc51 100644
--- a/src/KurrentDB.Core/KurrentDB.Core.csproj
+++ b/src/KurrentDB.Core/KurrentDB.Core.csproj
@@ -12,6 +12,7 @@
+
diff --git a/src/KurrentDB.Core/Services/Archive/ArchiveOptions.cs b/src/KurrentDB.Core/Services/Archive/ArchiveOptions.cs
index 8fe464cf7dd..cab6120a9f0 100644
--- a/src/KurrentDB.Core/Services/Archive/ArchiveOptions.cs
+++ b/src/KurrentDB.Core/Services/Archive/ArchiveOptions.cs
@@ -12,6 +12,7 @@ public class ArchiveOptions {
public FileSystemOptions FileSystem { get; init; } = new();
public S3Options S3 { get; init; } = new();
public AzureOptions Azure { get; init; } = new();
+ public GcpOptions GCP { get; init; } = new();
public RetentionOptions RetainAtLeast { get; init; } = new();
public void Validate() {
@@ -28,7 +29,7 @@ private void ValidateImpl() {
switch (StorageType) {
case StorageType.Unspecified:
- throw new InvalidConfigurationException("Please specify a StorageType (e.g. S3)");
+ throw new InvalidConfigurationException("Please specify a StorageType (e.g. S3, Azure, GCP)");
case StorageType.FileSystemDevelopmentOnly:
FileSystem.Validate();
break;
@@ -37,6 +38,9 @@ private void ValidateImpl() {
break;
case StorageType.Azure:
Azure.Validate();
+ break;
+ case StorageType.GCP:
+ GCP.Validate();
break;
default:
throw new InvalidConfigurationException("Unknown StorageType");
@@ -53,13 +57,14 @@ public enum StorageType {
FileSystemDevelopmentOnly,
S3,
Azure,
+ GCP,
}
public class FileSystemOptions {
public string Path { get; init; } = "";
public void Validate() {
- if (string.IsNullOrEmpty(Path))
+ if (string.IsNullOrWhiteSpace(Path))
throw new InvalidConfigurationException("Please provide a Path for the FileSystem archive");
}
}
@@ -69,10 +74,10 @@ public class S3Options {
public string Region { get; init; } = "";
public void Validate() {
- if (string.IsNullOrEmpty(Bucket))
+ if (string.IsNullOrWhiteSpace(Bucket))
throw new InvalidConfigurationException("Please provide a Bucket for the S3 archive");
- if (string.IsNullOrEmpty(Region))
+ if (string.IsNullOrWhiteSpace(Region))
throw new InvalidConfigurationException("Please provide a Region for the S3 archive");
}
}
@@ -161,6 +166,15 @@ public enum AuthenticationType {
}
}
+public class GcpOptions {
+ public string Bucket { get; init; } = "";
+
+ public void Validate() {
+ if (string.IsNullOrWhiteSpace(Bucket))
+ throw new InvalidConfigurationException("Please provide a Bucket for the GCP archive");
+ }
+}
+
// Local chunks are removed after they have passed beyond both criteria, so they
// must both be set to be useful.
public class RetentionOptions {
diff --git a/src/KurrentDB.Core/Services/Archive/Storage/ArchiveStorageFactory.cs b/src/KurrentDB.Core/Services/Archive/Storage/ArchiveStorageFactory.cs
index 7353d0e9d19..ef69a49587c 100644
--- a/src/KurrentDB.Core/Services/Archive/Storage/ArchiveStorageFactory.cs
+++ b/src/KurrentDB.Core/Services/Archive/Storage/ArchiveStorageFactory.cs
@@ -4,6 +4,7 @@
using System;
using KurrentDB.Core.Services.Archive.Naming;
using KurrentDB.Core.Services.Archive.Storage.Azure;
+using KurrentDB.Core.Services.Archive.Storage.Gcp;
using KurrentDB.Core.Services.Archive.Storage.S3;
namespace KurrentDB.Core.Services.Archive.Storage;
@@ -17,6 +18,7 @@ public static IArchiveStorage Create(ArchiveOptions options, IArchiveNamingStrat
StorageType.FileSystemDevelopmentOnly => new FileSystemBlobStorage(options.FileSystem),
StorageType.S3 => new S3BlobStorage(options.S3),
StorageType.Azure => new AzureBlobStorage(options.Azure),
+ StorageType.GCP => new GcpBlobStorage(options.GCP),
_ => throw new ArgumentOutOfRangeException(nameof(options.StorageType))
};
diff --git a/src/KurrentDB.Core/Services/Archive/Storage/Gcp/GcpBlobStorage.cs b/src/KurrentDB.Core/Services/Archive/Storage/Gcp/GcpBlobStorage.cs
new file mode 100644
index 00000000000..3ec8e7145ec
--- /dev/null
+++ b/src/KurrentDB.Core/Services/Archive/Storage/Gcp/GcpBlobStorage.cs
@@ -0,0 +1,119 @@
+// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
+// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
+
+using System;
+using System.IO;
+using System.Net;
+using System.Net.Http.Headers;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using DotNext;
+using DotNext.Buffers;
+using DotNext.IO;
+using Google;
+using Google.Cloud.Storage.V1;
+using KurrentDB.Common.Exceptions;
+using Serilog;
+
+namespace KurrentDB.Core.Services.Archive.Storage.Gcp;
+
+public class GcpBlobStorage : IBlobStorage {
+ private readonly GcpOptions _options;
+ private readonly StorageClient _storageClient;
+
+ private static readonly ILogger Logger = Log.ForContext();
+
+ public GcpBlobStorage(GcpOptions options) {
+ _options = options;
+
+ if (string.IsNullOrEmpty(options.Bucket))
+ throw new InvalidConfigurationException("Please specify an Archive GCP Bucket");
+
+ _storageClient = StorageClient.Create();
+ }
+
+ public async ValueTask ReadAsync(string name, Memory buffer, long offset, CancellationToken ct) {
+ ArgumentOutOfRangeException.ThrowIfNegative(offset);
+
+ if (buffer.IsEmpty)
+ return 0;
+
+ var destination = StreamSource.AsSynchronousStream(new MemoryWriter(buffer));
+ try {
+ await _storageClient.DownloadObjectAsync(
+ bucket: _options.Bucket,
+ objectName: name,
+ destination: destination,
+ options: new DownloadObjectOptions {
+ Range = GetRange(offset, buffer.Length)
+ }, cancellationToken: ct);
+
+ return (int)destination.Length; // the cast is safe, because Stream.Length cannot be greater than Memory.Length
+ } catch (GoogleApiException ex) when (
+ ex.HttpStatusCode is HttpStatusCode.NotFound &&
+ ex.Error.ErrorResponseContent.StartsWith("No such object:")) {
+ throw new FileNotFoundException();
+ } catch (GoogleApiException ex) when (ex.HttpStatusCode is HttpStatusCode.RequestedRangeNotSatisfiable) {
+ return 0;
+ } catch (GoogleApiException ex) {
+ Logger.Error(ex, "Failed to read object '{name}' at offset: {offset}, length: {length}", name, offset, buffer.Length);
+ throw;
+ } finally {
+ await destination.DisposeAsync();
+ }
+ }
+
+ public async ValueTask StoreAsync(Stream readableStream, string name, CancellationToken ct) {
+ try {
+ await _storageClient.UploadObjectAsync(_options.Bucket, name, string.Empty, readableStream, cancellationToken: ct);
+ } catch (GoogleApiException ex) {
+ Logger.Error(ex, "Failed to store object '{name}'", name);
+ throw;
+ }
+ }
+
+ public async ValueTask GetMetadataAsync(string name, CancellationToken token) {
+ try {
+ var obj = await _storageClient.GetObjectAsync(_options.Bucket, name, cancellationToken: token);
+ return new BlobMetadata(Size: long.CreateSaturating(obj.Size!.Value));
+ } catch (GoogleApiException ex) {
+ Logger.Error(ex, "Failed to fetch metadata for object '{name}'", name);
+ throw;
+ }
+ }
+
+ private static RangeHeaderValue GetRange(long offset, int length) => new(
+ from: offset,
+ to: offset + length - 1L);
+
+ [StructLayout(LayoutKind.Auto)]
+ private struct MemoryWriter(Memory output) : IReadOnlySpanConsumer, IFlushable {
+ void IReadOnlySpanConsumer.Invoke(ReadOnlySpan input) => Copy(input);
+
+ // We need to replace default interface implementation because .NET Runtime
+ // causes boxing when calling default impl on structs
+ ValueTask ISupplier, CancellationToken, ValueTask>.
+ Invoke(ReadOnlyMemory input, CancellationToken token) {
+ var task = ValueTask.CompletedTask;
+ try {
+ Copy(input.Span);
+ } catch (Exception e) {
+ task = ValueTask.FromException(e);
+ }
+
+ return task;
+ }
+
+ private void Copy(ReadOnlySpan input) {
+ input.CopyTo(output.Span);
+ output = output.Slice(input.Length);
+ }
+
+ readonly void IFlushable.Flush() {
+ // nothing to do
+ }
+
+ readonly Task IFlushable.FlushAsync(CancellationToken token) => Task.CompletedTask;
+ }
+}