Skip to content

Commit 1343adf

Browse files
Merge pull request #74 from HedgehogNSK/feature/rework_updatemanager
Feature/rework updatemanager
2 parents 170c120 + 2b594d7 commit 1343adf

15 files changed

+629
-161
lines changed

src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@ public class LongpollingUpdateTracker(ITelegramBot telegramBot)
1717
private int _isRunning = NotRunning;
1818
private CancellationTokenSource _cancellationTokenSource;
1919
UpdateType[] _trackedUpdateTypes = [];
20-
List<IObserver<Update>> _observers = new List<IObserver<Update>>();
20+
readonly List<IObserver<Update>> _observers = new List<IObserver<Update>>();
2121

22-
private IEnumerable<UpdateType> GetTrackingUpdateTypes()
23-
=> _trackedUpdateTypes;
2422
public void Set(IEnumerable<UpdateType> types)
2523
{
26-
if (types == null && _trackedUpdateTypes == null)
27-
return;
28-
2924
if (types == null)
3025
{
31-
_trackedUpdateTypes = null;
32-
_cancellationTokenSource?.Cancel();
26+
if (_trackedUpdateTypes != null)
27+
{
28+
_trackedUpdateTypes = null;
29+
_cancellationTokenSource?.Cancel();
30+
}
3331
return;
3432
}
3533

@@ -47,20 +45,28 @@ internal async Task RunUpdateSafe()
4745
_cancellationTokenSource = new CancellationTokenSource();
4846
await RunUpdate();
4947
}
50-
catch (Exception)
48+
catch (Exception ex)
5149
{
5250
// ignored
51+
ExceptionHelpers.ThrowIfFatal(ex);
5352
}
5453
finally
5554
{
56-
Volatile.Write(ref _isRunning, NotRunning);
55+
_cancellationTokenSource.Dispose();
5756
_cancellationTokenSource = null;
57+
58+
if (!_observers.Any())
59+
Volatile.Write(ref _isRunning, NotRunning);
60+
else
61+
RunUpdateTask();
5862
}
63+
void RunUpdateTask() => Task.Run(RunUpdateSafe);
5964
}
60-
65+
66+
int? offset = null; // Offset must be preserved for all errors except TaskCanceledException.
67+
// Using a local variable may cause duplicates if an exception occurs.
6168
internal async Task RunUpdate()
6269
{
63-
int? offset = null;
6470

6571
while (_observers.Count != 0)
6672
{
@@ -76,27 +82,26 @@ internal async Task RunUpdate()
7682
Timeout = 60,
7783

7884
// if there is a null value in the list, it means that all updates are allowed
79-
AllowedUpdates = GetTrackingUpdateTypes() ?? null
85+
AllowedUpdates = _trackedUpdateTypes ?? null
8086
};
87+
8188
var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token);
8289
if (!result.Any())
8390
{
8491
await Task.Delay(1000);
8592
continue;
8693
}
8794

88-
offset = result.Max(x => x.UpdateId) + 1;
8995
NotifyObservers(result);
96+
offset = result.Last().UpdateId + 1;
9097
}
9198
catch (TaskCanceledException)
9299
{
93-
// create new token and check observers
94100
offset = null;
95101
_cancellationTokenSource = new CancellationTokenSource();
96102
}
97103
catch (Exception exception)
98104
{
99-
// unexpected exception report them to the observers and cancel run update
100105
OnException(exception);
101106
throw;
102107
}
@@ -110,8 +115,27 @@ internal void NotifyObservers(Update[] updates)
110115
}
111116
internal void OnException(Exception exception)
112117
{
113-
for (int oid = 0; oid != _observers.Count; ++oid)
114-
_observers[oid].OnError(exception);
118+
IObserver<Update>[] current;
119+
lock (_observers)
120+
{
121+
current = _observers.ToArray(); // Caching current observers to prevent
122+
// notifying those who subscribed after an error occurred.
123+
_observers.Clear();
124+
}
125+
126+
for (int oid = 0; oid != current.Length; ++oid)
127+
{
128+
try
129+
{
130+
current[oid].OnError(exception);
131+
}
132+
catch (Exception ex)
133+
{
134+
// Ignore exceptions from observers without an error handler,
135+
// as it would break the process and propagate the exception to the outer scope.
136+
ExceptionHelpers.ThrowIfFatal(ex);
137+
}
138+
}
115139
}
116140
internal void Remove(IObserver<Update> observer)
117141
{
@@ -128,14 +152,12 @@ internal void Remove(IObserver<Update> observer)
128152
}
129153
public IDisposable Subscribe(IObserver<Update> observer)
130154
{
131-
if(observer==null)
155+
if (observer == null)
132156
throw new ArgumentNullException(nameof(observer));
157+
133158
lock (_observers)
134159
{
135-
if (!_observers.Contains(observer))
136-
{
137-
_observers.Add(observer);
138-
}
160+
_observers.Add(observer);
139161
}
140162

141163
if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning)

src/RxTelegram.Bot/Api/UpdateDistributor.cs

Lines changed: 59 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using RxTelegram.Bot.Interface.InlineMode;
44
using RxTelegram.Bot.Interface.Payments;
55
using RxTelegram.Bot.Interface.Setup;
6+
using RxTelegram.Bot.Utils.Rx;
67
using System;
78
using System.Collections.Generic;
89
using System.Linq;
@@ -17,11 +18,14 @@ namespace RxTelegram.Bot.Api;
1718

1819
public sealed class UpdateDistributor : IUpdateManager, IDisposable
1920
{
20-
private IObservable<Update> _tracker;
2121

2222
#region Observable Update properties
23-
private Dictionary<UpdateType, UpdateTypeInfo> _updateInfos = new Dictionary<UpdateType, UpdateTypeInfo>();
24-
private UpdateTypeInfo _update = new UpdateTypeInfo();
23+
private readonly Dictionary<UpdateType, UpdateTypeInfo> _updateInfos;
24+
private readonly UpdateTypeInfo _updateInfo;
25+
private readonly IEnumerable<UpdateType> _trackedTypes;
26+
private readonly ReactiveProperty<IObservable<Update>> _tracker;
27+
private bool _isDisposed = false;
28+
private readonly object _lock;
2529
public IObservable<CallbackQuery> CallbackQuery => Selector(UpdateType.CallbackQuery, _update => _update.CallbackQuery);
2630
public IObservable<Message> ChannelPost => Selector(UpdateType.ChannelPost, _update => _update.ChannelPost);
2731
public IObservable<ChatBoostUpdated> ChatBoost => Selector(UpdateType.ChatBoost, _update => _update.ChatBoost);
@@ -38,9 +42,7 @@ public sealed class UpdateDistributor : IUpdateManager, IDisposable
3842
public IObservable<PreCheckoutQuery> PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, _update => _update.PreCheckoutQuery);
3943
public IObservable<ChatBoostRemoved> RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, _update => _update.RemovedChatBoost);
4044
public IObservable<ShippingQuery> ShippingQuery => Selector(UpdateType.ShippingQuery, _update => _update.ShippingQuery);
41-
public IObservable<Update> Update => (IObservable<Update>)(_update.Observer ??= new UpdateSubject<Update>(x => x,
42-
onSubscribe: AddGeneralListener,
43-
onDispose: RemoveGeneralListener));
45+
public IObservable<Update> Update => Selector(null, _update => _update);
4446
#endregion
4547

4648
#if NETSTANDARD2_1
@@ -65,101 +67,85 @@ public sealed class UpdateDistributor : IUpdateManager, IDisposable
6567
#endif
6668
public UpdateDistributor(IObservable<Update> updateTracker)
6769
{
70+
_lock = new();
6871
_updateInfos = Enum.GetValues(typeof(UpdateType))
6972
.Cast<UpdateType>()
7073
.ToDictionary(x => x, _ => new UpdateTypeInfo());
71-
Set(updateTracker);
72-
}
7374

74-
private void AddGeneralListener()
75-
{
76-
++_update.Listeners;
75+
_updateInfo = new UpdateTypeInfo();
7776

78-
(_tracker as ITrackerSetup)?.Set(null);
77+
_trackedTypes = _updateInfos.Where(x => x.Value.Listeners != 0)
78+
.Select(x => x.Key);
7979

80-
_update.Subscription ??= _tracker.Subscribe(_update.Observer);
80+
_tracker = new ReactiveProperty<IObservable<Update>>(updateTracker);
81+
Set(updateTracker);
8182
}
82-
private void AddListener(UpdateType type)
83+
private void AddListener(UpdateType? type)
8384
{
84-
var updateType = _updateInfos[type];
85-
++updateType.Listeners;
86-
87-
UpdateTrackerTypes();
85+
lock (_lock)
86+
{
87+
var info = GetInfo(type);
88+
++info.Listeners;
8889

89-
updateType.Subscription ??= _tracker.Subscribe(updateType.Observer);
90+
if (info.Listeners != 1) return;
91+
UpdateTrackerTypes();
92+
}
9093
}
91-
private void RemoveGeneralListener()
92-
{
93-
--_update.Listeners;
94-
_update.Subscription?.Dispose();
95-
_update.Subscription = null;
94+
private UpdateTypeInfo GetInfo(UpdateType? type)
95+
=> type == null ? _updateInfo : _updateInfos[(UpdateType)type];
9696

97-
UpdateTrackerTypes();
98-
}
99-
private void RemoveListener(UpdateType type)
97+
private void RemoveListener(UpdateType? type)
10098
{
101-
var updateType = _updateInfos[type];
102-
--updateType.Listeners;
103-
_update.Subscription?.Dispose();
104-
_update.Subscription = null;
99+
lock (_lock)
100+
{
101+
var info = GetInfo(type);
102+
--info.Listeners;
103+
if (info.Listeners != 0) return;
105104

106-
UpdateTrackerTypes();
105+
UpdateTrackerTypes();
106+
}
107107
}
108-
public IObservable<T> Selector<T>(UpdateType updateType, Func<Update, T> propertySelector)
108+
public IObservable<T> Selector<T>(UpdateType? type, Func<Update, T> propertySelector)
109+
where T : class
109110
{
110-
var info = _updateInfos[updateType];
111-
if (info.Observer != null)
112-
return (IObservable<T>)info;
113-
114-
var subject = new UpdateSubject<T>(propertySelector,
115-
onSubscribe: () => AddListener(updateType),
116-
onDispose: () => RemoveListener(updateType));
117-
info.Observer = subject;
118-
info.Subscription = _tracker.Subscribe(info.Observer);
119-
return subject;
111+
return _tracker.Switch().Select(propertySelector)
112+
.Where(x => x != null)
113+
.DoOnSubscribe(() => AddListener(type))
114+
.Finally(() => RemoveListener(type));
120115
}
116+
121117
public void Set(IObservable<Update> tracker)
122118
{
123-
//Setup current tracker to listen all messages before change to a new one
124-
(_tracker as ITrackerSetup)?.Set(null);
125-
DisposeTrackerSubcription();
126-
_tracker = tracker;
119+
// Configure the current tracker to listen for all types of updates
120+
// before switching to a new one
121+
(_tracker.Current as ITrackerSetup)?.Set(null);
122+
123+
_tracker.OnNext(tracker);
127124
UpdateTrackerTypes();
128-
SubscribeToTracker();
129125
}
130-
public void DisposeTrackerSubcription()
131-
{
132-
_update.Subscription?.Dispose();
133-
foreach (var info in _updateInfos.Values)
134-
info.Subscription?.Dispose();
135-
}
136-
public void SubscribeToTracker()
126+
private void UpdateTrackerTypes()
137127
{
138-
if(_update.Observer!=null)
139-
_update.Subscription = _tracker.Subscribe(_update.Observer);
140-
foreach (var info in _updateInfos.Values.Where(x=>x.Observer!=null))
141-
info.Subscription = _tracker.Subscribe(info.Observer);
128+
if (_tracker.Current is not ITrackerSetup setup) return;
129+
130+
setup.Set(_updateInfo.Listeners != 0 || !_trackedTypes.Any() ?
131+
null : _trackedTypes);
142132
}
143-
private void UpdateTrackerTypes()
133+
134+
public void Dispose() => Dispose(true);
135+
void Dispose(bool explicitDisposing)
144136
{
145-
if (_tracker is not ITrackerSetup) return;
137+
if (_isDisposed) return;
146138

147-
IEnumerable<UpdateType> types = null;
148-
if (_update.Listeners == 0)
149-
{
150-
types = _updateInfos.Where(x => x.Value.Listeners != 0).Select(x => x.Key);
151-
if (!types.Any())
152-
types = null;
153-
}
154-
(_tracker as ITrackerSetup).Set(types);
139+
if (explicitDisposing)
140+
_tracker.Dispose();
141+
142+
_isDisposed = true;
155143
}
156144

157-
public void Dispose() => DisposeTrackerSubcription();
145+
~UpdateDistributor() => Dispose(false);
158146

159-
private class UpdateTypeInfo
147+
sealed private class UpdateTypeInfo
160148
{
161149
public int Listeners { get; set; } = 0;
162-
public IObserver<Update> Observer { get; set; } = null;
163-
public IDisposable Subscription { get; set; } = null;
164150
}
165151
}

src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/RxTelegram.Bot/Utils/Rx/CustomSubject.cs

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)