diff --git a/FoundationDB.Client/Core/IFdbTransactionHandler.cs b/FoundationDB.Client/Core/IFdbTransactionHandler.cs index 32255efe3..3064f830b 100644 --- a/FoundationDB.Client/Core/IFdbTransactionHandler.cs +++ b/FoundationDB.Client/Core/IFdbTransactionHandler.cs @@ -52,6 +52,7 @@ public interface IFdbTransactionHandler : IDisposable void SetOption(FdbTransactionOption option, Slice data); /// Returns this transaction snapshot read version. + /// Token used to cancel the operation from the outside, if different than the cancellation token of the transaction itself Task GetReadVersionAsync(CancellationToken ct); /// Retrieves the database version number at which a given transaction was committed. diff --git a/FoundationDB.Client/FdbTransaction.Snapshot.cs b/FoundationDB.Client/FdbTransaction.Snapshot.cs index 6f117de1e..8a77d7e1a 100644 --- a/FoundationDB.Client/FdbTransaction.Snapshot.cs +++ b/FoundationDB.Client/FdbTransaction.Snapshot.cs @@ -101,7 +101,7 @@ public Task GetAsync(Slice key) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetAsync", $"Getting value for '{key.ToString()}'"); #endif - return m_parent.m_handler.GetAsync(key, snapshot: true, ct: m_parent.m_cancellation); + return m_parent.m_handler.GetAsync(key, snapshot: true, ct: CancellationToken.None); } public Task GetValuesAsync(Slice[] keys) @@ -116,7 +116,7 @@ public Task GetValuesAsync(Slice[] keys) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetValuesAsync", $"Getting batch of {keys.Length} values ..."); #endif - return m_parent.m_handler.GetValuesAsync(keys, snapshot: true, ct: m_parent.m_cancellation); + return m_parent.m_handler.GetValuesAsync(keys, snapshot: true, ct: CancellationToken.None); } public async Task GetKeyAsync(KeySelector selector) @@ -129,7 +129,7 @@ public async Task GetKeyAsync(KeySelector selector) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetKeyAsync", $"Getting key '{selector.ToString()}'"); #endif - var key = await m_parent.m_handler.GetKeyAsync(selector, snapshot: true, ct: m_parent.m_cancellation).ConfigureAwait(false); + var key = await m_parent.m_handler.GetKeyAsync(selector, snapshot: true, ct: CancellationToken.None).ConfigureAwait(false); // don't forget to truncate keys that would fall outside of the database's globalspace ! return m_parent.m_database.BoundCheck(key); @@ -149,7 +149,7 @@ public Task GetKeysAsync(KeySelector[] selectors) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetKeysCoreAsync", $"Getting batch of {selectors.Length} keys ..."); #endif - return m_parent.m_handler.GetKeysAsync(selectors, snapshot: true, ct: m_parent.m_cancellation); + return m_parent.m_handler.GetKeysAsync(selectors, snapshot: true, ct: CancellationToken.None); } public Task GetRangeAsync(KeySelector beginInclusive, KeySelector endExclusive, FdbRangeOptions options, int iteration) @@ -165,7 +165,7 @@ public Task GetRangeAsync(KeySelector beginInclusive, KeySelector // The iteration value is only needed when in iterator mode, but then it should start from 1 if (iteration == 0) iteration = 1; - return m_parent.m_handler.GetRangeAsync(beginInclusive, endExclusive, options, iteration, snapshot: true, ct: m_parent.m_cancellation); + return m_parent.m_handler.GetRangeAsync(beginInclusive, endExclusive, options, iteration, snapshot: true, ct: CancellationToken.None); } public FdbRangeQuery> GetRange(KeySelector beginInclusive, KeySelector endExclusive, FdbRangeOptions options) @@ -178,7 +178,7 @@ public FdbRangeQuery> GetRange(KeySelector beginInclu public Task GetAddressesForKeyAsync(Slice key) { EnsureCanRead(); - return m_parent.m_handler.GetAddressesForKeyAsync(key, ct: m_parent.m_cancellation); + return m_parent.m_handler.GetAddressesForKeyAsync(key, CancellationToken.None); } void IFdbReadOnlyTransaction.Cancel() diff --git a/FoundationDB.Client/FdbTransaction.cs b/FoundationDB.Client/FdbTransaction.cs index 62a6f7327..3a72c5139 100644 --- a/FoundationDB.Client/FdbTransaction.cs +++ b/FoundationDB.Client/FdbTransaction.cs @@ -93,6 +93,9 @@ public sealed partial class FdbTransaction : IFdbTransaction /// Random token (but constant per transaction retry) used to generate incomplete VersionStamps private ulong m_versionStampToken; + /// Used to cancel the transaction if the parent CTS fires + private CancellationTokenRegistration m_ctr; + #endregion #region Constructors... @@ -111,6 +114,15 @@ internal FdbTransaction(FdbDatabase db, FdbOperationContext context, int id, IFd m_readOnly = (mode & FdbTransactionMode.ReadOnly) != 0; m_handler = handler; + + if (m_cancellation.IsCancellationRequested) + { // already dead? + Cancel(explicitly: false); + } + else + { + m_ctr = m_cancellation.RegisterWithoutEC(CancellationHandler, this); + } } #endregion @@ -263,7 +275,7 @@ public Task GetReadVersionAsync() // can be called after the transaction has been committed EnsureCanRetry(); - return m_handler.GetReadVersionAsync(m_cancellation); + return m_handler.GetReadVersionAsync(CancellationToken.None); } /// Retrieves the database version number at which a given transaction was committed. @@ -386,7 +398,7 @@ public Task GetAsync(Slice key) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetAsync", $"Getting value for '{key.ToString()}'"); #endif - return m_handler.GetAsync(key, snapshot: false, ct: m_cancellation); + return m_handler.GetAsync(key, snapshot: false, ct: CancellationToken.None); } #endregion @@ -409,7 +421,7 @@ public Task GetValuesAsync(Slice[] keys) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetValuesAsync", $"Getting batch of {keys.Length} values ..."); #endif - return m_handler.GetValuesAsync(keys, snapshot: false, ct: m_cancellation); + return m_handler.GetValuesAsync(keys, snapshot: false, ct: CancellationToken.None); } #endregion @@ -439,7 +451,7 @@ public Task GetRangeAsync(KeySelector beginInclusive, KeySelector // The iteration value is only needed when in iterator mode, but then it should start from 1 if (iteration == 0) iteration = 1; - return m_handler.GetRangeAsync(beginInclusive, endExclusive, options, iteration, snapshot: false, ct: m_cancellation); + return m_handler.GetRangeAsync(beginInclusive, endExclusive, options, iteration, snapshot: false, ct: CancellationToken.None); } #endregion @@ -492,7 +504,7 @@ public async Task GetKeyAsync(KeySelector selector) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetKeyAsync", $"Getting key '{selector.ToString()}'"); #endif - var key = await m_handler.GetKeyAsync(selector, snapshot: false, ct: m_cancellation).ConfigureAwait(false); + var key = await m_handler.GetKeyAsync(selector, snapshot: false, ct: CancellationToken.None).ConfigureAwait(false); // don't forget to truncate keys that would fall outside of the database's globalspace ! return m_database.BoundCheck(key); @@ -520,7 +532,7 @@ public Task GetKeysAsync(KeySelector[] selectors) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetKeysAsync", $"Getting batch of {selectors.Length} keys ..."); #endif - return m_handler.GetKeysAsync(selectors, snapshot: false, ct: m_cancellation); + return m_handler.GetKeysAsync(selectors, snapshot: false, ct: CancellationToken.None); } #endregion @@ -727,7 +739,7 @@ public Task GetAddressesForKeyAsync(Slice key) if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetAddressesForKeyAsync", $"Getting addresses for key '{FdbKey.Dump(key)}'"); #endif - return m_handler.GetAddressesForKeyAsync(key, ct: m_cancellation); + return m_handler.GetAddressesForKeyAsync(key, CancellationToken.None); } #endregion @@ -750,7 +762,7 @@ public async Task CommitAsync() //TODO: need a STATE_COMMITTING ? try { - await m_handler.CommitAsync(m_cancellation).ConfigureAwait(false); + await m_handler.CommitAsync(CancellationToken.None).ConfigureAwait(false); if (Interlocked.CompareExchange(ref m_state, STATE_COMMITTED, STATE_READY) == STATE_READY) { @@ -817,7 +829,7 @@ public async Task OnErrorAsync(FdbError code) { EnsureCanRetry(); - await m_handler.OnErrorAsync(code, ct: m_cancellation).ConfigureAwait(false); + await m_handler.OnErrorAsync(code, CancellationToken.None).ConfigureAwait(false); // If fdb_transaction_on_error succeeds, that means that the transaction has been reset and is usable again var state = this.State; @@ -875,19 +887,47 @@ public void Reset() /// Rollback this transaction, and dispose it. It should not be used after that. public void Cancel() + { + Cancel(explicitly: true); + } + + private void Cancel(bool explicitly) { var state = Interlocked.CompareExchange(ref m_state, STATE_CANCELED, STATE_READY); if (state != STATE_READY) { - switch(state) + if (explicitly) { - case STATE_CANCELED: return; // already the case ! + switch (state) + { + case STATE_CANCELED: + { + return; // already the case! + } + case STATE_COMMITTED: + { + throw new InvalidOperationException("Cannot cancel transaction that has already been committed"); + } + case STATE_FAILED: + { + throw new InvalidOperationException("Cannot cancel transaction because it is in a failed state"); + } + case STATE_DISPOSED: + { + throw new ObjectDisposedException("FdbTransaction", "Cannot cancel transaction because it already has been disposed"); + } + default: + { + throw new InvalidOperationException(String.Format("Cannot cancel transaction because it is in unknown state {0}", state)); + } + } + } - case STATE_COMMITTED: throw new InvalidOperationException("Cannot cancel transaction that has already been committed"); - case STATE_FAILED: throw new InvalidOperationException("Cannot cancel transaction because it is in a failed state"); - case STATE_DISPOSED: throw new ObjectDisposedException("FdbTransaction", "Cannot cancel transaction because it already has been disposed"); - default: throw new InvalidOperationException($"Cannot cancel transaction because it is in unknown state {state}"); + if (state == STATE_CANCELED || state == STATE_DISPOSED) + { // it's too late + return; } + } if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "Cancel", "Canceling transaction..."); @@ -897,6 +937,16 @@ public void Cancel() if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "Cancel", "Transaction has been canceled"); } + private static readonly Action CancellationHandler = CancellationCallback; + + /// Handler called when the cancellation source of the transaction fires + private static void CancellationCallback(object state) + { + Contract.Requires(state != null); + var trans = (FdbTransaction) state; + trans.Cancel(explicitly: false); + } + #endregion #region IDisposable... @@ -1014,6 +1064,7 @@ public void Dispose() { try { + m_ctr.Dispose(); this.Database.UnregisterTransaction(this); m_cts.SafeCancelAndDispose(); diff --git a/FoundationDB.Client/FdbWatch.cs b/FoundationDB.Client/FdbWatch.cs index 95ff94606..0310b2266 100644 --- a/FoundationDB.Client/FdbWatch.cs +++ b/FoundationDB.Client/FdbWatch.cs @@ -82,10 +82,13 @@ public TaskAwaiter GetAwaiter() if (m_future != null) { +#if REFACTORING_IN_PROGRESS if (m_future.HasFlag(FdbFuture.Flags.DISPOSED)) { throw new ObjectDisposedException("Cannot await a watch that has already been disposed"); } + +#endif return m_future.Task.GetAwaiter(); } throw new InvalidOperationException("Cannot await an empty watch"); @@ -100,7 +103,11 @@ public void Cancel() /// Dispose the resources allocated by the watch. public void Dispose() { - m_future?.Dispose(); + if (m_future != null) + { + //TODO: what should be do? (=> cancel the future?) + //m_future.Dispose(); + } } public override string ToString() diff --git a/FoundationDB.Client/Native/FdbFuture.cs b/FoundationDB.Client/Native/FdbFuture.cs deleted file mode 100644 index af24f95c8..000000000 --- a/FoundationDB.Client/Native/FdbFuture.cs +++ /dev/null @@ -1,516 +0,0 @@ -#region BSD Licence -/* Copyright (c) 2013-2018, Doxense SAS -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - * Neither the name of Doxense nor the - names of its contributors may be used to endorse or promote products - derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY -DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -#endregion - -// enable this to help debug Futures -//#define DEBUG_FUTURES - -namespace FoundationDB.Client.Native -{ - using System; - using System.Collections.Concurrent; - using System.Collections.Generic; - using System.Diagnostics; - using System.Runtime.CompilerServices; - using System.Threading; - using System.Threading.Tasks; - using Doxense.Diagnostics.Contracts; - using FoundationDB.Client.Utils; - using JetBrains.Annotations; - - /// Helper class to create FDBFutures - internal static class FdbFuture - { - - public static class Flags - { - /// The future has completed (either success or failure) - public const int COMPLETED = 1; - - /// A completion/failure/cancellation has been posted on the thread pool - public const int HAS_POSTED_ASYNC_COMPLETION = 2; - - /// The future has been cancelled from an external source (manually, or via then CancellationTokeb) - public const int CANCELLED = 4; - - /// The resources allocated by this future have been released - public const int MEMORY_RELEASED = 8; - - /// The future has been constructed, and is listening for the callbacks - public const int READY = 64; - - /// Dispose has been called - public const int DISPOSED = 128; - } - - /// Create a new from an FDBFuture* pointer - /// Type of the result of the task - /// FDBFuture* pointer - /// Func that will be called to get the result once the future completes (and did not fail) - /// Optional cancellation token that can be used to cancel the future - /// Object that tracks the execution of the FDBFuture handle - [NotNull] - public static FdbFutureSingle FromHandle([NotNull] FutureHandle handle, [NotNull] Func selector, CancellationToken ct) - { - return new FdbFutureSingle(handle, selector, ct); - } - - /// Create a new from an array of FDBFuture* pointers - /// Type of the items of the arrayreturn by the task - /// Array of FDBFuture* pointers - /// Func that will be called for each future that complete (and did not fail) - /// Optional cancellation token that can be used to cancel the future - /// Object that tracks the execution of all the FDBFuture handles - [NotNull] - public static FdbFutureArray FromHandleArray([NotNull] FutureHandle[] handles, [NotNull] Func selector, CancellationToken ct) - { - return new FdbFutureArray(handles, selector, ct); - } - - /// Wrap a FDBFuture* pointer into a - /// Type of the result of the task - /// FDBFuture* pointer - /// Lambda that will be called once the future completes successfully, to extract the result from the future handle. - /// Optional cancellation token that can be used to cancel the future - /// Task that will either return the result of the continuation lambda, or an exception - public static Task CreateTaskFromHandle([NotNull] FutureHandle handle, [NotNull] Func continuation, CancellationToken ct) - { - return new FdbFutureSingle(handle, continuation, ct).Task; - } - - /// Wrap multiple handles into a single that returns an array of T - /// Type of the result of the task - /// Array of FDBFuture* pointers - /// Lambda that will be called once for each future that completes successfully, to extract the result from the future handle. - /// Optional cancellation token that can be used to cancel the future - /// Task that will either return all the results of the continuation lambdas, or an exception - /// If at least one future fails, the whole task will fail. - [ItemNotNull] - public static Task CreateTaskFromHandleArray([NotNull] FutureHandle[] handles, [NotNull] Func continuation, CancellationToken ct) - { - // Special case, because FdbFutureArray does not support empty arrays - //TODO: technically, there is no reason why FdbFutureArray would not accept an empty array. We should simplify this by handling the case in the ctor (we are already allocating something anyway...) - if (handles.Length == 0) return Task.FromResult(new T[0]); - - return new FdbFutureArray(handles, continuation, ct).Task; - } - - } - - /// Base class for all FDBFuture wrappers - /// Type of the Task's result - [DebuggerDisplay("Flags={m_flags}, State={this.Task.Status}")] - internal abstract class FdbFuture : TaskCompletionSource, IDisposable - { - - #region Private Members... - - /// Flags of the future (bit field of FLAG_xxx values) - private int m_flags; - - /// Future key in the callback dictionary - protected IntPtr m_key; - - /// Optionnal registration on the parent Cancellation Token - /// Is only valid if FLAG_HAS_CTR is set - protected CancellationTokenRegistration m_ctr; - - #endregion - - #region State Management... - - internal bool HasFlag(int flag) - { - return (Volatile.Read(ref m_flags) & flag) == flag; - } - - internal bool HasAnyFlags(int flags) - { - return (Volatile.Read(ref m_flags) & flags) != 0; - } - - protected void SetFlag(int flag) - { - var flags = m_flags; - Interlocked.MemoryBarrier(); - m_flags = flags | flag; - } - - protected bool TrySetFlag(int flag) - { - var wait = new SpinWait(); - while (true) - { - var flags = Volatile.Read(ref m_flags); - if ((flags & flag) != 0) - { - return false; - } - if (Interlocked.CompareExchange(ref m_flags, flags | flag, flags) == flags) - { - return true; - } - wait.SpinOnce(); - } - } - - protected bool TryCleanup() - { - // We try to cleanup the future handle as soon as possible, meaning as soon as we have the result, or an error, or a cancellation - - if (TrySetFlag(FdbFuture.Flags.COMPLETED)) - { - DoCleanup(); - return true; - } - return false; - } - - private void DoCleanup() - { - try - { - // unsubscribe from the parent cancellation token if there was one - UnregisterCancellationRegistration(); - - // ensure that the task always complete ! - // note: always defer the completion on the threadpool, because we don't want to dead lock here (we can be called by Dispose) - if (!this.Task.IsCompleted && TrySetFlag(FdbFuture.Flags.HAS_POSTED_ASYNC_COMPLETION)) - { - PostCancellationOnThreadPool(this); - } - - // The only surviving value after this would be a Task and an optional WorkItem on the ThreadPool that will signal it... - } - finally - { - CloseHandles(); - } - } - - /// Close all the handles managed by this future - protected abstract void CloseHandles(); - - /// Cancel all the handles managed by this future - protected abstract void CancelHandles(); - - /// Release all memory allocated by this future - protected abstract void ReleaseMemory(); - - /// Set the result of this future - /// Result of the future - /// If true, called from the network thread callback and will defer the operation on the ThreadPool. If false, may run the continuations inline. - protected void SetResult(T result, bool fromCallback) - { - if (!fromCallback) - { - this.TrySetResult(result); - } - else if (TrySetFlag(FdbFuture.Flags.HAS_POSTED_ASYNC_COMPLETION)) - { - PostCompletionOnThreadPool(this, result); - } - } - - /// Fault the future's Task - /// Error that will be the result of the task - /// If true, called from the network thread callback and will defer the operation on the ThreadPool. If false, may run the continuations inline. - protected void SetFaulted(Exception e, bool fromCallback) - { - if (!fromCallback) - { - this.TrySetException(e); - } - else if (TrySetFlag(FdbFuture.Flags.HAS_POSTED_ASYNC_COMPLETION)) - { - PostFailureOnThreadPool(this, e); - } - } - - /// Fault the future's Task - /// Error that will be the result of the task - /// If true, called from the network thread callback and will defer the operation on the ThreadPool. If false, may run the continuations inline. - protected void SetFaulted(IEnumerable errors, bool fromCallback) - { - if (!fromCallback) - { - this.TrySetException(errors); - } - else if (TrySetFlag(FdbFuture.Flags.HAS_POSTED_ASYNC_COMPLETION)) - { - PostFailureOnThreadPool(this, errors); - } - } - - /// Cancel the future's Task - /// If true, called from the network thread callback and will defer the operation on the ThreadPool. If false, may run the continuations inline. - protected void SetCanceled(bool fromCallback) - { - if (!fromCallback) - { - this.TrySetCanceled(); - } - else if (TrySetFlag(FdbFuture.Flags.HAS_POSTED_ASYNC_COMPLETION)) - { - PostCancellationOnThreadPool(this); - } - } - - /// Defer setting the result of a TaskCompletionSource on the ThreadPool - private static void PostCompletionOnThreadPool(TaskCompletionSource future, T result) - { - ThreadPool.UnsafeQueueUserWorkItem( - (_state) => - { - var prms = (Tuple, T>)_state; - prms.Item1.TrySetResult(prms.Item2); - }, - Tuple.Create(future, result) - ); - } - - /// Defer failing a TaskCompletionSource on the ThreadPool - private static void PostFailureOnThreadPool(TaskCompletionSource future, Exception error) - { - ThreadPool.UnsafeQueueUserWorkItem( - (_state) => - { - var prms = (Tuple, Exception>)_state; - prms.Item1.TrySetException(prms.Item2); - }, - Tuple.Create(future, error) - ); - } - - /// Defer failing a TaskCompletionSource on the ThreadPool - private static void PostFailureOnThreadPool(TaskCompletionSource future, IEnumerable errors) - { - ThreadPool.UnsafeQueueUserWorkItem( - (_state) => - { - var prms = (Tuple, IEnumerable>)_state; - prms.Item1.TrySetException(prms.Item2); - }, - Tuple.Create(future, errors) - ); - } - - /// Defer cancelling a TaskCompletionSource on the ThreadPool - private static void PostCancellationOnThreadPool(TaskCompletionSource future) - { - ThreadPool.UnsafeQueueUserWorkItem( - (_state) => ((TaskCompletionSource)_state).TrySetCanceled(), - future - ); - } - - #endregion - - #region Callbacks... - - /// List of all pending futures that have not yet completed - private static readonly ConcurrentDictionary> s_futures = new ConcurrentDictionary>(); - - /// Internal counter to generated a unique parameter value for each futures - private static long s_futureCounter; - - /// Register a future in the callback context and return the corresponding callback parameter - /// Future instance - /// Parameter that can be passed to FutureSetCallback and that uniquely identify this future. - /// The caller MUST call ClearCallbackHandler to ensure that the future instance is removed from the list - internal static IntPtr RegisterCallback([NotNull] FdbFuture future) - { - Contract.Requires(future != null); - - // generate a new unique id for this future, that will be use to lookup the future instance in the callback handler - long id = Interlocked.Increment(ref s_futureCounter); - var prm = new IntPtr(id); // note: we assume that we can only run in 64-bit mode, so it is safe to cast a long into an IntPtr - // critical region - try { } - finally - { - Volatile.Write(ref future.m_key, prm); -#if DEBUG_FUTURES - Contract.Assert(!s_futures.ContainsKey(prm)); -#endif - s_futures[prm.ToInt64()] = future; - Interlocked.Increment(ref DebugCounters.CallbackHandlesTotal); - Interlocked.Increment(ref DebugCounters.CallbackHandles); - } - return prm; - } - - /// Remove a future from the callback handler dictionary - /// Future that has just completed, or is being destroyed - internal static void UnregisterCallback([NotNull] FdbFuture future) - { - Contract.Requires(future != null); - - // critical region - try - { } - finally - { - var key = Interlocked.Exchange(ref future.m_key, IntPtr.Zero); - if (key != IntPtr.Zero) - { - FdbFuture _; - if (s_futures.TryRemove(key.ToInt64(), out _)) - { - Interlocked.Decrement(ref DebugCounters.CallbackHandles); - } - } - } - } - - internal static FdbFuture GetFutureFromCallbackParameter(IntPtr parameter) - { - FdbFuture future; - if (s_futures.TryGetValue(parameter.ToInt64(), out future)) - { - if (future != null && Volatile.Read(ref future.m_key) == parameter) - { - return future; - } -#if DEBUG_FUTURES - // If you breakpoint here, that means that a future callback fired but was not able to find a matching registration - // => either the FdbFuture was incorrectly disposed, or there is some problem in the callback dictionary - if (System.Diagnostics.Debugger.IsAttached) System.Diagnostics.Debugger.Break(); -#endif - } - return null; - } - - #endregion - - #region Cancellation... - - protected void RegisterForCancellation(CancellationToken ct) - { - //note: if the token is already cancelled, the callback handler will run inline and any exception would bubble up here - //=> this is not a problem because the ctor already has a try/catch that will clean up everything - m_ctr = ct.Register( - (_state) => { CancellationHandler(_state); }, - this, - false - ); - } - - protected void UnregisterCancellationRegistration() - { - // unsubscribe from the parent cancellation token if there was one - m_ctr.Dispose(); - m_ctr = default; - } - - private static void CancellationHandler(object state) - { - if (state is FdbFuture future) - { -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + ">.Cancel(0x" + future.m_handle.Handle.ToString("x") + ") was called on thread #" + Thread.CurrentThread.ManagedThreadId.ToString()); -#endif - future.Cancel(); - } - } - - #endregion - - /// Return true if the future has completed (successfully or not) - public bool IsReady => this.Task.IsCompleted; - - /// Make the Future awaitable - public TaskAwaiter GetAwaiter() - { - return this.Task.GetAwaiter(); - } - - /// Try to abort the task (if it is still running) - public void Cancel() - { - if (HasAnyFlags(FdbFuture.Flags.DISPOSED | FdbFuture.Flags.COMPLETED | FdbFuture.Flags.CANCELLED)) - { - return; - } - - if (TrySetFlag(FdbFuture.Flags.CANCELLED)) - { - bool fromCallback = Fdb.IsNetworkThread; - try - { - if (!this.Task.IsCompleted) - { - CancelHandles(); - SetCanceled(fromCallback); - } - } - finally - { - TryCleanup(); - } - } - } - - /// Free memory allocated by this future after it has completed. - /// This method provides no benefit to most application code, and should only be called when attempting to write thread-safe custom layers. - public void Clear() - { - if (HasFlag(FdbFuture.Flags.DISPOSED)) - { - return; - } - - if (!this.Task.IsCompleted) - { - throw new InvalidOperationException("Cannot release memory allocated by a future that has not yet completed"); - } - - if (TrySetFlag(FdbFuture.Flags.MEMORY_RELEASED)) - { - ReleaseMemory(); - } - } - - public void Dispose() - { - if (TrySetFlag(FdbFuture.Flags.DISPOSED)) - { - try - { - TryCleanup(); - } - finally - { - if (Volatile.Read(ref m_key) != IntPtr.Zero) UnregisterCallback(this); - } - } - GC.SuppressFinalize(this); - } - - } - -} diff --git a/FoundationDB.Client/Native/FdbFutureArray.cs b/FoundationDB.Client/Native/FdbFutureArray.cs deleted file mode 100644 index 5546179be..000000000 --- a/FoundationDB.Client/Native/FdbFutureArray.cs +++ /dev/null @@ -1,323 +0,0 @@ -#region BSD Licence -/* Copyright (c) 2013-2018, Doxense SAS -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - * Neither the name of Doxense nor the - names of its contributors may be used to endorse or promote products - derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY -DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -#endregion - -namespace FoundationDB.Client.Native -{ - using JetBrains.Annotations; - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Threading; - - /// FDBFuture[] wrapper - /// Type of result - internal sealed class FdbFutureArray : FdbFuture - { - // Wraps several FDBFuture* handles and return all the results at once - - #region Private Members... - - /// Value of the 'FDBFuture*' - private readonly FutureHandle[] m_handles; - - /// Counter of callbacks that still need to fire. - private int m_pending; - - /// Lambda used to extract the result of this FDBFuture - private readonly Func m_resultSelector; - - #endregion - - #region Constructors... - - internal FdbFutureArray([NotNull] FutureHandle[] handles, [NotNull] Func selector, CancellationToken ct) - { - if (handles == null) throw new ArgumentNullException(nameof(handles)); - if (handles.Length == 0) throw new ArgumentException("Handle array cannot be empty", nameof(handles)); - if (selector == null) throw new ArgumentNullException(nameof(selector)); - - m_handles = handles; - m_resultSelector = selector; - - bool abortAllHandles = false; - - try - { - if (ct.IsCancellationRequested) - { // already cancelled, we must abort everything - - SetFlag(FdbFuture.Flags.COMPLETED); - abortAllHandles = true; - m_resultSelector = null; - this.TrySetCanceled(); - return; - } - - // add this instance to the list of pending futures - var prm = RegisterCallback(this); - - foreach (var handle in handles) - { - - if (FdbNative.FutureIsReady(handle)) - { // this handle is already done - continue; - } - - Interlocked.Increment(ref m_pending); - - // register the callback handler - var err = FdbNative.FutureSetCallback(handle, CallbackHandler, prm); - if (Fdb.Failed(err)) - { // uhoh - Debug.WriteLine("Failed to set callback for Future<" + typeof(T).Name + "> 0x" + handle.Handle.ToString("x") + " !!!"); - throw Fdb.MapToException(err); - } - } - - // allow the callbacks to handle completion - TrySetFlag(FdbFuture.Flags.READY); - - if (Volatile.Read(ref m_pending) == 0) - { // all callbacks have already fired (or all handles were already completed) - UnregisterCallback(this); - HandleCompletion(fromCallback: false); - m_resultSelector = null; - abortAllHandles = true; - SetFlag(FdbFuture.Flags.COMPLETED); - } - else if (ct.CanBeCanceled) - { // register for cancellation (if needed) - RegisterForCancellation(ct); - } - } - catch - { - // this is bad news, since we are in the constructor, we need to clear everything - SetFlag(FdbFuture.Flags.DISPOSED); - - UnregisterCancellationRegistration(); - - UnregisterCallback(this); - - abortAllHandles = true; - - // this is technically not needed, but just to be safe... - this.TrySetCanceled(); - - throw; - } - finally - { - if (abortAllHandles) - { - CloseHandles(handles); - } - } - GC.KeepAlive(this); - } - - #endregion - - protected override void CloseHandles() - { - CloseHandles(m_handles); - } - - protected override void CancelHandles() - { - CancelHandles(m_handles); - } - - protected override void ReleaseMemory() - { - var handles = m_handles; - if (handles != null) - { - foreach (var handle in handles) - { - if (handle != null && !handle.IsClosed && !handle.IsInvalid) - { - //REVIEW: there is a possibility of a race condition with Dispoe() that could potentially call FutureDestroy(handle) at the same time (not verified) - FdbNative.FutureReleaseMemory(handle); - } - } - } - } - - private static void CloseHandles(FutureHandle[] handles) - { - if (handles != null) - { - foreach (var handle in handles) - { - if (handle != null) - { - //note: Dispose() will be a no-op if already called - handle.Dispose(); - } - } - } - } - - private static void CancelHandles(FutureHandle[] handles) - { - if (handles != null) - { - foreach (var handle in handles) - { - if (handle != null && !handle.IsClosed && !handle.IsInvalid) - { - //REVIEW: there is a possibility of a race condition with Dispoe() that could potentially call FutureDestroy(handle) at the same time (not verified) - FdbNative.FutureCancel(handle); - } - } - } - } - - /// Cached delegate of the future completion callback handler - private static readonly FdbNative.FdbFutureCallback CallbackHandler = FutureCompletionCallback; - - /// Handler called when a FDBFuture becomes ready - /// Handle on the future that became ready - /// Paramter to the callback (unused) - private static void FutureCompletionCallback(IntPtr futureHandle, IntPtr parameter) - { -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + ">.Callback(0x" + futureHandle.ToString("x") + ", " + parameter.ToString("x") + ") has fired on thread #" + Thread.CurrentThread.ManagedThreadId.ToString()); -#endif - - var future = (FdbFutureArray)GetFutureFromCallbackParameter(parameter); - - if (future != null && Interlocked.Decrement(ref future.m_pending) == 0) - { // the last future handle has fired, we can proceed to read all the results - - if (future.HasFlag(FdbFuture.Flags.READY)) - { - UnregisterCallback(future); - try - { - future.HandleCompletion(fromCallback: true); - } - catch(Exception) - { - //TODO ? - } - } - // else, the ctor will handle that - } - } - - /// Update the Task with the state of a ready Future - /// If true, the method is called from the network thread and must defer the continuations from the Thread Pool - /// True if we got a result, or false in case of error (or invalid state) - private void HandleCompletion(bool fromCallback) - { - if (HasAnyFlags(FdbFuture.Flags.DISPOSED | FdbFuture.Flags.COMPLETED)) - { - return; - } - -#if DEBUG_FUTURES - Debug.WriteLine("FutureArray<" + typeof(T).Name + ">.Callback(...) handling completion on thread #" + Thread.CurrentThread.ManagedThreadId.ToString()); -#endif - - try - { - UnregisterCancellationRegistration(); - - List errors = null; - bool cancellation = false; - var selector = m_resultSelector; - - var results = selector != null ? new T[m_handles.Length] : null; - - for (int i = 0; i < m_handles.Length; i++) - { - var handle = m_handles[i]; - - if (handle != null && !handle.IsClosed && !handle.IsInvalid) - { - FdbError err = FdbNative.FutureGetError(handle); - if (Fdb.Failed(err)) - { // it failed... - if (err != FdbError.OperationCancelled) - { // get the exception from the error code - var ex = Fdb.MapToException(err); - (errors ?? (errors = new List())).Add(ex); - } - else - { - cancellation = true; - break; - } - } - else - { // it succeeded... - // try to get the result... - if (selector != null) - { - //note: result selector will execute from network thread, but this should be our own code that only calls into some fdb_future_get_XXXX(), which should be safe... - results[i] = selector(handle); - } - } - } - } - - if (cancellation) - { // the transaction has been cancelled - SetCanceled(fromCallback); - } - else if (errors != null) - { // there was at least one error - SetFaulted(errors, fromCallback); - } - else - { // success - SetResult(results, fromCallback); - } - - } - catch (Exception e) - { // something went wrong - if (e is ThreadAbortException) - { - SetCanceled(fromCallback); - throw; - } - SetFaulted(e, fromCallback); - } - finally - { - TryCleanup(); - } - } - - } - -} diff --git a/FoundationDB.Client/Native/FdbFutureSingle.cs b/FoundationDB.Client/Native/FdbFutureSingle.cs deleted file mode 100644 index 7982fba29..000000000 --- a/FoundationDB.Client/Native/FdbFutureSingle.cs +++ /dev/null @@ -1,266 +0,0 @@ -#region BSD Licence -/* Copyright (c) 2013-2018, Doxense SAS -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - * Neither the name of Doxense nor the - names of its contributors may be used to endorse or promote products - derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY -DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -#endregion - -//#define DEBUG_FUTURES - -namespace FoundationDB.Client.Native -{ - using System; - using System.Threading; - using JetBrains.Annotations; - - /// FDBFuture wrapper - /// Type of result - internal sealed class FdbFutureSingle : FdbFuture - { - #region Private Members... - - /// Value of the 'FDBFuture*' - private readonly FutureHandle m_handle; - - /// Lambda used to extract the result of this FDBFuture - private readonly Func m_resultSelector; - - #endregion - - #region Constructors... - - internal FdbFutureSingle([NotNull] FutureHandle handle, [NotNull] Func selector, CancellationToken ct) - { - if (handle == null) throw new ArgumentNullException(nameof(handle)); - if (selector == null) throw new ArgumentNullException(nameof(selector)); - - m_handle = handle; - m_resultSelector = selector; - - try - { - - if (handle.IsInvalid) - { // it's dead, Jim ! - SetFlag(FdbFuture.Flags.COMPLETED); - m_resultSelector = null; - return; - } - - if (FdbNative.FutureIsReady(handle)) - { // either got a value or an error -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + "> 0x" + handle.Handle.ToString("x") + " was already ready"); -#endif - HandleCompletion(fromCallback: false); -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + "> 0x" + handle.Handle.ToString("x") + " completed inline"); -#endif - return; - } - - // register for cancellation (if needed) - if (ct.CanBeCanceled) - { - if (ct.IsCancellationRequested) - { // we have already been cancelled - -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + "> 0x" + handle.Handle.ToString("x") + " will complete later"); -#endif - - // Abort the future and simulate a Canceled task - SetFlag(FdbFuture.Flags.COMPLETED); - // note: we don't need to call fdb_future_cancel because fdb_future_destroy will take care of everything - handle.Dispose(); - // also, don't keep a reference on the callback because it won't be needed - m_resultSelector = null; - this.TrySetCanceled(); - return; - } - - // token still active - RegisterForCancellation(ct); - } - -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + "> 0x" + handle.Handle.ToString("x") + " will complete later"); -#endif - - TrySetFlag(FdbFuture.Flags.READY); - - // add this instance to the list of pending futures - var prm = RegisterCallback(this); - - // register the callback handler - var err = FdbNative.FutureSetCallback(handle, CallbackHandler, prm); - if (Fdb.Failed(err)) - { // uhoh -#if DEBUG_FUTURES - Debug.WriteLine("Failed to set callback for Future<" + typeof(T).Name + "> 0x" + handle.Handle.ToString("x") + " !!!"); -#endif - throw Fdb.MapToException(err); - } - } - catch - { - // this is bad news, since we are in the constructor, we need to clear everything - SetFlag(FdbFuture.Flags.DISPOSED); - UnregisterCancellationRegistration(); - UnregisterCallback(this); - - // kill the future handle - m_handle.Dispose(); - - // this is technically not needed, but just to be safe... - this.TrySetCanceled(); - - throw; - } - GC.KeepAlive(this); - } - - #endregion - - /// Cached delegate of the future completion callback handler - private static readonly FdbNative.FdbFutureCallback CallbackHandler = FutureCompletionCallback; - - /// Handler called when a FDBFuture becomes ready - /// Handle on the future that became ready - /// Paramter to the callback (unused) - private static void FutureCompletionCallback(IntPtr futureHandle, IntPtr parameter) - { -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + ">.Callback(0x" + futureHandle.ToString("x") + ", " + parameter.ToString("x") + ") has fired on thread #" + Thread.CurrentThread.ManagedThreadId.ToString()); -#endif - - var future = (FdbFutureSingle)GetFutureFromCallbackParameter(parameter); - if (future != null) - { - UnregisterCallback(future); - future.HandleCompletion(fromCallback: true); - } - } - - /// Update the Task with the state of a ready Future - /// If true, we are called from the network thread - /// True if we got a result, or false in case of error (or invalid state) - private void HandleCompletion(bool fromCallback) - { - // note: if fromCallback is true, we are running on the network thread - // this means that we have to signal the TCS from the threadpool, if not continuations on the task may run inline. - // this is very frequent when we are called with await, or ContinueWith(..., TaskContinuationOptions.ExecuteSynchronously) - - if (HasAnyFlags(FdbFuture.Flags.DISPOSED | FdbFuture.Flags.COMPLETED)) - { - return; - } - -#if DEBUG_FUTURES - var sw = Stopwatch.StartNew(); -#endif - try - { - var handle = m_handle; - if (handle != null && !handle.IsClosed && !handle.IsInvalid) - { - UnregisterCancellationRegistration(); - - FdbError err = FdbNative.FutureGetError(handle); - if (Fdb.Failed(err)) - { // it failed... -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + "> has FAILED: " + err); -#endif - if (err != FdbError.OperationCancelled) - { // get the exception from the error code - var ex = Fdb.MapToException(err); - SetFaulted(ex, fromCallback); - return; - } - //else: will be handle below - } - else - { // it succeeded... - // try to get the result... -#if DEBUG_FUTURES - Debug.WriteLine("Future<" + typeof(T).Name + "> has completed successfully"); -#endif - var selector = m_resultSelector; - if (selector != null) - { - //note: result selector will execute from network thread, but this should be our own code that only calls into some fdb_future_get_XXXX(), which should be safe... - var result = selector(handle); - SetResult(result, fromCallback); - return; - } - //else: it will be handled below - } - } - - // most probably the future was cancelled or we are shutting down... - SetCanceled(fromCallback); - } - catch (Exception e) - { // something went wrong - if (e is ThreadAbortException) - { - SetCanceled(fromCallback); - throw; - } - SetFaulted(e, fromCallback); - } - finally - { -#if DEBUG_FUTURES - sw.Stop(); - Debug.WriteLine("Future<" + typeof(T).Name + "> callback completed in " + sw.Elapsed.TotalMilliseconds.ToString() + " ms"); -#endif - TryCleanup(); - } - } - - protected override void CloseHandles() - { - var handle = m_handle; - if (handle != null) handle.Dispose(); - } - - protected override void CancelHandles() - { - var handle = m_handle; - //REVIEW: there is a possibility of a race condition with Dispose() that could potentially call FutureDestroy(handle) at the same time (not verified) - if (handle != null && !handle.IsClosed && !handle.IsInvalid) FdbNative.FutureCancel(handle); - } - - protected override void ReleaseMemory() - { - var handle = m_handle; - //REVIEW: there is a possibility of a race condition with Dispose() that could potentially call FutureDestroy(handle) at the same time (not verified) - if (handle != null && !handle.IsClosed && !handle.IsInvalid) FdbNative.FutureReleaseMemory(handle); - } - - } - -} diff --git a/FoundationDB.Client/Native/FdbNative.cs b/FoundationDB.Client/Native/FdbNative.cs index e7dea3a69..320fa48f4 100644 --- a/FoundationDB.Client/Native/FdbNative.cs +++ b/FoundationDB.Client/Native/FdbNative.cs @@ -34,10 +34,14 @@ namespace FoundationDB.Client.Native using System; using System.Collections.Generic; using System.IO; + using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Text; + using System.Threading; + using System.Threading.Tasks; using Doxense.Diagnostics.Contracts; + using FoundationDB.Client.Core; internal static unsafe class FdbNative { @@ -94,7 +98,7 @@ internal static class NativeMethods // Cluster [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl, CharSet = CharSet.Ansi, BestFitMapping = false, ThrowOnUnmappableChar = true)] - public static extern FutureHandle fdb_create_cluster([MarshalAs(UnmanagedType.LPStr)] string clusterFilePath); + public static extern IntPtr fdb_create_cluster([MarshalAs(UnmanagedType.LPStr)] string clusterFilePath); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] public static extern void fdb_cluster_destroy(IntPtr cluster); @@ -103,7 +107,7 @@ internal static class NativeMethods public static extern FdbError fdb_cluster_set_option(ClusterHandle cluster, FdbClusterOption option, byte* value, int valueLength); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl, CharSet = CharSet.Ansi, BestFitMapping = false, ThrowOnUnmappableChar = true)] - public static extern FutureHandle fdb_cluster_create_database(ClusterHandle cluster, [MarshalAs(UnmanagedType.LPStr)] string dbName, int dbNameLength); + public static extern IntPtr fdb_cluster_create_database(ClusterHandle cluster, [MarshalAs(UnmanagedType.LPStr)] string dbName, int dbNameLength); // Database @@ -128,19 +132,19 @@ internal static class NativeMethods public static extern void fdb_transaction_set_read_version(TransactionHandle handle, long version); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_get_read_version(TransactionHandle transaction); + public static extern IntPtr fdb_transaction_get_read_version(TransactionHandle transaction); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_get(TransactionHandle transaction, byte* keyName, int keyNameLength, bool snapshot); + public static extern IntPtr fdb_transaction_get(TransactionHandle transaction, byte* keyName, int keyNameLength, bool snapshot); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_get_addresses_for_key(TransactionHandle transaction, byte* keyName, int keyNameLength); + public static extern IntPtr fdb_transaction_get_addresses_for_key(TransactionHandle transaction, byte* keyName, int keyNameLength); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_get_key(TransactionHandle transaction, byte* keyName, int keyNameLength, bool orEqual, int offset, bool snapshot); + public static extern IntPtr fdb_transaction_get_key(TransactionHandle transaction, byte* keyName, int keyNameLength, bool orEqual, int offset, bool snapshot); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_get_range( + public static extern IntPtr fdb_transaction_get_range( TransactionHandle transaction, byte* beginKeyName, int beginKeyNameLength, bool beginOrEqual, int beginOffset, byte* endKeyName, int endKeyNameLength, bool endOrEqual, int endOffset, @@ -164,19 +168,19 @@ public static extern void fdb_transaction_clear_range( public static extern void fdb_transaction_atomic_op(TransactionHandle transaction, byte* keyName, int keyNameLength, byte* param, int paramLength, FdbMutationType operationType); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_commit(TransactionHandle transaction); + public static extern IntPtr fdb_transaction_commit(TransactionHandle transaction); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] public static extern FdbError fdb_transaction_get_committed_version(TransactionHandle transaction, out long version); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_get_versionstamp(TransactionHandle transaction); + public static extern IntPtr fdb_transaction_get_versionstamp(TransactionHandle transaction); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_watch(TransactionHandle transaction, byte* keyName, int keyNameLength); + public static extern IntPtr fdb_transaction_watch(TransactionHandle transaction, byte* keyName, int keyNameLength); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FutureHandle fdb_transaction_on_error(TransactionHandle transaction, FdbError error); + public static extern IntPtr fdb_transaction_on_error(TransactionHandle transaction, FdbError error); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] public static extern void fdb_transaction_reset(TransactionHandle transaction); @@ -193,43 +197,43 @@ public static extern void fdb_transaction_clear_range( public static extern void fdb_future_destroy(IntPtr future); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern void fdb_future_cancel(FutureHandle future); + public static extern void fdb_future_cancel(IntPtr future); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern void fdb_future_release_memory(FutureHandle future); + public static extern void fdb_future_release_memory(IntPtr future); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_block_until_ready(FutureHandle futureHandle); + public static extern FdbError fdb_future_block_until_ready(IntPtr futureHandle); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern bool fdb_future_is_ready(FutureHandle futureHandle); + public static extern bool fdb_future_is_ready(IntPtr futureHandle); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_get_error(FutureHandle futureHandle); + public static extern FdbError fdb_future_get_error(IntPtr futureHandle); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_set_callback(FutureHandle future, FdbFutureCallback callback, IntPtr callbackParameter); + public static extern FdbError fdb_future_set_callback(IntPtr future, FdbFutureCallback callback, IntPtr callbackParameter); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_get_version(FutureHandle future, out long version); + public static extern FdbError fdb_future_get_version(IntPtr future, out long version); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_get_key(FutureHandle future, out byte* key, out int keyLength); + public static extern FdbError fdb_future_get_key(IntPtr future, out byte* key, out int keyLength); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_get_cluster(FutureHandle future, out ClusterHandle cluster); + public static extern FdbError fdb_future_get_cluster(IntPtr future, out ClusterHandle cluster); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_get_database(FutureHandle future, out DatabaseHandle database); + public static extern FdbError fdb_future_get_database(IntPtr future, out DatabaseHandle database); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_get_value(FutureHandle future, out bool present, out byte* value, out int valueLength); + public static extern FdbError fdb_future_get_value(IntPtr future, out bool present, out byte* value, out int valueLength); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_get_string_array(FutureHandle future, out byte** strings, out int count); + public static extern FdbError fdb_future_get_string_array(IntPtr future, out byte** strings, out int count); [DllImport(FDB_C_DLL, CallingConvention = CallingConvention.Cdecl)] - public static extern FdbError fdb_future_get_keyvalue_array(FutureHandle future, out FdbKeyValue* kv, out int count, out bool more); + public static extern FdbError fdb_future_get_keyvalue_array(IntPtr future, out FdbKeyValue* kv, out int count, out bool more); } @@ -376,51 +380,54 @@ public static int GetMaxApiVersion() #region Futures... - public static bool FutureIsReady(FutureHandle futureHandle) + public static bool FutureIsReady(IntPtr futureHandle) { return NativeMethods.fdb_future_is_ready(futureHandle); } - public static void FutureDestroy(IntPtr futureHandle) + public static void FutureDestroy(IntPtr futureHandle, [CallerMemberName] string caller = null) { +#if DEBUG_FUTURES + Debug.WriteLine("Native.FutureDestroy(0x{0}) from {1}", (object)futureHandle.ToString("X"), caller); +#endif if (futureHandle != IntPtr.Zero) { NativeMethods.fdb_future_destroy(futureHandle); } } - public static void FutureCancel(FutureHandle futureHandle) + public static void FutureCancel(IntPtr futureHandle) { NativeMethods.fdb_future_cancel(futureHandle); } - public static void FutureReleaseMemory(FutureHandle futureHandle) + public static void FutureReleaseMemory(IntPtr futureHandle) { NativeMethods.fdb_future_release_memory(futureHandle); } - public static FdbError FutureGetError(FutureHandle future) + public static FdbError FutureGetError(IntPtr future) { return NativeMethods.fdb_future_get_error(future); } - public static FdbError FutureBlockUntilReady(FutureHandle future) + public static FdbError FutureBlockUntilReady(IntPtr future) { #if DEBUG_NATIVE_CALLS - Debug.WriteLine("calling fdb_future_block_until_ready(0x" + future.Handle.ToString("x") + ")..."); + Debug.WriteLine("calling fdb_future_block_until_ready(0x" + future.ToString("x") + ")..."); #endif var err = NativeMethods.fdb_future_block_until_ready(future); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_block_until_ready(0x" + future.Handle.ToString("x") + ") => err=" + err); + Debug.WriteLine("fdb_future_block_until_ready(0x" + future.ToString("x") + ") => err=" + err); #endif return err; } - public static FdbError FutureSetCallback(FutureHandle future, FdbFutureCallback callback, IntPtr callbackParameter) + public static FdbError FutureSetCallback(IntPtr future, FdbFutureCallback callback, IntPtr callbackParameter) { var err = NativeMethods.fdb_future_set_callback(future, callback, callbackParameter); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_set_callback(0x" + future.Handle.ToString("x") + ", 0x" + ptrCallback.ToString("x") + ") => err=" + err); + Debug.WriteLine("fdb_future_set_callback(0x" + future.ToString("x") + ", 0x" + callbackParameter.ToString("x") + ") => err=" + err); #endif return err; } @@ -457,12 +464,12 @@ public static FdbError StopNetwork() #region Clusters... - public static FutureHandle CreateCluster(string path) + public static IntPtr CreateCluster(string path) { var future = NativeMethods.fdb_create_cluster(path); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_create_cluster(" + path + ") => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_create_cluster(" + path + ") => 0x" + future.ToString("x")); #endif return future; @@ -481,11 +488,11 @@ public static FdbError ClusterSetOption(ClusterHandle cluster, FdbClusterOption return NativeMethods.fdb_cluster_set_option(cluster, option, value, valueLength); } - public static FdbError FutureGetCluster(FutureHandle future, out ClusterHandle cluster) + public static FdbError FutureGetCluster(IntPtr future, out ClusterHandle cluster) { var err = NativeMethods.fdb_future_get_cluster(future, out cluster); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_get_cluster(0x" + future.Handle.ToString("x") + ") => err=" + err + ", handle=0x" + cluster.Handle.ToString("x")); + Debug.WriteLine("fdb_future_get_cluster(0x" + future.ToString("x") + ") => err=" + err + ", handle=0x" + cluster.Handle.ToString("x")); #endif //TODO: check if err == Success ? return err; @@ -495,11 +502,11 @@ public static FdbError FutureGetCluster(FutureHandle future, out ClusterHandle c #region Databases... - public static FdbError FutureGetDatabase(FutureHandle future, out DatabaseHandle database) + public static FdbError FutureGetDatabase(IntPtr future, out DatabaseHandle database) { var err = NativeMethods.fdb_future_get_database(future, out database); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_get_database(0x" + future.Handle.ToString("x") + ") => err=" + err + ", handle=0x" + database.Handle.ToString("x")); + Debug.WriteLine("fdb_future_get_database(0x" + future.ToString("x") + ") => err=" + err + ", handle=0x" + database.Handle.ToString("x")); #endif //TODO: check if err == Success ? return err; @@ -518,10 +525,10 @@ public static void DatabaseDestroy(IntPtr handle) } } - public static FutureHandle ClusterCreateDatabase(ClusterHandle cluster, string name) + public static IntPtr ClusterCreateDatabase(ClusterHandle cluster, string name) { var future = NativeMethods.fdb_cluster_create_database(cluster, name, name == null ? 0 : name.Length); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS Debug.WriteLine("fdb_cluster_create_database(0x" + cluster.Handle.ToString("x") + ", name: '" + name + "') => 0x" + cluster.Handle.ToString("x")); #endif @@ -554,17 +561,17 @@ public static FdbError DatabaseCreateTransaction(DatabaseHandle database, out Tr return err; } - public static FutureHandle TransactionCommit(TransactionHandle transaction) + public static IntPtr TransactionCommit(TransactionHandle transaction) { var future = NativeMethods.fdb_transaction_commit(transaction); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_transaction_commit(0x" + transaction.Handle.ToString("x") + ") => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_transaction_commit(0x" + transaction.Handle.ToString("x") + ") => 0x" + future.ToString("x")); #endif return future; } - public static FutureHandle TransactionGetVersionStamp(TransactionHandle transaction) + public static IntPtr TransactionGetVersionStamp(TransactionHandle transaction) { var future = NativeMethods.fdb_transaction_get_versionstamp(transaction); Contract.Assert(future != null); @@ -574,27 +581,27 @@ public static FutureHandle TransactionGetVersionStamp(TransactionHandle transact return future; } - public static FutureHandle TransactionWatch(TransactionHandle transaction, Slice key) + public static IntPtr TransactionWatch(TransactionHandle transaction, Slice key) { if (key.IsNullOrEmpty) throw new ArgumentException("Key cannot be null or empty", "key"); fixed (byte* ptrKey = key.Array) { var future = NativeMethods.fdb_transaction_watch(transaction, ptrKey + key.Offset, key.Count); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_transaction_watch(0x" + transaction.Handle.ToString("x") + ", key: '" + FdbKey.Dump(key) + "') => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_transaction_watch(0x" + transaction.Handle.ToString("x") + ", key: '" + FdbKey.Dump(key) + "') => 0x" + future.ToString("x")); #endif return future; } } - public static FutureHandle TransactionOnError(TransactionHandle transaction, FdbError errorCode) + public static IntPtr TransactionOnError(TransactionHandle transaction, FdbError errorCode) { var future = NativeMethods.fdb_transaction_on_error(transaction, errorCode); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_transaction_on_error(0x" + transaction.Handle.ToString("x") + ", " + errorCode + ") => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_transaction_on_error(0x" + transaction.Handle.ToString("x") + ", " + errorCode + ") => 0x" + future.ToString("x")); #endif return future; } @@ -623,12 +630,12 @@ public static void TransactionSetReadVersion(TransactionHandle transaction, long NativeMethods.fdb_transaction_set_read_version(transaction, version); } - public static FutureHandle TransactionGetReadVersion(TransactionHandle transaction) + public static IntPtr TransactionGetReadVersion(TransactionHandle transaction) { var future = NativeMethods.fdb_transaction_get_read_version(transaction); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_transaction_get_read_version(0x" + transaction.Handle.ToString("x") + ") => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_transaction_get_read_version(0x" + transaction.Handle.ToString("x") + ") => 0x" + future.ToString("x")); #endif return future; } @@ -641,15 +648,15 @@ public static FdbError TransactionGetCommittedVersion(TransactionHandle transact return NativeMethods.fdb_transaction_get_committed_version(transaction, out version); } - public static FdbError FutureGetVersion(FutureHandle future, out long version) + public static FdbError FutureGetVersion(IntPtr future, out long version) { #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_get_version(0x" + future.Handle.ToString("x") + ")"); + Debug.WriteLine("fdb_future_get_version(0x" + future.ToString("x") + ")"); #endif return NativeMethods.fdb_future_get_version(future, out version); } - public static FutureHandle TransactionGet(TransactionHandle transaction, Slice key, bool snapshot) + public static IntPtr TransactionGet(TransactionHandle transaction, Slice key, bool snapshot) { if (key.IsNull) throw new ArgumentException("Key cannot be null", "key"); @@ -659,15 +666,15 @@ public static FutureHandle TransactionGet(TransactionHandle transaction, Slice k fixed (byte* ptrKey = key.Array) { var future = NativeMethods.fdb_transaction_get(transaction, ptrKey + key.Offset, key.Count, snapshot); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_transaction_get(0x" + transaction.Handle.ToString("x") + ", key: '" + FdbKey.Dump(key) + "', snapshot: " + snapshot + ") => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_transaction_get(0x" + transaction.Handle.ToString("x") + ", key: '" + FdbKey.Dump(key) + "', snapshot: " + snapshot + ") => 0x" + future.ToString("x")); #endif return future; } } - public static FutureHandle TransactionGetRange(TransactionHandle transaction, KeySelector begin, KeySelector end, int limit, int targetBytes, FdbStreamingMode mode, int iteration, bool snapshot, bool reverse) + public static IntPtr TransactionGetRange(TransactionHandle transaction, KeySelector begin, KeySelector end, int limit, int targetBytes, FdbStreamingMode mode, int iteration, bool snapshot, bool reverse) { fixed (byte* ptrBegin = begin.Key.Array) fixed (byte* ptrEnd = end.Key.Array) @@ -677,51 +684,51 @@ public static FutureHandle TransactionGetRange(TransactionHandle transaction, Ke ptrBegin + begin.Key.Offset, begin.Key.Count, begin.OrEqual, begin.Offset, ptrEnd + end.Key.Offset, end.Key.Count, end.OrEqual, end.Offset, limit, targetBytes, mode, iteration, snapshot, reverse); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_transaction_get_range(0x" + transaction.Handle.ToString("x") + ", begin: " + begin.PrettyPrint(FdbKey.PrettyPrintMode.Begin) + ", end: " + end.PrettyPrint(FdbKey.PrettyPrintMode.End) + ", " + snapshot + ") => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_transaction_get_range(0x" + transaction.Handle.ToString("x") + ", begin: " + begin.PrettyPrint(FdbKey.PrettyPrintMode.Begin) + ", end: " + end.PrettyPrint(FdbKey.PrettyPrintMode.End) + ", " + snapshot + ") => 0x" + future.ToString("x")); #endif return future; } } - public static FutureHandle TransactionGetKey(TransactionHandle transaction, KeySelector selector, bool snapshot) + public static IntPtr TransactionGetKey(TransactionHandle transaction, KeySelector selector, bool snapshot) { if (selector.Key.IsNull) throw new ArgumentException("Key cannot be null", "selector"); fixed (byte* ptrKey = selector.Key.Array) { var future = NativeMethods.fdb_transaction_get_key(transaction, ptrKey + selector.Key.Offset, selector.Key.Count, selector.OrEqual, selector.Offset, snapshot); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_transaction_get_key(0x" + transaction.Handle.ToString("x") + ", " + selector.ToString() + ", " + snapshot + ") => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_transaction_get_key(0x" + transaction.Handle.ToString("x") + ", " + selector.ToString() + ", " + snapshot + ") => 0x" + future.ToString("x")); #endif return future; } } - public static FutureHandle TransactionGetAddressesForKey(TransactionHandle transaction, Slice key) + public static IntPtr TransactionGetAddressesForKey(TransactionHandle transaction, Slice key) { if (key.IsNullOrEmpty) throw new ArgumentException("Key cannot be null or empty", "key"); fixed (byte* ptrKey = key.Array) { var future = NativeMethods.fdb_transaction_get_addresses_for_key(transaction, ptrKey + key.Offset, key.Count); - Contract.Assert(future != null); + Contract.Assert(future != IntPtr.Zero); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_transaction_get_addresses_for_key(0x" + transaction.Handle.ToString("x") + ", key: '" + FdbKey.Dump(key) + "') => 0x" + future.Handle.ToString("x")); + Debug.WriteLine("fdb_transaction_get_addresses_for_key(0x" + transaction.Handle.ToString("x") + ", key: '" + FdbKey.Dump(key) + "') => 0x" + future.ToString("x")); #endif return future; } } - public static FdbError FutureGetValue(FutureHandle future, out bool valuePresent, out Slice value) + public static FdbError FutureGetValue(IntPtr future, out bool valuePresent, out Slice value) { byte* ptr; int valueLength; var err = NativeMethods.fdb_future_get_value(future, out valuePresent, out ptr, out valueLength); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_get_value(0x" + future.Handle.ToString("x") + ") => err=" + err + ", present=" + valuePresent + ", valueLength=" + valueLength); + Debug.WriteLine("fdb_future_get_value(0x" + future.ToString("x") + ") => err=" + err + ", present=" + valuePresent + ", valueLength=" + valueLength); #endif if (ptr != null && valueLength >= 0) { @@ -736,13 +743,13 @@ public static FdbError FutureGetValue(FutureHandle future, out bool valuePresent return err; } - public static FdbError FutureGetKey(FutureHandle future, out Slice key) + public static FdbError FutureGetKey(IntPtr future, out Slice key) { byte* ptr; int keyLength; var err = NativeMethods.fdb_future_get_key(future, out ptr, out keyLength); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_get_key(0x" + future.Handle.ToString("x") + ") => err=" + err + ", keyLength=" + keyLength); + Debug.WriteLine("fdb_future_get_key(0x" + future.ToString("x") + ") => err=" + err + ", keyLength=" + keyLength); #endif // note: fdb_future_get_key is allowed to return NULL for the empty key (not to be confused with a key that has an empty value) @@ -759,7 +766,7 @@ public static FdbError FutureGetKey(FutureHandle future, out Slice key) return err; } - public static FdbError FutureGetKeyValueArray(FutureHandle future, out KeyValuePair[] result, out bool more) + public static FdbError FutureGetKeyValueArray(IntPtr future, out KeyValuePair[] result, out bool more) { result = null; @@ -768,7 +775,7 @@ public static FdbError FutureGetKeyValueArray(FutureHandle future, out KeyValueP var err = NativeMethods.fdb_future_get_keyvalue_array(future, out kvp, out count, out more); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_get_keyvalue_array(0x" + future.Handle.ToString("x") + ") => err=" + err + ", count=" + count + ", more=" + more); + Debug.WriteLine("fdb_future_get_keyvalue_array(0x" + future.ToString("x") + ") => err=" + err + ", count=" + count + ", more=" + more); #endif if (Fdb.Success(err)) @@ -829,7 +836,7 @@ public static FdbError FutureGetKeyValueArray(FutureHandle future, out KeyValueP return err; } - public static FdbError FutureGetStringArray(FutureHandle future, out string[] result) + public static FdbError FutureGetStringArray(IntPtr future, out string[] result) { result = null; @@ -838,7 +845,7 @@ public static FdbError FutureGetStringArray(FutureHandle future, out string[] re var err = NativeMethods.fdb_future_get_string_array(future, out strings, out count); #if DEBUG_NATIVE_CALLS - Debug.WriteLine("fdb_future_get_string_array(0x" + future.Handle.ToString("x") + ") => err=" + err + ", count=" + count); + Debug.WriteLine("fdb_future_get_string_array(0x" + future.ToString("x") + ") => err=" + err + ", count=" + count); #endif if (Fdb.Success(err)) @@ -865,7 +872,7 @@ public static FdbError FutureGetStringArray(FutureHandle future, out string[] re return err; } - public static FdbError FutureGetVersionStamp(FutureHandle future, out VersionStamp stamp) + public static FdbError FutureGetVersionStamp(IntPtr future, out VersionStamp stamp) { byte* ptr; int keyLength; @@ -945,6 +952,129 @@ public static FdbError TransactionAddConflictRange(TransactionHandle transaction #endregion + #region Global Future Context... + + internal static readonly GlobalNativeContext GlobalContext = new GlobalNativeContext(); + + internal sealed class GlobalNativeContext : FdbFutureContext + { + + public Task CreateClusterAsync(string clusterFile, CancellationToken ct) + { + return RunAsync( + (arg) => FdbNative.CreateCluster((string)arg), + clusterFile, + (h, _) => + { + ClusterHandle cluster; + var err = FdbNative.FutureGetCluster(h, out cluster); + if (err != FdbError.Success) + { + cluster.Dispose(); + throw Fdb.MapToException(err); + } + var handler = new FdbNativeCluster(cluster); + return (IFdbClusterHandler)handler; + }, + null, //unused + ct + ); + } + + public void WatchKeyAsync(ref FdbWatch watch, TransactionHandle handle, Slice key, CancellationToken ct) + { + throw new NotImplementedException(); +#if false + IntPtr h = IntPtr.Zero; + bool mustDispose = true; + try + { + IntPtr cookie = IntPtr.Zero; //TODO!! + + RuntimeHelpers.PrepareConstrainedRegions(); + try + { } + finally + { + h = FdbNative.TransactionWatch(handle, key); + } + Contract.Assert(h != IntPtr.Zero); + if (h == IntPtr.Zero) throw new InvalidOperationException("FIXME: failed to create a watch handle");//TODO: message? + + var f = new FdbWatchFuture(key, cookie, "WatchKeyAsync", null); + watch = new FdbWatch(f, key, Slice.Nil); + + if (FdbNative.FutureIsReady(h)) + { + f.OnReady(); + mustDispose = false; + return; + } + } + finally + { + if (mustDispose && h != IntPtr.Zero) + { + FdbNative.FutureDestroy(h); + } + } +#endif + } + + internal sealed class FdbWatchFuture : FdbFuture + { + private IntPtr m_handle; + + private readonly object m_lock = new object(); + + public FdbWatchFuture(Slice key, IntPtr cookie, string label, object state) + : base(cookie, label, state) + { + this.Key = key; + } + + public Slice Key { get; private set; } + + public override bool Visit(IntPtr handle) + { + Contract.Assert(handle == m_handle || m_handle == IntPtr.Zero); + return true; + } + + protected override void OnCancel() + { + throw new NotImplementedException(); + } + + public override void OnReady() + { + IntPtr handle = IntPtr.Zero; + try + { + handle = Interlocked.Exchange(ref m_handle, IntPtr.Zero); + if (handle == IntPtr.Zero) return; + + var err = FdbNative.FutureGetError(m_handle); + + if (err == FdbError.Success) + { + PublishResult(this.Key); + } + else + { + PublishError(null, err); + } + } + finally + { + if (handle != IntPtr.Zero) FdbNative.FutureDestroy(handle); + } + } + } + + } + +#endregion } } diff --git a/FoundationDB.Client/Native/FdbNativeCluster.cs b/FoundationDB.Client/Native/FdbNativeCluster.cs index 0443261f0..2067cf1e3 100644 --- a/FoundationDB.Client/Native/FdbNativeCluster.cs +++ b/FoundationDB.Client/Native/FdbNativeCluster.cs @@ -29,43 +29,27 @@ DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY namespace FoundationDB.Client.Native { using System; + using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Doxense.Diagnostics.Contracts; using FoundationDB.Client.Core; /// Wraps a native FDBCluster* handle - internal sealed class FdbNativeCluster : IFdbClusterHandler + internal sealed class FdbNativeCluster : FdbFutureContext, IFdbClusterHandler { - private readonly ClusterHandle m_handle; + //private readonly ClusterHandle m_handle; public FdbNativeCluster(ClusterHandle handle) + : base(handle) { - Contract.Requires(handle != null); - m_handle = handle; } public static Task CreateClusterAsync(string clusterFile, CancellationToken ct) { - var future = FdbNative.CreateCluster(clusterFile); - return FdbFuture.CreateTaskFromHandle(future, - (h) => - { - var err = FdbNative.FutureGetCluster(h, out ClusterHandle cluster); - if (err != FdbError.Success) - { - cluster.Dispose(); - throw Fdb.MapToException(err); - } - var handler = new FdbNativeCluster(cluster); - return (IFdbClusterHandler) handler; - }, - ct - ); + return FdbNative.GlobalContext.CreateClusterAsync(clusterFile, ct); } - internal ClusterHandle Handle => m_handle; - public bool IsInvalid => m_handle.IsInvalid; public bool IsClosed => m_handle.IsClosed; @@ -94,10 +78,10 @@ public Task OpenDatabaseAsync(string databaseName, Cancella { if (ct.IsCancellationRequested) return Task.FromCanceled(ct); - var future = FdbNative.ClusterCreateDatabase(m_handle, databaseName); - return FdbFuture.CreateTaskFromHandle( - future, - (h) => + return RunAsync( + (handle, state) => FdbNative.ClusterCreateDatabase(handle, state), + databaseName, + (h, state) => { var err = FdbNative.FutureGetDatabase(h, out DatabaseHandle database); if (err != FdbError.Success) @@ -105,18 +89,14 @@ public Task OpenDatabaseAsync(string databaseName, Cancella database.Dispose(); throw Fdb.MapToException(err); } - var handler = new FdbNativeDatabase(database); + var handler = new FdbNativeDatabase(database, (string)state); return (IFdbDatabaseHandler) handler; }, + databaseName, ct ); } - public void Dispose() - { - m_handle?.Dispose(); - } - } diff --git a/FoundationDB.Client/Native/FdbNativeDatabase.cs b/FoundationDB.Client/Native/FdbNativeDatabase.cs index a996996f8..37c9ecec4 100644 --- a/FoundationDB.Client/Native/FdbNativeDatabase.cs +++ b/FoundationDB.Client/Native/FdbNativeDatabase.cs @@ -37,43 +37,20 @@ namespace FoundationDB.Client.Native /// Wraps a native FDBDatabase* handle [DebuggerDisplay("Handle={m_handle}, Closed={m_handle.IsClosed}")] - internal sealed class FdbNativeDatabase : IFdbDatabaseHandler + internal sealed class FdbNativeDatabase : FdbFutureContext, IFdbDatabaseHandler { - /// Handle that wraps the native FDB_DATABASE* - private readonly DatabaseHandle m_handle; - -#if CAPTURE_STACKTRACES - private readonly StackTrace m_stackTrace; -#endif - - public FdbNativeDatabase(DatabaseHandle handle) - { - if (handle == null) throw new ArgumentNullException("handle"); - - m_handle = handle; -#if CAPTURE_STACKTRACES - m_stackTrace = new StackTrace(); -#endif - } - - //REVIEW: do we really need a destructor ? The handle is a SafeHandle, and will take care of itself... - ~FdbNativeDatabase() + public FdbNativeDatabase(DatabaseHandle handle, string name) + : base(handle) { -#if CAPTURE_STACKTRACES - Trace.WriteLine("A database handle (" + m_handle + ") was leaked by " + m_stackTrace); -#endif -#if DEBUG - // If you break here, that means that a native database handler was leaked by a FdbDatabase instance (or that the database instance was leaked) - if (Debugger.IsAttached) Debugger.Break(); -#endif - Dispose(false); + this.Name = name; } - public bool IsInvalid { get { return m_handle.IsInvalid; } } public bool IsClosed { get { return m_handle.IsClosed; } } + public string Name { get; private set; } + public void SetOption(FdbDatabaseOption option, Slice data) { Fdb.EnsureNotOnNetworkThread(); @@ -113,19 +90,6 @@ public IFdbTransactionHandler CreateTransaction(FdbOperationContext context) } } - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - private void Dispose(bool disposing) - { - if (disposing) - { - if (m_handle != null) m_handle.Dispose(); - } - } } } diff --git a/FoundationDB.Client/Native/FdbNativeTransaction.cs b/FoundationDB.Client/Native/FdbNativeTransaction.cs index 426348100..4d2b35cdf 100644 --- a/FoundationDB.Client/Native/FdbNativeTransaction.cs +++ b/FoundationDB.Client/Native/FdbNativeTransaction.cs @@ -44,50 +44,25 @@ namespace FoundationDB.Client.Native /// Wraps a native FDB_TRANSACTION handle [DebuggerDisplay("Handle={m_handle}, Size={m_payloadBytes}, Closed={m_handle.IsClosed}")] - internal class FdbNativeTransaction : IFdbTransactionHandler + internal class FdbNativeTransaction : FdbFutureContext, IFdbTransactionHandler { + private readonly FdbNativeDatabase m_database; - /// FDB_TRANSACTION* handle - private readonly TransactionHandle m_handle; /// Estimated current size of the transaction private int m_payloadBytes; -#if CAPTURE_STACKTRACES - private StackTrace m_stackTrace; -#endif - - public FdbNativeTransaction(FdbNativeDatabase db, TransactionHandle handle) + public FdbNativeTransaction([NotNull] FdbNativeDatabase db, [NotNull] TransactionHandle handle) + : base(handle) { if (db == null) throw new ArgumentNullException("db"); - if (handle == null) throw new ArgumentNullException("handle"); m_database = db; - m_handle = handle; -#if CAPTURE_STACKTRACES - m_stackTrace = new StackTrace(); -#endif - } - - //REVIEW: do we really need a destructor ? The handle is a SafeHandle, and will take care of itself... - ~FdbNativeTransaction() - { -#if CAPTURE_STACKTRACES - Trace.WriteLine("A transaction handle (" + m_handle + ", " + m_payloadBytes + " bytes written) was leaked by " + m_stackTrace); -#endif -#if DEBUG - // If you break here, that means that a native transaction handler was leaked by a FdbTransaction instance (or that the transaction instance was leaked) - if (Debugger.IsAttached) Debugger.Break(); -#endif - Dispose(false); } #region Properties... public bool IsClosed { get { return m_handle.IsClosed; } } - /// Native FDB_TRANSACTION* handle - public TransactionHandle Handle { get { return m_handle; } } - /// Database handler that owns this transaction public FdbNativeDatabase Database { get { return m_database; } } @@ -133,18 +108,20 @@ public void SetOption(FdbTransactionOption option, Slice data) public Task GetReadVersionAsync(CancellationToken ct) { - var future = FdbNative.TransactionGetReadVersion(m_handle); - return FdbFuture.CreateTaskFromHandle(future, - (h) => + return RunAsync( + (handle, state) => FdbNative.TransactionGetReadVersion(handle), + default(object), + (future, state) => { long version; - var err = FdbNative.FutureGetVersion(h, out version); + var err = FdbNative.FutureGetVersion(future, out version); #if DEBUG_TRANSACTIONS Debug.WriteLine("FdbTransaction[" + m_id + "].GetReadVersion() => err=" + err + ", version=" + version); #endif Fdb.DieOnError(err); return version; }, + default(object), ct ); } @@ -154,10 +131,8 @@ public void SetReadVersion(long version) FdbNative.TransactionSetReadVersion(m_handle, version); } - private static bool TryGetValueResult(FutureHandle h, out Slice result) + private static bool TryGetValueResult(IntPtr h, out Slice result) { - Contract.Requires(h != null); - bool present; var err = FdbNative.FutureGetValue(h, out present, out result); #if DEBUG_TRANSACTIONS @@ -167,22 +142,21 @@ private static bool TryGetValueResult(FutureHandle h, out Slice result) return present; } - private static Slice GetValueResultBytes(FutureHandle h) + private static Slice GetValueResultBytes(IntPtr h) { - Contract.Requires(h != null); - Slice result; - if (!TryGetValueResult(h, out result)) - { - return Slice.Nil; - } - return result; + return !TryGetValueResult(h, out result) ? Slice.Nil : result; } public Task GetAsync(Slice key, bool snapshot, CancellationToken ct) { - var future = FdbNative.TransactionGet(m_handle, key, snapshot); - return FdbFuture.CreateTaskFromHandle(future, (h) => GetValueResultBytes(h), ct); + return RunAsync( + (handle, state) => FdbNative.TransactionGet(handle, state.Item1, state.Item2), + (key, snapshot), + (future, state) => GetValueResultBytes(future), + null, + ct + ); } public Task GetValuesAsync(Slice[] keys, bool snapshot, CancellationToken ct) @@ -191,24 +165,24 @@ public Task GetValuesAsync(Slice[] keys, bool snapshot, CancellationTok if (keys.Length == 0) return Task.FromResult(Array.Empty()); - var futures = new FutureHandle[keys.Length]; - try - { - for (int i = 0; i < keys.Length; i++) - { - futures[i] = FdbNative.TransactionGet(m_handle, keys[i], snapshot); - } - } - catch - { - for (int i = 0; i < keys.Length; i++) + return RunAsync( + keys.Length, + (handle, state, futures) => { - if (futures[i] == null) break; - futures[i].Dispose(); - } - throw; - } - return FdbFuture.CreateTaskFromHandleArray(futures, (h) => GetValueResultBytes(h), ct); + var _keys = state.Item1; + var _snapshot = state.Item2; + for (int i = 0; i < _keys.Length; i++) + { + var h = FdbNative.TransactionGet(handle, _keys[i], _snapshot); + if (h == IntPtr.Zero) throw new FdbException(FdbError.OperationFailed); + futures[i] = h; + } + }, + (keys, snapshot), + (future, state) => GetValueResultBytes(future), + default(object), //TODO: buffer for the slices + ct + ); } /// Extract a chunk of result from a completed Future @@ -216,7 +190,7 @@ public Task GetValuesAsync(Slice[] keys, bool snapshot, CancellationTok /// Receives true if there are more result, or false if all results have been transmited /// Array of key/value pairs, or an exception [NotNull] - private static KeyValuePair[] GetKeyValueArrayResult(FutureHandle h, out bool more) + private static KeyValuePair[] GetKeyValueArrayResult(IntPtr h, out bool more) { KeyValuePair[] result; var err = FdbNative.FutureGetKeyValueArray(h, out result, out more); @@ -233,28 +207,26 @@ public Task GetRangeAsync(KeySelector begin, KeySelector end, Fdb Contract.Requires(options != null); bool reversed = options.Reverse ?? false; - var future = FdbNative.TransactionGetRange(m_handle, begin, end, options.Limit ?? 0, options.TargetBytes ?? 0, options.Mode ?? FdbStreamingMode.Iterator, iteration, snapshot, reversed); - return FdbFuture.CreateTaskFromHandle( - future, - (h) => + + return RunAsync( + (handle, _) => FdbNative.TransactionGetRange(handle, begin, end, options.Limit ?? 0, options.TargetBytes ?? 0, options.Mode ?? FdbStreamingMode.Iterator, iteration, snapshot, reversed), + default(object), //TODO: pass options & co? + (future, state) => { // TODO: quietly return if disposed - bool hasMore; - var chunk = GetKeyValueArrayResult(h, out hasMore); + var chunk = GetKeyValueArrayResult(future, out bool hasMore); return new FdbRangeChunk(hasMore, chunk, iteration, reversed); }, + default(object), //TODO: pass options & co? ct ); } - private static Slice GetKeyResult(FutureHandle h) + private static Slice GetKeyResult(IntPtr h) { - Contract.Requires(h != null); - - Slice result; - var err = FdbNative.FutureGetKey(h, out result); + var err = FdbNative.FutureGetKey(h, out Slice result); #if DEBUG_TRANSACTIONS Debug.WriteLine("FdbTransaction[].GetKeyResult() => err=" + err + ", result=" + result.ToString()); #endif @@ -264,10 +236,11 @@ private static Slice GetKeyResult(FutureHandle h) public Task GetKeyAsync(KeySelector selector, bool snapshot, CancellationToken ct) { - var future = FdbNative.TransactionGetKey(m_handle, selector, snapshot); - return FdbFuture.CreateTaskFromHandle( - future, - (h) => GetKeyResult(h), + return RunAsync( + (handle, state) => FdbNative.TransactionGetKey(handle, state.Selector, state.Snapshot), + (Selector: selector, Snapshot: snapshot), + (future, state) => GetKeyResult(future), + default(object), ct ); } @@ -276,25 +249,26 @@ public Task GetKeysAsync(KeySelector[] selectors, bool snapshot, Cancel { Contract.Requires(selectors != null); - var futures = new FutureHandle[selectors.Length]; - try - { - for (int i = 0; i < selectors.Length; i++) - { - futures[i] = FdbNative.TransactionGetKey(m_handle, selectors[i], snapshot); - } - } - catch - { - for (int i = 0; i < selectors.Length; i++) - { - if (futures[i] == null) break; - futures[i].Dispose(); - } - throw; - } - return FdbFuture.CreateTaskFromHandleArray(futures, (h) => GetKeyResult(h), ct); + if (selectors.Length == 0) return Task.FromResult(Array.Empty()); //REVIEW: PERF: maybe we could cache the emtpy array task? + return RunAsync( + selectors.Length, + (handle, state, futures) => + { + var _selectors = state.Selectors; + var _snapshot = state.Snapshot; + for (int i = 0; i < _selectors.Length; i++) + { + var h = FdbNative.TransactionGetKey(handle, _selectors[i], _snapshot); + if (h == IntPtr.Zero) throw new FdbException(FdbError.OperationFailed); + futures[i] = h; + } + }, + (Selectors: selectors, Snapshot: snapshot), + (future, state) => GetKeyResult(future), + default(object), //TODO: buffer for the slices + ct + ); } #endregion @@ -340,10 +314,8 @@ public void AddConflictRange(Slice beginKeyInclusive, Slice endKeyExclusive, Fdb } [NotNull] - private static string[] GetStringArrayResult(FutureHandle h) + private static string[] GetStringArrayResult(IntPtr h) { - Contract.Requires(h != null); - string[] result; var err = FdbNative.FutureGetStringArray(h, out result); #if DEBUG_TRANSACTIONS @@ -356,10 +328,11 @@ private static string[] GetStringArrayResult(FutureHandle h) public Task GetAddressesForKeyAsync(Slice key, CancellationToken ct) { - var future = FdbNative.TransactionGetAddressesForKey(m_handle, key); - return FdbFuture.CreateTaskFromHandle( - future, - (h) => GetStringArrayResult(h), + return RunAsync( + (handle, state) => FdbNative.TransactionGetAddressesForKey(handle, state), + key, + (future, state) => GetStringArrayResult(future), + default(object), ct ); } @@ -370,12 +343,12 @@ public Task GetAddressesForKeyAsync(Slice key, CancellationToken ct) public FdbWatch Watch(Slice key, CancellationToken ct) { - var future = FdbNative.TransactionWatch(m_handle, key); - return new FdbWatch( - FdbFuture.FromHandle(future, (h) => key, ct), - key, - Slice.Nil - ); + // a Watch will outlive the transaction, so we can attach it to the current FutureContext (which will be disposed once the transaction goes away) + // => we will store at them to the GlobalContext + + + + throw new NotImplementedException("FIXME: Future refactoring in progress! I owe you a beer (*) if I ever forget to remove this before committing! (*: if you come get it in person!)"); } #endregion @@ -384,8 +357,7 @@ public FdbWatch Watch(Slice key, CancellationToken ct) public long GetCommittedVersion() { - long version; - var err = FdbNative.TransactionGetCommittedVersion(m_handle, out version); + var err = FdbNative.TransactionGetCommittedVersion(m_handle, out long version); #if DEBUG_TRANSACTIONS Debug.WriteLine("FdbTransaction[" + m_id + "].GetCommittedVersion() => err=" + err + ", version=" + version); #endif @@ -395,11 +367,16 @@ public long GetCommittedVersion() public Task GetVersionStampAsync(CancellationToken ct) { - var future = FdbNative.TransactionGetVersionStamp(m_handle); - return FdbFuture.CreateTaskFromHandle(future, GetVersionStampResult, ct); + return RunAsync( + (handle, state) => FdbNative.TransactionGetVersionStamp(handle), + default(object), + (future, state) => GetVersionStampResult(future), + default(object), + ct + ); } - private static VersionStamp GetVersionStampResult(FutureHandle h) + private static VersionStamp GetVersionStampResult(IntPtr h) { Contract.Requires(h != null); var err = FdbNative.FutureGetVersionStamp(h, out VersionStamp stamp); @@ -421,14 +398,28 @@ private static VersionStamp GetVersionStampResult(FutureHandle h) /// As with other client/server databases, in some failure scenarios a client may be unable to determine whether a transaction succeeded. In these cases, CommitAsync() will throw CommitUnknownResult error. The OnErrorAsync() function treats this error as retryable, so retry loops that don’t check for CommitUnknownResult could execute the transaction twice. In these cases, you must consider the idempotence of the transaction. public Task CommitAsync(CancellationToken ct) { - var future = FdbNative.TransactionCommit(m_handle); - return FdbFuture.CreateTaskFromHandle(future, (h) => null, ct); + return RunAsync( + (handle, state) => FdbNative.TransactionCommit(handle), + default(object), + (future, state) => state, + default(object), //TODO:? + ct + ); } public Task OnErrorAsync(FdbError code, CancellationToken ct) { - var future = FdbNative.TransactionOnError(m_handle, code); - return FdbFuture.CreateTaskFromHandle(future, (h) => { ResetInternal(); return null; }, ct); + return RunAsync( + (handle, state) => FdbNative.TransactionOnError(handle, state), + code, + (h, state) => + { + ((FdbNativeTransaction)state).ResetInternal(); + return default(object); + }, + this, + ct + ); } public void Reset() @@ -449,25 +440,6 @@ private void ResetInternal() #endregion - #region IDisposable... - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - private void Dispose(bool disposing) - { - if (disposing) - { - // Dispose of the handle - if (!m_handle.IsClosed) m_handle.Dispose(); - } - } - - #endregion - } } diff --git a/FoundationDB.Client/Native/Futures/FdbFuture.cs b/FoundationDB.Client/Native/Futures/FdbFuture.cs new file mode 100644 index 000000000..4e944ae9c --- /dev/null +++ b/FoundationDB.Client/Native/Futures/FdbFuture.cs @@ -0,0 +1,121 @@ +#region BSD Licence +/* Copyright (c) 2013-2015, Doxense SAS +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Doxense nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#endregion + +// enable this to help debug Futures +#undef DEBUG_FUTURES + +using System.Diagnostics.Contracts; + +namespace FoundationDB.Client.Native +{ + using System; + using System.Diagnostics; + using System.Runtime.CompilerServices; + using System.Threading; + using System.Threading.Tasks; + + /// Base class for all FDBFuture wrappers + /// Type of the Task's result + [DebuggerDisplay("Label={Label}, Cookie={Cookie}, State={Task.Status}")] + internal abstract class FdbFuture : TaskCompletionSource, IFdbFuture + { + + #region Private Members... + + /// Optionnal registration on the parent Cancellation Token + /// Is only valid if FLAG_HAS_CTR is set + internal CancellationTokenRegistration m_ctr; + + protected FdbFuture(IntPtr cookie, string label, object state) + : base(state) + { + this.Cookie = cookie; + this.Label = label; + } + + public IntPtr Cookie { get; private set; } + + public string Label { get; private set; } + + #endregion + + #region Cancellation... + + #endregion + + public abstract bool Visit(IntPtr handle); + + public abstract void OnReady(); + + /// Return true if the future has completed (successfully or not) + public bool IsReady + { + get { return this.Task.IsCompleted; } + } + + /// Make the Future awaitable + public TaskAwaiter GetAwaiter() + { + return this.Task.GetAwaiter(); + } + + /// Try to abort the task (if it is still running) + public void Cancel() + { + if (this.Task.IsCanceled) return; + + OnCancel(); + } + + protected abstract void OnCancel(); + + internal void PublishResult(T result) + { + TrySetResult(result); + } + + internal void PublishError(Exception error, FdbError code) + { + if (error != null) + { + TrySetException(error); + } + else if (FdbFutureContext.ClassifyErrorSeverity(code) == FdbFutureContext.CATEGORY_CANCELLED) + { + TrySetCanceled(); + } + else + { + Contract.Assert(code != FdbError.Success); + TrySetException(Fdb.MapToException(code)); + } + } + + } + +} diff --git a/FoundationDB.Client/Native/Futures/FdbFutureArray.cs b/FoundationDB.Client/Native/Futures/FdbFutureArray.cs new file mode 100644 index 000000000..60897bf2c --- /dev/null +++ b/FoundationDB.Client/Native/Futures/FdbFutureArray.cs @@ -0,0 +1,195 @@ +#region BSD Licence +/* Copyright (c) 2013-2015, Doxense SAS +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Doxense nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#endregion + +namespace FoundationDB.Client.Native +{ + using JetBrains.Annotations; + using System; + using System.Diagnostics; + using System.Threading; + + /// FDBFuture[] wrapper + /// Type of result + internal sealed class FdbFutureArray : FdbFuture + { + // This future encapsulate multiple FDBFuture* handles and use ref-counting to detect when all the handles have fired + // The ref-counting is handled by the network thread, and invokation of future.OnReady() is deferred to the ThreadPool once the counter reaches zero + // The result array is computed once all FDBFuture are ready, from the ThreadPool. + // If at least one of the FDBFuture fails, the Task fails, using the most "serious" error found (ie: Non-Retryable > Cancelled > Retryable) + + #region Private Members... + + /// Encapsulated handles + // May contains IntPtr.Zero handles if there was a problem when setting up the callbacks. + // Atomically set to null by the first thread that needs to destroy all the handles + [CanBeNull] + private IntPtr[] m_handles; + + /// Number of handles that haven't fired yet + private int m_pending; + + /// Lambda used to extract the result of one handle + // the first argument is the FDBFuture handle that must be ready and not failed + // the second argument is a state that is passed by the caller. + [NotNull] + private readonly Func m_resultSelector; + + #endregion + + internal FdbFutureArray([NotNull] IntPtr[] handles, [NotNull] Func selector, object state, IntPtr cookie, string label) + : base(cookie, label, state) + { + m_handles = handles; + m_pending = handles.Length; + m_resultSelector = selector; + } + + public override bool Visit(IntPtr handle) + { + return 0 == Interlocked.Decrement(ref m_pending); + } + + public override void OnReady() + { + //README: + // - This callback will fire either from the ThreadPool (async ops) or inline form the ctor of the future (non-async ops, or ops that where served from some cache). + // - The method *MUST* dispose the future handle before returning, and *SHOULD* do so before signaling the task. + // => This is because continuations may run inline, and start new futures from there, while we still have our original future handle opened. + + IntPtr[] handles = null; + try + { + // make sure that nobody can destroy our handles while we are using them. + handles = Interlocked.Exchange(ref m_handles, null); + if (handles == null) return; // already disposed? + +#if DEBUG_FUTURES + Debug.WriteLine("FutureArray.{0}<{1}[]>.OnReady([{2}])", this.Label, typeof(T).Name, handles.Length); +#endif + + T[] results = new T[handles.Length]; + FdbError code = FdbError.Success; + int severity = 0; + Exception error = null; + + if (this.Task.IsCompleted) + { // task has already been handled by someone else + return; + } + + var state = this.Task.AsyncState; + for (int i = 0; i < results.Length; i++) + { + var handle = handles[i]; + var err = FdbNative.FutureGetError(handle); + if (err == FdbError.Success) + { + if (code != FdbError.Success) + { // there's been at least one error before, so there is no point in computing the result, it would be discarded anyway + continue; + } + + try + { + results[i] = m_resultSelector(handle, state); + } + catch (AccessViolationException e) + { // trouble in paradise! + +#if DEBUG_FUTURES + Debug.WriteLine("EPIC FAIL: " + e.ToString()); +#endif + + // => THIS IS VERY BAD! We have no choice but to terminate the process immediately, because any new call to any method to the binding may end up freezing the whole process (best case) or sending corrupted data to the cluster (worst case) + if (Debugger.IsAttached) Debugger.Break(); + + Environment.FailFast("FIXME: FDB done goofed!", e); + } + catch (Exception e) + { +#if DEBUG_FUTURES + Debug.WriteLine("FAIL: " + e.ToString()); +#endif + code = FdbError.InternalError; + error = e; + break; + } + } + else if (code != err) + { + int cur = FdbFutureContext.ClassifyErrorSeverity(err); + if (cur > severity) + { // error is more serious than before + severity = cur; + code = err; + } + } + } + + // since continuations may fire inline, make sure to release all the memory used by this handle first + FdbFutureContext.DestroyHandles(ref handles); + + if (code == FdbError.Success) + { + PublishResult(results); + } + else + { + PublishError(error, code); + } + } + catch (Exception e) + { // we must not blow up the TP or the parent, so make sure to propagate all exceptions to the task + TrySetException(e); + } + finally + { + if (handles != null) FdbFutureContext.DestroyHandles(ref handles); + GC.KeepAlive(this); + } + } + + protected override void OnCancel() + { + var handles = Volatile.Read(ref m_handles); + //TODO: we probably need locking to prevent concurrent destroy and cancel calls + if (handles != null) + { + foreach (var handle in handles) + { + if (handle != IntPtr.Zero) + { + FdbNative.FutureCancel(handle); + } + } + } + } + + } + +} \ No newline at end of file diff --git a/FoundationDB.Client/Native/Futures/FdbFutureContext.cs b/FoundationDB.Client/Native/Futures/FdbFutureContext.cs new file mode 100644 index 000000000..fb93b9e32 --- /dev/null +++ b/FoundationDB.Client/Native/Futures/FdbFutureContext.cs @@ -0,0 +1,756 @@ +#region BSD Licence +/* Copyright (c) 2013-2014, Doxense SAS +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Doxense nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#endregion + +// enable this to capture the stacktrace of the ctor, when troubleshooting leaked transaction handles +//#define CAPTURE_STACKTRACES + +namespace FoundationDB.Client.Native +{ + using JetBrains.Annotations; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Runtime.CompilerServices; + using System.Threading; + using System.Threading.Tasks; + using Doxense.Diagnostics.Contracts; + using Doxense.Threading.Tasks; + + internal class FdbFutureContext : IDisposable + { + + #region Private Constants... + + private const int FUTURE_COOKIE_SIZE = 32; + + private const int FUTURE_COOKIE_SHIFT = 0; + + private const ulong FUTURE_COOKIE_MASK = (1UL << FUTURE_COOKIE_SIZE) - 1; + + private const int CONTEXT_COOKIE_SIZE = 32; + + private const ulong CONTEXT_COOKIE_MASK = (1UL << CONTEXT_COOKIE_SIZE) - 1; + + private const int CONTEXT_COOKIE_SHIFT = FUTURE_COOKIE_SIZE; + + #endregion + + #region Static Stuff.... + + /// Counter used to generate the cookie values for each unique context + private static int s_globalCookieCounter; + + private static readonly Dictionary s_contexts = new Dictionary(); + + private static IntPtr MakeCallbackCookie(uint contextId, uint futureId) + { + ulong cookie = (contextId & CONTEXT_COOKIE_MASK) << CONTEXT_COOKIE_SHIFT; + cookie |= (futureId & FUTURE_COOKIE_MASK) << FUTURE_COOKIE_SHIFT; + return new IntPtr((long)cookie); + } + + private static uint GetContextIdFromCookie(IntPtr cookie) + { + return (uint) (((ulong) cookie.ToInt64() >> CONTEXT_COOKIE_SHIFT) & CONTEXT_COOKIE_MASK); + } + + private static uint GetFutureIdFromCookie(IntPtr cookie) + { + return (uint)(((ulong)cookie.ToInt64() >> FUTURE_COOKIE_SHIFT) & FUTURE_COOKIE_MASK); + } + + /// Delegate that will be called by fdb_c to notify us that a future as completed + /// It is important to make sure that this delegate will NOT be garbaged collected before the last future callback has fired! + private static readonly FdbNative.FdbFutureCallback GlobalCallback = FutureCallbackHandler; + + private static void FutureCallbackHandler(IntPtr handle, IntPtr cookie) + { + // cookie is the value that will help us find the corresponding context (upper 32 bits) and future within this context (lower 32 bits) that matches with this future handle. + +#if DEBUG_FUTURES + Debug.WriteLine("FutureCallbackHandler(0x{0}, {1:X8} | {2:X8}) called from {3} [{4}]", handle.ToString("X"), cookie.ToInt64() >> 32, cookie.ToInt64() & uint.MaxValue, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.Name); +#endif + bool fromNetworkThread = Fdb.IsNetworkThread; + + if (!fromNetworkThread) + { // most probably, we have been called inline from fdb_future_set_callback + // => The caller is holding a lock, so we have to defer to the ThreadPool and return as soon as possible! + try + { + ThreadPool.UnsafeQueueUserWorkItem( + (state) => + { + var args = (Tuple) state; + ProcessFutureCallback(args.Item1, args.Item2, false); + }, + Tuple.Create(handle, cookie) + ); + return; + } + catch (Exception) + { // unable to defer to the TP? + // we can't rethrow the exception if FDB_C is calling us (it won't know about it), + // so we will continue running inline. Hopefully this should never happen. + + // => eat the exception and continue + } + } + + ProcessFutureCallback(handle, cookie, fromNetworkThread); + } + + private static void ProcessFutureCallback(IntPtr handle, IntPtr cookie, bool fromNetworkThread) + { +#if DEBUG_FUTURES + Debug.WriteLine("ProcessFutureCallback(0x{0}, {1:X8} | {2:X8}, {3}) called from {4} [{5}]", handle.ToString("X"), cookie.ToInt64() >> 32, cookie.ToInt64() & uint.MaxValue, fromNetworkThread, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.Name); +#endif + // we are called by FDB_C, from the thread that runs the Event Loop + bool keepAlive = false; + try + { + // extract the upper 32 bits which contain the ID of the corresponding future context + uint contextId = GetContextIdFromCookie(cookie); + + FdbFutureContext context; + lock (s_contexts) // there will only be contentions on this lock if other a lot of threads are creating new contexts (ie: new transactions) + { + s_contexts.TryGetValue(contextId, out context); + } + + if (context != null) + { + //TODO: if the context is marked as "dead" we need to refcount the pending futures down to 0, and then remove the context from the list + + Contract.Assert(context.m_contextId == contextId); + bool purgeContext; + keepAlive = context.OnFutureReady(handle, cookie, fromNetworkThread, out purgeContext); + + if (purgeContext) + { // the context was disposed and saw the last pending future going by, we have to remove it from the list + lock (s_contexts) + { + s_contexts.Remove(contextId); + } + } + } + } + finally + { + if (!keepAlive) DestroyHandle(ref handle); + } + } + + #endregion + + private const int STATE_DEFAULT = 0; + + private const int STATE_DEAD = 1; + + // this flag must only be used under the lock + private int m_flags; + + /// Cookie for this context + /// Makes the 32-bits upper bits of the future callback parameter + private readonly uint m_contextId = (uint) Interlocked.Increment(ref s_globalCookieCounter); + + /// Counter used to generated the cookie for all futures created from this context + private int m_localCookieCounter; + + /// Dictionary used to store all the pending Futures for this context + /// All methods should take a lock on this instance before manipulating the state + private readonly Dictionary m_futures = new Dictionary(); + +#if CAPTURE_STACKTRACES + private readonly StackTrace m_stackTrace; +#endif + + #region Constructors... + + protected FdbFutureContext() + { + //REVIEW: is this a good idea to do this in the constructor? (we could start observing a context that hasn't been fully constructed yet + lock (s_contexts) + { + s_contexts[m_contextId] = this; + } +#if CAPTURE_STACKTRACES + m_stackTrace = new StackTrace(); +#endif + } + +#if NOT_NEEDED + //REVIEW: do we really need a destructor ? The handle is a SafeHandle, and will take care of itself... + ~FdbFutureContext() + { + if (!AppDomain.CurrentDomain.IsFinalizingForUnload()) + { +#if CAPTURE_STACKTRACES + Debug.WriteLine("A future context ({0}) was leaked by {1}", this, m_stackTrace); +#endif +#if DEBUG + // If you break here, that means that a native transaction handler was leaked by a FdbTransaction instance (or that the transaction instance was leaked) + if (Debugger.IsAttached) Debugger.Break(); +#endif + Dispose(false); + } + } +#endif + + #endregion + + #region IDisposable... + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + // + +#if DEBUG_FUTURES + Debug.WriteLine("Disposing context {0}#{1} with {2} pending future(s) ({3} total)", this.GetType().Name, m_contextId, m_futures.Count, m_localCookieCounter); +#endif + bool purge; + lock (m_futures) + { + if (m_flags == STATE_DEAD) + { // already dead! + return; + } + m_flags = STATE_DEAD; + purge = m_futures.Count == 0; + } + + if (purge) + { // no pending futures, we can remove ourselves from the global list + lock (s_contexts) + { + s_contexts.Remove(m_contextId); +#if DEBUG_FUTURES + Debug.WriteLine("Dumping all remaining contexts: {0}", s_contexts.Count); + foreach (var ctx in s_contexts) + { + Debug.WriteLine("- {0}#{1} : {2} ({3})", ctx.Value.GetType().Name, ctx.Key, ctx.Value.m_futures.Count, ctx.Value.m_localCookieCounter); + } +#endif + } + } + //else: we have to wait for all callbacks to fire. The last one will remove this context from the global list + } + } + + #endregion + + /// A callback has fire for a future handled by this context + private bool OnFutureReady(IntPtr handle, IntPtr cookie, bool fromNetworkThread, out bool purgeContext) + { + uint futureId = GetFutureIdFromCookie(cookie); + + purgeContext = false; + IFdbFuture future; + lock (m_futures) + { + if (m_flags == STATE_DEAD) + { // we are just waiting for all callbacks to fire + m_futures.Remove(futureId); + purgeContext = m_futures.Count == 0; + return false; + } + + m_futures.TryGetValue(futureId, out future); + } + + if (future != null && future.Cookie == cookie) + { + if (future.Visit(handle)) + { // future is ready to process all the results + + lock (m_futures) + { + m_futures.Remove(futureId); + } + + if (fromNetworkThread) + { + ThreadPool.UnsafeQueueUserWorkItem((state) => ((IFdbFuture)state).OnReady(), future); + //TODO: what happens if TP.UQUWI() fails? + } + else + { + future.OnReady(); + } + } + // else: expecting more handles + + // handles will be destroyed when the future completes + return true; + } + return false; + } + + /// Add a new future handle to this context + /// + /// Handle of the newly created future + /// Flag set to true if the future must be disposed by the caller (in case of error), or false if the future will be disposed by some other thread. + /// Method called when the future completes successfully + /// State that will be passed as the second argument to + /// TODO: remove this? + /// Type of future (name of the caller) + /// + protected Task RegisterFuture( + IntPtr handle, + ref bool mustDispose, + [NotNull] Func selector, + object state, + CancellationToken ct, + string label + ) + { + if (ct.IsCancellationRequested) return Task.FromCanceled(ct); + + FdbFutureSingle future = null; + IntPtr cookie = IntPtr.Zero; + uint futureId = (uint)Interlocked.Increment(ref m_localCookieCounter); + + try + { + cookie = MakeCallbackCookie(m_contextId, futureId); + + future = new FdbFutureSingle(handle, selector, state, cookie, label); + + if (FdbNative.FutureIsReady(handle)) + { // the result is already computed +#if DEBUG_FUTURES + Debug.WriteLine("FutureSingle.{0} 0x{1} already completed!", label, handle.ToString("X")); +#endif + cookie = IntPtr.Zero; + mustDispose = false; + future.OnReady(); + return future.Task; + } + + if (ct.CanBeCanceled) + { + if (ct.IsCancellationRequested) + { + future.TrySetCanceled(); + cookie = IntPtr.Zero; + return future.Task; + } + + // note that the cancellation handler can fire inline, but it will only mark the future as cancelled + // this means that we will still wait for the future callback to fire and set the task state in there. + future.m_ctr = RegisterForCancellation(future, ct); + } + + lock (m_futures) + { + m_futures[futureId] = future; + + // note: if the future just got ready, the callback will fire inline (as of v3.0) + // => if this happens, the callback defer the execution to the ThreadPool and returns immediately + var err = FdbNative.FutureSetCallback(handle, GlobalCallback, cookie); + if (!Fdb.Success(err)) + { // the callback will not fire, so we have to abort the future immediately + future.PublishError(null, err); + } + else + { + mustDispose = false; + } + } + return future.Task; + } + catch (Exception e) + { + if (future != null) + { + future.PublishError(e, FdbError.UnknownError); + return future.Task; + } + throw; + } + finally + { + if (mustDispose && cookie != IntPtr.Zero) + { // make sure that we never leak a failed future ! + lock (m_futures) + { + m_futures.Remove(futureId); + } + } + } + } + + /// Add a new future handle to this context + /// + /// Handles of the newly created future + /// Flag set to true if the future must be disposed by the caller (in case of error), or false if the future will be disposed by some other thread. + /// Method called when the future completes successfully + /// State that will be passed as the second argument to + /// TODO: remove this? + /// Type of future (name of the caller) + /// + protected Task RegisterFutures( + [NotNull] IntPtr[] handles, + ref bool mustDispose, + [NotNull] Func selector, + object state, + CancellationToken ct, + string label + ) + { + if (ct.IsCancellationRequested) return Task.FromCanceled(ct); + + FdbFutureArray future = null; + IntPtr cookie = IntPtr.Zero; + uint futureId = (uint) Interlocked.Increment(ref m_localCookieCounter); + try + { + cookie = MakeCallbackCookie(m_contextId, futureId); + + // make a copy because we may diverge from the caller if we partially fail to register the callbacks below + var tmp = new IntPtr[handles.Length]; + handles.CopyTo(tmp, 0); + future = new FdbFutureArray(tmp, selector, state, cookie, label); + + // check the case where all futures are already ready (served from cache?) + bool ready = true; + foreach (var handle in tmp) + { + if (!FdbNative.FutureIsReady(handle)) + { + ready = false; + break; + } + } + if (ready) + { +#if DEBUG_FUTURES + Debug.WriteLine("FutureArray.{0} [{1}] already completed!", label, tmp.Length); +#endif + cookie = IntPtr.Zero; + mustDispose = false; + future.OnReady(); + return future.Task; + } + + if (ct.CanBeCanceled) + { + future.m_ctr = RegisterForCancellation(future, ct); + if (future.Task.IsCompleted) + { // cancellation ran inline + future.TrySetCanceled(); + return future.Task; + } + } + + lock (m_futures) + { + m_futures[futureId] = future; + + // since the callbacks can fire inline, we have to make sure that we finish setting everything up under the lock + for (int i = 0; i < handles.Length; i++) + { + FdbError err = FdbNative.FutureSetCallback(handles[i], GlobalCallback, cookie); + if (Fdb.Success(err)) + { + handles[i] = IntPtr.Zero; + continue; + } + + // we have to cleanup everything, and mute this future + lock (m_futures) + { + m_futures.Remove(futureId); + for (int j = i + 1; j < handles.Length; j++) + { + tmp[j] = IntPtr.Zero; + } + } + + throw Fdb.MapToException(err); + } + } + mustDispose = false; + return future.Task; + } + catch (Exception e) + { + if (future != null) + { + future.PublishError(e, FdbError.UnknownError); + return future.Task; + } + throw; + } + finally + { + if (mustDispose && cookie != IntPtr.Zero) + { // make sure that we never leak a failed future ! + lock (m_futures) + { + m_futures.Remove(futureId); + } + } + + } + } + + /// Start a new async operation + /// Result of the operation + /// Lambda called to produce the future handle + /// Argument passed to . It will not be used after the handle has been constructed + /// Lambda called once the future completes (successfully) + /// State object passed to . It will be stored in the future has long as it is active. + /// Optional cancellation token used to cancel the task from an external source. + /// Optional label, used for logging and troubleshooting purpose (by default the name of the caller) + /// + protected Task RunAsync( + [NotNull] Func generator, + object argument, + [NotNull] Func selector, + object state, + CancellationToken ct, + [CallerMemberName] string label = null + ) + { + if (ct.IsCancellationRequested) return Task.FromCanceled(ct); + + bool mustDispose = true; + IntPtr h = IntPtr.Zero; + try + { + RuntimeHelpers.PrepareConstrainedRegions(); + try + { } + finally + { + h = generator(argument); + } + + return RegisterFuture(h, ref mustDispose, selector, state, ct, label); + } + finally + { + if (mustDispose && h != IntPtr.Zero) + { + FdbNative.FutureDestroy(h); + } + } + } + + internal static CancellationTokenRegistration RegisterForCancellation(IFdbFuture future, CancellationToken cancellationToken) + { + //note: if the token is already cancelled, the callback handler will run inline and any exception would bubble up here + //=> this is not a problem because the ctor already has a try/catch that will clean up everything + return cancellationToken.RegisterWithoutEC( + (_state) => { CancellationHandler(_state); }, + future + ); + } + + private static void CancellationHandler(object state) + { + var future = (IFdbFuture)state; + Contract.Assert(state != null); +#if DEBUG_FUTURES + Debug.WriteLine("CancellationHandler for " + future + " was called on thread #" + Thread.CurrentThread.ManagedThreadId.ToString()); +#endif + future.Cancel(); + } + + internal static void DestroyHandle(ref IntPtr handle) + { + if (handle != IntPtr.Zero) + { + FdbNative.FutureDestroy(handle); + handle = IntPtr.Zero; + } + } + + internal static void DestroyHandles(ref IntPtr[] handles) + { + if (handles != null) + { + foreach (var handle in handles) + { + if (handle != IntPtr.Zero) FdbNative.FutureDestroy(handle); + } + handles = null; + } + } + + internal const int CATEGORY_SUCCESS = 0; + internal const int CATEGORY_RETRYABLE = 1; + internal const int CATEGORY_CANCELLED = 2; + internal const int CATEGORY_FAILURE = 3; + + internal static int ClassifyErrorSeverity(FdbError error) + { + switch (error) + { + case FdbError.Success: + { + return CATEGORY_SUCCESS; + } + case FdbError.PastVersion: + case FdbError.FutureVersion: + case FdbError.NotCommitted: + case FdbError.CommitUnknownResult: + { + return CATEGORY_RETRYABLE; + } + + case FdbError.OperationCancelled: // happens if a future is cancelled (probably a watch) + case FdbError.TransactionCancelled: // happens if a transaction is cancelled (via its own parent CT, or via tr.Cancel()) + { + return CATEGORY_CANCELLED; + } + + default: + { + return CATEGORY_FAILURE; + } + } + } + } + + internal class FdbFutureContext : FdbFutureContext + where THandle : FdbSafeHandle + { + + protected readonly THandle m_handle; + + protected FdbFutureContext([NotNull] THandle handle) + { + if (handle == null) throw new ArgumentNullException("handle"); + m_handle = handle; + } + + public THandle Handle { [NotNull] get { return m_handle; } } + + protected override void Dispose(bool disposing) + { + try + { + base.Dispose(disposing); + } + finally + { + if (disposing) + { + lock (this.Handle) + { + if (!this.Handle.IsClosed) this.Handle.Dispose(); + } + } + } + } + + /// Start a new async operation + /// Type of the result of the operation + /// Type of the argument passed to the generator + /// Lambda called to produce the future handle + /// Argument passed to . It will not be used after the handle has been constructed + /// Lambda called once the future completes (successfully) + /// State object passed to . It will be stored in the future has long as it is active. + /// Optional cancellation token used to cancel the task from an external source. + /// Optional label, used for logging and troubleshooting purpose (by default the name of the caller) + /// + protected Task RunAsync( + [NotNull] Func generator, + TArg argument, + [NotNull] Func selector, + object state, + CancellationToken ct, + [CallerMemberName] string label = null + ) + { + if (ct.IsCancellationRequested) return Task.FromCanceled(ct); + + bool mustDispose = true; + IntPtr h = IntPtr.Zero; + try + { + lock (this.Handle) + { + if (this.Handle.IsClosed) throw new ObjectDisposedException(this.GetType().Name); + h = generator(m_handle, argument); + } + return RegisterFuture(h, ref mustDispose, selector, state, ct, label); + } + finally + { + if (mustDispose && h != IntPtr.Zero) + { + FdbNative.FutureDestroy(h); + } + } + } + + protected Task RunAsync( + int count, + Action generator, + TArg arg, + Func selector, + object state, + CancellationToken ct, + [CallerMemberName] string label = null + + ) + { + bool mustDispose = true; + var handles = new IntPtr[count]; + try + { + lock (this.Handle) + { + if (this.Handle.IsClosed) throw new ObjectDisposedException(this.GetType().Name); + generator(m_handle, arg, handles); + } + return RegisterFutures(handles, ref mustDispose, selector, state, ct, label); + } + catch + { + foreach (var future in handles) + { + if (future != IntPtr.Zero) FdbNative.FutureDestroy(future); + } + throw; + } + } + + } + +} diff --git a/FoundationDB.Client/Native/Futures/FdbFutureSingle.cs b/FoundationDB.Client/Native/Futures/FdbFutureSingle.cs new file mode 100644 index 000000000..d07fa8d66 --- /dev/null +++ b/FoundationDB.Client/Native/Futures/FdbFutureSingle.cs @@ -0,0 +1,159 @@ +#region BSD Licence +/* Copyright (c) 2013-2015, Doxense SAS +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Doxense nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#endregion + +namespace FoundationDB.Client.Native +{ + using System; + using System.Diagnostics; + using System.Runtime.ExceptionServices; + using System.Threading; + using Doxense.Diagnostics.Contracts; + using JetBrains.Annotations; + + /// FDBFuture wrapper + /// Type of result + internal sealed class FdbFutureSingle : FdbFuture + { + #region Private Members... + + /// Value of the 'FDBFuture*' + private IntPtr m_handle; + + /// Lambda used to extract the result of this FDBFuture + private readonly Func m_resultSelector; + + #endregion + + internal FdbFutureSingle(IntPtr handle, [NotNull] Func selector, object state, IntPtr cookie, string label) + : base(cookie, label, state) + { + if (handle == IntPtr.Zero) throw new ArgumentException("Invalid future handle", nameof(handle)); + if (selector == null) throw new ArgumentNullException(nameof(selector)); + + m_handle = handle; + m_resultSelector = selector; + } + + public override bool Visit(IntPtr handle) + { +#if DEBUG_FUTURES + Debug.WriteLine("FutureSingle.{0}<{1}>.Visit(0x{2})", this.Label, typeof(T).Name, handle.ToString("X8")); +#endif + Contract.Requires(handle == m_handle, this.Label); + return true; + } + + [HandleProcessCorruptedStateExceptions] // to be able to handle Access Violations and terminate the process + public override void OnReady() + { + IntPtr handle = IntPtr.Zero; + + //README: + // - This callback will fire either from the ThreadPool (async ops) or inline form the ctor of the future (non-async ops, or ops that where served from some cache). + // - The method *MUST* dispose the future handle before returning, and *SHOULD* do so before signaling the task. + // => This is because continuations may run inline, and start new futures from there, while we still have our original future handle opened. + + try + { + handle = Interlocked.Exchange(ref m_handle, IntPtr.Zero); + if (handle == IntPtr.Zero) return; // already disposed? + +#if DEBUG_FUTURES + Debug.WriteLine("FutureSingle.{0}<{1}>.OnReady(0x{2})", this.Label, typeof(T).Name, handle.ToString("X8")); +#endif + + if (this.Task.IsCompleted) + { // task has already been handled by someone else + return; + } + + var result = default(T); + var error = default(Exception); + + var code = FdbNative.FutureGetError(handle); + if (code == FdbError.Success) + { + try + { + result = m_resultSelector(handle, this.Task.AsyncState); + } + catch (AccessViolationException e) + { // trouble in paradise! + +#if DEBUG_FUTURES + Debug.WriteLine("EPIC FAIL: " + e.ToString()); +#endif + + // => THIS IS VERY BAD! We have no choice but to terminate the process immediately, because any new call to any method to the binding may end up freezing the whole process (best case) or sending corrupted data to the cluster (worst case) + if (Debugger.IsAttached) Debugger.Break(); + + Environment.FailFast("FIXME: FDB done goofed!", e); + } + catch (Exception e) + { +#if DEBUG_FUTURES + Debug.WriteLine("FAIL: " + e.ToString()); +#endif + code = FdbError.InternalError; + error = e; + } + } + + // since continuations may fire inline, make sure to release all the memory used by this handle first + FdbFutureContext.DestroyHandle(ref handle); + + if (code == FdbError.Success) + { + PublishResult(result); + } + else + { + PublishError(error, code); + } + } + catch (Exception e) + { // we must not blow up the TP or the parent, so make sure to propagate all exceptions to the task + TrySetException(e); + } + finally + { + if (handle != IntPtr.Zero) FdbFutureContext.DestroyHandle(ref handle); + GC.KeepAlive(this); + } + } + + protected override void OnCancel() + { + IntPtr handle = Volatile.Read(ref m_handle); + //TODO: we probably need locking to prevent concurrent destroy and cancel calls + if (handle != IntPtr.Zero) FdbNative.FutureCancel(handle); + } + + } + +} diff --git a/FoundationDB.Client/Native/Futures/IFdbFuture.cs b/FoundationDB.Client/Native/Futures/IFdbFuture.cs new file mode 100644 index 000000000..7ec3fc921 --- /dev/null +++ b/FoundationDB.Client/Native/Futures/IFdbFuture.cs @@ -0,0 +1,56 @@ +#region BSD Licence +/* Copyright (c) 2013-2015, Doxense SAS +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Doxense nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#endregion + +// enable this to help debug Futures +#undef DEBUG_FUTURES + +namespace FoundationDB.Client.Native +{ + using System; + + internal interface IFdbFuture + { + /// Unique identifier of this future + IntPtr Cookie { get; } + + /// Label of the future (usually the name of the operation) + string Label { get; } + + /// Cancel the future, if it hasen't completed yet + void Cancel(); + + /// Test if this was the last pending handle for this future, or not + /// Handle that completed + /// True if this was the last handle and can be called, or False if more handles need to fire first. + bool Visit(IntPtr handle); + + /// Called when all handles tracked by this future have fired + void OnReady(); + } + +} diff --git a/FoundationDB.Client/Native/Handles/FutureHandle.cs b/FoundationDB.Client/Native/Handles/FutureHandle.cs index 20eac078d..fc6f5fcb2 100644 --- a/FoundationDB.Client/Native/Handles/FutureHandle.cs +++ b/FoundationDB.Client/Native/Handles/FutureHandle.cs @@ -26,6 +26,8 @@ DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY */ #endregion +#if REFACTORED + namespace FoundationDB.Client.Native { using FoundationDB.Client.Utils; @@ -62,3 +64,5 @@ public override string ToString() } } + +#endif diff --git a/FoundationDB.Client/Shared/Async/AsyncCancellableMutex.cs b/FoundationDB.Client/Shared/Async/AsyncCancellableMutex.cs index 3de164053..9172d6fa7 100644 --- a/FoundationDB.Client/Shared/Async/AsyncCancellableMutex.cs +++ b/FoundationDB.Client/Shared/Async/AsyncCancellableMutex.cs @@ -34,6 +34,7 @@ namespace Doxense.Async using System.Diagnostics; using System.Threading; using System.Threading.Tasks; + using Doxense.Threading.Tasks; using JetBrains.Annotations; /// Implements a async mutex that supports cancellation @@ -83,7 +84,7 @@ public AsyncCancelableMutex(CancellationToken ct) { if (ct.CanBeCanceled) { - m_ctr = ct.Register(s_cancellationCallback, new WeakReference(this), useSynchronizationContext: false); + m_ctr = ct.RegisterWithoutEC(s_cancellationCallback, new WeakReference(this)); } GC.SuppressFinalize(this); } @@ -130,12 +131,12 @@ public bool Abort(bool async = false) private static void SetDefered(AsyncCancelableMutex mutex) { - ThreadPool.QueueUserWorkItem((state) => ((AsyncCancelableMutex)state).TrySetResult(null), mutex); + ThreadPool.UnsafeQueueUserWorkItem((state) => ((AsyncCancelableMutex)state).TrySetResult(null), mutex); } private static void CancelDefered(AsyncCancelableMutex mutex) { - ThreadPool.QueueUserWorkItem((state) => ((AsyncCancelableMutex)state).TrySetCanceled(), mutex); + ThreadPool.UnsafeQueueUserWorkItem((state) => ((AsyncCancelableMutex)state).TrySetCanceled(), mutex); } } diff --git a/FoundationDB.Client/Shared/Async/TaskHelpers.cs b/FoundationDB.Client/Shared/Async/TaskHelpers.cs index 3809615bd..d4d0c3b92 100644 --- a/FoundationDB.Client/Shared/Async/TaskHelpers.cs +++ b/FoundationDB.Client/Shared/Async/TaskHelpers.cs @@ -422,6 +422,36 @@ public static void Observe(Task task) } } + private delegate CancellationTokenRegistration RegisterWithoutECDelegate(ref CancellationToken ct, Action callback, object state); + private static readonly RegisterWithoutECDelegate RegisterWithoutECHandler = GetRegisterWithoutECDelegate(); + + [NotNull] + private static RegisterWithoutECDelegate GetRegisterWithoutECDelegate() + { + try + { + // CancellationToken.Register(..., useExecutionContext) is "private", and all the public version of Register pass true, which does costly allocations (capturing context, ...) + // There is however CancellationToken.InternalRegisterWithoutEC which is internal and pass false. + // => we will attempt to create a delegate to call the internal method - if possible - or fallback to the default version of Register, if this is not possible. + var method = typeof(CancellationToken).GetMethod("InternalRegisterWithoutEC", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic, null, new[] { typeof(Action), typeof(object) }, null); + if (method != null) + { + return (RegisterWithoutECDelegate)Delegate.CreateDelegate(typeof(RegisterWithoutECDelegate), null, method); + } + } + catch + { } + + return (ref CancellationToken token, Action callback, object state) => token.Register(callback, state); + } + + /// Version of CancellationToken.Register() that does not propagate the current ExecutionContext to the callback (faster, but unsafe!) + /// This should only be used with callbacks that do not execute user-provided code! + internal static CancellationTokenRegistration RegisterWithoutEC(this CancellationToken ct, [NotNull] Action callback, object state) + { + return RegisterWithoutECHandler(ref ct, callback, state); + } + /// Safely cancel a CancellationTokenSource /// CancellationTokenSource that needs to be cancelled public static void SafeCancel(this CancellationTokenSource source) diff --git a/FoundationDB.Client/Tuples/Encoding/TupleParser.cs b/FoundationDB.Client/Tuples/Encoding/TupleParser.cs index dd176b8dc..0a1212a01 100644 --- a/FoundationDB.Client/Tuples/Encoding/TupleParser.cs +++ b/FoundationDB.Client/Tuples/Encoding/TupleParser.cs @@ -938,8 +938,7 @@ public static string ParseAscii(Slice slice) if (slice.Count <= 2) return String.Empty; var decoded = UnescapeByteString(slice.Array, slice.Offset + 1, slice.Count - 2); - - return Encoding.Default.GetString(decoded.Array, decoded.Offset, decoded.Count); + return decoded.ToStringAscii(); } /// Parse a tuple segment containing a unicode string @@ -951,7 +950,7 @@ public static string ParseUnicode(Slice slice) if (slice.Count <= 2) return String.Empty; //TODO: check args var decoded = UnescapeByteString(slice.Array, slice.Offset + 1, slice.Count - 2); - return Encoding.UTF8.GetString(decoded.Array, decoded.Offset, decoded.Count); + return decoded.ToUnicode(); } /// Parse a tuple segment containing an embedded tuple diff --git a/FoundationDB.Client/Utils/TinyJsonParser.cs b/FoundationDB.Client/Utils/TinyJsonParser.cs index b67014911..f90b87304 100644 --- a/FoundationDB.Client/Utils/TinyJsonParser.cs +++ b/FoundationDB.Client/Utils/TinyJsonParser.cs @@ -344,7 +344,7 @@ internal static Dictionary ParseObject([NotNull] char[] chars, i if (token == Token.Eof) return null; // ensure we got an object - if (token != Token.MapBegin) throw new InvalidOperationException(String.Format("JSON object expected, but got a {0}", token)); + if (token != Token.MapBegin) throw new InvalidOperationException($"JSON object expected, but got a {token}"); var map = (Dictionary)parser.m_current; // ensure that there is nothing after the object @@ -371,8 +371,7 @@ public static List ParseArray(Slice data) [NotNull] internal static Dictionary GetMapField(Dictionary map, string field) { - object item; - return map != null && map.TryGetValue(field, out item) ? (Dictionary)item : s_missingMap; + return map != null && map.TryGetValue(field, out object item) ? (Dictionary)item : s_missingMap; } [NotNull] diff --git a/FoundationDB.Tests/TransactionFacts.cs b/FoundationDB.Tests/TransactionFacts.cs index 4fec664b9..729120ef9 100644 --- a/FoundationDB.Tests/TransactionFacts.cs +++ b/FoundationDB.Tests/TransactionFacts.cs @@ -260,7 +260,7 @@ public async Task Test_Cancelling_Transaction_During_Commit_Should_Abort_Task() // Writes about 5 MB of stuff in 100k chunks for (int i = 0; i < 50; i++) { - tr.Set(location.Keys.Encode(i), Slice.Random(rnd, 100 * 1000)); + tr.Set(location.Keys.Encode(i), Slice.Random(rnd, 100_000)); } // start commiting @@ -271,11 +271,7 @@ public async Task Test_Cancelling_Transaction_During_Commit_Should_Abort_Task() Assume.That(t.IsCompleted, Is.False, "Commit task already completed before having a chance to cancel"); tr.Cancel(); - await TestHelpers.AssertThrowsFdbErrorAsync( - () => t, - FdbError.TransactionCancelled, - "Cancelling a transaction that is writing to the server should fail the commit task" - ); + Assert.That(async () => await t, Throws.InstanceOf(), "Cancelling a transaction that is writing to the server should fail the commit task"); } } } @@ -2347,123 +2343,127 @@ public async Task Test_BadPractice_Future_Fuzzer() #endif const int R = 100; - using (var db = await OpenTestDatabaseAsync()) + try { - var location = db.Partition.ByKey("Fuzzer"); - - var rnd = new Random(); - int seed = rnd.Next(); - Log("Using random seeed {0}", seed); - rnd = new Random(seed); - - await db.WriteAsync((tr) => + using (var db = await OpenTestDatabaseAsync()) { - for (int i = 0; i < R; i++) + var location = db.Partition.ByKey("Fuzzer"); + var rnd = new Random(); + int seed = rnd.Next(); + Log("Using random seeed {0}", seed); + rnd = new Random(seed); + + await db.WriteAsync((tr) => { + for (int i = 0; i < R; i++) + { tr.Set(location.Keys.Encode(i), Slice.FromInt32(i)); - } - }, this.Cancellation); + } + }, this.Cancellation); - var start = DateTime.UtcNow; - Log("This test will run for {0} seconds", DURATION_SEC); + var start = DateTime.UtcNow; + Log("This test will run for {0} seconds", DURATION_SEC); - int time = 0; + int time = 0; - List m_alive = new List(); - var sb = new StringBuilder(); - while (DateTime.UtcNow - start < TimeSpan.FromSeconds(DURATION_SEC)) - { - switch (rnd.Next(10)) + var line = new StringBuilder(256); + + var alive = new List(100); + var lastCheck = start; + while (DateTime.UtcNow - start < TimeSpan.FromSeconds(DURATION_SEC)) { - case 0: + int x = rnd.Next(10); + + if (x == 0) { // start a new transaction - sb.Append('T'); + line.Append('T'); var tr = db.BeginTransaction(FdbTransactionMode.Default, this.Cancellation); - m_alive.Add(tr); - break; + alive.Add(tr); } - case 1: + else if (x == 1) { // drop a random transaction - if (m_alive.Count == 0) continue; - sb.Append('L'); - int p = rnd.Next(m_alive.Count); + if (alive.Count == 0) continue; + line.Append('L'); + int p = rnd.Next(alive.Count); - m_alive.RemoveAt(p); - //no dispose - break; + alive.RemoveAt(p); + //no dispose! } - case 2: + else if (x == 2) { // dispose a random transaction - if (m_alive.Count == 0) continue; - sb.Append('D'); - int p = rnd.Next(m_alive.Count); + if (alive.Count == 0) continue; + line.Append('D'); + int p = rnd.Next(alive.Count); - var tr = m_alive[p]; + var tr = alive[p]; + alive.RemoveAt(p); tr.Dispose(); - m_alive.RemoveAt(p); - break; } - case 3: - { // GC! - sb.Append('C'); + else if (x == 3) + { // get read version + line.Append('R'); var tr = db.BeginTransaction(FdbTransactionMode.ReadOnly, this.Cancellation); - m_alive.Add(tr); + alive.Add(tr); _ = await tr.GetReadVersionAsync(); - break; } - - case 4: - case 5: - case 6: - { // read a random value from a random transaction - sb.Append('G'); - if (m_alive.Count == 0) break; - int p = rnd.Next(m_alive.Count); - var tr = m_alive[p]; - - int x = rnd.Next(R); - try - { - _ = await tr.GetAsync(location.Keys.Encode(x)); + else + { + if (x % 2 == 0) + { // read a random value from a random transaction + if (alive.Count == 0) continue; + line.Append('G'); + int p = rnd.Next(alive.Count); + var tr = alive[p]; + + int k = rnd.Next(R); + try + { + await tr.GetAsync(location.Keys.Encode(x)); + } + catch (FdbException) + { + line.Append('!'); + alive.RemoveAt(p); + tr.Dispose(); + } } - catch (FdbException) - { - sb.Append('!'); + else + { // read a random value, but drop the task + if (alive.Count == 0) continue; + line.Append('g'); + int p = rnd.Next(alive.Count); + var tr = alive[p]; + + int k = rnd.Next(R); + var t = tr.GetAsync(location.Keys.Encode(k)).ContinueWith((_) => { var err = _.Exception; }, TaskContinuationOptions.OnlyOnFaulted); + // => t is not stored } - break; } - case 7: - case 8: - case 9: - { // read a random value, but drop the task - sb.Append('g'); - if (m_alive.Count == 0) break; - int p = rnd.Next(m_alive.Count); - var tr = m_alive[p]; - - int x = rnd.Next(R); - _ = tr.GetAsync(location.Keys.Encode(x)).ContinueWith((_) => sb.Append('!') /*BUGBUG: locking ?*/, TaskContinuationOptions.NotOnRanToCompletion); - // => t is not stored - break; + + if ((++time) % 10 == 0 && DateTime.UtcNow - lastCheck >= TimeSpan.FromSeconds(1)) + { + Log(line.ToString()); + line.Clear(); + Log("State: {0}", alive.Count); + //Log("Performing full GC"); + //GC.Collect(2); + //GC.WaitForPendingFinalizers(); + //GC.Collect(2); + lastCheck = DateTime.UtcNow; } - } - if ((time++) % 80 == 0) - { - Log(sb.ToString()); - Log("State: {0}", m_alive.Count); - sb.Clear(); - sb.Append('C'); - GC.Collect(); - GC.WaitForPendingFinalizers(); - GC.Collect(); + //await Task.Delay(1); + } + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); } - - GC.Collect(); - GC.WaitForPendingFinalizers(); - GC.Collect(); + } + finally + { + Log("Test methods completed!"); } } diff --git a/FoundationDB.Tests/Utils/SliceFacts.cs b/FoundationDB.Tests/Utils/SliceFacts.cs index ee1cb2078..8dec93914 100644 --- a/FoundationDB.Tests/Utils/SliceFacts.cs +++ b/FoundationDB.Tests/Utils/SliceFacts.cs @@ -43,6 +43,25 @@ namespace Doxense.Memory.Tests public class SliceFacts : FdbTest { +#if MEASURE + [TestFixtureTearDown] + public void DumpStats() + { + Log("# MemCopy:"); + for (int i = 0; i < SliceHelpers.CopyHistogram.Length; i++) + { + if (SliceHelpers.CopyHistogram[i] == 0) continue; + Log("# {0} : {1:N0} ({2:N1} ns, {3:N3} ns/byte)", i, SliceHelpers.CopyHistogram[i], SliceHelpers.CopyDurations[i] / SliceHelpers.CopyHistogram[i], SliceHelpers.CopyDurations[i] / (SliceHelpers.CopyHistogram[i] * i)); + } + Log("# MemCompare:"); + for (int i = 0; i < SliceHelpers.CompareHistogram.Length; i++) + { + if (SliceHelpers.CompareHistogram[i] == 0) continue; + Log("# {0} : {1:N0} ({2:N1} ns, {3:N3} ns/byte)", i, SliceHelpers.CompareHistogram[i], SliceHelpers.CompareDurations[i] / SliceHelpers.CompareHistogram[i], SliceHelpers.CompareDurations[i] / (SliceHelpers.CompareHistogram[i] * i)); + } + } +#endif + [Test] public void Test_Slice_Nil() { diff --git a/FoundationDb.Client.sln.DotSettings b/FoundationDb.Client.sln.DotSettings index 8493c51f0..287748a9d 100644 --- a/FoundationDb.Client.sln.DotSettings +++ b/FoundationDb.Client.sln.DotSettings @@ -77,4 +77,6 @@ True True True - True \ No newline at end of file + True + True + False \ No newline at end of file