Skip to content

Commit 4997a34

Browse files
Refactor diagnostics and improve channel processing
- Changed severity levels for diagnostics in `.editorconfig` to `silent` for `IDE0301`, `IDE0305`, `IDE0306`, and `RCS1077`. - Simplified methods in `ChannelDbExtensions` by adding `Contract.EndContractBlock()` and using lambda expressions for channel writing. - Refactored `PipeResultsTo` and `PipeResultsToAsync` methods in `Transformer.cs` for better clarity and handling of transformations. - Added new asynchronous methods in `DataReaderExtensions` for converting `DbDataReader` to `IAsyncEnumerable<object[]>`. - Expanded test cases in `IDataReaderToChannelObjectArrayExtensionTests.cs` to cover additional scenarios and ensure proper null checks. - Updated `ToChannelExtensionsContractTests` to include asynchronous tests for null parameter checks.
1 parent 3b6fcd6 commit 4997a34

File tree

7 files changed

+286
-197
lines changed

7 files changed

+286
-197
lines changed

.editorconfig

+3
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,6 @@ dotnet_diagnostic.IDE0305.severity = silent
186186

187187
# IDE0306: Simplify collection initialization
188188
dotnet_diagnostic.IDE0306.severity = silent
189+
190+
# RCS1077: Optimize LINQ usage.
191+
dotnet_diagnostic.RCS1077.severity = silent

Source/Channel/ToChannel.Target.cs

+6-36
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ private static async ValueTask<long> ToChannelCore<T>(
1616
if (command is null) throw new ArgumentNullException(nameof(command));
1717
if (writer is null) throw new ArgumentNullException(nameof(writer));
1818
if (command.Connection is null) throw new InvalidOperationException("Command has no connection.");
19+
Contract.EndContractBlock();
1920

2021
if (!command.Connection.State.HasFlag(ConnectionState.Open))
2122
{
@@ -101,17 +102,11 @@ public static ValueTask<long> ToChannel(this IDataReader reader,
101102
ChannelWriter<object[]> target,
102103
bool complete,
103104
CancellationToken cancellationToken = default)
104-
{
105-
if (reader is null) throw new ArgumentNullException(nameof(reader));
106-
if (target is null) throw new ArgumentNullException(nameof(target));
107-
Contract.EndContractBlock();
108-
109-
return target.WriteAll(
105+
=> target.WriteAll(
110106
reader.AsEnumerable(),
111107
complete,
112108
false,
113109
cancellationToken);
114-
}
115110

116111
/// <summary>
117112
/// Iterates an <see cref="IDataReader"/> and writes each record as an array to the channel.
@@ -127,17 +122,11 @@ public static ValueTask<long> ToChannel(this IDataReader reader,
127122
bool complete,
128123
ArrayPool<object>? arrayPool,
129124
CancellationToken cancellationToken = default)
130-
{
131-
if (reader is null) throw new ArgumentNullException(nameof(reader));
132-
if (target is null) throw new ArgumentNullException(nameof(target));
133-
Contract.EndContractBlock();
134-
135-
return target.WriteAll(
125+
=> target.WriteAll(
136126
reader.AsEnumerable(arrayPool),
137127
complete,
138128
false,
139129
cancellationToken);
140-
}
141130

142131
/// <summary>
143132
/// Iterates an <see cref="IDataReader"/> through the transform function and writes each record to the channel.
@@ -154,18 +143,11 @@ public static ValueTask<long> ToChannel<T>(this IDataReader reader,
154143
bool complete,
155144
Func<IDataRecord, T> transform,
156145
CancellationToken cancellationToken = default)
157-
{
158-
if (reader is null) throw new ArgumentNullException(nameof(reader));
159-
if (target is null) throw new ArgumentNullException(nameof(target));
160-
if (transform is null) throw new ArgumentNullException(nameof(transform));
161-
Contract.EndContractBlock();
162-
163-
return target.WriteAll(
146+
=> target.WriteAll(
164147
reader.Select(transform, cancellationToken),
165148
complete,
166149
false,
167150
cancellationToken);
168-
}
169151

170152
/// <summary>
171153
/// Iterates an <see cref="IDataReader"/> mapping the results to classes of type <typeparamref name="T"/> and writes each record to the channel.
@@ -521,17 +503,11 @@ public static ValueTask<long> ToChannelAsync(this DbDataReader reader,
521503
ChannelWriter<object[]> target,
522504
bool complete,
523505
CancellationToken cancellationToken = default)
524-
{
525-
if (reader is null) throw new ArgumentNullException(nameof(reader));
526-
if (target is null) throw new ArgumentNullException(nameof(target));
527-
Contract.EndContractBlock();
528-
529-
return target.WriteAllAsync(
506+
=> target.WriteAllAsync(
530507
reader.AsAsyncEnumerable(cancellationToken),
531508
complete,
532509
false,
533510
cancellationToken);
534-
}
535511

536512
/// <summary>
537513
/// Asynchronously iterates an DbDataReader and writes each record as an array to the channel.
@@ -546,17 +522,11 @@ public static ValueTask<long> ToChannelAsync(this DbDataReader reader,
546522
bool complete,
547523
ArrayPool<object>? arrayPool,
548524
CancellationToken cancellationToken = default)
549-
{
550-
if (reader is null) throw new ArgumentNullException(nameof(reader));
551-
if (target is null) throw new ArgumentNullException(nameof(target));
552-
Contract.EndContractBlock();
553-
554-
return target.WriteAllAsync(
525+
=> target.WriteAllAsync(
555526
reader.AsAsyncEnumerable(arrayPool, cancellationToken),
556527
complete,
557528
false,
558529
cancellationToken);
559-
}
560530

561531
/// <summary>
562532
/// Asynchronously iterates an DbDataReader through the transform function and writes each record to the channel.

Source/Channel/Transformer.cs

+44-74
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
namespace Open.Database.Extensions;
1+
using System.Diagnostics;
2+
3+
namespace Open.Database.Extensions;
24

35
/// <summary>Constructs a <see cref="Transformer{T}"/>.</summary>
46
/// <param name="fieldMappingOverrides">An optional override map of field names to column names where the keys are the property names, and values are the column names.</param>
@@ -10,24 +12,27 @@ internal class Transformer<T>(IEnumerable<(string Field, string? Column)>? field
1012
/// Static utility for creating a Transformer <typeparamref name="T"/>.
1113
/// </summary>
1214
/// <param name="fieldMappingOverrides">An optional override map of field names to column names where the keys are the property names, and values are the column names.</param>
15+
[ExcludeFromCodeCoverage]
1316
public static new Transformer<T> Create(IEnumerable<(string Field, string? Column)>? fieldMappingOverrides = null)
1417
=> new(fieldMappingOverrides);
1518

1619
/// <summary>
17-
/// Transforms the results from the reader by first buffering the results and if/when the buffer size is reached, the results are transformed to a channel for reading.
20+
/// Transforms the results from the reader by first buffering the results and then the final results are transformed to the target channel for reading.
1821
/// </summary>
19-
/// <param name="reader">The reader to read from.</param>
20-
/// <param name="target">The target channel to write to.</param>
21-
/// <param name="complete">Will call complete when no more results are avaiable.</param>
22-
/// <param name="cancellationToken">The cancellation token.</param>
23-
/// <returns>The ChannelReader of the target.</returns>
24-
internal async ValueTask<long> PipeResultsTo(IDataReader reader, ChannelWriter<T> target, bool complete, CancellationToken cancellationToken)
22+
/// <remarks>
23+
/// This is necessary to absorb as much data as possible first and defer the transformation till later.
24+
/// </remarks>
25+
private ChannelWriter<object[]> PipeResultsToPrep(
26+
IDataReader reader,
27+
ChannelWriter<T> target,
28+
bool complete,
29+
CancellationToken cancellationToken)
2530
{
2631
if (reader is null) throw new ArgumentNullException(nameof(reader));
32+
if (target is null) throw new ArgumentNullException(nameof(target));
2733
Contract.EndContractBlock();
2834

2935
(string Name, int Ordinal)[] columns = reader.GetMatchingOrdinals(ColumnNames, true);
30-
var ordinals = columns.Select(m => m.Ordinal).ToImmutableArray();
3136
var names = columns.Select(m => m.Name).ToImmutableArray();
3237

3338
var processor = new Processor(this, names);
@@ -36,7 +41,7 @@ internal async ValueTask<long> PipeResultsTo(IDataReader reader, ChannelWriter<T
3641
Channel<object[]> channel = ChannelDbExtensions.CreateChannel<object[]>(MaxArrayBuffer, true);
3742
ChannelWriter<object[]> writer = channel.Writer;
3843

39-
ValueTask<long> piped = channel
44+
_ = channel
4045
.Reader
4146
.Transform(a =>
4247
{
@@ -49,26 +54,29 @@ internal async ValueTask<long> PipeResultsTo(IDataReader reader, ChannelWriter<T
4954
LocalPool.Return(a);
5055
}
5156
})
52-
.PipeTo(target, complete, cancellationToken);
53-
54-
if (complete)
55-
{
56-
_ = piped
57-
.AsTask()
58-
.ContinueWith(t =>
59-
{
60-
if (t.IsFaulted) target.Complete(t.Exception);
61-
else target.Complete();
62-
},
63-
CancellationToken.None,
64-
TaskContinuationOptions.ExecuteSynchronously,
65-
TaskScheduler.Current);
66-
}
57+
.PipeTo(target, complete, cancellationToken)
58+
.AsTask();
6759

68-
return await reader
69-
.ToChannel(writer, false, LocalPool, cancellationToken).ConfigureAwait(false);
60+
return writer;
7061
}
7162

63+
/// <summary>
64+
/// Transforms the results from the reader by first buffering the results and if/when the buffer size is reached, the results are transformed to a channel for reading.
65+
/// </summary>
66+
/// <param name="reader">The reader to read from.</param>
67+
/// <param name="target">The target channel to write to.</param>
68+
/// <param name="complete">Will call complete when no more results are avaiable.</param>
69+
/// <param name="cancellationToken">The cancellation token.</param>
70+
/// <returns>The ChannelReader of the target.</returns>
71+
internal ValueTask<long> PipeResultsTo(
72+
IDataReader reader,
73+
ChannelWriter<T> target,
74+
bool complete,
75+
CancellationToken cancellationToken)
76+
=> reader.ToChannel(
77+
PipeResultsToPrep(reader, target, complete, cancellationToken),
78+
true, LocalPool, cancellationToken);
79+
7280
#if NETSTANDARD2_0
7381
#else
7482
/// <summary>
@@ -79,53 +87,15 @@ internal async ValueTask<long> PipeResultsTo(IDataReader reader, ChannelWriter<T
7987
/// <param name="complete">Will call complete when no more results are avaiable.</param>
8088
/// <param name="cancellationToken">The cancellation token.</param>
8189
/// <returns>The ChannelReader of the target.</returns>
82-
internal ValueTask<long> PipeResultsToAsync(DbDataReader reader, ChannelWriter<T> target, bool complete, CancellationToken cancellationToken)
83-
{
84-
if (reader is null) throw new ArgumentNullException(nameof(reader));
85-
Contract.EndContractBlock();
86-
87-
(string Name, int Ordinal)[] columns = reader.GetMatchingOrdinals(ColumnNames, true);
88-
var ordinals = columns.Select(m => m.Ordinal).ToImmutableArray();
89-
var names = columns.Select(m => m.Name).ToImmutableArray();
90-
91-
var processor = new Processor(this, names);
92-
Func<object?[], T> transform = processor.Transform;
93-
94-
Channel<object[]> channel = ChannelDbExtensions.CreateChannel<object[]>(MaxArrayBuffer, true);
95-
ChannelWriter<object[]> writer = channel.Writer;
96-
97-
ValueTask<long> piped = channel
98-
.Reader
99-
.Transform(a =>
100-
{
101-
try
102-
{
103-
return transform(a);
104-
}
105-
finally
106-
{
107-
LocalPool.Return(a);
108-
}
109-
})
110-
.PipeTo(target, complete, cancellationToken);
111-
112-
if (complete)
113-
{
114-
_ = piped
115-
.AsTask()
116-
.ContinueWith(t =>
117-
{
118-
if (t.IsFaulted) target.Complete(t.Exception);
119-
else target.Complete();
120-
},
121-
CancellationToken.None,
122-
TaskContinuationOptions.ExecuteSynchronously,
123-
TaskScheduler.Current);
124-
}
125-
126-
return reader
127-
.ToChannelAsync(writer, false, LocalPool, cancellationToken);
128-
}
90+
[ExcludeFromCodeCoverage]
91+
internal ValueTask<long> PipeResultsToAsync(
92+
DbDataReader reader,
93+
ChannelWriter<T> target,
94+
bool complete,
95+
CancellationToken cancellationToken)
96+
=> reader.ToChannelAsync(
97+
PipeResultsToPrep(reader, target, complete, cancellationToken),
98+
true, LocalPool, cancellationToken);
12999
#endif
130100

131101
}

0 commit comments

Comments
 (0)