Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 153 additions & 134 deletions src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,163 +8,182 @@
using System.Threading.Tasks;

namespace RxTelegram.Bot.Api;
public class LongpollingUpdateTracker(ITelegramBot telegramBot)
public class LongPollingUpdateTracker(ITelegramBot telegramBot)
: IObservable<Update>, ITrackerSetup
{
private readonly ITelegramBot _telegramBot = telegramBot;
private const int NotRunning = 0;
private const int Running = 1;
private int _isRunning = NotRunning;
private CancellationTokenSource _cancellationTokenSource;
UpdateType[] _trackedUpdateTypes = [];
readonly List<IObserver<Update>> _observers = new List<IObserver<Update>>();

public void Set(IEnumerable<UpdateType> types)
{
if (types == null)
{
if (_trackedUpdateTypes != null)
{
_trackedUpdateTypes = null;
_cancellationTokenSource?.Cancel();
}
return;
}
private readonly ITelegramBot _telegramBot = telegramBot;
private const int NotRunning = 0;
private const int Running = 1;
private int _isRunning = NotRunning;
private CancellationTokenSource _cancellationTokenSource;
private UpdateType[] _trackedUpdateTypes = [];
readonly List<IObserver<Update>> _observers = new List<IObserver<Update>>();

if (_trackedUpdateTypes == null || !types.SequenceEqual(_trackedUpdateTypes))
public void Set(IEnumerable<UpdateType> types)
{
_trackedUpdateTypes = types.ToArray();
_cancellationTokenSource?.Cancel();
if (types == null)
{
if (_trackedUpdateTypes != null)
{
_trackedUpdateTypes = null;
_cancellationTokenSource?.Cancel();
}
return;
}

var updateTypes = types as UpdateType[] ?? types.ToArray();
if (_trackedUpdateTypes == null || !updateTypes.SequenceEqual(_trackedUpdateTypes))
{
_trackedUpdateTypes = updateTypes;
_cancellationTokenSource?.Cancel();
}
}
}

internal async Task RunUpdateSafe()
{
try
internal async Task RunUpdateSafe()
{
_cancellationTokenSource = new CancellationTokenSource();
await RunUpdate();
try
{
_cancellationTokenSource = new CancellationTokenSource();
await RunUpdate();
}
catch (Exception ex)
{
// ignored
ExceptionHelpers.ThrowIfFatal(ex);
}
finally
{
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;

if (!_observers.Any())
{
Volatile.Write(ref _isRunning, NotRunning);
}
else
{
RunUpdateTask();
}
}
void RunUpdateTask() => Task.Run(RunUpdateSafe);
}
catch (Exception ex)

private int? _offset; // Offset must be preserved for all errors except TaskCanceledException.
// Using a local variable may cause duplicates if an exception occurs.
internal async Task RunUpdate()
{
// ignored
ExceptionHelpers.ThrowIfFatal(ex);

while (_observers.Count != 0)
{
try
{
// if the token already canceled before the first request reset token
if (_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource = new CancellationTokenSource();
}

var getUpdate = new GetUpdate
{
Offset = _offset,
Timeout = 60,

// if there is a null value in the list, it means that all updates are allowed
AllowedUpdates = _trackedUpdateTypes
};

var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token);
if (!result.Any())
{
await Task.Delay(1000);
continue;
}

NotifyObservers(result);

var lastId = result.Length - 1;
_offset = result[lastId].UpdateId + 1;
}
catch (TaskCanceledException)
{
_offset = null;
_cancellationTokenSource = new CancellationTokenSource();
}
catch (Exception exception)
{
OnException(exception);
throw;
}
}
}
finally
internal void NotifyObservers(Update[] updates)
{
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;

if (!_observers.Any())
Volatile.Write(ref _isRunning, NotRunning);
else
RunUpdateTask();
for (var uid = 0; uid != updates.Length; ++uid)
{
for (var oid = 0; oid != _observers.Count; ++oid)
{
_observers[oid].OnNext(updates[uid]);
}
}
}
void RunUpdateTask() => Task.Run(RunUpdateSafe);
}

int? offset = null; // Offset must be preserved for all errors except TaskCanceledException.
// Using a local variable may cause duplicates if an exception occurs.
internal async Task RunUpdate()
{

while (_observers.Count != 0)
internal void OnException(Exception exception)
{
try
{
// if the token already canceled before the first request reset token
if (_cancellationTokenSource.IsCancellationRequested)
_cancellationTokenSource = new CancellationTokenSource();

var getUpdate = new GetUpdate
IObserver<Update>[] current;
lock (_observers)
{
Offset = offset,
Timeout = 60,

// if there is a null value in the list, it means that all updates are allowed
AllowedUpdates = _trackedUpdateTypes ?? null
};
current = _observers.ToArray(); // Caching current observers to prevent
// notifying those who subscribed after an error occurred.
_observers.Clear();
}

var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token);
if (!result.Any())
for (var oid = 0; oid != current.Length; ++oid)
{
await Task.Delay(1000);
continue;
try
{
current[oid].OnError(exception);
}
catch (Exception ex)
{
// Ignore exceptions from observers without an error handler,
// as it would break the process and propagate the exception to the outer scope.
ExceptionHelpers.ThrowIfFatal(ex);
}
}

NotifyObservers(result);
offset = result.Last().UpdateId + 1;
}
catch (TaskCanceledException)
{
offset = null;
_cancellationTokenSource = new CancellationTokenSource();
}
catch (Exception exception)
{
OnException(exception);
throw;
}
}
}
internal void NotifyObservers(Update[] updates)
{
for (int uid = 0; uid != updates.Length; ++uid)
for (int oid = 0; oid != _observers.Count; ++oid)
_observers[oid].OnNext(updates[uid]);
}
internal void OnException(Exception exception)
{
IObserver<Update>[] current;
lock (_observers)
internal void Remove(IObserver<Update> observer)
{
current = _observers.ToArray(); // Caching current observers to prevent
// notifying those who subscribed after an error occurred.
_observers.Clear();
}
if (!_observers.Contains(observer))
{
return;
}

for (int oid = 0; oid != current.Length; ++oid)
{
try
{
current[oid].OnError(exception);
}
catch (Exception ex)
{
// Ignore exceptions from observers without an error handler,
// as it would break the process and propagate the exception to the outer scope.
ExceptionHelpers.ThrowIfFatal(ex);
}
}
}
internal void Remove(IObserver<Update> observer)
{
if (!_observers.Contains(observer))
return;
lock (_observers)
{
_observers.Remove(observer);
}

lock (_observers)
{
_observers.Remove(observer);
if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running)
{
_cancellationTokenSource?.Cancel();
}
}
public IDisposable Subscribe(IObserver<Update> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}

if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running)
_cancellationTokenSource?.Cancel();
}
public IDisposable Subscribe(IObserver<Update> observer)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
lock (_observers)
{
_observers.Add(observer);
}

lock (_observers)
{
_observers.Add(observer);
}
if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning)
{
Task.Run(RunUpdateSafe);
}

if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning)
{
Task.Run(RunUpdateSafe);
return new DisposableAction(() => Remove(observer));
}

return new DisposableAction(() => Remove(observer));
}
}
}
Loading
Loading