Skip to content

Commit bcc7e2f

Browse files
committed
+ConcatMapEager, +UnicastAsyncEnumerable
1 parent 54bb3c7 commit bcc7e2f

File tree

9 files changed

+955
-5
lines changed

9 files changed

+955
-5
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ finally
4040
- `Create` - generate values via async push
4141
- `CombineLatest` - combines the latest items of the source async sequences via a function into results
4242
- `Concat` - concatenate multiple async sequences
43+
- `ConcatEager` - run multiple async sequences at once but relay elements in order similar to `Concat`
4344
- `Defer` - defer the creation of the actual `IAsyncEnumerable`
4445
- `Error` - signal an error
4546
- `Empty` - the async sequence ends without any values
@@ -62,6 +63,7 @@ finally
6263
- `Buffer` - collect some number of items into buffer(s) and emit those buffers
6364
- `Collect` - collect items into a custom collection and emit the collection at the end
6465
- `ConcatMap` - concatenate in order the inner async sequences mapped from the main sequence
66+
- `ConcatMapEager` - run the async sources at once but relay elements in order similar to `ConcatMap`
6567
- `ConcatWith` - concatenate in order with another async sequence
6668
- `Distinct` - makes sure only distinct elements get relayed
6769
- `DistinctUntilChanged` - relays an element only if it is distinct from the previous item
@@ -118,6 +120,7 @@ finally
118120

119121
- `MulticastAsyncEnumerable` - signals events to currently associated IAsyncEnumerator consumers (aka PublishSubject).
120122
- `ReplayAsyncEnumerable` - replays some or all items to its IAsyncEnumerator consumers (aka ReplaySubject).
123+
- `UnicastAsyncEnumerable` - buffers then replay items for an only consumer
121124

122125
### IAsyncConsumer
123126

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright (c) David Karnok & Contributors.
2+
// Licensed under the Apache 2.0 License.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using Xunit;
6+
using async_enumerable_dotnet;
7+
using System;
8+
9+
namespace async_enumerable_dotnet_test
10+
{
11+
public class ConcatMapEagerTest
12+
{
13+
[Fact]
14+
public async void Normal()
15+
{
16+
await AsyncEnumerable.Range(1, 5)
17+
.ConcatMapEager(v => AsyncEnumerable.Range(v * 10, 5))
18+
.AssertResult(
19+
10, 11, 12, 13, 14,
20+
20, 21, 22, 23, 24,
21+
30, 31, 32, 33, 34,
22+
40, 41, 42, 43, 44,
23+
50, 51, 52, 53, 54
24+
);
25+
}
26+
27+
[Fact]
28+
public async void Normal_Take()
29+
{
30+
await AsyncEnumerable.Range(1, 5)
31+
.ConcatMapEager(v => AsyncEnumerable.Range(v * 10, 5))
32+
.Take(7)
33+
.AssertResult(
34+
10, 11, 12, 13, 14,
35+
20, 21
36+
);
37+
}
38+
39+
[Fact]
40+
public async void Normal_Params()
41+
{
42+
await AsyncEnumerable.ConcatEager(
43+
AsyncEnumerable.Range(1, 5),
44+
AsyncEnumerable.Range(6, 5)
45+
)
46+
.AssertResult(
47+
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
48+
);
49+
}
50+
51+
[Fact]
52+
public async void Normal_Params_MaxConcurrency()
53+
{
54+
await AsyncEnumerable.ConcatEager(1,
55+
AsyncEnumerable.Range(1, 5),
56+
AsyncEnumerable.Range(6, 5)
57+
)
58+
.AssertResult(
59+
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
60+
);
61+
}
62+
63+
[Fact]
64+
public async void Normal_Params_MaxConcurrency_Prefetch()
65+
{
66+
await AsyncEnumerable.ConcatEager(1, 1,
67+
AsyncEnumerable.Range(1, 5),
68+
AsyncEnumerable.Range(6, 5)
69+
)
70+
.AssertResult(
71+
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
72+
);
73+
}
74+
75+
[Fact]
76+
public async void Nested_Normal()
77+
{
78+
await AsyncEnumerable.FromArray(
79+
AsyncEnumerable.Range(1, 5),
80+
AsyncEnumerable.Range(6, 5)
81+
)
82+
.ConcatEager()
83+
.AssertResult(
84+
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
85+
);
86+
}
87+
88+
[Fact]
89+
public async void Main_Error()
90+
{
91+
await AsyncEnumerable.Range(1, 5).ConcatWith(AsyncEnumerable.Error<int>(new InvalidOperationException()))
92+
.ConcatMapEager(v => AsyncEnumerable.Range(v * 10, 5))
93+
.AssertFailure(typeof(InvalidOperationException),
94+
10, 11, 12, 13, 14,
95+
20, 21, 22, 23, 24,
96+
30, 31, 32, 33, 34,
97+
40, 41, 42, 43, 44,
98+
50, 51, 52, 53, 54
99+
);
100+
}
101+
102+
[Fact]
103+
public async void Inner_Error()
104+
{
105+
await AsyncEnumerable.Range(1, 5)
106+
.ConcatMapEager(v => {
107+
var res = AsyncEnumerable.Range(v * 10, 5);
108+
if (v == 3)
109+
{
110+
res = res.ConcatWith(AsyncEnumerable.Error<int>(new InvalidOperationException()));
111+
}
112+
return res;
113+
})
114+
.AssertFailure(typeof(InvalidOperationException),
115+
10, 11, 12, 13, 14,
116+
20, 21, 22, 23, 24,
117+
30, 31, 32, 33, 34,
118+
40, 41, 42, 43, 44,
119+
50, 51, 52, 53, 54
120+
);
121+
}
122+
123+
[Fact]
124+
public async void MaxConcurrency_Prefetch_Matrix()
125+
{
126+
for (var concurrency = 1; concurrency < 7; concurrency++)
127+
{
128+
for (var prefetch = 1; prefetch < 7; prefetch++)
129+
{
130+
await AsyncEnumerable.Range(1, 5)
131+
.ConcatMapEager(v => AsyncEnumerable.Range(v * 10, 5), concurrency, prefetch)
132+
.AssertResult(
133+
10, 11, 12, 13, 14,
134+
20, 21, 22, 23, 24,
135+
30, 31, 32, 33, 34,
136+
40, 41, 42, 43, 44,
137+
50, 51, 52, 53, 54
138+
);
139+
}
140+
}
141+
}
142+
}
143+
}

async-enumerable-dotnet-test/CreateTest.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public async void Empty()
2323
}
2424

2525
[Fact]
26-
public async ValueTask Range()
26+
public async void Range()
2727
{
2828
var result = AsyncEnumerable.Create<int>(async e =>
2929
{
@@ -41,7 +41,15 @@ public async void Range_Loop()
4141
{
4242
for (var j = 0; j < 1000; j++)
4343
{
44-
await Range();
44+
var result = AsyncEnumerable.Create<int>(async e =>
45+
{
46+
for (var i = 0; i < 10 && !e.DisposeAsyncRequested; i++)
47+
{
48+
await e.Next(i);
49+
}
50+
});
51+
52+
await result.AssertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
4553
}
4654
}
4755

@@ -61,7 +69,7 @@ public async void Items_And_Error()
6169
}
6270

6371
[Fact]
64-
public async ValueTask Take()
72+
public async void Take()
6573
{
6674
await AsyncEnumerable.Create<int>(async e =>
6775
{
@@ -79,7 +87,15 @@ public async void Take_Loop()
7987
{
8088
for (var j = 0; j < 1000; j++)
8189
{
82-
await Take();
90+
await AsyncEnumerable.Create<int>(async e =>
91+
{
92+
for (var i = 0; i < 10 && !e.DisposeAsyncRequested; i++)
93+
{
94+
await e.Next(i);
95+
}
96+
})
97+
.Take(5)
98+
.AssertResult(0, 1, 2, 3, 4);
8399
}
84100
}
85101
}

async-enumerable-dotnet-test/TestHelper.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public static ValueTask AssertResult<T>(this IAsyncEnumerable<T> source, params
2323
// ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Global
2424
public static async ValueTask AssertResult<T>(this IAsyncEnumerator<T> source, params T[] values)
2525
{
26+
var main = default(Exception);
27+
var dispose = default(Exception);
2628
var idx = 0;
2729
try
2830
{
@@ -35,9 +37,33 @@ public static async ValueTask AssertResult<T>(this IAsyncEnumerator<T> source, p
3537

3638
Assert.True(values.Length == idx, "Source has less items than expected: " + values.Length + ", actual: " + idx);
3739
}
40+
catch (Exception ex)
41+
{
42+
main = ex;
43+
}
3844
finally
3945
{
40-
await source.DisposeAsync();
46+
try
47+
{
48+
await source.DisposeAsync();
49+
}
50+
catch (Exception ex)
51+
{
52+
dispose = ex;
53+
}
54+
}
55+
56+
if (main != null && dispose != null)
57+
{
58+
throw new AggregateException(main, dispose);
59+
}
60+
if (main != null)
61+
{
62+
throw main;
63+
}
64+
if (dispose != null)
65+
{
66+
throw dispose;
4167
}
4268
}
4369

0 commit comments

Comments
 (0)