Skip to content

Better handling of Future handles #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7e33bb8
Reimpleted the mapping between Future handles and FdbFuture<T> classe…
KrzysFR Jan 27, 2015
6c3c00a
A bit of refactoring and cleanup
KrzysFR Jan 28, 2015
9024f98
Merge branch 'master' into futures_ng
KrzysFR Feb 11, 2015
988df11
Moved future related files into a subfolder
KrzysFR Feb 11, 2015
6da27bd
Future handles are now back to an IntPtr so that can't be compared wi…
KrzysFR Feb 12, 2015
0b5ae13
Added #if DEBUG_FUTURES to on/off custom logging in all the future ca…
KrzysFR Feb 12, 2015
b030e29
Detect when the main future callback is not called from the network t…
KrzysFR Feb 13, 2015
940dade
Switched over to the refac_tuples branch
KrzysFR Feb 16, 2015
79552df
Added the CancellationToken.RegisterWithoutEC() hack to not pay the c…
KrzysFR Feb 16, 2015
990b053
No custom CancellationToken for regular read operations on Snapshotte…
KrzysFR Feb 16, 2015
e845b03
FutureContext: make sure to remove the future from the context when i…
KrzysFR Feb 16, 2015
8eafc11
Merge branch 'refac_tuples' into futures_ng
KrzysFR Feb 20, 2015
fe67fa5
Slice: simplified SliceHelper to always call memmove and memcmp
KrzysFR Feb 23, 2015
4b72de6
Slice: fixed cases where count = 0
KrzysFR Feb 23, 2015
27eade2
Slice: made sure that the x64 code generated by the JIT is the fastes…
KrzysFR Feb 23, 2015
580ea33
Slice: using a singleton for Encoding.UTF8 + use direct string ctors …
KrzysFR Feb 23, 2015
9ec2c56
Merge branch 'refac_tuples' into futures_ng
KrzysFR Feb 28, 2015
f312f55
Merge branch 'refac_tuples' into futures_ng
KrzysFR Mar 6, 2015
1e0af93
Merge branch 'refac_tuples' into futures_ng
KrzysFR Mar 6, 2015
df01e4d
Merge branch 'refac_tuples' into futures_ng
KrzysFR Mar 6, 2015
08a9d60
Merge branch 'refac_tuples' into futures_ng
KrzysFR Mar 13, 2015
a35a15f
Merge branch 'master' into futures_ng
KrzysFR Apr 28, 2018
94baef7
Merge remote-tracking branch 'origin/master' into futures_ng
KrzysFR Jun 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions FoundationDB.Client/Core/IFdbTransactionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public interface IFdbTransactionHandler : IDisposable
void SetOption(FdbTransactionOption option, Slice data);

/// <summary>Returns this transaction snapshot read version.</summary>
/// <param name="ct">Token used to cancel the operation from the outside, if different than the cancellation token of the transaction itself</param>
Task<long> GetReadVersionAsync(CancellationToken ct);

/// <summary>Retrieves the database version number at which a given transaction was committed.</summary>
Expand Down
12 changes: 6 additions & 6 deletions FoundationDB.Client/FdbTransaction.Snapshot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Task<Slice> GetAsync(Slice key)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetAsync", $"Getting value for '{key.ToString()}'");
#endif

return m_parent.m_handler.GetAsync(key, snapshot: true, ct: m_parent.m_cancellation);
return m_parent.m_handler.GetAsync(key, snapshot: true, ct: CancellationToken.None);
}

public Task<Slice[]> GetValuesAsync(Slice[] keys)
Expand All @@ -116,7 +116,7 @@ public Task<Slice[]> GetValuesAsync(Slice[] keys)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetValuesAsync", $"Getting batch of {keys.Length} values ...");
#endif

return m_parent.m_handler.GetValuesAsync(keys, snapshot: true, ct: m_parent.m_cancellation);
return m_parent.m_handler.GetValuesAsync(keys, snapshot: true, ct: CancellationToken.None);
}

public async Task<Slice> GetKeyAsync(KeySelector selector)
Expand All @@ -129,7 +129,7 @@ public async Task<Slice> GetKeyAsync(KeySelector selector)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetKeyAsync", $"Getting key '{selector.ToString()}'");
#endif

var key = await m_parent.m_handler.GetKeyAsync(selector, snapshot: true, ct: m_parent.m_cancellation).ConfigureAwait(false);
var key = await m_parent.m_handler.GetKeyAsync(selector, snapshot: true, ct: CancellationToken.None).ConfigureAwait(false);

// don't forget to truncate keys that would fall outside of the database's globalspace !
return m_parent.m_database.BoundCheck(key);
Expand All @@ -149,7 +149,7 @@ public Task<Slice[]> GetKeysAsync(KeySelector[] selectors)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetKeysCoreAsync", $"Getting batch of {selectors.Length} keys ...");
#endif

return m_parent.m_handler.GetKeysAsync(selectors, snapshot: true, ct: m_parent.m_cancellation);
return m_parent.m_handler.GetKeysAsync(selectors, snapshot: true, ct: CancellationToken.None);
}

public Task<FdbRangeChunk> GetRangeAsync(KeySelector beginInclusive, KeySelector endExclusive, FdbRangeOptions options, int iteration)
Expand All @@ -165,7 +165,7 @@ public Task<FdbRangeChunk> GetRangeAsync(KeySelector beginInclusive, KeySelector
// The iteration value is only needed when in iterator mode, but then it should start from 1
if (iteration == 0) iteration = 1;

return m_parent.m_handler.GetRangeAsync(beginInclusive, endExclusive, options, iteration, snapshot: true, ct: m_parent.m_cancellation);
return m_parent.m_handler.GetRangeAsync(beginInclusive, endExclusive, options, iteration, snapshot: true, ct: CancellationToken.None);
}

public FdbRangeQuery<KeyValuePair<Slice, Slice>> GetRange(KeySelector beginInclusive, KeySelector endExclusive, FdbRangeOptions options)
Expand All @@ -178,7 +178,7 @@ public FdbRangeQuery<KeyValuePair<Slice, Slice>> GetRange(KeySelector beginInclu
public Task<string[]> GetAddressesForKeyAsync(Slice key)
{
EnsureCanRead();
return m_parent.m_handler.GetAddressesForKeyAsync(key, ct: m_parent.m_cancellation);
return m_parent.m_handler.GetAddressesForKeyAsync(key, CancellationToken.None);
}

void IFdbReadOnlyTransaction.Cancel()
Expand Down
81 changes: 66 additions & 15 deletions FoundationDB.Client/FdbTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public sealed partial class FdbTransaction : IFdbTransaction
/// <summary>Random token (but constant per transaction retry) used to generate incomplete VersionStamps</summary>
private ulong m_versionStampToken;

/// <summary>Used to cancel the transaction if the parent CTS fires</summary>
private CancellationTokenRegistration m_ctr;

#endregion

#region Constructors...
Expand All @@ -111,6 +114,15 @@ internal FdbTransaction(FdbDatabase db, FdbOperationContext context, int id, IFd

m_readOnly = (mode & FdbTransactionMode.ReadOnly) != 0;
m_handler = handler;

if (m_cancellation.IsCancellationRequested)
{ // already dead?
Cancel(explicitly: false);
}
else
{
m_ctr = m_cancellation.RegisterWithoutEC(CancellationHandler, this);
}
}

#endregion
Expand Down Expand Up @@ -263,7 +275,7 @@ public Task<long> GetReadVersionAsync()
// can be called after the transaction has been committed
EnsureCanRetry();

return m_handler.GetReadVersionAsync(m_cancellation);
return m_handler.GetReadVersionAsync(CancellationToken.None);
}

/// <summary>Retrieves the database version number at which a given transaction was committed.</summary>
Expand Down Expand Up @@ -386,7 +398,7 @@ public Task<Slice> GetAsync(Slice key)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetAsync", $"Getting value for '{key.ToString()}'");
#endif

return m_handler.GetAsync(key, snapshot: false, ct: m_cancellation);
return m_handler.GetAsync(key, snapshot: false, ct: CancellationToken.None);
}

#endregion
Expand All @@ -409,7 +421,7 @@ public Task<Slice[]> GetValuesAsync(Slice[] keys)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetValuesAsync", $"Getting batch of {keys.Length} values ...");
#endif

return m_handler.GetValuesAsync(keys, snapshot: false, ct: m_cancellation);
return m_handler.GetValuesAsync(keys, snapshot: false, ct: CancellationToken.None);
}

#endregion
Expand Down Expand Up @@ -439,7 +451,7 @@ public Task<FdbRangeChunk> GetRangeAsync(KeySelector beginInclusive, KeySelector
// The iteration value is only needed when in iterator mode, but then it should start from 1
if (iteration == 0) iteration = 1;

return m_handler.GetRangeAsync(beginInclusive, endExclusive, options, iteration, snapshot: false, ct: m_cancellation);
return m_handler.GetRangeAsync(beginInclusive, endExclusive, options, iteration, snapshot: false, ct: CancellationToken.None);
}

#endregion
Expand Down Expand Up @@ -492,7 +504,7 @@ public async Task<Slice> GetKeyAsync(KeySelector selector)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetKeyAsync", $"Getting key '{selector.ToString()}'");
#endif

var key = await m_handler.GetKeyAsync(selector, snapshot: false, ct: m_cancellation).ConfigureAwait(false);
var key = await m_handler.GetKeyAsync(selector, snapshot: false, ct: CancellationToken.None).ConfigureAwait(false);

// don't forget to truncate keys that would fall outside of the database's globalspace !
return m_database.BoundCheck(key);
Expand Down Expand Up @@ -520,7 +532,7 @@ public Task<Slice[]> GetKeysAsync(KeySelector[] selectors)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetKeysAsync", $"Getting batch of {selectors.Length} keys ...");
#endif

return m_handler.GetKeysAsync(selectors, snapshot: false, ct: m_cancellation);
return m_handler.GetKeysAsync(selectors, snapshot: false, ct: CancellationToken.None);
}

#endregion
Expand Down Expand Up @@ -727,7 +739,7 @@ public Task<string[]> GetAddressesForKeyAsync(Slice key)
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "GetAddressesForKeyAsync", $"Getting addresses for key '{FdbKey.Dump(key)}'");
#endif

return m_handler.GetAddressesForKeyAsync(key, ct: m_cancellation);
return m_handler.GetAddressesForKeyAsync(key, CancellationToken.None);
}

#endregion
Expand All @@ -750,7 +762,7 @@ public async Task CommitAsync()
//TODO: need a STATE_COMMITTING ?
try
{
await m_handler.CommitAsync(m_cancellation).ConfigureAwait(false);
await m_handler.CommitAsync(CancellationToken.None).ConfigureAwait(false);

if (Interlocked.CompareExchange(ref m_state, STATE_COMMITTED, STATE_READY) == STATE_READY)
{
Expand Down Expand Up @@ -817,7 +829,7 @@ public async Task OnErrorAsync(FdbError code)
{
EnsureCanRetry();

await m_handler.OnErrorAsync(code, ct: m_cancellation).ConfigureAwait(false);
await m_handler.OnErrorAsync(code, CancellationToken.None).ConfigureAwait(false);

// If fdb_transaction_on_error succeeds, that means that the transaction has been reset and is usable again
var state = this.State;
Expand Down Expand Up @@ -875,19 +887,47 @@ public void Reset()

/// <summary>Rollback this transaction, and dispose it. It should not be used after that.</summary>
public void Cancel()
{
Cancel(explicitly: true);
}

private void Cancel(bool explicitly)
{
var state = Interlocked.CompareExchange(ref m_state, STATE_CANCELED, STATE_READY);
if (state != STATE_READY)
{
switch(state)
if (explicitly)
{
case STATE_CANCELED: return; // already the case !
switch (state)
{
case STATE_CANCELED:
{
return; // already the case!
}
case STATE_COMMITTED:
{
throw new InvalidOperationException("Cannot cancel transaction that has already been committed");
}
case STATE_FAILED:
{
throw new InvalidOperationException("Cannot cancel transaction because it is in a failed state");
}
case STATE_DISPOSED:
{
throw new ObjectDisposedException("FdbTransaction", "Cannot cancel transaction because it already has been disposed");
}
default:
{
throw new InvalidOperationException(String.Format("Cannot cancel transaction because it is in unknown state {0}", state));
}
}
}

case STATE_COMMITTED: throw new InvalidOperationException("Cannot cancel transaction that has already been committed");
case STATE_FAILED: throw new InvalidOperationException("Cannot cancel transaction because it is in a failed state");
case STATE_DISPOSED: throw new ObjectDisposedException("FdbTransaction", "Cannot cancel transaction because it already has been disposed");
default: throw new InvalidOperationException($"Cannot cancel transaction because it is in unknown state {state}");
if (state == STATE_CANCELED || state == STATE_DISPOSED)
{ // it's too late
return;
}

}

if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "Cancel", "Canceling transaction...");
Expand All @@ -897,6 +937,16 @@ public void Cancel()
if (Logging.On && Logging.IsVerbose) Logging.Verbose(this, "Cancel", "Transaction has been canceled");
}

private static readonly Action<object> CancellationHandler = CancellationCallback;

/// <summary>Handler called when the cancellation source of the transaction fires</summary>
private static void CancellationCallback(object state)
{
Contract.Requires(state != null);
var trans = (FdbTransaction) state;
trans.Cancel(explicitly: false);
}

#endregion

#region IDisposable...
Expand Down Expand Up @@ -1014,6 +1064,7 @@ public void Dispose()
{
try
{
m_ctr.Dispose();
this.Database.UnregisterTransaction(this);
m_cts.SafeCancelAndDispose();

Expand Down
9 changes: 8 additions & 1 deletion FoundationDB.Client/FdbWatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,13 @@ public TaskAwaiter<Slice> GetAwaiter()

if (m_future != null)
{
#if REFACTORING_IN_PROGRESS
if (m_future.HasFlag(FdbFuture.Flags.DISPOSED))
{
throw new ObjectDisposedException("Cannot await a watch that has already been disposed");
}

#endif
return m_future.Task.GetAwaiter();
}
throw new InvalidOperationException("Cannot await an empty watch");
Expand All @@ -100,7 +103,11 @@ public void Cancel()
/// <summary>Dispose the resources allocated by the watch.</summary>
public void Dispose()
{
m_future?.Dispose();
if (m_future != null)
{
//TODO: what should be do? (=> cancel the future?)
//m_future.Dispose();
}
}

public override string ToString()
Expand Down
Loading