-
Notifications
You must be signed in to change notification settings - Fork 5
Rework updatemanager #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
1e87c5c
1e195b1
7884122
170c120
bfcb929
e32fdf7
78946f8
4d687f2
b58887a
41e6e93
2b594d7
1343adf
80421a7
bce88b6
068d1b4
be72545
da89c6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| using RxTelegram.Bot.Interface.BaseTypes.Enums; | ||
| using System.Collections.Generic; | ||
|
|
||
| namespace RxTelegram.Bot.Api; | ||
|
|
||
| public interface ITrackerSetup | ||
| { | ||
| void Set(IEnumerable<UpdateType> types); | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,189 @@ | ||||||
| using RxTelegram.Bot.Interface.BaseTypes.Enums; | ||||||
| using RxTelegram.Bot.Interface.Setup; | ||||||
| using RxTelegram.Bot.Utils.Rx; | ||||||
| using System; | ||||||
| using System.Collections.Generic; | ||||||
| using System.Linq; | ||||||
| using System.Threading; | ||||||
| using System.Threading.Tasks; | ||||||
|
|
||||||
| namespace RxTelegram.Bot.Api; | ||||||
| public class LongPollingUpdateTracker(ITelegramBot telegramBot) | ||||||
| : IObservable<Update>, ITrackerSetup | ||||||
| { | ||||||
| private readonly ITelegramBot _telegramBot = telegramBot; | ||||||
| private const int NotRunning = 0; | ||||||
| private const int Running = 1; | ||||||
| private int _isRunning = NotRunning; | ||||||
| private CancellationTokenSource _cancellationTokenSource; | ||||||
| private UpdateType[] _trackedUpdateTypes = []; | ||||||
| readonly List<IObserver<Update>> _observers = new List<IObserver<Update>>(); | ||||||
|
|
||||||
| public void Set(IEnumerable<UpdateType> types) | ||||||
| { | ||||||
| if (types == null) | ||||||
| { | ||||||
| if (_trackedUpdateTypes != null) | ||||||
| { | ||||||
| _trackedUpdateTypes = null; | ||||||
| _cancellationTokenSource?.Cancel(); | ||||||
| } | ||||||
| return; | ||||||
| } | ||||||
|
|
||||||
| var updateTypes = types as UpdateType[] ?? types.ToArray(); | ||||||
| if (_trackedUpdateTypes == null || !updateTypes.SequenceEqual(_trackedUpdateTypes)) | ||||||
| { | ||||||
| _trackedUpdateTypes = updateTypes; | ||||||
| _cancellationTokenSource?.Cancel(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| internal async Task RunUpdateSafe() | ||||||
| { | ||||||
| try | ||||||
| { | ||||||
| _cancellationTokenSource = new CancellationTokenSource(); | ||||||
| await RunUpdate(); | ||||||
| } | ||||||
| catch (Exception ex) | ||||||
| { | ||||||
| // ignored | ||||||
| ExceptionHelpers.ThrowIfFatal(ex); | ||||||
| } | ||||||
| finally | ||||||
| { | ||||||
| _cancellationTokenSource.Dispose(); | ||||||
| _cancellationTokenSource = null; | ||||||
|
|
||||||
| if (!_observers.Any()) | ||||||
| { | ||||||
| Volatile.Write(ref _isRunning, NotRunning); | ||||||
| } | ||||||
| else | ||||||
| { | ||||||
| RunUpdateTask(); | ||||||
| } | ||||||
| } | ||||||
| void RunUpdateTask() => Task.Run(RunUpdateSafe); | ||||||
| } | ||||||
|
|
||||||
| private int? _offset; // Offset must be preserved for all errors except TaskCanceledException. | ||||||
| // Using a local variable may cause duplicates if an exception occurs. | ||||||
| internal async Task RunUpdate() | ||||||
| { | ||||||
|
|
||||||
| while (_observers.Count != 0) | ||||||
| { | ||||||
| try | ||||||
| { | ||||||
| // if the token already canceled before the first request reset token | ||||||
| if (_cancellationTokenSource.IsCancellationRequested) | ||||||
| { | ||||||
| _cancellationTokenSource = new CancellationTokenSource(); | ||||||
| } | ||||||
|
|
||||||
| var getUpdate = new GetUpdate | ||||||
| { | ||||||
| Offset = _offset, | ||||||
| Timeout = 60, | ||||||
|
|
||||||
| // if there is a null value in the list, it means that all updates are allowed | ||||||
| AllowedUpdates = _trackedUpdateTypes | ||||||
| }; | ||||||
|
|
||||||
| var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token); | ||||||
| if (!result.Any()) | ||||||
| { | ||||||
| await Task.Delay(1000); | ||||||
| continue; | ||||||
| } | ||||||
|
|
||||||
| NotifyObservers(result); | ||||||
|
|
||||||
| var lastId = result.Length - 1; | ||||||
| _offset = result[lastId].UpdateId + 1; | ||||||
| } | ||||||
| catch (TaskCanceledException) | ||||||
| { | ||||||
| _offset = null; | ||||||
| _cancellationTokenSource = new CancellationTokenSource(); | ||||||
| } | ||||||
| catch (Exception exception) | ||||||
| { | ||||||
| OnException(exception); | ||||||
| throw; | ||||||
|
||||||
| throw; | |
| // Exception already handled by notifying observers; do not rethrow. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| using RxTelegram.Bot.Interface.BaseTypes; | ||
| using RxTelegram.Bot.Interface.BaseTypes.Enums; | ||
| using RxTelegram.Bot.Interface.InlineMode; | ||
| using RxTelegram.Bot.Interface.Payments; | ||
| using RxTelegram.Bot.Interface.Setup; | ||
| using RxTelegram.Bot.Utils.Rx; | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
|
|
||
| #if NETSTANDARD2_1 | ||
|
|
||
| using RxTelegram.Bot.Utils; | ||
|
|
||
| #endif | ||
|
|
||
| namespace RxTelegram.Bot.Api; | ||
|
|
||
| public sealed class UpdateDistributor : IUpdateManager, IDisposable | ||
| { | ||
|
|
||
| #region Observable Update properties | ||
| private readonly Dictionary<UpdateType, UpdateTypeInfo> _updateInfos; | ||
| private readonly UpdateTypeInfo _updateInfo; | ||
| private readonly IEnumerable<UpdateType> _trackedTypes; | ||
| private readonly ReactiveProperty<IObservable<Update>> _tracker; | ||
| private bool _isDisposed; | ||
| private readonly object _lock; | ||
| public IObservable<CallbackQuery> CallbackQuery => Selector(UpdateType.CallbackQuery, update => update.CallbackQuery); | ||
| public IObservable<Message> ChannelPost => Selector(UpdateType.ChannelPost, update => update.ChannelPost); | ||
| public IObservable<ChatBoostUpdated> ChatBoost => Selector(UpdateType.ChatBoost, update => update.ChatBoost); | ||
| public IObservable<ChatJoinRequest> ChatJoinRequest => Selector(UpdateType.ChatJoinRequest, update => update.ChatJoinRequest); | ||
| public IObservable<ChatMemberUpdated> ChatMember => Selector(UpdateType.ChatMember, update => update.ChatMember); | ||
| public IObservable<ChosenInlineResult> ChosenInlineResult => Selector(UpdateType.ChosenInlineResult, update => update.ChosenInlineResult); | ||
| public IObservable<Message> EditedChannelPost => Selector(UpdateType.EditedChannelPost, update => update.EditedChannelPost); | ||
| public IObservable<Message> EditedMessage => Selector(UpdateType.EditedMessage, update => update.EditedMessage); | ||
| public IObservable<InlineQuery> InlineQuery => Selector(UpdateType.InlineQuery, update => update.InlineQuery); | ||
| public IObservable<Message> Message => Selector(UpdateType.Message, update => update.Message); | ||
| public IObservable<ChatMemberUpdated> MyChatMember => Selector(UpdateType.MyChatMember, update => update.MyChatMember); | ||
| public IObservable<Poll> Poll => Selector(UpdateType.Poll, update => update.Poll); | ||
| public IObservable<PollAnswer> PollAnswer => Selector(UpdateType.PollAnswer, update => update.PollAnswer); | ||
| public IObservable<PreCheckoutQuery> PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, update => update.PreCheckoutQuery); | ||
| public IObservable<ChatBoostRemoved> RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, update => update.RemovedChatBoost); | ||
| public IObservable<ShippingQuery> ShippingQuery => Selector(UpdateType.ShippingQuery, update => update.ShippingQuery); | ||
| public IObservable<Update> Update => Selector(null, update => update); | ||
| #endregion | ||
|
|
||
| #if NETSTANDARD2_1 | ||
| public IAsyncEnumerable<CallbackQuery> CallbackQueryEnumerable() => CallbackQuery.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<Message> ChannelPostEnumerable() => ChannelPost.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<ChatBoostUpdated> ChatBoostEnumerable() => ChatBoost.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<ChatJoinRequest> ChatJoinRequestEnumerable() => ChatJoinRequest.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<ChatMemberUpdated> ChatMemberEnumerable() => ChatMember.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<ChosenInlineResult> ChosenInlineResultEnumerable() => ChosenInlineResult.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<Message> EditedChannelPostEnumerable() => EditedChannelPost.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<Message> EditedMessageEnumerable() => EditedMessage.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<InlineQuery> InlineQueryEnumerable() => InlineQuery.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<Message> MessageEnumerable() => Message.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<ChatMemberUpdated> MyChatMemberEnumerable() => MyChatMember.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<ShippingQuery> ShippingQueryEnumerable() => ShippingQuery.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<PollAnswer> PollAnswerEnumerable() => PollAnswer.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<Poll> PollEnumerable() => Poll.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<PreCheckoutQuery> PreCheckoutQueryEnumerable() => PreCheckoutQuery.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<ChatBoostRemoved> RemovedChatBoostEnumerable() => RemovedChatBoost.ToAsyncEnumerable(); | ||
| public IAsyncEnumerable<Update> UpdateEnumerable() => Update.ToAsyncEnumerable(); | ||
|
|
||
| #endif | ||
| public UpdateDistributor(IObservable<Update> updateTracker) | ||
| { | ||
| _lock = new(); | ||
| _updateInfos = Enum.GetValues(typeof(UpdateType)) | ||
| .Cast<UpdateType>() | ||
| .ToDictionary(x => x, _ => new UpdateTypeInfo()); | ||
|
|
||
| _updateInfo = new UpdateTypeInfo(); | ||
|
|
||
| _trackedTypes = _updateInfos.Where(x => x.Value.Listeners != 0) | ||
| .Select(x => x.Key); | ||
|
Comment on lines
+77
to
+78
|
||
|
|
||
| _tracker = new ReactiveProperty<IObservable<Update>>(updateTracker); | ||
| Set(updateTracker); | ||
| } | ||
| private void AddListener(UpdateType? type) | ||
| { | ||
| lock (_lock) | ||
| { | ||
| var info = GetInfo(type); | ||
| ++info.Listeners; | ||
|
|
||
| if (info.Listeners != 1) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| UpdateTrackerTypes(); | ||
| } | ||
| } | ||
| private UpdateTypeInfo GetInfo(UpdateType? type) | ||
| => type == null ? _updateInfo : _updateInfos[(UpdateType)type]; | ||
|
|
||
| private void RemoveListener(UpdateType? type) | ||
| { | ||
| lock (_lock) | ||
| { | ||
| var info = GetInfo(type); | ||
| --info.Listeners; | ||
| if (info.Listeners != 0) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| UpdateTrackerTypes(); | ||
| } | ||
| } | ||
|
|
||
| private IObservable<T> Selector<T>(UpdateType? type, Func<Update, T> propertySelector) | ||
| where T : class | ||
| { | ||
| return _tracker.Switch().Select(propertySelector) | ||
| .Where(x => x != null) | ||
| .DoOnSubscribe(() => AddListener(type)) | ||
| .Finally(() => RemoveListener(type)); | ||
| } | ||
|
|
||
| public void Set(IObservable<Update> tracker) | ||
| { | ||
| // Configure the current tracker to listen for all types of updates | ||
| // before switching to a new one | ||
| (_tracker.Current as ITrackerSetup)?.Set(null); | ||
|
|
||
| _tracker.OnNext(tracker); | ||
| UpdateTrackerTypes(); | ||
| } | ||
| private void UpdateTrackerTypes() | ||
| { | ||
| if (_tracker.Current is not ITrackerSetup setup) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| lock (_lock) | ||
| { | ||
| setup.Set(_updateInfo.Listeners != 0 || !_trackedTypes.Any() ? | ||
| null : _trackedTypes); | ||
| } | ||
| } | ||
|
|
||
| public void Dispose() => Dispose(true); | ||
| private void Dispose(bool explicitDisposing) | ||
| { | ||
| if (_isDisposed) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| if (explicitDisposing) | ||
| { | ||
| _tracker.Dispose(); | ||
| } | ||
|
|
||
| _isDisposed = true; | ||
| } | ||
|
|
||
| ~UpdateDistributor() => Dispose(false); | ||
|
|
||
| private sealed class UpdateTypeInfo | ||
| { | ||
| public int Listeners { get; set; } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new CancellationTokenSource without disposing the previous one can lead to resource leaks. The old token source should be disposed before creating a new one.