Skip to content

Commit b259def

Browse files
committed
+ Sample, some tools
1 parent c0a382c commit b259def

File tree

6 files changed

+341
-1
lines changed

6 files changed

+341
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ finally
7474
- `Reduce` - combine elements with an accumulator and emit the last result
7575
- `Repeat` - repeatedly consume the entire source async sequence (up to a number of times and/or condition)
7676
- `Retry` - retry a failed async sequence (up to a number of times or based on condition)
77+
- `Sample` - periodically take the latest item from the source sequence and emit it
7778
- `Scan` - perform rolling aggregation by emitting intermediate results
7879
- `Single` - signals the only item of the async sequence, fails if the sequence has more than one item
7980
- `Skip` - skip the first specified number of items of the source async sequence
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System;
2+
using Xunit;
3+
using async_enumerable_dotnet;
4+
using System.Threading.Tasks;
5+
6+
namespace async_enumerable_dotnet_test
7+
{
8+
public class SampleTest
9+
{
10+
[Fact]
11+
public async void Normal()
12+
{
13+
var t = 200;
14+
if (Environment.GetEnvironmentVariable("CI") != null)
15+
{
16+
t = 2000;
17+
}
18+
19+
await AsyncEnumerable.Range(1, 5)
20+
.FlatMap(v =>
21+
AsyncEnumerable.Timer(TimeSpan.FromMilliseconds(t * v - t / 2))
22+
.Map(w => v)
23+
)
24+
.Sample(TimeSpan.FromMilliseconds(t * 2))
25+
.AssertResult(2, 4, 5);
26+
}
27+
28+
[Fact]
29+
public async void Last()
30+
{
31+
await AsyncEnumerable.Range(1, 5)
32+
.Sample(TimeSpan.FromMilliseconds(500))
33+
.AssertResult(5);
34+
}
35+
36+
[Fact]
37+
public async void Error()
38+
{
39+
await AsyncEnumerable.Error<int>(new InvalidOperationException())
40+
.Sample(TimeSpan.FromMilliseconds(500))
41+
.AssertFailure(typeof(InvalidOperationException));
42+
}
43+
}
44+
}

async-enumerable-dotnet/AsyncEnumerable.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,5 +1121,17 @@ public static IAsyncEnumerable<IList<T>> ToList<T>(this IAsyncEnumerable<T> sour
11211121
{
11221122
return Collect<T, IList<T>>(source, () => new List<T>(capacityHint), (a, b) => a.Add(b));
11231123
}
1124+
1125+
/// <summary>
1126+
/// Periodically take the latest item from the source async sequence and relay it.
1127+
/// </summary>
1128+
/// <typeparam name="T">The element type of the async sequence.</typeparam>
1129+
/// <param name="source">The source async sequence to sample.</param>
1130+
/// <param name="period">The sampling period.</param>
1131+
/// <returns>The new IAsyncEnumerable sequence.</returns>
1132+
public static IAsyncEnumerable<T> Sample<T>(this IAsyncEnumerable<T> source, TimeSpan period)
1133+
{
1134+
return new Sample<T>(source, period);
1135+
}
11241136
}
11251137
}

async-enumerable-dotnet/impl/ExceptionHelper.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ internal sealed class ExceptionHelper
1616
/// </summary>
1717
internal static readonly Exception Terminated = new TerminatedException();
1818

19+
/// <summary>
20+
/// Atomically aggregate the given exception into the target field
21+
/// or return false if the field contains the terminated exception indicator.
22+
/// </summary>
23+
/// <param name="field">The target field</param>
24+
/// <param name="ex">The exception to aggregate</param>
25+
/// <returns>True if successful, false if the field already has the terminated
26+
/// indicator.</returns>
1927
internal static bool AddException(ref Exception field, Exception ex)
2028
{
2129
for (; ;)
@@ -47,17 +55,41 @@ internal static bool AddException(ref Exception field, Exception ex)
4755
}
4856
}
4957

58+
/// <summary>
59+
/// Atomically swap in the terminated indicator and return the
60+
/// previous exception (may be null if none).
61+
/// </summary>
62+
/// <param name="field">The target field.</param>
63+
/// <returns>The last exception or null if no exceptions were in the field.</returns>
5064
internal static Exception Terminate(ref Exception field)
5165
{
5266
return Interlocked.Exchange(ref field, Terminated);
5367
}
5468

55-
internal sealed class TerminatedException : Exception
69+
/// <summary>
70+
/// An exception indicating a terminal state within an Exception field.
71+
/// </summary>
72+
sealed class TerminatedException : Exception
5673
{
5774
internal TerminatedException() : base("No further exceptions.")
5875
{
5976

6077
}
6178
}
79+
80+
/// <summary>
81+
/// If the given exception is of an AggregateException with
82+
/// only a single inner exception, extract it.
83+
/// </summary>
84+
/// <param name="ex">The exception to un-aggregate</param>
85+
/// <returns>The inner solo exception or <paramref name="ex"/>.</returns>
86+
internal static Exception Unaggregate(Exception ex)
87+
{
88+
if (ex is AggregateException g && g.InnerExceptions.Count == 1)
89+
{
90+
return g.InnerExceptions[0];
91+
}
92+
return ex;
93+
}
6294
}
6395
}

async-enumerable-dotnet/impl/ResumeHelper.cs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,84 @@ internal static void Clear<U>(ref TaskCompletionSource<U> resume)
5353
Interlocked.Exchange(ref resume, null);
5454
}
5555

56+
/// <summary>
57+
/// Create an action that takes a Task and sets the given
58+
/// TaskCompletionSource to the same state.
59+
/// </summary>
60+
/// <typeparam name="T">The element type of the source task</typeparam>
61+
/// <param name="tcs">The TaskCompletionSource to complete/fault based on the task.</param>
62+
/// <returns>The new action</returns>
63+
internal static Action<Task<T>> ResumeWith<T>(TaskCompletionSource<T> tcs)
64+
{
65+
return t =>
66+
{
67+
if (t.IsCanceled)
68+
{
69+
tcs.TrySetCanceled();
70+
}
71+
else if (t.IsFaulted)
72+
{
73+
tcs.TrySetException(t.Exception);
74+
}
75+
else
76+
{
77+
tcs.TrySetResult(t.Result);
78+
}
79+
};
80+
}
81+
82+
/// <summary>
83+
/// Create an action that takes a Task and sets the given
84+
/// TaskCompletionSource to the same state.
85+
/// </summary>
86+
/// <param name="tcs">The TaskCompletionSource to complete/fault based on the task.</param>
87+
/// <returns>The new action</returns>
88+
internal static Action<Task> ResumeWith(TaskCompletionSource<bool> tcs)
89+
{
90+
return t =>
91+
{
92+
if (t.IsCanceled)
93+
{
94+
tcs.TrySetCanceled();
95+
}
96+
else if (t.IsFaulted)
97+
{
98+
tcs.TrySetException(t.Exception);
99+
}
100+
else
101+
{
102+
tcs.TrySetResult(true); // by convention
103+
}
104+
};
105+
}
106+
107+
/// <summary>
108+
/// Terminates the given TaskCompletionSource if the ValueTask completed
109+
/// or adds a continuation to it which will set the completion state on
110+
/// The TCS.
111+
/// </summary>
112+
/// <param name="task">The task that will be completed.</param>
113+
/// <param name="tcs">The task completion source to terminate.</param>
114+
internal static void ResumeWhen(ValueTask task, TaskCompletionSource<bool> tcs)
115+
{
116+
if (task.IsCanceled)
117+
{
118+
tcs.TrySetCanceled();
119+
}
120+
else
121+
if (task.IsFaulted)
122+
{
123+
tcs.TrySetException(task.AsTask().Exception);
124+
}
125+
else
126+
if (task.IsCompleted)
127+
{
128+
tcs.TrySetResult(true); // by convention
129+
}
130+
else
131+
{
132+
task.AsTask().ContinueWith(ResumeWith(tcs));
133+
}
134+
}
56135
}
57136
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace async_enumerable_dotnet.impl
8+
{
9+
internal sealed class Sample<T> : IAsyncEnumerable<T>
10+
{
11+
readonly IAsyncEnumerable<T> source;
12+
13+
readonly TimeSpan period;
14+
15+
public Sample(IAsyncEnumerable<T> source, TimeSpan period)
16+
{
17+
this.source = source;
18+
this.period = period;
19+
}
20+
21+
public IAsyncEnumerator<T> GetAsyncEnumerator()
22+
{
23+
var en = new SampleEnumerator(source.GetAsyncEnumerator(), period);
24+
en.StartTimer();
25+
en.MoveNext();
26+
return en;
27+
}
28+
29+
internal sealed class SampleEnumerator : IAsyncEnumerator<T>
30+
{
31+
readonly IAsyncEnumerator<T> source;
32+
33+
readonly TimeSpan period;
34+
35+
readonly CancellationTokenSource cts;
36+
37+
int consumerWip;
38+
39+
TaskCompletionSource<bool> resume;
40+
41+
object latest;
42+
volatile bool done;
43+
Exception error;
44+
45+
long wip;
46+
47+
int disposeWip;
48+
49+
readonly TaskCompletionSource<bool> disposeTask;
50+
51+
public T Current { get; private set; }
52+
53+
static readonly object EmptyIndicator = new object();
54+
55+
public SampleEnumerator(IAsyncEnumerator<T> source, TimeSpan period)
56+
{
57+
this.source = source;
58+
this.period = period;
59+
this.disposeTask = new TaskCompletionSource<bool>();
60+
this.cts = new CancellationTokenSource();
61+
Volatile.Write(ref latest, EmptyIndicator);
62+
}
63+
64+
public ValueTask DisposeAsync()
65+
{
66+
cts.Cancel();
67+
if (Interlocked.Increment(ref disposeWip) == 1)
68+
{
69+
return source.DisposeAsync();
70+
}
71+
return new ValueTask(disposeTask.Task);
72+
}
73+
74+
public async ValueTask<bool> MoveNextAsync()
75+
{
76+
for (; ; )
77+
{
78+
bool d = done;
79+
var v = Interlocked.Exchange(ref latest, EmptyIndicator);
80+
81+
if (d && v == EmptyIndicator)
82+
{
83+
if (error != null)
84+
{
85+
throw error;
86+
}
87+
return false;
88+
}
89+
else if (v != EmptyIndicator)
90+
{
91+
Current = (T)v;
92+
return true;
93+
}
94+
95+
if (Volatile.Read(ref wip) == 0)
96+
{
97+
await ResumeHelper.Resume(ref resume).Task;
98+
}
99+
ResumeHelper.Clear(ref resume);
100+
Interlocked.Exchange(ref wip, 0);
101+
}
102+
}
103+
104+
internal void StartTimer()
105+
{
106+
Task.Delay(period, cts.Token)
107+
.ContinueWith(t => HandleTimer(t), cts.Token);
108+
}
109+
110+
void HandleTimer(Task timer)
111+
{
112+
Signal();
113+
StartTimer();
114+
}
115+
116+
void Signal()
117+
{
118+
if (Interlocked.Increment(ref wip) == 1)
119+
{
120+
ResumeHelper.Resume(ref resume).TrySetResult(true);
121+
}
122+
}
123+
124+
internal void MoveNext()
125+
{
126+
if (Interlocked.Increment(ref consumerWip) == 1)
127+
{
128+
do
129+
{
130+
if (Interlocked.Increment(ref disposeWip) == 1)
131+
{
132+
source.MoveNextAsync()
133+
.AsTask().ContinueWith(t => Handler(t));
134+
}
135+
else
136+
{
137+
break;
138+
}
139+
}
140+
while (Interlocked.Decrement(ref consumerWip) != 0);
141+
}
142+
}
143+
144+
void Handler(Task<bool> t)
145+
{
146+
if (Interlocked.Decrement(ref disposeWip) != 0)
147+
{
148+
ResumeHelper.ResumeWhen(source.DisposeAsync(), disposeTask);
149+
}
150+
else if (t.IsFaulted)
151+
{
152+
error = ExceptionHelper.Unaggregate(t.Exception);
153+
done = true;
154+
cts.Cancel();
155+
Signal();
156+
}
157+
else if (t.Result)
158+
{
159+
Interlocked.Exchange(ref latest, source.Current);
160+
// resumption will be triggered by the timer
161+
MoveNext();
162+
}
163+
else
164+
{
165+
done = true;
166+
cts.Cancel();
167+
Signal();
168+
}
169+
}
170+
}
171+
}
172+
}

0 commit comments

Comments
 (0)