From ac6b1c71a230fb6c69d089287b5f3d108ced2d26 Mon Sep 17 00:00:00 2001 From: xwwwx Date: Tue, 22 Apr 2025 13:13:34 +0800 Subject: [PATCH 1/5] Refactor Bulk Insert to Use Memory for Enhanced Flexibility and Efficiency --- ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs | 9 ++++----- ClickHouse.Client/Copy/Batch.cs | 4 ++-- ClickHouse.Client/Copy/ClickHouseBulkCopy.cs | 14 +++++++++----- .../Copy/Serializer/BatchSerializer.cs | 15 +++++---------- .../Copy/Serializer/IRowSerializer.cs | 3 ++- .../Copy/Serializer/RowBinarySerializer.cs | 3 ++- .../Serializer/RowBinaryWithDefaultsSerializer.cs | 3 ++- ClickHouse.Client/Utility/EnumerableExtensions.cs | 8 ++++---- 8 files changed, 30 insertions(+), 29 deletions(-) diff --git a/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs b/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs index 4742cc58..886cc4fd 100644 --- a/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs +++ b/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs @@ -449,9 +449,9 @@ public async Task ShouldThrowExceptionOnInnerException(double fraction) } [Test] - public async Task ShouldNotAffectSharedArrayPool() + public async Task ShouldNotAffectSharedMemoryPool() { - var targetTable = "test." + SanitizeTableName($"array_pool"); + var targetTable = "test." + SanitizeTableName($"memory_pool"); await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}"); await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (int Int32, str String, dt DateTime) ENGINE Null"); @@ -466,9 +466,8 @@ public async Task ShouldNotAffectSharedArrayPool() await bulkCopy.InitAsync(); await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 0, "a", DateTime.Now }, 100)); - var rentedArray = ArrayPool.Shared.Rent(poolSize); - Assert.DoesNotThrow(() => { rentedArray[0] = 1; }); - ArrayPool.Shared.Return(rentedArray); + using var rentedArray = MemoryPool.Shared.Rent(poolSize); + Assert.DoesNotThrow(() => { rentedArray.Memory.Span[0] = 1; }); } [Test] diff --git a/ClickHouse.Client/Copy/Batch.cs b/ClickHouse.Client/Copy/Batch.cs index 23b73021..9b977c41 100644 --- a/ClickHouse.Client/Copy/Batch.cs +++ b/ClickHouse.Client/Copy/Batch.cs @@ -7,7 +7,7 @@ namespace ClickHouse.Client.Copy; // Convenience argument collection internal struct Batch : IDisposable { - public object[][] Rows; + public IMemoryOwner> Rows; public int Size; public string Query; public ClickHouseType[] Types; @@ -16,7 +16,7 @@ public void Dispose() { if (Rows != null) { - ArrayPool.Shared.Return(Rows, true); + Rows.Dispose(); Rows = null; } } diff --git a/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs b/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs index e8441beb..be28e03e 100644 --- a/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs +++ b/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs @@ -117,7 +117,7 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken token) if (reader is null) throw new ArgumentNullException(nameof(reader)); - return WriteToServerAsync(reader.AsEnumerable(), token); + return WriteToServerAsync(reader.AsEnumerable().Select(r => new Memory(r)), token); } public Task WriteToServerAsync(DataTable table, CancellationToken token) @@ -125,13 +125,17 @@ public Task WriteToServerAsync(DataTable table, CancellationToken token) if (table is null) throw new ArgumentNullException(nameof(table)); - var rows = table.Rows.Cast().Select(r => r.ItemArray); + var rows = table.Rows.Cast().Select(r => new Memory(r.ItemArray)); return WriteToServerAsync(rows, token); } - public Task WriteToServerAsync(IEnumerable rows) => WriteToServerAsync(rows, CancellationToken.None); + public Task WriteToServerAsync(IEnumerable rows) => + WriteToServerAsync(rows.Select(r => new Memory(r)), CancellationToken.None); - public async Task WriteToServerAsync(IEnumerable rows, CancellationToken token) + public Task WriteToServerAsync(IEnumerable rows, CancellationToken token) => + WriteToServerAsync(rows.Select(r => new Memory(r)), token); + + public async Task WriteToServerAsync(IEnumerable> rows, CancellationToken token) { if (rows is null) throw new ArgumentNullException(nameof(rows)); @@ -201,7 +205,7 @@ public void Dispose() private static string GetColumnsExpression(IReadOnlyCollection columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns); - private IEnumerable IntoBatches(IEnumerable rows, string query, ClickHouseType[] types) + private IEnumerable IntoBatches(IEnumerable> rows, string query, ClickHouseType[] types) { foreach (var (batch, size) in rows.BatchRented(BatchSize)) { diff --git a/ClickHouse.Client/Copy/Serializer/BatchSerializer.cs b/ClickHouse.Client/Copy/Serializer/BatchSerializer.cs index 1a1e79cc..f10fe8c9 100644 --- a/ClickHouse.Client/Copy/Serializer/BatchSerializer.cs +++ b/ClickHouse.Client/Copy/Serializer/BatchSerializer.cs @@ -35,24 +35,19 @@ public void Serialize(Batch batch, Stream stream) using var writer = new ExtendedBinaryWriter(gzipStream); - object[] row = null; - int counter = 0; - var enumerator = batch.Rows.GetEnumerator(); + Memory row = null; + var enumerator = batch.Rows.Memory.Span.Slice(0, batch.Size).GetEnumerator(); try { while (enumerator.MoveNext()) { - row = (object[])enumerator.Current; - rowSerializer.Serialize(row, batch.Types, writer); - - counter++; - if (counter >= batch.Size) - break; // We've reached the batch size + row = enumerator.Current; + rowSerializer.Serialize(row.Span, batch.Types, writer); } } catch (Exception e) { - throw new ClickHouseBulkCopySerializationException(row, e); + throw new ClickHouseBulkCopySerializationException(row.ToArray(), e); } } } diff --git a/ClickHouse.Client/Copy/Serializer/IRowSerializer.cs b/ClickHouse.Client/Copy/Serializer/IRowSerializer.cs index a2d329c5..8b5b531d 100644 --- a/ClickHouse.Client/Copy/Serializer/IRowSerializer.cs +++ b/ClickHouse.Client/Copy/Serializer/IRowSerializer.cs @@ -1,3 +1,4 @@ +using System; using ClickHouse.Client.Formats; using ClickHouse.Client.Types; @@ -5,5 +6,5 @@ namespace ClickHouse.Client.Copy.Serializer; internal interface IRowSerializer { - void Serialize(object[] row, ClickHouseType[] types, ExtendedBinaryWriter writer); + void Serialize(Span row, ClickHouseType[] types, ExtendedBinaryWriter writer); } diff --git a/ClickHouse.Client/Copy/Serializer/RowBinarySerializer.cs b/ClickHouse.Client/Copy/Serializer/RowBinarySerializer.cs index 7d424d1b..8e647e92 100644 --- a/ClickHouse.Client/Copy/Serializer/RowBinarySerializer.cs +++ b/ClickHouse.Client/Copy/Serializer/RowBinarySerializer.cs @@ -1,3 +1,4 @@ +using System; using ClickHouse.Client.Formats; using ClickHouse.Client.Types; @@ -5,7 +6,7 @@ namespace ClickHouse.Client.Copy.Serializer; internal class RowBinarySerializer : IRowSerializer { - public void Serialize(object[] row, ClickHouseType[] types, ExtendedBinaryWriter writer) + public void Serialize(Span row, ClickHouseType[] types, ExtendedBinaryWriter writer) { for (int col = 0; col < row.Length; col++) { diff --git a/ClickHouse.Client/Copy/Serializer/RowBinaryWithDefaultsSerializer.cs b/ClickHouse.Client/Copy/Serializer/RowBinaryWithDefaultsSerializer.cs index f3220b63..e179b9e8 100644 --- a/ClickHouse.Client/Copy/Serializer/RowBinaryWithDefaultsSerializer.cs +++ b/ClickHouse.Client/Copy/Serializer/RowBinaryWithDefaultsSerializer.cs @@ -1,3 +1,4 @@ +using System; using ClickHouse.Client.Constraints; using ClickHouse.Client.Formats; using ClickHouse.Client.Types; @@ -7,7 +8,7 @@ namespace ClickHouse.Client.Copy.Serializer; // https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithdefaults internal class RowBinaryWithDefaultsSerializer : IRowSerializer { - public void Serialize(object[] row, ClickHouseType[] types, ExtendedBinaryWriter writer) + public void Serialize(Span row, ClickHouseType[] types, ExtendedBinaryWriter writer) { for (int col = 0; col < row.Length; col++) { diff --git a/ClickHouse.Client/Utility/EnumerableExtensions.cs b/ClickHouse.Client/Utility/EnumerableExtensions.cs index 271800ef..9235045c 100644 --- a/ClickHouse.Client/Utility/EnumerableExtensions.cs +++ b/ClickHouse.Client/Utility/EnumerableExtensions.cs @@ -23,20 +23,20 @@ public static void Deconstruct(this IList list, out T first, out T second, third = list[2]; } - public static IEnumerable<(T[], int)> BatchRented(this IEnumerable enumerable, int batchSize) + public static IEnumerable<(IMemoryOwner, int)> BatchRented(this IEnumerable enumerable, int batchSize) { - var array = ArrayPool.Shared.Rent(batchSize); + var array = MemoryPool.Shared.Rent(batchSize); int counter = 0; foreach (var item in enumerable) { - array[counter++] = item; + array.Memory.Span[counter++] = item; if (counter >= batchSize) { yield return (array, counter); counter = 0; - array = ArrayPool.Shared.Rent(batchSize); + array = MemoryPool.Shared.Rent(batchSize); } } if (counter > 0) From 8d5e37fc9a13f01236329da19741e44d3bc4b7d3 Mon Sep 17 00:00:00 2001 From: xwwwx Date: Wed, 23 Apr 2025 11:25:29 +0800 Subject: [PATCH 2/5] replace MemoryPool by ArrayPool --- ClickHouse.Client.Tests/TestUtilities.cs | 4 ++-- ClickHouse.Client/Copy/Batch.cs | 4 ++-- ClickHouse.Client/Copy/Serializer/BatchSerializer.cs | 2 +- ClickHouse.Client/Utility/EnumerableExtensions.cs | 8 ++++---- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ClickHouse.Client.Tests/TestUtilities.cs b/ClickHouse.Client.Tests/TestUtilities.cs index bc43026f..9fa98cf9 100644 --- a/ClickHouse.Client.Tests/TestUtilities.cs +++ b/ClickHouse.Client.Tests/TestUtilities.cs @@ -74,8 +74,8 @@ public static ClickHouseConnection GetTestClickHouseConnection(bool compression public static ClickHouseConnectionStringBuilder GetConnectionStringBuilder() { // Connection string must be provided pointing to a test ClickHouse server - var devConnectionString = Environment.GetEnvironmentVariable("CLICKHOUSE_CONNECTION") ?? - throw new InvalidOperationException("Must set CLICKHOUSE_CONNECTION environment variable pointing at ClickHouse server"); + var devConnectionString = "Host=localhost;Port=18123;User=default;Password=changeme;" ?? + throw new InvalidOperationException("Must set CLICKHOUSE_CONNECTION environment variable pointing at ClickHouse server"); return new ClickHouseConnectionStringBuilder(devConnectionString); } diff --git a/ClickHouse.Client/Copy/Batch.cs b/ClickHouse.Client/Copy/Batch.cs index 9b977c41..ba894135 100644 --- a/ClickHouse.Client/Copy/Batch.cs +++ b/ClickHouse.Client/Copy/Batch.cs @@ -7,7 +7,7 @@ namespace ClickHouse.Client.Copy; // Convenience argument collection internal struct Batch : IDisposable { - public IMemoryOwner> Rows; + public Memory[] Rows; public int Size; public string Query; public ClickHouseType[] Types; @@ -16,7 +16,7 @@ public void Dispose() { if (Rows != null) { - Rows.Dispose(); + ArrayPool>.Shared.Return(Rows, true); Rows = null; } } diff --git a/ClickHouse.Client/Copy/Serializer/BatchSerializer.cs b/ClickHouse.Client/Copy/Serializer/BatchSerializer.cs index f10fe8c9..1bd48234 100644 --- a/ClickHouse.Client/Copy/Serializer/BatchSerializer.cs +++ b/ClickHouse.Client/Copy/Serializer/BatchSerializer.cs @@ -36,7 +36,7 @@ public void Serialize(Batch batch, Stream stream) using var writer = new ExtendedBinaryWriter(gzipStream); Memory row = null; - var enumerator = batch.Rows.Memory.Span.Slice(0, batch.Size).GetEnumerator(); + var enumerator = batch.Rows.AsSpan(0, batch.Size).GetEnumerator(); try { while (enumerator.MoveNext()) diff --git a/ClickHouse.Client/Utility/EnumerableExtensions.cs b/ClickHouse.Client/Utility/EnumerableExtensions.cs index 9235045c..271800ef 100644 --- a/ClickHouse.Client/Utility/EnumerableExtensions.cs +++ b/ClickHouse.Client/Utility/EnumerableExtensions.cs @@ -23,20 +23,20 @@ public static void Deconstruct(this IList list, out T first, out T second, third = list[2]; } - public static IEnumerable<(IMemoryOwner, int)> BatchRented(this IEnumerable enumerable, int batchSize) + public static IEnumerable<(T[], int)> BatchRented(this IEnumerable enumerable, int batchSize) { - var array = MemoryPool.Shared.Rent(batchSize); + var array = ArrayPool.Shared.Rent(batchSize); int counter = 0; foreach (var item in enumerable) { - array.Memory.Span[counter++] = item; + array[counter++] = item; if (counter >= batchSize) { yield return (array, counter); counter = 0; - array = MemoryPool.Shared.Rent(batchSize); + array = ArrayPool.Shared.Rent(batchSize); } } if (counter > 0) From f67dfc2a46bf1fc8cd95c37a3e57ea423cd2084d Mon Sep 17 00:00:00 2001 From: xwwwx Date: Wed, 23 Apr 2025 11:32:24 +0800 Subject: [PATCH 3/5] Revert testUtilites --- ClickHouse.Client.Tests/TestUtilities.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ClickHouse.Client.Tests/TestUtilities.cs b/ClickHouse.Client.Tests/TestUtilities.cs index 9fa98cf9..bc43026f 100644 --- a/ClickHouse.Client.Tests/TestUtilities.cs +++ b/ClickHouse.Client.Tests/TestUtilities.cs @@ -74,8 +74,8 @@ public static ClickHouseConnection GetTestClickHouseConnection(bool compression public static ClickHouseConnectionStringBuilder GetConnectionStringBuilder() { // Connection string must be provided pointing to a test ClickHouse server - var devConnectionString = "Host=localhost;Port=18123;User=default;Password=changeme;" ?? - throw new InvalidOperationException("Must set CLICKHOUSE_CONNECTION environment variable pointing at ClickHouse server"); + var devConnectionString = Environment.GetEnvironmentVariable("CLICKHOUSE_CONNECTION") ?? + throw new InvalidOperationException("Must set CLICKHOUSE_CONNECTION environment variable pointing at ClickHouse server"); return new ClickHouseConnectionStringBuilder(devConnectionString); } From 983e0a366f40780903242250ad09d19cd3b1dfb4 Mon Sep 17 00:00:00 2001 From: xwwwx Date: Wed, 23 Apr 2025 13:14:08 +0800 Subject: [PATCH 4/5] ShouldInsertDataReader/ShouldInsertDataTable --- .../BulkCopy/BulkCopyTests.cs | 82 ++++++++++++++++++- 1 file changed, 78 insertions(+), 4 deletions(-) diff --git a/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs b/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs index 886cc4fd..ae252f1e 100644 --- a/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs +++ b/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs @@ -1,6 +1,7 @@ using System; using System.Buffers; using System.Collections.Generic; +using System.Data; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; @@ -449,9 +450,9 @@ public async Task ShouldThrowExceptionOnInnerException(double fraction) } [Test] - public async Task ShouldNotAffectSharedMemoryPool() + public async Task ShouldNotAffectSharedArrayPool() { - var targetTable = "test." + SanitizeTableName($"memory_pool"); + var targetTable = "test." + SanitizeTableName($"Array_pool"); await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}"); await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (int Int32, str String, dt DateTime) ENGINE Null"); @@ -466,8 +467,9 @@ public async Task ShouldNotAffectSharedMemoryPool() await bulkCopy.InitAsync(); await bulkCopy.WriteToServerAsync(Enumerable.Repeat(new object[] { 0, "a", DateTime.Now }, 100)); - using var rentedArray = MemoryPool.Shared.Rent(poolSize); - Assert.DoesNotThrow(() => { rentedArray.Memory.Span[0] = 1; }); + var rentedArray = ArrayPool>.Shared.Rent(poolSize); + Assert.DoesNotThrow(() => { rentedArray[0] = new Memory(); }); + ArrayPool>.Shared.Return(rentedArray); } [Test] @@ -495,5 +497,77 @@ public async Task ShouldInsertJson() Assert.That(reader.GetValue(0), Is.EqualTo(jsonObject).UsingPropertiesComparer()); } } + + [Test] + public async Task ShouldInsertDataReader() + { + var targetTable = "test." + SanitizeTableName($"bulk_datareader"); + await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (int Int32, str String) ENGINE Memory"); + + using var bulkCopy = new ClickHouseBulkCopy(connection) + { + DestinationTableName = targetTable, + }; + + var dataTable = new DataTable(); + dataTable.Columns.Add(new DataColumn("int", typeof(int))); + dataTable.Columns.Add(new DataColumn("str", typeof(string))); + dataTable.Rows.Add(new object[] { 1, "a" }); + dataTable.Rows.Add(new object[] { 2, "b" }); + dataTable.Rows.Add(new object[] { 3, "c" }); + + + await bulkCopy.InitAsync(); + await bulkCopy.WriteToServerAsync(dataTable.CreateDataReader()); + + Assert.That(bulkCopy.RowsWritten, Is.EqualTo(3)); + await using var reader = await connection.ExecuteReaderAsync($"SELECT * from {targetTable}"); + Assert.That(reader.Read(), Is.True); + Assert.That(reader.GetInt32(0), Is.EqualTo(dataTable.Rows[0]["int"])); + Assert.That(reader.GetString(1), Is.EqualTo(dataTable.Rows[0]["str"])); + Assert.That(reader.Read(), Is.True); + Assert.That(reader.GetInt32(0), Is.EqualTo(dataTable.Rows[1]["int"])); + Assert.That(reader.GetString(1), Is.EqualTo(dataTable.Rows[1]["str"])); + Assert.That(reader.Read(), Is.True); + Assert.That(reader.GetInt32(0), Is.EqualTo(dataTable.Rows[2]["int"])); + Assert.That(reader.GetString(1), Is.EqualTo(dataTable.Rows[2]["str"])); + } + + [Test] + public async Task ShouldInsertDataTable() + { + var targetTable = "test." + SanitizeTableName($"bulk_dataTable"); + await connection.ExecuteStatementAsync($"DROP TABLE IF EXISTS {targetTable}"); + await connection.ExecuteStatementAsync($"CREATE TABLE IF NOT EXISTS {targetTable} (int Int32, str String) ENGINE Memory"); + + using var bulkCopy = new ClickHouseBulkCopy(connection) + { + DestinationTableName = targetTable, + }; + + var dataTable = new DataTable(); + dataTable.Columns.Add(new DataColumn("int", typeof(int))); + dataTable.Columns.Add(new DataColumn("str", typeof(string))); + dataTable.Rows.Add(new object[] { 1, "a" }); + dataTable.Rows.Add(new object[] { 2, "b" }); + dataTable.Rows.Add(new object[] { 3, "c" }); + + + await bulkCopy.InitAsync(); + await bulkCopy.WriteToServerAsync(dataTable, CancellationToken.None); + + Assert.That(bulkCopy.RowsWritten, Is.EqualTo(3)); + await using var reader = await connection.ExecuteReaderAsync($"SELECT * from {targetTable}"); + Assert.That(reader.Read(), Is.True); + Assert.That(reader.GetInt32(0), Is.EqualTo(dataTable.Rows[0]["int"])); + Assert.That(reader.GetString(1), Is.EqualTo(dataTable.Rows[0]["str"])); + Assert.That(reader.Read(), Is.True); + Assert.That(reader.GetInt32(0), Is.EqualTo(dataTable.Rows[1]["int"])); + Assert.That(reader.GetString(1), Is.EqualTo(dataTable.Rows[1]["str"])); + Assert.That(reader.Read(), Is.True); + Assert.That(reader.GetInt32(0), Is.EqualTo(dataTable.Rows[2]["int"])); + Assert.That(reader.GetString(1), Is.EqualTo(dataTable.Rows[2]["str"])); + } } From 554b1fc442e6c17d94b98c160226f888ba20d5d6 Mon Sep 17 00:00:00 2001 From: xwwwx Date: Wed, 23 Apr 2025 13:21:55 +0800 Subject: [PATCH 5/5] replace await using by using --- ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs b/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs index ae252f1e..bedce69a 100644 --- a/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs +++ b/ClickHouse.Client.Tests/BulkCopy/BulkCopyTests.cs @@ -522,7 +522,7 @@ public async Task ShouldInsertDataReader() await bulkCopy.WriteToServerAsync(dataTable.CreateDataReader()); Assert.That(bulkCopy.RowsWritten, Is.EqualTo(3)); - await using var reader = await connection.ExecuteReaderAsync($"SELECT * from {targetTable}"); + using var reader = await connection.ExecuteReaderAsync($"SELECT * from {targetTable}"); Assert.That(reader.Read(), Is.True); Assert.That(reader.GetInt32(0), Is.EqualTo(dataTable.Rows[0]["int"])); Assert.That(reader.GetString(1), Is.EqualTo(dataTable.Rows[0]["str"])); @@ -558,7 +558,7 @@ public async Task ShouldInsertDataTable() await bulkCopy.WriteToServerAsync(dataTable, CancellationToken.None); Assert.That(bulkCopy.RowsWritten, Is.EqualTo(3)); - await using var reader = await connection.ExecuteReaderAsync($"SELECT * from {targetTable}"); + using var reader = await connection.ExecuteReaderAsync($"SELECT * from {targetTable}"); Assert.That(reader.Read(), Is.True); Assert.That(reader.GetInt32(0), Is.EqualTo(dataTable.Rows[0]["int"])); Assert.That(reader.GetString(1), Is.EqualTo(dataTable.Rows[0]["str"]));