Skip to content

Commit 068d1b4

Browse files
Merge pull request #75 from HedgehogNSK/feature/rework_updatemanager
Feature/rework updatemanager Fixes for Sonar and Config issues for PR #71
2 parents 1343adf + bce88b6 commit 068d1b4

14 files changed

+723
-683
lines changed

src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs

Lines changed: 153 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -8,163 +8,182 @@
88
using System.Threading.Tasks;
99

1010
namespace RxTelegram.Bot.Api;
11-
public class LongpollingUpdateTracker(ITelegramBot telegramBot)
11+
public class LongPollingUpdateTracker(ITelegramBot telegramBot)
1212
: IObservable<Update>, ITrackerSetup
1313
{
14-
private readonly ITelegramBot _telegramBot = telegramBot;
15-
private const int NotRunning = 0;
16-
private const int Running = 1;
17-
private int _isRunning = NotRunning;
18-
private CancellationTokenSource _cancellationTokenSource;
19-
UpdateType[] _trackedUpdateTypes = [];
20-
readonly List<IObserver<Update>> _observers = new List<IObserver<Update>>();
21-
22-
public void Set(IEnumerable<UpdateType> types)
23-
{
24-
if (types == null)
25-
{
26-
if (_trackedUpdateTypes != null)
27-
{
28-
_trackedUpdateTypes = null;
29-
_cancellationTokenSource?.Cancel();
30-
}
31-
return;
32-
}
14+
private readonly ITelegramBot _telegramBot = telegramBot;
15+
private const int NotRunning = 0;
16+
private const int Running = 1;
17+
private int _isRunning = NotRunning;
18+
private CancellationTokenSource _cancellationTokenSource;
19+
private UpdateType[] _trackedUpdateTypes = [];
20+
readonly List<IObserver<Update>> _observers = new List<IObserver<Update>>();
3321

34-
if (_trackedUpdateTypes == null || !types.SequenceEqual(_trackedUpdateTypes))
22+
public void Set(IEnumerable<UpdateType> types)
3523
{
36-
_trackedUpdateTypes = types.ToArray();
37-
_cancellationTokenSource?.Cancel();
24+
if (types == null)
25+
{
26+
if (_trackedUpdateTypes != null)
27+
{
28+
_trackedUpdateTypes = null;
29+
_cancellationTokenSource?.Cancel();
30+
}
31+
return;
32+
}
33+
34+
var updateTypes = types as UpdateType[] ?? types.ToArray();
35+
if (_trackedUpdateTypes == null || !updateTypes.SequenceEqual(_trackedUpdateTypes))
36+
{
37+
_trackedUpdateTypes = updateTypes;
38+
_cancellationTokenSource?.Cancel();
39+
}
3840
}
39-
}
4041

41-
internal async Task RunUpdateSafe()
42-
{
43-
try
42+
internal async Task RunUpdateSafe()
4443
{
45-
_cancellationTokenSource = new CancellationTokenSource();
46-
await RunUpdate();
44+
try
45+
{
46+
_cancellationTokenSource = new CancellationTokenSource();
47+
await RunUpdate();
48+
}
49+
catch (Exception ex)
50+
{
51+
// ignored
52+
ExceptionHelpers.ThrowIfFatal(ex);
53+
}
54+
finally
55+
{
56+
_cancellationTokenSource.Dispose();
57+
_cancellationTokenSource = null;
58+
59+
if (!_observers.Any())
60+
{
61+
Volatile.Write(ref _isRunning, NotRunning);
62+
}
63+
else
64+
{
65+
RunUpdateTask();
66+
}
67+
}
68+
void RunUpdateTask() => Task.Run(RunUpdateSafe);
4769
}
48-
catch (Exception ex)
70+
71+
private int? _offset; // Offset must be preserved for all errors except TaskCanceledException.
72+
// Using a local variable may cause duplicates if an exception occurs.
73+
internal async Task RunUpdate()
4974
{
50-
// ignored
51-
ExceptionHelpers.ThrowIfFatal(ex);
75+
76+
while (_observers.Count != 0)
77+
{
78+
try
79+
{
80+
// if the token already canceled before the first request reset token
81+
if (_cancellationTokenSource.IsCancellationRequested)
82+
{
83+
_cancellationTokenSource = new CancellationTokenSource();
84+
}
85+
86+
var getUpdate = new GetUpdate
87+
{
88+
Offset = _offset,
89+
Timeout = 60,
90+
91+
// if there is a null value in the list, it means that all updates are allowed
92+
AllowedUpdates = _trackedUpdateTypes
93+
};
94+
95+
var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token);
96+
if (!result.Any())
97+
{
98+
await Task.Delay(1000);
99+
continue;
100+
}
101+
102+
NotifyObservers(result);
103+
104+
var lastId = result.Length - 1;
105+
_offset = result[lastId].UpdateId + 1;
106+
}
107+
catch (TaskCanceledException)
108+
{
109+
_offset = null;
110+
_cancellationTokenSource = new CancellationTokenSource();
111+
}
112+
catch (Exception exception)
113+
{
114+
OnException(exception);
115+
throw;
116+
}
117+
}
52118
}
53-
finally
119+
internal void NotifyObservers(Update[] updates)
54120
{
55-
_cancellationTokenSource.Dispose();
56-
_cancellationTokenSource = null;
57-
58-
if (!_observers.Any())
59-
Volatile.Write(ref _isRunning, NotRunning);
60-
else
61-
RunUpdateTask();
121+
for (var uid = 0; uid != updates.Length; ++uid)
122+
{
123+
for (var oid = 0; oid != _observers.Count; ++oid)
124+
{
125+
_observers[oid].OnNext(updates[uid]);
126+
}
127+
}
62128
}
63-
void RunUpdateTask() => Task.Run(RunUpdateSafe);
64-
}
65-
66-
int? offset = null; // Offset must be preserved for all errors except TaskCanceledException.
67-
// Using a local variable may cause duplicates if an exception occurs.
68-
internal async Task RunUpdate()
69-
{
70-
71-
while (_observers.Count != 0)
129+
internal void OnException(Exception exception)
72130
{
73-
try
74-
{
75-
// if the token already canceled before the first request reset token
76-
if (_cancellationTokenSource.IsCancellationRequested)
77-
_cancellationTokenSource = new CancellationTokenSource();
78-
79-
var getUpdate = new GetUpdate
131+
IObserver<Update>[] current;
132+
lock (_observers)
80133
{
81-
Offset = offset,
82-
Timeout = 60,
83-
84-
// if there is a null value in the list, it means that all updates are allowed
85-
AllowedUpdates = _trackedUpdateTypes ?? null
86-
};
134+
current = _observers.ToArray(); // Caching current observers to prevent
135+
// notifying those who subscribed after an error occurred.
136+
_observers.Clear();
137+
}
87138

88-
var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token);
89-
if (!result.Any())
139+
for (var oid = 0; oid != current.Length; ++oid)
90140
{
91-
await Task.Delay(1000);
92-
continue;
141+
try
142+
{
143+
current[oid].OnError(exception);
144+
}
145+
catch (Exception ex)
146+
{
147+
// Ignore exceptions from observers without an error handler,
148+
// as it would break the process and propagate the exception to the outer scope.
149+
ExceptionHelpers.ThrowIfFatal(ex);
150+
}
93151
}
94-
95-
NotifyObservers(result);
96-
offset = result.Last().UpdateId + 1;
97-
}
98-
catch (TaskCanceledException)
99-
{
100-
offset = null;
101-
_cancellationTokenSource = new CancellationTokenSource();
102-
}
103-
catch (Exception exception)
104-
{
105-
OnException(exception);
106-
throw;
107-
}
108152
}
109-
}
110-
internal void NotifyObservers(Update[] updates)
111-
{
112-
for (int uid = 0; uid != updates.Length; ++uid)
113-
for (int oid = 0; oid != _observers.Count; ++oid)
114-
_observers[oid].OnNext(updates[uid]);
115-
}
116-
internal void OnException(Exception exception)
117-
{
118-
IObserver<Update>[] current;
119-
lock (_observers)
153+
internal void Remove(IObserver<Update> observer)
120154
{
121-
current = _observers.ToArray(); // Caching current observers to prevent
122-
// notifying those who subscribed after an error occurred.
123-
_observers.Clear();
124-
}
155+
if (!_observers.Contains(observer))
156+
{
157+
return;
158+
}
125159

126-
for (int oid = 0; oid != current.Length; ++oid)
127-
{
128-
try
129-
{
130-
current[oid].OnError(exception);
131-
}
132-
catch (Exception ex)
133-
{
134-
// Ignore exceptions from observers without an error handler,
135-
// as it would break the process and propagate the exception to the outer scope.
136-
ExceptionHelpers.ThrowIfFatal(ex);
137-
}
138-
}
139-
}
140-
internal void Remove(IObserver<Update> observer)
141-
{
142-
if (!_observers.Contains(observer))
143-
return;
160+
lock (_observers)
161+
{
162+
_observers.Remove(observer);
163+
}
144164

145-
lock (_observers)
146-
{
147-
_observers.Remove(observer);
165+
if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running)
166+
{
167+
_cancellationTokenSource?.Cancel();
168+
}
148169
}
170+
public IDisposable Subscribe(IObserver<Update> observer)
171+
{
172+
if (observer == null)
173+
{
174+
throw new ArgumentNullException(nameof(observer));
175+
}
149176

150-
if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running)
151-
_cancellationTokenSource?.Cancel();
152-
}
153-
public IDisposable Subscribe(IObserver<Update> observer)
154-
{
155-
if (observer == null)
156-
throw new ArgumentNullException(nameof(observer));
177+
lock (_observers)
178+
{
179+
_observers.Add(observer);
180+
}
157181

158-
lock (_observers)
159-
{
160-
_observers.Add(observer);
161-
}
182+
if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning)
183+
{
184+
Task.Run(RunUpdateSafe);
185+
}
162186

163-
if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning)
164-
{
165-
Task.Run(RunUpdateSafe);
187+
return new DisposableAction(() => Remove(observer));
166188
}
167-
168-
return new DisposableAction(() => Remove(observer));
169-
}
170-
}
189+
}

0 commit comments

Comments
 (0)