diff --git a/src/RxTelegram.Bot/Api/ITrackerSetup.cs b/src/RxTelegram.Bot/Api/ITrackerSetup.cs new file mode 100644 index 0000000..0943df7 --- /dev/null +++ b/src/RxTelegram.Bot/Api/ITrackerSetup.cs @@ -0,0 +1,9 @@ +using RxTelegram.Bot.Interface.BaseTypes.Enums; +using System.Collections.Generic; + +namespace RxTelegram.Bot.Api; + +public interface ITrackerSetup +{ + void Set(IEnumerable types); +} diff --git a/src/RxTelegram.Bot/Api/IUpdateManager.cs b/src/RxTelegram.Bot/Api/IUpdateManager.cs index 56a611d..7020685 100644 --- a/src/RxTelegram.Bot/Api/IUpdateManager.cs +++ b/src/RxTelegram.Bot/Api/IUpdateManager.cs @@ -12,6 +12,12 @@ namespace RxTelegram.Bot.Api; public interface IUpdateManager { + /// + /// Allows to set custom updates tracker + /// + /// + void Set(IObservable tracker); + /// /// Updates of all Types. /// diff --git a/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs b/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs new file mode 100644 index 0000000..844df0a --- /dev/null +++ b/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs @@ -0,0 +1,189 @@ +using RxTelegram.Bot.Interface.BaseTypes.Enums; +using RxTelegram.Bot.Interface.Setup; +using RxTelegram.Bot.Utils.Rx; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace RxTelegram.Bot.Api; +public class LongPollingUpdateTracker(ITelegramBot telegramBot) + : IObservable, ITrackerSetup +{ + private readonly ITelegramBot _telegramBot = telegramBot; + private const int NotRunning = 0; + private const int Running = 1; + private int _isRunning = NotRunning; + private CancellationTokenSource _cancellationTokenSource; + private UpdateType[] _trackedUpdateTypes = []; + readonly List> _observers = new List>(); + + public void Set(IEnumerable types) + { + if (types == null) + { + if (_trackedUpdateTypes != null) + { + _trackedUpdateTypes = null; + _cancellationTokenSource?.Cancel(); + } + return; + } + + var updateTypes = types as UpdateType[] ?? types.ToArray(); + if (_trackedUpdateTypes == null || !updateTypes.SequenceEqual(_trackedUpdateTypes)) + { + _trackedUpdateTypes = updateTypes; + _cancellationTokenSource?.Cancel(); + } + } + + internal async Task RunUpdateSafe() + { + try + { + _cancellationTokenSource = new CancellationTokenSource(); + await RunUpdate(); + } + catch (Exception ex) + { + // ignored + ExceptionHelpers.ThrowIfFatal(ex); + } + finally + { + _cancellationTokenSource.Dispose(); + _cancellationTokenSource = null; + + if (!_observers.Any()) + { + Volatile.Write(ref _isRunning, NotRunning); + } + else + { + RunUpdateTask(); + } + } + void RunUpdateTask() => Task.Run(RunUpdateSafe); + } + + private int? _offset; // Offset must be preserved for all errors except TaskCanceledException. + // Using a local variable may cause duplicates if an exception occurs. + internal async Task RunUpdate() + { + + while (_observers.Count != 0) + { + try + { + // if the token already canceled before the first request reset token + if (_cancellationTokenSource.IsCancellationRequested) + { + _cancellationTokenSource = new CancellationTokenSource(); + } + + var getUpdate = new GetUpdate + { + Offset = _offset, + Timeout = 60, + + // if there is a null value in the list, it means that all updates are allowed + AllowedUpdates = _trackedUpdateTypes + }; + + var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token); + if (!result.Any()) + { + await Task.Delay(1000); + continue; + } + + NotifyObservers(result); + + var lastId = result.Length - 1; + _offset = result[lastId].UpdateId + 1; + } + catch (TaskCanceledException) + { + _offset = null; + _cancellationTokenSource = new CancellationTokenSource(); + } + catch (Exception exception) + { + OnException(exception); + throw; + } + } + } + internal void NotifyObservers(Update[] updates) + { + for (var uid = 0; uid != updates.Length; ++uid) + { + for (var oid = 0; oid != _observers.Count; ++oid) + { + _observers[oid].OnNext(updates[uid]); + } + } + } + internal void OnException(Exception exception) + { + IObserver[] current; + lock (_observers) + { + current = _observers.ToArray(); // Caching current observers to prevent + // notifying those who subscribed after an error occurred. + _observers.Clear(); + } + + for (var oid = 0; oid != current.Length; ++oid) + { + try + { + current[oid].OnError(exception); + } + catch (Exception ex) + { + // Ignore exceptions from observers without an error handler, + // as it would break the process and propagate the exception to the outer scope. + ExceptionHelpers.ThrowIfFatal(ex); + } + } + } + internal void Remove(IObserver observer) + { + if (!_observers.Contains(observer)) + { + return; + } + + lock (_observers) + { + _observers.Remove(observer); + } + + if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running) + { + _cancellationTokenSource?.Cancel(); + } + } + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + lock (_observers) + { + _observers.Add(observer); + } + + if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning) + { + Task.Run(RunUpdateSafe); + } + + return new DisposableAction(() => Remove(observer)); + } +} diff --git a/src/RxTelegram.Bot/Api/UpdateDistributor.cs b/src/RxTelegram.Bot/Api/UpdateDistributor.cs new file mode 100644 index 0000000..a38874e --- /dev/null +++ b/src/RxTelegram.Bot/Api/UpdateDistributor.cs @@ -0,0 +1,170 @@ +using RxTelegram.Bot.Interface.BaseTypes; +using RxTelegram.Bot.Interface.BaseTypes.Enums; +using RxTelegram.Bot.Interface.InlineMode; +using RxTelegram.Bot.Interface.Payments; +using RxTelegram.Bot.Interface.Setup; +using RxTelegram.Bot.Utils.Rx; +using System; +using System.Collections.Generic; +using System.Linq; + +#if NETSTANDARD2_1 + +using RxTelegram.Bot.Utils; + +#endif + +namespace RxTelegram.Bot.Api; + +public sealed class UpdateDistributor : IUpdateManager, IDisposable +{ + + #region Observable Update properties + private readonly Dictionary _updateInfos; + private readonly UpdateTypeInfo _updateInfo; + private readonly IEnumerable _trackedTypes; + private readonly ReactiveProperty> _tracker; + private bool _isDisposed; + private readonly object _lock; + public IObservable CallbackQuery => Selector(UpdateType.CallbackQuery, update => update.CallbackQuery); + public IObservable ChannelPost => Selector(UpdateType.ChannelPost, update => update.ChannelPost); + public IObservable ChatBoost => Selector(UpdateType.ChatBoost, update => update.ChatBoost); + public IObservable ChatJoinRequest => Selector(UpdateType.ChatJoinRequest, update => update.ChatJoinRequest); + public IObservable ChatMember => Selector(UpdateType.ChatMember, update => update.ChatMember); + public IObservable ChosenInlineResult => Selector(UpdateType.ChosenInlineResult, update => update.ChosenInlineResult); + public IObservable EditedChannelPost => Selector(UpdateType.EditedChannelPost, update => update.EditedChannelPost); + public IObservable EditedMessage => Selector(UpdateType.EditedMessage, update => update.EditedMessage); + public IObservable InlineQuery => Selector(UpdateType.InlineQuery, update => update.InlineQuery); + public IObservable Message => Selector(UpdateType.Message, update => update.Message); + public IObservable MyChatMember => Selector(UpdateType.MyChatMember, update => update.MyChatMember); + public IObservable Poll => Selector(UpdateType.Poll, update => update.Poll); + public IObservable PollAnswer => Selector(UpdateType.PollAnswer, update => update.PollAnswer); + public IObservable PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, update => update.PreCheckoutQuery); + public IObservable RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, update => update.RemovedChatBoost); + public IObservable ShippingQuery => Selector(UpdateType.ShippingQuery, update => update.ShippingQuery); + public IObservable Update => Selector(null, update => update); + #endregion + +#if NETSTANDARD2_1 + public IAsyncEnumerable CallbackQueryEnumerable() => CallbackQuery.ToAsyncEnumerable(); + public IAsyncEnumerable ChannelPostEnumerable() => ChannelPost.ToAsyncEnumerable(); + public IAsyncEnumerable ChatBoostEnumerable() => ChatBoost.ToAsyncEnumerable(); + public IAsyncEnumerable ChatJoinRequestEnumerable() => ChatJoinRequest.ToAsyncEnumerable(); + public IAsyncEnumerable ChatMemberEnumerable() => ChatMember.ToAsyncEnumerable(); + public IAsyncEnumerable ChosenInlineResultEnumerable() => ChosenInlineResult.ToAsyncEnumerable(); + public IAsyncEnumerable EditedChannelPostEnumerable() => EditedChannelPost.ToAsyncEnumerable(); + public IAsyncEnumerable EditedMessageEnumerable() => EditedMessage.ToAsyncEnumerable(); + public IAsyncEnumerable InlineQueryEnumerable() => InlineQuery.ToAsyncEnumerable(); + public IAsyncEnumerable MessageEnumerable() => Message.ToAsyncEnumerable(); + public IAsyncEnumerable MyChatMemberEnumerable() => MyChatMember.ToAsyncEnumerable(); + public IAsyncEnumerable ShippingQueryEnumerable() => ShippingQuery.ToAsyncEnumerable(); + public IAsyncEnumerable PollAnswerEnumerable() => PollAnswer.ToAsyncEnumerable(); + public IAsyncEnumerable PollEnumerable() => Poll.ToAsyncEnumerable(); + public IAsyncEnumerable PreCheckoutQueryEnumerable() => PreCheckoutQuery.ToAsyncEnumerable(); + public IAsyncEnumerable RemovedChatBoostEnumerable() => RemovedChatBoost.ToAsyncEnumerable(); + public IAsyncEnumerable UpdateEnumerable() => Update.ToAsyncEnumerable(); + +#endif + public UpdateDistributor(IObservable updateTracker) + { + _lock = new(); + _updateInfos = Enum.GetValues(typeof(UpdateType)) + .Cast() + .ToDictionary(x => x, _ => new UpdateTypeInfo()); + + _updateInfo = new UpdateTypeInfo(); + + _trackedTypes = _updateInfos.Where(x => x.Value.Listeners != 0) + .Select(x => x.Key); + + _tracker = new ReactiveProperty>(updateTracker); + Set(updateTracker); + } + private void AddListener(UpdateType? type) + { + lock (_lock) + { + var info = GetInfo(type); + ++info.Listeners; + + if (info.Listeners != 1) + { + return; + } + + UpdateTrackerTypes(); + } + } + private UpdateTypeInfo GetInfo(UpdateType? type) + => type == null ? _updateInfo : _updateInfos[(UpdateType)type]; + + private void RemoveListener(UpdateType? type) + { + lock (_lock) + { + var info = GetInfo(type); + --info.Listeners; + if (info.Listeners != 0) + { + return; + } + + UpdateTrackerTypes(); + } + } + + private IObservable Selector(UpdateType? type, Func propertySelector) + where T : class + { + return _tracker.Switch().Select(propertySelector) + .Where(x => x != null) + .DoOnSubscribe(() => AddListener(type)) + .Finally(() => RemoveListener(type)); + } + + public void Set(IObservable tracker) + { + // Configure the current tracker to listen for all types of updates + // before switching to a new one + (_tracker.Current as ITrackerSetup)?.Set(null); + + _tracker.OnNext(tracker); + UpdateTrackerTypes(); + } + private void UpdateTrackerTypes() + { + if (_tracker.Current is not ITrackerSetup setup) + { + return; + } + + lock (_lock) + { + setup.Set(_updateInfo.Listeners != 0 || !_trackedTypes.Any() ? + null : _trackedTypes); + } + } + + public void Dispose() => Dispose(true); + private void Dispose(bool explicitDisposing) + { + if (_isDisposed) + { + return; + } + + if (explicitDisposing) + { + _tracker.Dispose(); + } + + _isDisposed = true; + } + + ~UpdateDistributor() => Dispose(false); + + private sealed class UpdateTypeInfo + { + public int Listeners { get; set; } + } +} diff --git a/src/RxTelegram.Bot/Api/UpdateManager.cs b/src/RxTelegram.Bot/Api/UpdateManager.cs index cdc386d..51a6740 100644 --- a/src/RxTelegram.Bot/Api/UpdateManager.cs +++ b/src/RxTelegram.Bot/Api/UpdateManager.cs @@ -16,6 +16,8 @@ namespace RxTelegram.Bot.Api; public class UpdateManager : IUpdateManager { + public void Set(IObservable tracker) => throw new NotImplementedException(); + public IObservable Update => _update; public IObservable Message => _message; diff --git a/src/RxTelegram.Bot/TelegramBot.Builder.cs b/src/RxTelegram.Bot/TelegramBot.Builder.cs new file mode 100644 index 0000000..132ef76 --- /dev/null +++ b/src/RxTelegram.Bot/TelegramBot.Builder.cs @@ -0,0 +1,42 @@ +using System; +using RxTelegram.Bot.Api; +using RxTelegram.Bot.Interface.Setup; + +namespace RxTelegram.Bot; + +public partial class TelegramBot +{ + public class Builder + { + private string _token; + private IObservable _tracker; + private IUpdateManager _updateManager; + public Builder() { } + public Builder(string token) : this() { _token = token; } + + public Builder SetToken(string token) + { + _token = token; + return this; + } + public Builder SetTracker(IObservable tracker) + { + _tracker = tracker; + return this; + } + public Builder SetManager(IUpdateManager updateManager) + { + _updateManager = updateManager; + return this; + } + public TelegramBot Build() + { + var bot = new TelegramBot(_token); + _tracker ??= new LongPollingUpdateTracker(bot); + bot.Updates = _updateManager ?? new UpdateDistributor(_tracker); + bot.Updates.Set(_tracker); + + return bot; + } + } +} diff --git a/src/RxTelegram.Bot/TelegramBot.cs b/src/RxTelegram.Bot/TelegramBot.cs index 7a8c1e5..e472e7f 100644 --- a/src/RxTelegram.Bot/TelegramBot.cs +++ b/src/RxTelegram.Bot/TelegramBot.cs @@ -30,15 +30,19 @@ namespace RxTelegram.Bot; -public class TelegramBot : BaseTelegramBot, ITelegramBot +public partial class TelegramBot : BaseTelegramBot, ITelegramBot { public TelegramBot(string token) : this(new BotInfo(token)) { } - public TelegramBot(BotInfo botInfo) : base(botInfo) => Updates = new UpdateManager(this); + public TelegramBot(BotInfo botInfo) : base(botInfo) + { + var tracker = new LongPollingUpdateTracker(this); + Updates = new UpdateDistributor(tracker); + } - public IUpdateManager Updates { get; } + public IUpdateManager Updates { get; private set; } /// /// Use this method to get basic info about a file and prepare it for downloading. diff --git a/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs b/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs new file mode 100644 index 0000000..b73b912 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs @@ -0,0 +1,10 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +public sealed class DisposableAction(Action action) : IDisposable +{ + public static DisposableAction Empty { get; } = new DisposableAction(() => { }); + private readonly Action _action = action ?? throw new ArgumentNullException(nameof(action)); + public void Dispose() => _action(); +} diff --git a/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs b/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs new file mode 100644 index 0000000..5a39b85 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs @@ -0,0 +1,20 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class DoOnDisposeObservable(IObservable source, Action onDispose) : IObservable +{ + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Action _onDispose = onDispose ?? throw new ArgumentNullException(nameof(onDispose)); + + public IDisposable Subscribe(IObserver observer) + { + + var subscription = _source.Subscribe(observer); + return new DisposableAction(() => + { + subscription.Dispose(); + _onDispose(); + }); + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs b/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs new file mode 100644 index 0000000..3570a56 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs @@ -0,0 +1,17 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class DoOnSubscribeObservable(IObservable source, Action onSubscribe) : IObservable +{ + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Action _onSubscribe = onSubscribe ?? throw new ArgumentNullException(nameof(onSubscribe)); + + public IDisposable Subscribe(IObserver observer) + { + + var subscription = _source.Subscribe(observer); + _onSubscribe(); + return subscription; + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs b/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs new file mode 100644 index 0000000..ff3a6a9 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading; + +namespace RxTelegram.Bot.Utils.Rx; + +internal static class ExceptionHelpers +{ + internal static void ThrowIfFatal(Exception ex) + { + if (ex is OutOfMemoryException || ex is StackOverflowException + || ex is ThreadAbortException) + { + throw ex; + } + } + + internal static void ThrowIfDisposed(this IDisposable disposable, Func isDisposed) + { + if (isDisposed()) + { + throw new ObjectDisposedException(disposable.GetType().FullName); + } + + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs b/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs new file mode 100644 index 0000000..b59ae2a --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs @@ -0,0 +1,66 @@ +using System; +using System.Threading; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class FinallyObservable(IObservable source, Action onTerminate) : IObservable +{ + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Action _onTerminate = onTerminate ?? throw new ArgumentNullException(nameof(onTerminate)); + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var finallyObserver = new FinallyObserver(observer, _onTerminate); + var subscription = _source.Subscribe(finallyObserver); + + return new DisposableAction(() => finallyObserver.DisposeSubscription(subscription)); + } + + private sealed class FinallyObserver(IObserver observer, Action onTerminate) : IObserver + { + private int _terminated; + + public void DisposeSubscription(IDisposable subscription) + { + subscription.Dispose(); + Terminate(); + } + + public void OnCompleted() + { + observer.OnCompleted(); + Terminate(); + } + + public void OnError(Exception error) + { + try + { + observer.OnError(error); + } + catch (Exception ex) + { + ExceptionHelpers.ThrowIfFatal(ex); + } + finally + { + Terminate(); + } + } + + public void OnNext(T value) => observer.OnNext(value); + + private void Terminate() + { + if (Interlocked.Exchange(ref _terminated, 1) == 0) + { + onTerminate(); + } + } + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/Interfaces/ISubject.cs b/src/RxTelegram.Bot/Utils/Rx/Interfaces/ISubject.cs new file mode 100644 index 0000000..24db1d7 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/Interfaces/ISubject.cs @@ -0,0 +1,6 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx.Interfaces; + +public interface ISubject : IObserver, IObservable { } +public interface ISubject : ISubject { } \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs b/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs new file mode 100644 index 0000000..adc491a --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs @@ -0,0 +1,19 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +public static class ObservableOperators +{ + public static IObservable DoOnDispose(this IObservable source, Action onDispose) + => new DoOnDisposeObservable(source, onDispose); + public static IObservable DoOnSubscribe(this IObservable source, Action onSubscribe) + => new DoOnSubscribeObservable(source, onSubscribe); + public static IObservable Finally(this IObservable source, Action onTerminate) + => new FinallyObservable(source, onTerminate); + public static IObservable Select(this IObservable source, Func selector) + => new SelectObservable(source, selector); + public static IObservable Switch(this IObservable> source) + => new SwitchObservable(source); + public static IObservable Where(this IObservable source, Func predicate) + => new WhereObservable(source, predicate); +} diff --git a/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs b/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs new file mode 100644 index 0000000..155cad2 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs @@ -0,0 +1,139 @@ +using RxTelegram.Bot.Utils.Rx.Interfaces; +using System; +using System.Collections.Generic; +using System.Threading; + +namespace RxTelegram.Bot.Utils.Rx; + +public class ReactiveProperty() : ISubject, IDisposable +{ + private static readonly List> Terminated = []; + private readonly object _lock = new object(); + private Exception _error; + private T _current; + public T Current + { + get => _current; + protected set { _current = value; HasValue = true; } + } + public bool HasValue { get; private set; } + private List> _observers = new(); + public bool IsDisposed { get; private set; } + public bool IsError => _error != null; + public Exception Error => _error; + + public ReactiveProperty(T initValue) : this() + { + Current = initValue; + } + + public void OnCompleted() + { + lock (_lock) + { + ThrowIfDisposed(); + foreach (var observer in _observers) + { + observer.OnCompleted(); + } + + _observers = Terminated; + } + } + public void OnError(Exception error) + { + lock (_lock) + { + ThrowIfDisposed(); + _error = error; + Current = default; + foreach (var observer in _observers) + { + try + { + observer.OnError(error); + } + catch (Exception ex) + { + //ignore + // if observer doesn't have exception handler block + // it will lead to breaking process and throwing exception to outer scope + ExceptionHelpers.ThrowIfFatal(ex); + } + } + + _observers = Terminated; + } + } + public void OnNext(T value) + { + lock (_lock) + { + ThrowIfDisposed(); + Current = value; + + foreach (var observer in _observers) + { + observer.OnNext(value); + } + } + } + public IDisposable Subscribe(IObserver observer) + { + lock (_lock) + { + ThrowIfDisposed(); + if (_observers == Terminated) + { + observer.OnCompleted(); + return DisposableAction.Empty; + } + + _observers.Add(observer); + if (HasValue) + { + observer.OnNext(Current); + } + + return new DisposableAction(() => + { + lock (_lock) + { + _observers.Remove(observer); + } + }); + } + } + + protected void ThrowIfDisposed() + { + if (IsDisposed) + { + throw new ObjectDisposedException(nameof(ReactiveProperty)); + } + } + + ~ReactiveProperty() => Dispose(false); + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool explicitDisposing) + { + if (IsDisposed) + { + return; + } + + if (explicitDisposing) + { + OnCompleted(); + Interlocked.Exchange(ref _observers, Terminated); + } + + IsDisposed = true; + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs b/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs new file mode 100644 index 0000000..c747e2e --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs @@ -0,0 +1,39 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; +internal class SelectObservable(IObservable source, Func selector) : IObservable +{ + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Func _selector = selector ?? throw new ArgumentNullException(nameof(selector)); + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var selectObserver = new SelectObserver(observer, _selector); + return _source.Subscribe(selectObserver); + } + private sealed class SelectObserver(IObserver observer, Func selector) : IObserver + { + public void OnCompleted() => observer.OnCompleted(); + public void OnError(Exception error) => observer.OnError(error); + public void OnNext(TIn value) + { + TOut result; + try + { + result = selector(value); + } + catch (Exception ex) + { + observer.OnError(ex); + return; + } + + observer.OnNext(result); + } + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs b/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs new file mode 100644 index 0000000..5a7b4e7 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs @@ -0,0 +1,99 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class SwitchObservable(IObservable> source) : IObservable +{ + private readonly IObservable> _source = source ?? throw new ArgumentNullException(nameof(source)); + + public IDisposable Subscribe(IObserver observer) + { + var switchObserver = new SwitchObserver(observer); + var stream = _source.Subscribe(switchObserver); + return new DisposableAction(() => + { + stream.Dispose(); + switchObserver.Dispose(); + }); + } + + private sealed class SwitchObserver(IObserver observer) : IObserver>, IDisposable + { + private readonly object _lock = new(); + private IObserver _observer = observer ?? throw new ArgumentNullException(nameof(observer)); + private IDisposable _subscription; + private bool _isDisposed; + + public void OnCompleted() + { + lock (_lock) + { + if (_isDisposed) + { + return; + } + + _observer?.OnCompleted(); + Dispose(); + } + } + + public void OnError(Exception error) + { + lock (_lock) + { + if (_isDisposed) + { + return; + } + + _observer?.OnError(error); + Dispose(); + } + } + + public void OnNext(IObservable stream) + { + if (stream == null) + { + throw new ArgumentNullException(nameof(stream)); + } + + lock (_lock) + { + if (_isDisposed) + { + throw new ObjectDisposedException(nameof(SwitchObserver)); + } + + _subscription?.Dispose(); + _subscription = stream.Subscribe(_observer); + } + } + + ~SwitchObserver() => Dispose(false); + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool explicitDisposing) + { + if (_isDisposed) + { + return; + } + + if (explicitDisposing) + { + _subscription?.Dispose(); + _subscription = null; + _observer = null; + } + + _isDisposed = true; + } + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs b/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs new file mode 100644 index 0000000..3c56938 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs @@ -0,0 +1,101 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class WhereObservable(IObservable source, Func predicate) : IObservable +{ + private readonly IObservable _source = source ?? throw new ArgumentNullException(nameof(source)); + private readonly Func _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + + public IDisposable Subscribe(IObserver observer) + { + var where = new WhereObserver(observer, _predicate); + var subscription = _source.Subscribe(where); + return new DisposableAction(() => + { + subscription.Dispose(); + where.Dispose(); + }); + } + private sealed class WhereObserver(IObserver observer, Func predicate) : IObserver, IDisposable + { + private readonly object _lock = new(); + private readonly Func _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + private IObserver _observer = observer ?? throw new ArgumentNullException(nameof(observer)); + private bool _isCompleted; + private bool _isDisposed; + + public void OnCompleted() + { + lock (_lock) + { + if (_isCompleted || _isDisposed) + { + return; + } + + _isCompleted = true; + _observer?.OnCompleted(); + } + } + + public void OnError(Exception error) + { + lock (_lock) + { + if (_isCompleted || _isDisposed) + { + return; + } + + _isCompleted = true; + _observer?.OnError(error); + } + } + + public void OnNext(T value) + { + lock (_lock) + { + if (_isCompleted || _isDisposed) + { + return; + } + + var isPassed = false; + try + { + isPassed = _predicate(value); + } + catch (Exception error) + { + OnError(error); + } + if (isPassed) + { + _observer.OnNext(value); + } + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + private void Dispose(bool explicitDisposing) + { + if (_isDisposed) return; + + lock (_lock) + { + if (explicitDisposing) + { + _observer = null; + } + _isDisposed = true; + } + } + ~WhereObserver() => Dispose(false); + } +}