Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/RxTelegram.Bot/Api/ITrackerSetup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using RxTelegram.Bot.Interface.BaseTypes.Enums;
using System.Collections.Generic;

namespace RxTelegram.Bot.Api;

public interface ITrackerSetup
{
void Set(IEnumerable<UpdateType> types);
}
6 changes: 6 additions & 0 deletions src/RxTelegram.Bot/Api/IUpdateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ namespace RxTelegram.Bot.Api;

public interface IUpdateManager
{
/// <summary>
/// Allows to set custom updates tracker
/// </summary>
/// <param name="tracker"></param>
void Set(IObservable<Update> tracker);

/// <summary>
/// Updates of all Types.
/// </summary>
Expand Down
189 changes: 189 additions & 0 deletions src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
using RxTelegram.Bot.Interface.BaseTypes.Enums;
using RxTelegram.Bot.Interface.Setup;
using RxTelegram.Bot.Utils.Rx;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace RxTelegram.Bot.Api;
public class LongPollingUpdateTracker(ITelegramBot telegramBot)
: IObservable<Update>, ITrackerSetup
{
private readonly ITelegramBot _telegramBot = telegramBot;
private const int NotRunning = 0;
private const int Running = 1;
private int _isRunning = NotRunning;
private CancellationTokenSource _cancellationTokenSource;
private UpdateType[] _trackedUpdateTypes = [];
readonly List<IObserver<Update>> _observers = new List<IObserver<Update>>();

public void Set(IEnumerable<UpdateType> types)
{
if (types == null)
{
if (_trackedUpdateTypes != null)
{
_trackedUpdateTypes = null;
_cancellationTokenSource?.Cancel();
}
return;
}

var updateTypes = types as UpdateType[] ?? types.ToArray();
if (_trackedUpdateTypes == null || !updateTypes.SequenceEqual(_trackedUpdateTypes))
{
_trackedUpdateTypes = updateTypes;
_cancellationTokenSource?.Cancel();
}
}

internal async Task RunUpdateSafe()
{
try
{
_cancellationTokenSource = new CancellationTokenSource();
await RunUpdate();
}
catch (Exception ex)
{
// ignored
ExceptionHelpers.ThrowIfFatal(ex);
}
finally
{
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;

if (!_observers.Any())
{
Volatile.Write(ref _isRunning, NotRunning);
}
else
{
RunUpdateTask();
}
}
void RunUpdateTask() => Task.Run(RunUpdateSafe);
}

private int? _offset; // Offset must be preserved for all errors except TaskCanceledException.
// Using a local variable may cause duplicates if an exception occurs.
internal async Task RunUpdate()
{

while (_observers.Count != 0)
{
try
{
// if the token already canceled before the first request reset token
if (_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource = new CancellationTokenSource();
}

var getUpdate = new GetUpdate
{
Offset = _offset,
Timeout = 60,

// if there is a null value in the list, it means that all updates are allowed
AllowedUpdates = _trackedUpdateTypes
};

var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token);
if (!result.Any())
{
await Task.Delay(1000);
continue;
}

NotifyObservers(result);

var lastId = result.Length - 1;
_offset = result[lastId].UpdateId + 1;
}
catch (TaskCanceledException)
{
_offset = null;
_cancellationTokenSource = new CancellationTokenSource();
}
Comment on lines +107 to +111
Copy link

Copilot AI Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new CancellationTokenSource without disposing the previous one can lead to resource leaks. The old token source should be disposed before creating a new one.

Copilot uses AI. Check for mistakes.
catch (Exception exception)
{
OnException(exception);
throw;
Copy link

Copilot AI Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After calling OnException which clears all observers, throwing the exception will not be handled by any observer. The throw statement should be removed since OnException already notifies observers of the error.

Suggested change
throw;
// Exception already handled by notifying observers; do not rethrow.

Copilot uses AI. Check for mistakes.
}
}
}
internal void NotifyObservers(Update[] updates)
{
for (var uid = 0; uid != updates.Length; ++uid)
{
for (var oid = 0; oid != _observers.Count; ++oid)
{
_observers[oid].OnNext(updates[uid]);
}
}
}
internal void OnException(Exception exception)
{
IObserver<Update>[] current;
lock (_observers)
{
current = _observers.ToArray(); // Caching current observers to prevent
// notifying those who subscribed after an error occurred.
_observers.Clear();
}

for (var oid = 0; oid != current.Length; ++oid)
{
try
{
current[oid].OnError(exception);
}
catch (Exception ex)
{
// Ignore exceptions from observers without an error handler,
// as it would break the process and propagate the exception to the outer scope.
ExceptionHelpers.ThrowIfFatal(ex);
}
}
}
internal void Remove(IObserver<Update> observer)
{
if (!_observers.Contains(observer))
{
return;
}

lock (_observers)
{
_observers.Remove(observer);
}

if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running)
{
_cancellationTokenSource?.Cancel();
}
}
public IDisposable Subscribe(IObserver<Update> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}

lock (_observers)
{
_observers.Add(observer);
}

if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning)
{
Task.Run(RunUpdateSafe);
}

return new DisposableAction(() => Remove(observer));
}
}
170 changes: 170 additions & 0 deletions src/RxTelegram.Bot/Api/UpdateDistributor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
using RxTelegram.Bot.Interface.BaseTypes;
using RxTelegram.Bot.Interface.BaseTypes.Enums;
using RxTelegram.Bot.Interface.InlineMode;
using RxTelegram.Bot.Interface.Payments;
using RxTelegram.Bot.Interface.Setup;
using RxTelegram.Bot.Utils.Rx;
using System;
using System.Collections.Generic;
using System.Linq;

#if NETSTANDARD2_1

using RxTelegram.Bot.Utils;

#endif

namespace RxTelegram.Bot.Api;

public sealed class UpdateDistributor : IUpdateManager, IDisposable
{

#region Observable Update properties
private readonly Dictionary<UpdateType, UpdateTypeInfo> _updateInfos;
private readonly UpdateTypeInfo _updateInfo;
private readonly IEnumerable<UpdateType> _trackedTypes;
private readonly ReactiveProperty<IObservable<Update>> _tracker;
private bool _isDisposed;
private readonly object _lock;
public IObservable<CallbackQuery> CallbackQuery => Selector(UpdateType.CallbackQuery, update => update.CallbackQuery);
public IObservable<Message> ChannelPost => Selector(UpdateType.ChannelPost, update => update.ChannelPost);
public IObservable<ChatBoostUpdated> ChatBoost => Selector(UpdateType.ChatBoost, update => update.ChatBoost);
public IObservable<ChatJoinRequest> ChatJoinRequest => Selector(UpdateType.ChatJoinRequest, update => update.ChatJoinRequest);
public IObservable<ChatMemberUpdated> ChatMember => Selector(UpdateType.ChatMember, update => update.ChatMember);
public IObservable<ChosenInlineResult> ChosenInlineResult => Selector(UpdateType.ChosenInlineResult, update => update.ChosenInlineResult);
public IObservable<Message> EditedChannelPost => Selector(UpdateType.EditedChannelPost, update => update.EditedChannelPost);
public IObservable<Message> EditedMessage => Selector(UpdateType.EditedMessage, update => update.EditedMessage);
public IObservable<InlineQuery> InlineQuery => Selector(UpdateType.InlineQuery, update => update.InlineQuery);
public IObservable<Message> Message => Selector(UpdateType.Message, update => update.Message);
public IObservable<ChatMemberUpdated> MyChatMember => Selector(UpdateType.MyChatMember, update => update.MyChatMember);
public IObservable<Poll> Poll => Selector(UpdateType.Poll, update => update.Poll);
public IObservable<PollAnswer> PollAnswer => Selector(UpdateType.PollAnswer, update => update.PollAnswer);
public IObservable<PreCheckoutQuery> PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, update => update.PreCheckoutQuery);
public IObservable<ChatBoostRemoved> RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, update => update.RemovedChatBoost);
public IObservable<ShippingQuery> ShippingQuery => Selector(UpdateType.ShippingQuery, update => update.ShippingQuery);
public IObservable<Update> Update => Selector(null, update => update);
#endregion

#if NETSTANDARD2_1
public IAsyncEnumerable<CallbackQuery> CallbackQueryEnumerable() => CallbackQuery.ToAsyncEnumerable();
public IAsyncEnumerable<Message> ChannelPostEnumerable() => ChannelPost.ToAsyncEnumerable();
public IAsyncEnumerable<ChatBoostUpdated> ChatBoostEnumerable() => ChatBoost.ToAsyncEnumerable();
public IAsyncEnumerable<ChatJoinRequest> ChatJoinRequestEnumerable() => ChatJoinRequest.ToAsyncEnumerable();
public IAsyncEnumerable<ChatMemberUpdated> ChatMemberEnumerable() => ChatMember.ToAsyncEnumerable();
public IAsyncEnumerable<ChosenInlineResult> ChosenInlineResultEnumerable() => ChosenInlineResult.ToAsyncEnumerable();
public IAsyncEnumerable<Message> EditedChannelPostEnumerable() => EditedChannelPost.ToAsyncEnumerable();
public IAsyncEnumerable<Message> EditedMessageEnumerable() => EditedMessage.ToAsyncEnumerable();
public IAsyncEnumerable<InlineQuery> InlineQueryEnumerable() => InlineQuery.ToAsyncEnumerable();
public IAsyncEnumerable<Message> MessageEnumerable() => Message.ToAsyncEnumerable();
public IAsyncEnumerable<ChatMemberUpdated> MyChatMemberEnumerable() => MyChatMember.ToAsyncEnumerable();
public IAsyncEnumerable<ShippingQuery> ShippingQueryEnumerable() => ShippingQuery.ToAsyncEnumerable();
public IAsyncEnumerable<PollAnswer> PollAnswerEnumerable() => PollAnswer.ToAsyncEnumerable();
public IAsyncEnumerable<Poll> PollEnumerable() => Poll.ToAsyncEnumerable();
public IAsyncEnumerable<PreCheckoutQuery> PreCheckoutQueryEnumerable() => PreCheckoutQuery.ToAsyncEnumerable();
public IAsyncEnumerable<ChatBoostRemoved> RemovedChatBoostEnumerable() => RemovedChatBoost.ToAsyncEnumerable();
public IAsyncEnumerable<Update> UpdateEnumerable() => Update.ToAsyncEnumerable();

#endif
public UpdateDistributor(IObservable<Update> updateTracker)
{
_lock = new();
_updateInfos = Enum.GetValues(typeof(UpdateType))
.Cast<UpdateType>()
.ToDictionary(x => x, _ => new UpdateTypeInfo());

_updateInfo = new UpdateTypeInfo();

_trackedTypes = _updateInfos.Where(x => x.Value.Listeners != 0)
.Select(x => x.Key);
Comment on lines +77 to +78
Copy link

Copilot AI Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _trackedTypes enumerable is evaluated at construction time when all Listeners are 0, resulting in an empty collection. This should be evaluated lazily or updated when listeners change to reflect the current state.

Copilot uses AI. Check for mistakes.

_tracker = new ReactiveProperty<IObservable<Update>>(updateTracker);
Set(updateTracker);
}
private void AddListener(UpdateType? type)
{
lock (_lock)
{
var info = GetInfo(type);
++info.Listeners;

if (info.Listeners != 1)
{
return;
}

UpdateTrackerTypes();
}
}
private UpdateTypeInfo GetInfo(UpdateType? type)
=> type == null ? _updateInfo : _updateInfos[(UpdateType)type];

private void RemoveListener(UpdateType? type)
{
lock (_lock)
{
var info = GetInfo(type);
--info.Listeners;
if (info.Listeners != 0)
{
return;
}

UpdateTrackerTypes();
}
}

private IObservable<T> Selector<T>(UpdateType? type, Func<Update, T> propertySelector)
where T : class
{
return _tracker.Switch().Select(propertySelector)
.Where(x => x != null)
.DoOnSubscribe(() => AddListener(type))
.Finally(() => RemoveListener(type));
}

public void Set(IObservable<Update> tracker)
{
// Configure the current tracker to listen for all types of updates
// before switching to a new one
(_tracker.Current as ITrackerSetup)?.Set(null);

_tracker.OnNext(tracker);
UpdateTrackerTypes();
}
private void UpdateTrackerTypes()
{
if (_tracker.Current is not ITrackerSetup setup)
{
return;
}

lock (_lock)
{
setup.Set(_updateInfo.Listeners != 0 || !_trackedTypes.Any() ?
null : _trackedTypes);
}
}

public void Dispose() => Dispose(true);
private void Dispose(bool explicitDisposing)
{
if (_isDisposed)
{
return;
}

if (explicitDisposing)
{
_tracker.Dispose();
}

_isDisposed = true;
}

~UpdateDistributor() => Dispose(false);

private sealed class UpdateTypeInfo
{
public int Listeners { get; set; }
}
}
2 changes: 2 additions & 0 deletions src/RxTelegram.Bot/Api/UpdateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

public class UpdateManager : IUpdateManager
{
public void Set(IObservable<Update> tracker) => throw new NotImplementedException();

public IObservable<Update> Update => _update;

public IObservable<Message> Message => _message;
Expand Down Expand Up @@ -113,7 +115,7 @@
{
try
{
_cancellationTokenSource = new CancellationTokenSource();

Check warning on line 118 in src/RxTelegram.Bot/Api/UpdateManager.cs

View workflow job for this annotation

GitHub Actions / sonarqube

Dispose '_cancellationTokenSource' when it is no longer needed. (https://rules.sonarsource.com/csharp/RSPEC-2930)
await RunUpdate();
}
catch (Exception)
Expand All @@ -138,7 +140,7 @@
// if the token already canceled before the first request reset token
if (_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource = new CancellationTokenSource();

Check warning on line 143 in src/RxTelegram.Bot/Api/UpdateManager.cs

View workflow job for this annotation

GitHub Actions / sonarqube

Dispose '_cancellationTokenSource' when it is no longer needed. (https://rules.sonarsource.com/csharp/RSPEC-2930)
}

var getUpdate = new GetUpdate
Expand Down Expand Up @@ -166,7 +168,7 @@
{
// create new token and check observers
offset = null;
_cancellationTokenSource = new CancellationTokenSource();

Check warning on line 171 in src/RxTelegram.Bot/Api/UpdateManager.cs

View workflow job for this annotation

GitHub Actions / sonarqube

Dispose '_cancellationTokenSource' when it is no longer needed. (https://rules.sonarsource.com/csharp/RSPEC-2930)
}
catch (Exception exception)
{
Expand Down
Loading
Loading