Skip to content

Commit 2b594d7

Browse files
committed
Added Finally observable operator.
Replaced `DoOnDispose` with `Finally` in `UpdateDistributor` to fix subscriber tracking.
1 parent 41e6e93 commit 2b594d7

File tree

4 files changed

+82
-3
lines changed

4 files changed

+82
-3
lines changed

src/RxTelegram.Bot/Api/UpdateDistributor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public IObservable<T> Selector<T>(UpdateType? type, Func<Update, T> propertySele
111111
return _tracker.Switch().Select(propertySelector)
112112
.Where(x => x != null)
113113
.DoOnSubscribe(() => AddListener(type))
114-
.DoOnDispose(() => RemoveListener(type));
114+
.Finally(() => RemoveListener(type));
115115
}
116116

117117
public void Set(IObservable<Update> tracker)

src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ internal class DoOnDisposeObservable<T> : IObservable<T>
77
private readonly IObservable<T> _source;
88
private readonly Action _onDispose;
99

10-
public DoOnDisposeObservable(IObservable<T> source, Action onTerminate)
10+
public DoOnDisposeObservable(IObservable<T> source, Action onDispose)
1111
{
1212
this._source = source ?? throw new ArgumentNullException(nameof(source));
13-
this._onDispose = onTerminate ?? throw new ArgumentNullException(nameof(onTerminate));
13+
this._onDispose = onDispose ?? throw new ArgumentNullException(nameof(onDispose));
1414
}
1515

1616
public IDisposable Subscribe(IObserver<T> observer)
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
using System;
2+
using System.Threading;
3+
4+
namespace RxTelegram.Bot.Utils.Rx;
5+
6+
internal class FinallyObservable<T> : IObservable<T>
7+
{
8+
private readonly IObservable<T> _source;
9+
private readonly Action _onTerminate;
10+
11+
public FinallyObservable(IObservable<T> source, Action onTerminate)
12+
{
13+
_source = source ?? throw new ArgumentNullException(nameof(source));
14+
_onTerminate = onTerminate ?? throw new ArgumentNullException(nameof(onTerminate));
15+
}
16+
17+
public IDisposable Subscribe(IObserver<T> observer)
18+
{
19+
if (observer == null)
20+
throw new ArgumentNullException(nameof(observer));
21+
22+
var finallyObserver = new FinallyObserver(observer, _onTerminate);
23+
var subscription = _source.Subscribe(finallyObserver);
24+
25+
return new DisposableAction(() => finallyObserver.Dispose(subscription));
26+
}
27+
28+
private class FinallyObserver : IObserver<T>
29+
{
30+
private readonly IObserver<T> _observer;
31+
private readonly Action _onTerminate;
32+
private int _terminated;
33+
34+
public FinallyObserver(IObserver<T> observer, Action onTerminate)
35+
{
36+
_observer = observer;
37+
_onTerminate = onTerminate;
38+
}
39+
public void Dispose(IDisposable subscription)
40+
{
41+
subscription.Dispose();
42+
Terminate();
43+
}
44+
45+
public void OnCompleted()
46+
{
47+
_observer.OnCompleted();
48+
Terminate();
49+
}
50+
51+
public void OnError(Exception error)
52+
{
53+
try
54+
{
55+
_observer.OnError(error);
56+
}
57+
catch (Exception ex)
58+
{
59+
ExceptionHelpers.ThrowIfFatal(ex);
60+
}
61+
finally
62+
{
63+
Terminate();
64+
}
65+
}
66+
67+
public void OnNext(T value) => _observer.OnNext(value);
68+
69+
private void Terminate()
70+
{
71+
if (Interlocked.Exchange(ref _terminated, 1) == 0)
72+
{
73+
_onTerminate();
74+
}
75+
}
76+
}
77+
}

src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ static public IObservable<T> DoOnDispose<T>(this IObservable<T> source, Action o
88
=> new DoOnDisposeObservable<T>(source, onDispose);
99
static public IObservable<T> DoOnSubscribe<T>(this IObservable<T> source, Action onSubscribe)
1010
=> new DoOnSubscribeObservable<T>(source, onSubscribe);
11+
static public IObservable<T> Finally<T>(this IObservable<T> source, Action onTerminate)
12+
=> new FinallyObservable<T>(source,onTerminate);
1113
static public IObservable<KOut> Select<TIn, KOut>(this IObservable<TIn> source, Func<TIn, KOut> selector)
1214
=> new SelectObservable<TIn, KOut>(source, selector);
1315
static public IObservable<T> Switch<T>(this IObservable<IObservable<T>> source)

0 commit comments

Comments
 (0)