diff --git a/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs b/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs index 51d6c10..844df0a 100644 --- a/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs +++ b/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs @@ -8,163 +8,182 @@ using System.Threading.Tasks; namespace RxTelegram.Bot.Api; -public class LongpollingUpdateTracker(ITelegramBot telegramBot) +public class LongPollingUpdateTracker(ITelegramBot telegramBot) : IObservable, ITrackerSetup { - private readonly ITelegramBot _telegramBot = telegramBot; - private const int NotRunning = 0; - private const int Running = 1; - private int _isRunning = NotRunning; - private CancellationTokenSource _cancellationTokenSource; - UpdateType[] _trackedUpdateTypes = []; - readonly List> _observers = new List>(); - - public void Set(IEnumerable types) - { - if (types == null) - { - if (_trackedUpdateTypes != null) - { - _trackedUpdateTypes = null; - _cancellationTokenSource?.Cancel(); - } - return; - } + private readonly ITelegramBot _telegramBot = telegramBot; + private const int NotRunning = 0; + private const int Running = 1; + private int _isRunning = NotRunning; + private CancellationTokenSource _cancellationTokenSource; + private UpdateType[] _trackedUpdateTypes = []; + readonly List> _observers = new List>(); - if (_trackedUpdateTypes == null || !types.SequenceEqual(_trackedUpdateTypes)) + public void Set(IEnumerable types) { - _trackedUpdateTypes = types.ToArray(); - _cancellationTokenSource?.Cancel(); + if (types == null) + { + if (_trackedUpdateTypes != null) + { + _trackedUpdateTypes = null; + _cancellationTokenSource?.Cancel(); + } + return; + } + + var updateTypes = types as UpdateType[] ?? types.ToArray(); + if (_trackedUpdateTypes == null || !updateTypes.SequenceEqual(_trackedUpdateTypes)) + { + _trackedUpdateTypes = updateTypes; + _cancellationTokenSource?.Cancel(); + } } - } - internal async Task RunUpdateSafe() - { - try + internal async Task RunUpdateSafe() { - _cancellationTokenSource = new CancellationTokenSource(); - await RunUpdate(); + try + { + _cancellationTokenSource = new CancellationTokenSource(); + await RunUpdate(); + } + catch (Exception ex) + { + // ignored + ExceptionHelpers.ThrowIfFatal(ex); + } + finally + { + _cancellationTokenSource.Dispose(); + _cancellationTokenSource = null; + + if (!_observers.Any()) + { + Volatile.Write(ref _isRunning, NotRunning); + } + else + { + RunUpdateTask(); + } + } + void RunUpdateTask() => Task.Run(RunUpdateSafe); } - catch (Exception ex) + + private int? _offset; // Offset must be preserved for all errors except TaskCanceledException. + // Using a local variable may cause duplicates if an exception occurs. + internal async Task RunUpdate() { - // ignored - ExceptionHelpers.ThrowIfFatal(ex); + + while (_observers.Count != 0) + { + try + { + // if the token already canceled before the first request reset token + if (_cancellationTokenSource.IsCancellationRequested) + { + _cancellationTokenSource = new CancellationTokenSource(); + } + + var getUpdate = new GetUpdate + { + Offset = _offset, + Timeout = 60, + + // if there is a null value in the list, it means that all updates are allowed + AllowedUpdates = _trackedUpdateTypes + }; + + var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token); + if (!result.Any()) + { + await Task.Delay(1000); + continue; + } + + NotifyObservers(result); + + var lastId = result.Length - 1; + _offset = result[lastId].UpdateId + 1; + } + catch (TaskCanceledException) + { + _offset = null; + _cancellationTokenSource = new CancellationTokenSource(); + } + catch (Exception exception) + { + OnException(exception); + throw; + } + } } - finally + internal void NotifyObservers(Update[] updates) { - _cancellationTokenSource.Dispose(); - _cancellationTokenSource = null; - - if (!_observers.Any()) - Volatile.Write(ref _isRunning, NotRunning); - else - RunUpdateTask(); + for (var uid = 0; uid != updates.Length; ++uid) + { + for (var oid = 0; oid != _observers.Count; ++oid) + { + _observers[oid].OnNext(updates[uid]); + } + } } - void RunUpdateTask() => Task.Run(RunUpdateSafe); - } - - int? offset = null; // Offset must be preserved for all errors except TaskCanceledException. - // Using a local variable may cause duplicates if an exception occurs. - internal async Task RunUpdate() - { - - while (_observers.Count != 0) + internal void OnException(Exception exception) { - try - { - // if the token already canceled before the first request reset token - if (_cancellationTokenSource.IsCancellationRequested) - _cancellationTokenSource = new CancellationTokenSource(); - - var getUpdate = new GetUpdate + IObserver[] current; + lock (_observers) { - Offset = offset, - Timeout = 60, - - // if there is a null value in the list, it means that all updates are allowed - AllowedUpdates = _trackedUpdateTypes ?? null - }; + current = _observers.ToArray(); // Caching current observers to prevent + // notifying those who subscribed after an error occurred. + _observers.Clear(); + } - var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token); - if (!result.Any()) + for (var oid = 0; oid != current.Length; ++oid) { - await Task.Delay(1000); - continue; + try + { + current[oid].OnError(exception); + } + catch (Exception ex) + { + // Ignore exceptions from observers without an error handler, + // as it would break the process and propagate the exception to the outer scope. + ExceptionHelpers.ThrowIfFatal(ex); + } } - - NotifyObservers(result); - offset = result.Last().UpdateId + 1; - } - catch (TaskCanceledException) - { - offset = null; - _cancellationTokenSource = new CancellationTokenSource(); - } - catch (Exception exception) - { - OnException(exception); - throw; - } } - } - internal void NotifyObservers(Update[] updates) - { - for (int uid = 0; uid != updates.Length; ++uid) - for (int oid = 0; oid != _observers.Count; ++oid) - _observers[oid].OnNext(updates[uid]); - } - internal void OnException(Exception exception) - { - IObserver[] current; - lock (_observers) + internal void Remove(IObserver observer) { - current = _observers.ToArray(); // Caching current observers to prevent - // notifying those who subscribed after an error occurred. - _observers.Clear(); - } + if (!_observers.Contains(observer)) + { + return; + } - for (int oid = 0; oid != current.Length; ++oid) - { - try - { - current[oid].OnError(exception); - } - catch (Exception ex) - { - // Ignore exceptions from observers without an error handler, - // as it would break the process and propagate the exception to the outer scope. - ExceptionHelpers.ThrowIfFatal(ex); - } - } - } - internal void Remove(IObserver observer) - { - if (!_observers.Contains(observer)) - return; + lock (_observers) + { + _observers.Remove(observer); + } - lock (_observers) - { - _observers.Remove(observer); + if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running) + { + _cancellationTokenSource?.Cancel(); + } } + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } - if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running) - _cancellationTokenSource?.Cancel(); - } - public IDisposable Subscribe(IObserver observer) - { - if (observer == null) - throw new ArgumentNullException(nameof(observer)); + lock (_observers) + { + _observers.Add(observer); + } - lock (_observers) - { - _observers.Add(observer); - } + if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning) + { + Task.Run(RunUpdateSafe); + } - if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning) - { - Task.Run(RunUpdateSafe); + return new DisposableAction(() => Remove(observer)); } - - return new DisposableAction(() => Remove(observer)); - } -} \ No newline at end of file +} diff --git a/src/RxTelegram.Bot/Api/UpdateDistributor.cs b/src/RxTelegram.Bot/Api/UpdateDistributor.cs index adc9399..a38874e 100644 --- a/src/RxTelegram.Bot/Api/UpdateDistributor.cs +++ b/src/RxTelegram.Bot/Api/UpdateDistributor.cs @@ -19,31 +19,31 @@ namespace RxTelegram.Bot.Api; public sealed class UpdateDistributor : IUpdateManager, IDisposable { - #region Observable Update properties - private readonly Dictionary _updateInfos; - private readonly UpdateTypeInfo _updateInfo; - private readonly IEnumerable _trackedTypes; - private readonly ReactiveProperty> _tracker; - private bool _isDisposed = false; - private readonly object _lock; - public IObservable CallbackQuery => Selector(UpdateType.CallbackQuery, _update => _update.CallbackQuery); - public IObservable ChannelPost => Selector(UpdateType.ChannelPost, _update => _update.ChannelPost); - public IObservable ChatBoost => Selector(UpdateType.ChatBoost, _update => _update.ChatBoost); - public IObservable ChatJoinRequest => Selector(UpdateType.ChatJoinRequest, _update => _update.ChatJoinRequest); - public IObservable ChatMember => Selector(UpdateType.ChatMember, _update => _update.ChatMember); - public IObservable ChosenInlineResult => Selector(UpdateType.ChosenInlineResult, _update => _update.ChosenInlineResult); - public IObservable EditedChannelPost => Selector(UpdateType.EditedChannelPost, _update => _update.EditedChannelPost); - public IObservable EditedMessage => Selector(UpdateType.EditedMessage, _update => _update.EditedMessage); - public IObservable InlineQuery => Selector(UpdateType.InlineQuery, _update => _update.InlineQuery); - public IObservable Message => Selector(UpdateType.Message, _update => _update.Message); - public IObservable MyChatMember => Selector(UpdateType.MyChatMember, _update => _update.MyChatMember); - public IObservable Poll => Selector(UpdateType.Poll, _update => _update.Poll); - public IObservable PollAnswer => Selector(UpdateType.PollAnswer, _update => _update.PollAnswer); - public IObservable PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, _update => _update.PreCheckoutQuery); - public IObservable RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, _update => _update.RemovedChatBoost); - public IObservable ShippingQuery => Selector(UpdateType.ShippingQuery, _update => _update.ShippingQuery); - public IObservable Update => Selector(null, _update => _update); - #endregion + #region Observable Update properties + private readonly Dictionary _updateInfos; + private readonly UpdateTypeInfo _updateInfo; + private readonly IEnumerable _trackedTypes; + private readonly ReactiveProperty> _tracker; + private bool _isDisposed; + private readonly object _lock; + public IObservable CallbackQuery => Selector(UpdateType.CallbackQuery, update => update.CallbackQuery); + public IObservable ChannelPost => Selector(UpdateType.ChannelPost, update => update.ChannelPost); + public IObservable ChatBoost => Selector(UpdateType.ChatBoost, update => update.ChatBoost); + public IObservable ChatJoinRequest => Selector(UpdateType.ChatJoinRequest, update => update.ChatJoinRequest); + public IObservable ChatMember => Selector(UpdateType.ChatMember, update => update.ChatMember); + public IObservable ChosenInlineResult => Selector(UpdateType.ChosenInlineResult, update => update.ChosenInlineResult); + public IObservable EditedChannelPost => Selector(UpdateType.EditedChannelPost, update => update.EditedChannelPost); + public IObservable EditedMessage => Selector(UpdateType.EditedMessage, update => update.EditedMessage); + public IObservable InlineQuery => Selector(UpdateType.InlineQuery, update => update.InlineQuery); + public IObservable Message => Selector(UpdateType.Message, update => update.Message); + public IObservable MyChatMember => Selector(UpdateType.MyChatMember, update => update.MyChatMember); + public IObservable Poll => Selector(UpdateType.Poll, update => update.Poll); + public IObservable PollAnswer => Selector(UpdateType.PollAnswer, update => update.PollAnswer); + public IObservable PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, update => update.PreCheckoutQuery); + public IObservable RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, update => update.RemovedChatBoost); + public IObservable ShippingQuery => Selector(UpdateType.ShippingQuery, update => update.ShippingQuery); + public IObservable Update => Selector(null, update => update); + #endregion #if NETSTANDARD2_1 public IAsyncEnumerable CallbackQueryEnumerable() => CallbackQuery.ToAsyncEnumerable(); @@ -65,87 +65,106 @@ public sealed class UpdateDistributor : IUpdateManager, IDisposable public IAsyncEnumerable UpdateEnumerable() => Update.ToAsyncEnumerable(); #endif - public UpdateDistributor(IObservable updateTracker) - { - _lock = new(); - _updateInfos = Enum.GetValues(typeof(UpdateType)) - .Cast() - .ToDictionary(x => x, _ => new UpdateTypeInfo()); - - _updateInfo = new UpdateTypeInfo(); - - _trackedTypes = _updateInfos.Where(x => x.Value.Listeners != 0) - .Select(x => x.Key); - - _tracker = new ReactiveProperty>(updateTracker); - Set(updateTracker); - } - private void AddListener(UpdateType? type) - { - lock (_lock) + public UpdateDistributor(IObservable updateTracker) { - var info = GetInfo(type); - ++info.Listeners; + _lock = new(); + _updateInfos = Enum.GetValues(typeof(UpdateType)) + .Cast() + .ToDictionary(x => x, _ => new UpdateTypeInfo()); - if (info.Listeners != 1) return; - UpdateTrackerTypes(); + _updateInfo = new UpdateTypeInfo(); + + _trackedTypes = _updateInfos.Where(x => x.Value.Listeners != 0) + .Select(x => x.Key); + + _tracker = new ReactiveProperty>(updateTracker); + Set(updateTracker); + } + private void AddListener(UpdateType? type) + { + lock (_lock) + { + var info = GetInfo(type); + ++info.Listeners; + + if (info.Listeners != 1) + { + return; + } + + UpdateTrackerTypes(); + } } - } - private UpdateTypeInfo GetInfo(UpdateType? type) - => type == null ? _updateInfo : _updateInfos[(UpdateType)type]; + private UpdateTypeInfo GetInfo(UpdateType? type) + => type == null ? _updateInfo : _updateInfos[(UpdateType)type]; - private void RemoveListener(UpdateType? type) - { - lock (_lock) + private void RemoveListener(UpdateType? type) { - var info = GetInfo(type); - --info.Listeners; - if (info.Listeners != 0) return; + lock (_lock) + { + var info = GetInfo(type); + --info.Listeners; + if (info.Listeners != 0) + { + return; + } + + UpdateTrackerTypes(); + } + } - UpdateTrackerTypes(); + private IObservable Selector(UpdateType? type, Func propertySelector) + where T : class + { + return _tracker.Switch().Select(propertySelector) + .Where(x => x != null) + .DoOnSubscribe(() => AddListener(type)) + .Finally(() => RemoveListener(type)); + } + + public void Set(IObservable tracker) + { + // Configure the current tracker to listen for all types of updates + // before switching to a new one + (_tracker.Current as ITrackerSetup)?.Set(null); + + _tracker.OnNext(tracker); + UpdateTrackerTypes(); + } + private void UpdateTrackerTypes() + { + if (_tracker.Current is not ITrackerSetup setup) + { + return; + } + + lock (_lock) + { + setup.Set(_updateInfo.Listeners != 0 || !_trackedTypes.Any() ? + null : _trackedTypes); + } + } + + public void Dispose() => Dispose(true); + private void Dispose(bool explicitDisposing) + { + if (_isDisposed) + { + return; + } + + if (explicitDisposing) + { + _tracker.Dispose(); + } + + _isDisposed = true; + } + + ~UpdateDistributor() => Dispose(false); + + private sealed class UpdateTypeInfo + { + public int Listeners { get; set; } } - } - public IObservable Selector(UpdateType? type, Func propertySelector) - where T : class - { - return _tracker.Switch().Select(propertySelector) - .Where(x => x != null) - .DoOnSubscribe(() => AddListener(type)) - .Finally(() => RemoveListener(type)); - } - - public void Set(IObservable tracker) - { - // Configure the current tracker to listen for all types of updates - // before switching to a new one - (_tracker.Current as ITrackerSetup)?.Set(null); - - _tracker.OnNext(tracker); - UpdateTrackerTypes(); - } - private void UpdateTrackerTypes() - { - if (_tracker.Current is not ITrackerSetup setup) return; - - setup.Set(_updateInfo.Listeners != 0 || !_trackedTypes.Any() ? - null : _trackedTypes); - } - - public void Dispose() => Dispose(true); - void Dispose(bool explicitDisposing) - { - if (_isDisposed) return; - - if (explicitDisposing) - _tracker.Dispose(); - - _isDisposed = true; - } - - ~UpdateDistributor() => Dispose(false); - - sealed private class UpdateTypeInfo - { - public int Listeners { get; set; } = 0; - } -} \ No newline at end of file +} diff --git a/src/RxTelegram.Bot/TelegramBot.Builder.cs b/src/RxTelegram.Bot/TelegramBot.Builder.cs index 943c4c6..132ef76 100644 --- a/src/RxTelegram.Bot/TelegramBot.Builder.cs +++ b/src/RxTelegram.Bot/TelegramBot.Builder.cs @@ -4,39 +4,39 @@ namespace RxTelegram.Bot; -public partial class TelegramBot : BaseTelegramBot, ITelegramBot +public partial class TelegramBot { - public class Builder - { - private string _token; - private IObservable _tracker = null; - private IUpdateManager _updateManager = null; - public Builder() { } - public Builder(string token) : this() { _token = token; } - - public Builder SetToken(string token) - { - _token = token; - return this; - } - public Builder SetTracker(IObservable tracker) - { - _tracker = tracker; - return this; - } - public Builder SetManager(IUpdateManager updateManager) - { - _updateManager = updateManager; - return this; - } - public TelegramBot Build() + public class Builder { - var bot = new TelegramBot(_token); - _tracker ??= new LongpollingUpdateTracker(bot); - bot.Updates = _updateManager ?? new UpdateDistributor(_tracker); - bot.Updates.Set(_tracker); + private string _token; + private IObservable _tracker; + private IUpdateManager _updateManager; + public Builder() { } + public Builder(string token) : this() { _token = token; } + + public Builder SetToken(string token) + { + _token = token; + return this; + } + public Builder SetTracker(IObservable tracker) + { + _tracker = tracker; + return this; + } + public Builder SetManager(IUpdateManager updateManager) + { + _updateManager = updateManager; + return this; + } + public TelegramBot Build() + { + var bot = new TelegramBot(_token); + _tracker ??= new LongPollingUpdateTracker(bot); + bot.Updates = _updateManager ?? new UpdateDistributor(_tracker); + bot.Updates.Set(_tracker); - return bot; + return bot; + } } - } -} \ No newline at end of file +} diff --git a/src/RxTelegram.Bot/TelegramBot.cs b/src/RxTelegram.Bot/TelegramBot.cs index c09bfd4..e472e7f 100644 --- a/src/RxTelegram.Bot/TelegramBot.cs +++ b/src/RxTelegram.Bot/TelegramBot.cs @@ -38,7 +38,7 @@ public TelegramBot(string token) : this(new BotInfo(token)) public TelegramBot(BotInfo botInfo) : base(botInfo) { - var tracker = new LongpollingUpdateTracker(this); + var tracker = new LongPollingUpdateTracker(this); Updates = new UpdateDistributor(tracker); } diff --git a/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs b/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs index ec11ec8..b73b912 100644 --- a/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs +++ b/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs @@ -2,17 +2,9 @@ namespace RxTelegram.Bot.Utils.Rx; -sealed public class DisposableAction : IDisposable +public sealed class DisposableAction(Action action) : IDisposable { - static public DisposableAction Empty { get; } = new DisposableAction(() => { }); - private readonly Action action; - - public DisposableAction(Action action) - { - if (action == null) - throw new ArgumentNullException(nameof(action)); - this.action = action; - } - - public void Dispose() => action(); -} \ No newline at end of file + public static DisposableAction Empty { get; } = new DisposableAction(() => { }); + private readonly Action _action = action ?? throw new ArgumentNullException(nameof(action)); + public void Dispose() => _action(); +} diff --git a/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs b/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs index 4b7094c..5a39b85 100644 --- a/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs +++ b/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs @@ -2,25 +2,19 @@ namespace RxTelegram.Bot.Utils.Rx; -internal class DoOnDisposeObservable : IObservable +internal class DoOnDisposeObservable(IObservable source, Action onDispose) : IObservable { - private readonly IObservable _source; - private readonly Action _onDispose; + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Action _onDispose = onDispose ?? throw new ArgumentNullException(nameof(onDispose)); - public DoOnDisposeObservable(IObservable source, Action onDispose) - { - this._source = source ?? throw new ArgumentNullException(nameof(source)); - this._onDispose = onDispose ?? throw new ArgumentNullException(nameof(onDispose)); - } + public IDisposable Subscribe(IObserver observer) + { - public IDisposable Subscribe(IObserver observer) - { - - var subscription = _source.Subscribe(observer); - return new DisposableAction(() => - { - subscription.Dispose(); - _onDispose(); - }); - } -} \ No newline at end of file + var subscription = _source.Subscribe(observer); + return new DisposableAction(() => + { + subscription.Dispose(); + _onDispose(); + }); + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs b/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs index 82f2c10..3570a56 100644 --- a/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs +++ b/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs @@ -2,22 +2,16 @@ namespace RxTelegram.Bot.Utils.Rx; -internal class DoOnSubscribeObservable : IObservable +internal class DoOnSubscribeObservable(IObservable source, Action onSubscribe) : IObservable { - private readonly IObservable _source; - private readonly Action _onSbuscribe; + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Action _onSubscribe = onSubscribe ?? throw new ArgumentNullException(nameof(onSubscribe)); - public DoOnSubscribeObservable(IObservable source, Action onSubscribe) - { - this._source = source ?? throw new ArgumentNullException(nameof(source)); - this._onSbuscribe = onSubscribe ?? throw new ArgumentNullException(nameof(onSubscribe)); - } + public IDisposable Subscribe(IObserver observer) + { - public IDisposable Subscribe(IObserver observer) - { - - var subscription = _source.Subscribe(observer); - _onSbuscribe(); - return subscription; - } -} \ No newline at end of file + var subscription = _source.Subscribe(observer); + _onSubscribe(); + return subscription; + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs b/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs index 4c6c1db..ff3a6a9 100644 --- a/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs +++ b/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs @@ -5,10 +5,21 @@ namespace RxTelegram.Bot.Utils.Rx; internal static class ExceptionHelpers { - internal static void ThrowIfFatal(Exception ex) - { - if (ex is OutOfMemoryException || ex is StackOverflowException - || ex is ThreadAbortException) - throw ex; - } -} \ No newline at end of file + internal static void ThrowIfFatal(Exception ex) + { + if (ex is OutOfMemoryException || ex is StackOverflowException + || ex is ThreadAbortException) + { + throw ex; + } + } + + internal static void ThrowIfDisposed(this IDisposable disposable, Func isDisposed) + { + if (isDisposed()) + { + throw new ObjectDisposedException(disposable.GetType().FullName); + } + + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs b/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs index f4e1444..b59ae2a 100644 --- a/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs +++ b/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs @@ -3,75 +3,64 @@ namespace RxTelegram.Bot.Utils.Rx; -internal class FinallyObservable : IObservable +internal class FinallyObservable(IObservable source, Action onTerminate) : IObservable { - private readonly IObservable _source; - private readonly Action _onTerminate; + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Action _onTerminate = onTerminate ?? throw new ArgumentNullException(nameof(onTerminate)); - public FinallyObservable(IObservable source, Action onTerminate) - { - _source = source ?? throw new ArgumentNullException(nameof(source)); - _onTerminate = onTerminate ?? throw new ArgumentNullException(nameof(onTerminate)); - } - - public IDisposable Subscribe(IObserver observer) - { - if (observer == null) - throw new ArgumentNullException(nameof(observer)); - - var finallyObserver = new FinallyObserver(observer, _onTerminate); - var subscription = _source.Subscribe(finallyObserver); - - return new DisposableAction(() => finallyObserver.Dispose(subscription)); - } + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } - private class FinallyObserver : IObserver - { - private readonly IObserver _observer; - private readonly Action _onTerminate; - private int _terminated; + var finallyObserver = new FinallyObserver(observer, _onTerminate); + var subscription = _source.Subscribe(finallyObserver); - public FinallyObserver(IObserver observer, Action onTerminate) - { - _observer = observer; - _onTerminate = onTerminate; - } - public void Dispose(IDisposable subscription) - { - subscription.Dispose(); - Terminate(); + return new DisposableAction(() => finallyObserver.DisposeSubscription(subscription)); } - public void OnCompleted() + private sealed class FinallyObserver(IObserver observer, Action onTerminate) : IObserver { - _observer.OnCompleted(); - Terminate(); - } + private int _terminated; - public void OnError(Exception error) - { - try - { - _observer.OnError(error); - } - catch (Exception ex) - { - ExceptionHelpers.ThrowIfFatal(ex); - } - finally - { - Terminate(); - } - } + public void DisposeSubscription(IDisposable subscription) + { + subscription.Dispose(); + Terminate(); + } - public void OnNext(T value) => _observer.OnNext(value); + public void OnCompleted() + { + observer.OnCompleted(); + Terminate(); + } - private void Terminate() - { - if (Interlocked.Exchange(ref _terminated, 1) == 0) - { - _onTerminate(); - } + public void OnError(Exception error) + { + try + { + observer.OnError(error); + } + catch (Exception ex) + { + ExceptionHelpers.ThrowIfFatal(ex); + } + finally + { + Terminate(); + } + } + + public void OnNext(T value) => observer.OnNext(value); + + private void Terminate() + { + if (Interlocked.Exchange(ref _terminated, 1) == 0) + { + onTerminate(); + } + } } - } -} \ No newline at end of file +} diff --git a/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs b/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs index efa9fbf..adc491a 100644 --- a/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs +++ b/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs @@ -2,18 +2,18 @@ namespace RxTelegram.Bot.Utils.Rx; -static public class ObservableOperators +public static class ObservableOperators { - static public IObservable DoOnDispose(this IObservable source, Action onDispose) - => new DoOnDisposeObservable(source, onDispose); - static public IObservable DoOnSubscribe(this IObservable source, Action onSubscribe) - => new DoOnSubscribeObservable(source, onSubscribe); - static public IObservable Finally(this IObservable source, Action onTerminate) - => new FinallyObservable(source,onTerminate); - static public IObservable Select(this IObservable source, Func selector) - => new SelectObservable(source, selector); - static public IObservable Switch(this IObservable> source) - => new SwitchObservable(source); - static public IObservable Where(this IObservable source, Func predicate) - => new WhereObservable(source, predicate); -} \ No newline at end of file + public static IObservable DoOnDispose(this IObservable source, Action onDispose) + => new DoOnDisposeObservable(source, onDispose); + public static IObservable DoOnSubscribe(this IObservable source, Action onSubscribe) + => new DoOnSubscribeObservable(source, onSubscribe); + public static IObservable Finally(this IObservable source, Action onTerminate) + => new FinallyObservable(source, onTerminate); + public static IObservable Select(this IObservable source, Func selector) + => new SelectObservable(source, selector); + public static IObservable Switch(this IObservable> source) + => new SwitchObservable(source); + public static IObservable Where(this IObservable source, Func predicate) + => new WhereObservable(source, predicate); +} diff --git a/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs b/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs index c98d9ad..2ccec25 100644 --- a/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs +++ b/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs @@ -5,127 +5,135 @@ namespace RxTelegram.Bot.Utils.Rx; -public class ReactiveProperty : ISubject, IDisposable +public class ReactiveProperty() : ISubject, IDisposable { - static private readonly List> Terminated = []; - private readonly object _lock = new object(); - protected Exception _error; - private T _current; - public T Current - { - get => _current; - protected set { _current = value; HasValue = true; } - } - public bool HasValue { get; private set; } - private List> _observers; - public bool IsDisposed { get; protected set; } - public bool IsError => _error != null; - public Exception Error => _error; - public ReactiveProperty() - { - _error = null; - _observers = new List>(); - } - public ReactiveProperty(T initValue) : this() - { - Current = initValue; - } - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - void Dispose(bool explicitDisposing) - { - if (IsDisposed) return; + private static readonly List> Terminated = []; + private readonly object _lock = new object(); + private Exception _error; + private T _current; + public T Current + { + get => _current; + protected set { _current = value; HasValue = true; } + } + public bool HasValue { get; private set; } + private List> _observers = new(); + public bool IsDisposed { get; private set; } + public bool IsError => _error != null; + public Exception Error => _error; - if (explicitDisposing) - DisposeManaged(); + public ReactiveProperty(T initValue) : this() + { + Current = initValue; + } - IsDisposed = true; - } - protected virtual void DisposeManaged() - { - OnCompleted(); - Interlocked.Exchange(ref _observers, Terminated); - } - public void OnCompleted() - { - lock (_lock) + public void OnCompleted() { - ThrowIfDisposed(); - foreach (var observer in _observers) - observer.OnCompleted(); + lock (_lock) + { + ThrowIfDisposed(); + foreach (var observer in _observers) + { + observer.OnCompleted(); + } - _observers = Terminated; + _observers = Terminated; + } } - } - public void OnError(Exception error) - { - lock (_lock) + public void OnError(Exception error) { - ThrowIfDisposed(); - _error = error; - Current = default; - foreach (var observer in _observers) - { - try + lock (_lock) { - observer.OnError(error); + ThrowIfDisposed(); + _error = error; + Current = default; + foreach (var observer in _observers) + { + try + { + observer.OnError(error); + } + catch (Exception ex) + { + //ignore + // if observer doesn't have exception handler block + // it will lead to breaking process and throwing exception to outer scope + ExceptionHelpers.ThrowIfFatal(ex); + } + } + + _observers = Terminated; } - catch (Exception ex) + } + public void OnNext(T value) + { + lock (_lock) { - //ignore - // if observer doesn't have exception handler block - // it will lead to breaking process and throwing exception to outer scope - ExceptionHelpers.ThrowIfFatal(ex); - } - } + ThrowIfDisposed(); + Current = value; - _observers = Terminated; + foreach (var observer in _observers) + { + observer.OnNext(value); + } + } } - } - public void OnNext(T value) - { - lock (_lock) + public IDisposable Subscribe(IObserver observer) { - ThrowIfDisposed(); - Current = value; + lock (_lock) + { + ThrowIfDisposed(); + if (_observers == Terminated) + { + observer.OnCompleted(); + return DisposableAction.Empty; + } - foreach (var observer in _observers) - observer.OnNext(value); + _observers.Add(observer); + if (HasValue) + { + observer.OnNext(Current); + } + + return new DisposableAction(() => + { + lock (_lock) + { + _observers.Remove(observer); + } + }); + } } - } - public IDisposable Subscribe(IObserver observer) - { - lock (_lock) + + protected void ThrowIfDisposed() { - ThrowIfDisposed(); - if (_observers == Terminated) - { - observer.OnCompleted(); - return DisposableAction.Empty; - } + if (IsDisposed) + { + throw new ObjectDisposedException(nameof(ReactiveProperty)); + } + } - _observers.Add(observer); - if (HasValue) - observer.OnNext(Current); + ~ReactiveProperty() => Dispose(false); - return new DisposableAction(() => - { - lock (_lock) + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected void Dispose(bool explicitDisposing) + { + if (IsDisposed) { - _observers.Remove(observer); + return; } - }); - } - } - protected void ThrowIfDisposed() - { - if (IsDisposed) - throw new ObjectDisposedException(nameof(ReactiveProperty)); - } - ~ReactiveProperty() => Dispose(false); -} \ No newline at end of file + if (explicitDisposing) + { + OnCompleted(); + Interlocked.Exchange(ref _observers, Terminated); + } + + IsDisposed = true; + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs b/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs index 8638d25..c747e2e 100644 --- a/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs +++ b/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs @@ -1,51 +1,39 @@ using System; namespace RxTelegram.Bot.Utils.Rx; -internal class SelectObservable : IObservable +internal class SelectObservable(IObservable source, Func selector) : IObservable { - private readonly IObservable _source; - internal readonly Func _selector; + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Func _selector = selector ?? throw new ArgumentNullException(nameof(selector)); - public SelectObservable(IObservable source, Func selector) - { - this._source = source ?? throw new ArgumentNullException(nameof(source)); - this._selector = selector ?? throw new ArgumentNullException(nameof(selector)); - } - - public IDisposable Subscribe(IObserver observer) - { - if (observer == null) - throw new ArgumentNullException(nameof(observer)); - var selectObserver = new SelectObserver(observer, _selector); - return _source.Subscribe(selectObserver); - } - private class SelectObserver : IObserver - { - private readonly IObserver _observer; - private readonly Func _selector; - - public SelectObserver(IObserver observer, Func selector) + public IDisposable Subscribe(IObserver observer) { - this._observer = observer; - this._selector = selector; - } + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } - public void OnCompleted() => _observer.OnCompleted(); - public void OnError(Exception error) => _observer.OnError(error); - public void OnNext(T value) + var selectObserver = new SelectObserver(observer, _selector); + return _source.Subscribe(selectObserver); + } + private sealed class SelectObserver(IObserver observer, Func selector) : IObserver { - K result; - try - { - result = _selector(value); - } - catch (Exception ex) - { - _observer.OnError(ex); - return; - } + public void OnCompleted() => observer.OnCompleted(); + public void OnError(Exception error) => observer.OnError(error); + public void OnNext(TIn value) + { + TOut result; + try + { + result = selector(value); + } + catch (Exception ex) + { + observer.OnError(ex); + return; + } - _observer.OnNext(result); + observer.OnNext(result); + } } - } -} \ No newline at end of file +} diff --git a/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs b/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs index 0bbbee9..a7dc467 100644 --- a/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs +++ b/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs @@ -2,88 +2,98 @@ namespace RxTelegram.Bot.Utils.Rx; -internal class SwitchObservable : IObservable +internal class SwitchObservable(IObservable> source) : IObservable { - private readonly IObservable> _source; - - public SwitchObservable(IObservable> source) - { - _source = source ?? throw new ArgumentNullException(nameof(source)); - } - public IDisposable Subscribe(IObserver observer) - { - var switchObserver = new SwitchObserver(observer); - var stream = _source.Subscribe(switchObserver); - return new DisposableAction(() => - { - stream.Dispose(); - switchObserver.Dispose(); - }); - } - - private class SwitchObserver : IObserver>, IDisposable - { - private readonly object _lock = new(); - private IObserver _observer; - private IDisposable _subscription; - private bool _isDisposed = false; - public SwitchObserver(IObserver observer) - { - this._observer = observer ?? throw new ArgumentNullException(nameof(observer)); - } - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } + private readonly IObservable> _source = source ?? throw new ArgumentNullException(nameof(source)); - public void OnCompleted() + public IDisposable Subscribe(IObserver observer) { - lock (_lock) - { - if (_isDisposed) return; - _observer?.OnCompleted(); - Dispose(); - } + var switchObserver = new SwitchObserver(observer); + var stream = _source.Subscribe(switchObserver); + return new DisposableAction(() => + { + stream.Dispose(); + switchObserver.Dispose(); + }); } - public void OnError(Exception error) + private class SwitchObserver(IObserver observer) : IObserver>, IDisposable { - lock (_lock) - { - if (_isDisposed) return; - _observer?.OnError(error); - Dispose(); - } - } + private readonly object _lock = new(); + private IObserver _observer = observer ?? throw new ArgumentNullException(nameof(observer)); + private IDisposable _subscription; + private bool _isDisposed; - public void OnNext(IObservable stream) - { - if (stream == null) throw new ArgumentNullException(nameof(stream)); + public void OnCompleted() + { + lock (_lock) + { + if (_isDisposed) + { + return; + } - lock (_lock) - { - if (_isDisposed) - throw new ObjectDisposedException(nameof(SwitchObserver)); + _observer?.OnCompleted(); + Dispose(); + } + } - _subscription?.Dispose(); - _subscription = stream.Subscribe(_observer); - } - } - void Dispose(bool explicitDisposing) - { - if (_isDisposed) return; + public void OnError(Exception error) + { + lock (_lock) + { + if (_isDisposed) + { + return; + } - if (explicitDisposing) - { - _subscription?.Dispose(); - _subscription = null; - _observer = null; - } + _observer?.OnError(error); + Dispose(); + } + } - _isDisposed = true; - } + public void OnNext(IObservable stream) + { + if (stream == null) + { + throw new ArgumentNullException(nameof(stream)); + } + + lock (_lock) + { + if (_isDisposed) + { + throw new ObjectDisposedException(nameof(SwitchObserver)); + } + + _subscription?.Dispose(); + _subscription = stream.Subscribe(_observer); + } + } + + ~SwitchObserver() => Dispose(false); - ~SwitchObserver() => Dispose(false); - } -} \ No newline at end of file + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + void Dispose(bool explicitDisposing) + { + if (_isDisposed) + { + return; + } + + if (explicitDisposing) + { + _subscription?.Dispose(); + _subscription = null; + _observer = null; + } + + _isDisposed = true; + } + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs b/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs index ff1802c..3c56938 100644 --- a/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs +++ b/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs @@ -2,84 +2,100 @@ namespace RxTelegram.Bot.Utils.Rx; -internal class WhereObservable : IObservable +internal class WhereObservable(IObservable source, Func predicate) : IObservable { - private readonly IObservable _source; - internal readonly Func _predicate; + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Func _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); - public WhereObservable(IObservable source, Func predicate) - { - _source = source ?? throw new ArgumentNullException(nameof(source)); - _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); - } - public IDisposable Subscribe(IObserver observer) - { - var where = new WhereObserver(observer, _predicate); - var subscription = _source.Subscribe(where); - return new DisposableAction(() => + public IDisposable Subscribe(IObserver observer) { - subscription.Dispose(); - where.Dispose(); - }); - } - private class WhereObserver : IObserver, IDisposable - { - private readonly object _lock = new(); - private readonly Func _predicate; - private IObserver _observer; - private bool _isCompleted; - private bool _isDisposed; - - public WhereObserver(IObserver observer, Func predicate) - { - _observer = observer ?? throw new ArgumentNullException(nameof(observer)); - _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + var where = new WhereObserver(observer, _predicate); + var subscription = _source.Subscribe(where); + return new DisposableAction(() => + { + subscription.Dispose(); + where.Dispose(); + }); } - - public void OnCompleted() + private sealed class WhereObserver(IObserver observer, Func predicate) : IObserver, IDisposable { - if (_isCompleted || _isDisposed) return; - _isCompleted = true; - _observer?.OnCompleted(); - } + private readonly object _lock = new(); + private readonly Func _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + private IObserver _observer = observer ?? throw new ArgumentNullException(nameof(observer)); + private bool _isCompleted; + private bool _isDisposed; - public void OnError(Exception error) - { - lock (_lock) - { - if (_isCompleted || _isDisposed) return; - _isCompleted = true; - _observer?.OnError(error); - } - } + public void OnCompleted() + { + lock (_lock) + { + if (_isCompleted || _isDisposed) + { + return; + } - public void OnNext(T value) - { - lock (_lock) - { - if (_isCompleted || _isDisposed) return; + _isCompleted = true; + _observer?.OnCompleted(); + } + } - bool isPassed = false; - try + public void OnError(Exception error) { - isPassed = _predicate(value); + lock (_lock) + { + if (_isCompleted || _isDisposed) + { + return; + } + + _isCompleted = true; + _observer?.OnError(error); + } } - catch (Exception error) + + public void OnNext(T value) { - OnError(error); + lock (_lock) + { + if (_isCompleted || _isDisposed) + { + return; + } + + var isPassed = false; + try + { + isPassed = _predicate(value); + } + catch (Exception error) + { + OnError(error); + } + if (isPassed) + { + _observer.OnNext(value); + } + } } - if (isPassed) - _observer.OnNext(value); - } - } - public void Dispose() - { - lock (_lock) - { - _isDisposed = true; - _observer = null; - } + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + private void Dispose(bool explicitDisposing) + { + if (_isDisposed) return; + + lock (_lock) + { + if (explicitDisposing) + { + _observer = null; + } + _isDisposed = true; + } + } + ~WhereObserver() => Dispose(false); } - } -} \ No newline at end of file +}