Skip to content

Commit 731b7b7

Browse files
committed
+ Replay, fix style & headers
1 parent 379b6e2 commit 731b7b7

File tree

15 files changed

+334
-190
lines changed

15 files changed

+334
-190
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ finally
4848
- `FromObservable` - convert an `IObservable` into an `IAsyncEnumerable`
4949
- `Interval` - periodically signal an ever increasing number
5050
- `Just` - emit a single constant value
51+
- `Merge` - run multiple sources at once and merge their items into a single async sequence
5152
- `Never` - the async sequence never produces any items and never terminates
5253
- `Range` - emit a range of numbers
5354
- `Timer` - emit zero after some time delay
@@ -72,10 +73,13 @@ finally
7273
- `Last` - signals the last item of the async sequence
7374
- `Latest` - runs the source async sequence as fast as it can and samples it with the frequency of the consumer
7475
- `Map` - transform one source value into some other value
76+
- `MergeWith` - run two async sources at once and merge their items into a single async sequence
7577
- `OnErrorResumeNext` - if the main source fails, switch to an alternative source
7678
- `Prefetch` - run the source async sequence to prefetch items for a slow consumer
79+
- `Publish` - consume an async sequence once while multicasting its items to intermediate consumers for the duration of a function.
7780
- `Reduce` - combine elements with an accumulator and emit the last result
7881
- `Repeat` - repeatedly consume the entire source async sequence (up to a number of times and/or condition)
82+
- `Replay` - consume an async sequence once, caching some or all of its items and multicasting them to intermediate consumers for the duration of a function.
7983
- `Retry` - retry a failed async sequence (up to a number of times or based on condition)
8084
- `Sample` - periodically take the latest item from the source sequence and emit it
8185
- `Scan` - perform rolling aggregation by emitting intermediate results

async-enumerable-dotnet-benchmark/Program.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
using async_enumerable_dotnet;
66
using System;
7-
using System.Threading.Tasks;
87

98
namespace async_enumerable_dotnet_benchmark
109
{
@@ -15,14 +14,14 @@ class Program
1514
/// <summary>
1615
/// Don't worry about this program yet. I'm using it to
1716
/// diagnose await hangs and internal state that is otherwise
18-
/// hard (or I don't know how) to debug as an Xunit test.
17+
/// hard (or I don't know how) to debug as an XUnit test.
1918
/// </summary>
2019
/// <param name="args"></param>
2120
// ReSharper disable once UnusedParameter.Local
2221
// ReSharper disable once ArrangeTypeMemberModifiers
2322
static void Main(string[] args)
2423
{
25-
for (int i = 0; i < 100000; i++)
24+
for (var i = 0; i < 100000; i++)
2625
{
2726
if (i % 100 == 0)
2827
{

async-enumerable-dotnet-test/LicenseHeader.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,17 @@ private static void VisitSources(string path)
5252
var found = false;
5353

5454
var ci = Environment.GetEnvironmentVariable("CI") != null;
55-
55+
56+
var sb = new StringBuilder();
57+
5658
foreach (var entry in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
5759
{
58-
if (entry.Contains("AssemblyInfo")
59-
|| entry.Contains("Temporary")
60-
|| entry.Contains("/obj/"))
60+
var entryForward = entry.Replace("\\", "/");
61+
if (entryForward.Contains("AssemblyInfo")
62+
|| entryForward.Contains("Temporary")
63+
|| entryForward.Contains("/obj/")
64+
|| entryForward.Contains("/Debug/")
65+
|| entryForward.Contains("/Release/"))
6166
{
6267
continue;
6368
}
@@ -69,7 +74,7 @@ private static void VisitSources(string path)
6974
}
7075
if (!text.StartsWith(HeaderLines))
7176
{
72-
Console.WriteLine("Missing header: " + entry);
77+
sb.Append(entry).Append("\r\n");
7378
found = true;
7479
if (!ci)
7580
{
@@ -80,7 +85,7 @@ private static void VisitSources(string path)
8085

8186
if (found)
8287
{
83-
throw new InvalidOperationException("Missing header found and added. Please rebuild the project of " + path);
88+
throw new InvalidOperationException("Missing header found and added. Please rebuild the project of " + path + "\r\n" + sb);
8489
}
8590
}
8691
}

async-enumerable-dotnet-test/MergeTest.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ await AsyncEnumerable.Merge(
7272
[Fact]
7373
public async void Push()
7474
{
75-
for (int i = 0; i < 10; i++)
75+
for (var i = 0; i < 10; i++)
7676
{
7777
var push = new MulticastAsyncEnumerable<int>();
7878

@@ -84,7 +84,7 @@ public async void Push()
8484

8585
var t = Task.Run(async () =>
8686
{
87-
for (int j = 0; j < 100_000; j++)
87+
for (var j = 0; j < 100_000; j++)
8888
{
8989
await push.Next(j);
9090
}
@@ -104,7 +104,7 @@ public async void Push()
104104
[Fact]
105105
public async void Multicast_Merge()
106106
{
107-
for (int i = 0; i < 100000; i++)
107+
for (var i = 0; i < 100000; i++)
108108
{
109109
await AsyncEnumerable.Range(1, 5)
110110
.Publish(a => a.Take(3).MergeWith(a.Skip(3)))

async-enumerable-dotnet-test/MergeWithTest.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// See LICENSE file in the project root for full license information.
44

55
using Xunit;
6-
using System.Threading.Tasks;
76
using async_enumerable_dotnet;
87

98
namespace async_enumerable_dotnet_test

async-enumerable-dotnet-test/PublishTest.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
// See LICENSE file in the project root for full license information.
44

55
using Xunit;
6-
using System.Threading.Tasks;
76
using async_enumerable_dotnet;
7+
using System;
88

99
namespace async_enumerable_dotnet_test
1010
{
@@ -81,5 +81,15 @@ await AsyncEnumerable.Range(1, 5)
8181
)
8282
.AssertResult(1, 2, 3, 4, 5);
8383
}
84+
85+
86+
[Fact]
87+
public async void Handler_Crash()
88+
{
89+
await AsyncEnumerable.Range(1, 5)
90+
.Publish<int, int>(v => throw new InvalidOperationException())
91+
.AssertFailure(typeof(InvalidOperationException));
92+
}
93+
8494
}
8595
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright (c) David Karnok & Contributors.
2+
// Licensed under the Apache 2.0 License.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using Xunit;
6+
using async_enumerable_dotnet;
7+
using System;
8+
9+
namespace async_enumerable_dotnet_test
10+
{
11+
public class ReplayTest
12+
{
13+
[Fact]
14+
public async void All_Direct()
15+
{
16+
await AsyncEnumerable.Range(1, 5)
17+
.Replay(v => v)
18+
.AssertResult(1, 2, 3, 4, 5);
19+
}
20+
21+
[Fact]
22+
public async void All_Simple()
23+
{
24+
await AsyncEnumerable.Range(1, 5)
25+
.Replay(v => v.Map(w => w + 1))
26+
.AssertResult(2, 3, 4, 5, 6);
27+
}
28+
29+
[Fact]
30+
public async void All_Take()
31+
{
32+
await AsyncEnumerable.Range(1, 5)
33+
.Replay(v => v.Take(3))
34+
.AssertResult(1, 2, 3);
35+
}
36+
37+
[Fact]
38+
public async void All_Recombine()
39+
{
40+
await AsyncEnumerable.Range(1, 5)
41+
.Replay(v => v.Take(3).ConcatWith(v.Skip(3)))
42+
.AssertResult(1, 2, 3, 4, 5);
43+
}
44+
45+
[Fact]
46+
public async void All_Twice()
47+
{
48+
await AsyncEnumerable.Range(1, 5)
49+
.Replay(v => v.ConcatWith(v))
50+
.AssertResult(1, 2, 3, 4, 5, 1, 2, 3, 4, 5);
51+
}
52+
53+
[Fact]
54+
public async void All_Handler_Crash()
55+
{
56+
await AsyncEnumerable.Range(1, 5)
57+
.Replay<int, int>(v => throw new InvalidOperationException())
58+
.AssertFailure(typeof(InvalidOperationException));
59+
}
60+
}
61+
}

async-enumerable-dotnet-test/UnitTest1.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// See LICENSE file in the project root for full license information.
44

55
using Xunit;
6-
using System.Threading.Tasks;
76
using async_enumerable_dotnet;
87

98
namespace async_enumerable_dotnet_test

async-enumerable-dotnet/AsyncEnumerable.cs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,15 +1414,33 @@ public static IAsyncEnumerable<TSource> MergeWith<TSource>(this IAsyncEnumerable
14141414
public static IAsyncEnumerable<TSource> Merge<TSource>(params IAsyncEnumerable<TSource>[] sources)
14151415
{
14161416
RequireNonNull(sources, nameof(sources));
1417-
if (sources.Length == 0)
1417+
switch (sources.Length)
14181418
{
1419-
return Empty<TSource>();
1419+
case 0:
1420+
return Empty<TSource>();
1421+
case 1:
1422+
return sources[0];
1423+
default:
1424+
return new Merge<TSource>(sources);
14201425
}
1421-
if (sources.Length == 1)
1422-
{
1423-
return sources[0];
1424-
}
1425-
return new Merge<TSource>(sources);
1426+
}
1427+
1428+
/// <summary>
1429+
/// Shares and multicasts the source async sequence, caching some or
1430+
/// all of its items, for the duration
1431+
/// of a function and relays items from the returned async sequence.
1432+
/// </summary>
1433+
/// <typeparam name="TSource">The element type of the source.</typeparam>
1434+
/// <typeparam name="TResult">The result type.</typeparam>
1435+
/// <param name="source">The source async sequence to multicast.</param>
1436+
/// <param name="func">The function to transform the sequence without
1437+
/// consuming it multiple times.</param>
1438+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1439+
public static IAsyncEnumerable<TResult> Replay<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<IAsyncEnumerable<TSource>, IAsyncEnumerable<TResult>> func)
1440+
{
1441+
RequireNonNull(source, nameof(source));
1442+
RequireNonNull(func, nameof(func));
1443+
return new Replay<TSource, TResult>(source, func);
14261444
}
14271445
}
14281446
}

async-enumerable-dotnet/async-enumerable-dotnet.csproj

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,25 @@
66
<GenerateDocumentationFile>true</GenerateDocumentationFile>
77
<LangVersion>latest</LangVersion>
88
<PackageId>akarnokd.async-enumerable-dotnet</PackageId>
9-
<Version>0.0.1.5</Version>
9+
<Version>0.0.1.6</Version>
1010
<Authors>David Karnok</Authors>
1111
<Company />
12-
<AssemblyVersion>0.0.1.5</AssemblyVersion>
13-
<FileVersion>0.0.1.5</FileVersion>
12+
<AssemblyVersion>0.0.1.6</AssemblyVersion>
13+
<FileVersion>0.0.1.6</FileVersion>
1414
<PackageTags>async, concurrency, async-enumerable, operators, async-sequence</PackageTags>
1515
<RepositoryUrl>https://github.com/akarnokd/async-enumerable-dotnet</RepositoryUrl>
1616
<PackageProjectUrl>https://github.com/akarnokd/async-enumerable-dotnet#getting-started</PackageProjectUrl>
1717
<Description>Experimental operators for the upcoming C# 8 IAsyncEnumerables.</Description>
1818
<Copyright>(C) David Karnok</Copyright>
1919
<PackageLicenseUrl>https://www.apache.org/licenses/LICENSE-2.0</PackageLicenseUrl>
20-
<PackageReleaseNotes>- Internal notification mechanism reworked.
21-
- Fixed a few Current/DisposeAsync races
22-
- Renamed certain type arguments to conform the C# naming conventions.</PackageReleaseNotes>
20+
<PackageReleaseNotes>Bugfixes
21+
- FlatMap dispose now awaits the dispose of the inner sources.
22+
23+
New operators:
24+
- Merge
25+
- MergeWith
26+
- Publish
27+
- Replay</PackageReleaseNotes>
2328
<RepositoryType>Github</RepositoryType>
2429
<Product>Async Enumerable operators for .NET</Product>
2530
</PropertyGroup>

0 commit comments

Comments
 (0)