Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletions src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@ public class LongpollingUpdateTracker(ITelegramBot telegramBot)
private int _isRunning = NotRunning;
private CancellationTokenSource _cancellationTokenSource;
UpdateType[] _trackedUpdateTypes = [];
List<IObserver<Update>> _observers = new List<IObserver<Update>>();
readonly List<IObserver<Update>> _observers = new List<IObserver<Update>>();

private IEnumerable<UpdateType> GetTrackingUpdateTypes()
=> _trackedUpdateTypes;
public void Set(IEnumerable<UpdateType> types)
{
if (types == null && _trackedUpdateTypes == null)
return;

if (types == null)
{
_trackedUpdateTypes = null;
_cancellationTokenSource?.Cancel();
if (_trackedUpdateTypes != null)
{
_trackedUpdateTypes = null;
_cancellationTokenSource?.Cancel();
}
return;
}

Expand All @@ -47,20 +45,28 @@ internal async Task RunUpdateSafe()
_cancellationTokenSource = new CancellationTokenSource();
await RunUpdate();
}
catch (Exception)
catch (Exception ex)
{
// ignored
ExceptionHelpers.ThrowIfFatal(ex);
}
finally
{
Volatile.Write(ref _isRunning, NotRunning);
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;

if (!_observers.Any())
Volatile.Write(ref _isRunning, NotRunning);
else
RunUpdateTask();
}
void RunUpdateTask() => Task.Run(RunUpdateSafe);
}


int? offset = null; // Offset must be preserved for all errors except TaskCanceledException.
// Using a local variable may cause duplicates if an exception occurs.
internal async Task RunUpdate()
{
int? offset = null;

while (_observers.Count != 0)
{
Expand All @@ -76,27 +82,26 @@ internal async Task RunUpdate()
Timeout = 60,

// if there is a null value in the list, it means that all updates are allowed
AllowedUpdates = GetTrackingUpdateTypes() ?? null
AllowedUpdates = _trackedUpdateTypes ?? null
};

var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token);
if (!result.Any())
{
await Task.Delay(1000);
continue;
}

offset = result.Max(x => x.UpdateId) + 1;
NotifyObservers(result);
offset = result.Last().UpdateId + 1;
}
catch (TaskCanceledException)
{
// create new token and check observers
offset = null;
_cancellationTokenSource = new CancellationTokenSource();
}
catch (Exception exception)
{
// unexpected exception report them to the observers and cancel run update
OnException(exception);
throw;
}
Expand All @@ -110,8 +115,27 @@ internal void NotifyObservers(Update[] updates)
}
internal void OnException(Exception exception)
{
for (int oid = 0; oid != _observers.Count; ++oid)
_observers[oid].OnError(exception);
IObserver<Update>[] current;
lock (_observers)
{
current = _observers.ToArray(); // Caching current observers to prevent
// notifying those who subscribed after an error occurred.
_observers.Clear();
}

for (int oid = 0; oid != current.Length; ++oid)
{
try
{
current[oid].OnError(exception);
}
catch (Exception ex)
{
// Ignore exceptions from observers without an error handler,
// as it would break the process and propagate the exception to the outer scope.
ExceptionHelpers.ThrowIfFatal(ex);
}
}
}
internal void Remove(IObserver<Update> observer)
{
Expand All @@ -128,14 +152,12 @@ internal void Remove(IObserver<Update> observer)
}
public IDisposable Subscribe(IObserver<Update> observer)
{
if(observer==null)
if (observer == null)
throw new ArgumentNullException(nameof(observer));

lock (_observers)
{
if (!_observers.Contains(observer))
{
_observers.Add(observer);
}
_observers.Add(observer);
}

if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning)
Expand Down
132 changes: 59 additions & 73 deletions src/RxTelegram.Bot/Api/UpdateDistributor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using RxTelegram.Bot.Interface.InlineMode;
using RxTelegram.Bot.Interface.Payments;
using RxTelegram.Bot.Interface.Setup;
using RxTelegram.Bot.Utils.Rx;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -17,11 +18,14 @@ namespace RxTelegram.Bot.Api;

public sealed class UpdateDistributor : IUpdateManager, IDisposable
{
private IObservable<Update> _tracker;

#region Observable Update properties
private Dictionary<UpdateType, UpdateTypeInfo> _updateInfos = new Dictionary<UpdateType, UpdateTypeInfo>();
private UpdateTypeInfo _update = new UpdateTypeInfo();
private readonly Dictionary<UpdateType, UpdateTypeInfo> _updateInfos;
private readonly UpdateTypeInfo _updateInfo;
private readonly IEnumerable<UpdateType> _trackedTypes;
private readonly ReactiveProperty<IObservable<Update>> _tracker;
private bool _isDisposed = false;
private readonly object _lock;
public IObservable<CallbackQuery> CallbackQuery => Selector(UpdateType.CallbackQuery, _update => _update.CallbackQuery);
public IObservable<Message> ChannelPost => Selector(UpdateType.ChannelPost, _update => _update.ChannelPost);
public IObservable<ChatBoostUpdated> ChatBoost => Selector(UpdateType.ChatBoost, _update => _update.ChatBoost);
Expand All @@ -38,9 +42,7 @@ public sealed class UpdateDistributor : IUpdateManager, IDisposable
public IObservable<PreCheckoutQuery> PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, _update => _update.PreCheckoutQuery);
public IObservable<ChatBoostRemoved> RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, _update => _update.RemovedChatBoost);
public IObservable<ShippingQuery> ShippingQuery => Selector(UpdateType.ShippingQuery, _update => _update.ShippingQuery);
public IObservable<Update> Update => (IObservable<Update>)(_update.Observer ??= new UpdateSubject<Update>(x => x,
onSubscribe: AddGeneralListener,
onDispose: RemoveGeneralListener));
public IObservable<Update> Update => Selector(null, _update => _update);
#endregion

#if NETSTANDARD2_1
Expand All @@ -65,101 +67,85 @@ public sealed class UpdateDistributor : IUpdateManager, IDisposable
#endif
public UpdateDistributor(IObservable<Update> updateTracker)
{
_lock = new();
_updateInfos = Enum.GetValues(typeof(UpdateType))
.Cast<UpdateType>()
.ToDictionary(x => x, _ => new UpdateTypeInfo());
Set(updateTracker);
}

private void AddGeneralListener()
{
++_update.Listeners;
_updateInfo = new UpdateTypeInfo();

(_tracker as ITrackerSetup)?.Set(null);
_trackedTypes = _updateInfos.Where(x => x.Value.Listeners != 0)
.Select(x => x.Key);

_update.Subscription ??= _tracker.Subscribe(_update.Observer);
_tracker = new ReactiveProperty<IObservable<Update>>(updateTracker);
Set(updateTracker);
}
private void AddListener(UpdateType type)
private void AddListener(UpdateType? type)
{
var updateType = _updateInfos[type];
++updateType.Listeners;

UpdateTrackerTypes();
lock (_lock)
{
var info = GetInfo(type);
++info.Listeners;

updateType.Subscription ??= _tracker.Subscribe(updateType.Observer);
if (info.Listeners != 1) return;
UpdateTrackerTypes();
}
}
private void RemoveGeneralListener()
{
--_update.Listeners;
_update.Subscription?.Dispose();
_update.Subscription = null;
private UpdateTypeInfo GetInfo(UpdateType? type)
=> type == null ? _updateInfo : _updateInfos[(UpdateType)type];

UpdateTrackerTypes();
}
private void RemoveListener(UpdateType type)
private void RemoveListener(UpdateType? type)
{
var updateType = _updateInfos[type];
--updateType.Listeners;
_update.Subscription?.Dispose();
_update.Subscription = null;
lock (_lock)
{
var info = GetInfo(type);
--info.Listeners;
if (info.Listeners != 0) return;

UpdateTrackerTypes();
UpdateTrackerTypes();
}
}
public IObservable<T> Selector<T>(UpdateType updateType, Func<Update, T> propertySelector)
public IObservable<T> Selector<T>(UpdateType? type, Func<Update, T> propertySelector)
where T : class
{
var info = _updateInfos[updateType];
if (info.Observer != null)
return (IObservable<T>)info;

var subject = new UpdateSubject<T>(propertySelector,
onSubscribe: () => AddListener(updateType),
onDispose: () => RemoveListener(updateType));
info.Observer = subject;
info.Subscription = _tracker.Subscribe(info.Observer);
return subject;
return _tracker.Switch().Select(propertySelector)
.Where(x => x != null)
.DoOnSubscribe(() => AddListener(type))
.Finally(() => RemoveListener(type));
}

public void Set(IObservable<Update> tracker)
{
//Setup current tracker to listen all messages before change to a new one
(_tracker as ITrackerSetup)?.Set(null);
DisposeTrackerSubcription();
_tracker = tracker;
// Configure the current tracker to listen for all types of updates
// before switching to a new one
(_tracker.Current as ITrackerSetup)?.Set(null);

_tracker.OnNext(tracker);
UpdateTrackerTypes();
SubscribeToTracker();
}
public void DisposeTrackerSubcription()
{
_update.Subscription?.Dispose();
foreach (var info in _updateInfos.Values)
info.Subscription?.Dispose();
}
public void SubscribeToTracker()
private void UpdateTrackerTypes()
{
if(_update.Observer!=null)
_update.Subscription = _tracker.Subscribe(_update.Observer);
foreach (var info in _updateInfos.Values.Where(x=>x.Observer!=null))
info.Subscription = _tracker.Subscribe(info.Observer);
if (_tracker.Current is not ITrackerSetup setup) return;

setup.Set(_updateInfo.Listeners != 0 || !_trackedTypes.Any() ?
null : _trackedTypes);
}
private void UpdateTrackerTypes()

public void Dispose() => Dispose(true);
void Dispose(bool explicitDisposing)
{
if (_tracker is not ITrackerSetup) return;
if (_isDisposed) return;

IEnumerable<UpdateType> types = null;
if (_update.Listeners == 0)
{
types = _updateInfos.Where(x => x.Value.Listeners != 0).Select(x => x.Key);
if (!types.Any())
types = null;
}
(_tracker as ITrackerSetup).Set(types);
if (explicitDisposing)
_tracker.Dispose();

_isDisposed = true;
}

public void Dispose() => DisposeTrackerSubcription();
~UpdateDistributor() => Dispose(false);

private class UpdateTypeInfo
sealed private class UpdateTypeInfo
{
public int Listeners { get; set; } = 0;
public IObserver<Update> Observer { get; set; } = null;
public IDisposable Subscription { get; set; } = null;
}
}
8 changes: 0 additions & 8 deletions src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs

This file was deleted.

55 changes: 0 additions & 55 deletions src/RxTelegram.Bot/Utils/Rx/CustomSubject.cs

This file was deleted.

Loading