Skip to content

Commit 8a3c72d

Browse files
committed
JAVA-1994: Use already-retained AsyncConnection to asynchronously kill cursor, instead of checking out a new one while the existing one has not yet been released.
This avoids unnecessary pressure on the connection pool.
1 parent 37f5041 commit 8a3c72d

12 files changed

+103
-180
lines changed

driver-core/src/main/com/mongodb/operation/AggregateOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final Async
298298
@Override
299299
public AsyncBatchCursor<T> apply(final BsonDocument result) {
300300
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
301-
return new AsyncQueryBatchCursor<T>(queryResult, 0, batchSize != null ? batchSize : 0, decoder, source);
301+
return new AsyncQueryBatchCursor<T>(queryResult, 0, batchSize != null ? batchSize : 0, decoder, source, connection);
302302
}
303303
};
304304
}

driver-core/src/main/com/mongodb/operation/AsyncQueryBatchCursor.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ class AsyncQueryBatchCursor<T> implements AsyncBatchCursor<T> {
4747

4848
AsyncQueryBatchCursor(final QueryResult<T> firstBatch, final int limit, final int batchSize,
4949
final Decoder<T> decoder) {
50-
this(firstBatch, limit, batchSize, decoder, null);
50+
this(firstBatch, limit, batchSize, decoder, null, null);
5151
}
5252

5353
AsyncQueryBatchCursor(final QueryResult<T> firstBatch, final int limit, final int batchSize,
54-
final Decoder<T> decoder, final AsyncConnectionSource connectionSource) {
54+
final Decoder<T> decoder, final AsyncConnectionSource connectionSource, final AsyncConnection connection) {
5555
this.namespace = firstBatch.getNamespace();
5656
this.firstBatch = firstBatch;
5757
this.limit = limit;
@@ -60,6 +60,7 @@ class AsyncQueryBatchCursor<T> implements AsyncBatchCursor<T> {
6060
this.cursor = firstBatch.getCursor();
6161
if (this.cursor != null) {
6262
notNull("connectionSource", connectionSource);
63+
notNull("connection", connection);
6364
}
6465
if (connectionSource != null) {
6566
this.connectionSource = connectionSource.retain();
@@ -68,15 +69,15 @@ class AsyncQueryBatchCursor<T> implements AsyncBatchCursor<T> {
6869
}
6970
this.count += firstBatch.getResults().size();
7071
if (limitReached()) {
71-
killCursor();
72+
killCursor(connection);
7273
}
7374
}
7475

7576
@Override
7677
public void close() {
7778
if (!closed) {
7879
closed = true;
79-
killCursor();
80+
killCursor(null);
8081
}
8182
}
8283

@@ -133,32 +134,42 @@ public void onResult(final AsyncConnection connection, final Throwable t) {
133134
});
134135
}
135136

136-
private void killCursor() {
137+
private void killCursor(final AsyncConnection connection) {
137138
if (cursor != null) {
138139
final ServerCursor localCursor = cursor;
139140
final AsyncConnectionSource localConnectionSource = connectionSource;
140141
cursor = null;
141142
connectionSource = null;
142-
localConnectionSource.getConnection(new SingleResultCallback<AsyncConnection>() {
143-
@Override
144-
public void onResult(final AsyncConnection connection, final Throwable connectionException) {
145-
if (connection != null) {
146-
connection.killCursorAsync(namespace, singletonList(localCursor.getId()), new SingleResultCallback<Void>() {
147-
@Override
148-
public void onResult(final Void result, final Throwable t) {
149-
connection.release();
150-
localConnectionSource.release();
151-
}
152-
});
143+
if (connection != null) {
144+
connection.retain();
145+
killCursorAsynchronouslyAndReleaseConnectionAndSource(connection, localCursor, localConnectionSource);
146+
} else {
147+
localConnectionSource.getConnection(new SingleResultCallback<AsyncConnection>() {
148+
@Override
149+
public void onResult(final AsyncConnection connection, final Throwable connectionException) {
150+
if (connectionException == null) {
151+
killCursorAsynchronouslyAndReleaseConnectionAndSource(connection, localCursor, localConnectionSource);
152+
}
153153
}
154-
}
155-
});
154+
});
155+
}
156156
} else if (connectionSource != null) {
157157
connectionSource.release();
158158
connectionSource = null;
159159
}
160160
}
161161

162+
private void killCursorAsynchronouslyAndReleaseConnectionAndSource(final AsyncConnection connection, final ServerCursor localCursor,
163+
final AsyncConnectionSource localConnectionSource) {
164+
connection.killCursorAsync(namespace, singletonList(localCursor.getId()), new SingleResultCallback<Void>() {
165+
@Override
166+
public void onResult(final Void result, final Throwable t) {
167+
connection.release();
168+
localConnectionSource.release();
169+
}
170+
});
171+
}
172+
162173
private class QueryResultSingleResultCallback implements SingleResultCallback<QueryResult<T>> {
163174
private final AsyncConnection connection;
164175
private final SingleResultCallback<List<T>> callback;
@@ -181,7 +192,7 @@ public void onResult(final QueryResult<T> result, final Throwable t) {
181192
cursor = result.getCursor();
182193
count += result.getResults().size();
183194
if (limitReached()) {
184-
killCursor();
195+
killCursor(connection);
185196
}
186197
connection.release();
187198
if (result.getResults().isEmpty()) {

driver-core/src/main/com/mongodb/operation/DistinctOperation.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void call(final AsyncConnectionSource source, final AsyncConnection conne
141141
errorHandlingCallback(callback).onResult(null, t);
142142
} else {
143143
executeWrappedCommandProtocolAsync(binding, namespace.getDatabaseName(), getCommand(), createCommandDecoder(),
144-
connection, asyncTransformer(source, connection),
144+
connection, asyncTransformer(connection.getDescription()),
145145
releasingCallback(errorHandlingCallback(callback), source, connection));
146146
}
147147
}
@@ -167,13 +167,12 @@ public BatchCursor<T> apply(final BsonDocument result) {
167167
};
168168
}
169169

170-
private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final AsyncConnectionSource source,
171-
final AsyncConnection connection) {
170+
private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final ConnectionDescription connectionDescription) {
172171
return new Function<BsonDocument, AsyncBatchCursor<T>>() {
173172
@Override
174173
public AsyncBatchCursor<T> apply(final BsonDocument result) {
175-
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
176-
return new AsyncQueryBatchCursor<T>(queryResult, 0, 0, null, source);
174+
QueryResult<T> queryResult = createQueryResult(result, connectionDescription);
175+
return new AsyncQueryBatchCursor<T>(queryResult, 0, 0, null);
177176
}
178177
};
179178
}

driver-core/src/main/com/mongodb/operation/FindOperation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,9 @@ public void onResult(final QueryResult<T> result, final Throwable t) {
431431
if (t != null) {
432432
wrappedCallback.onResult(null, t);
433433
} else {
434-
wrappedCallback.onResult(new AsyncQueryBatchCursor<T>(result, limit, batchSize, decoder, source), null);
434+
wrappedCallback.onResult(new AsyncQueryBatchCursor<T>(result, limit, batchSize, decoder, source,
435+
connection),
436+
null);
435437
}
436438
}
437439
});

driver-core/src/main/com/mongodb/operation/FindOperationHelper.java

Lines changed: 0 additions & 112 deletions
This file was deleted.

driver-core/src/main/com/mongodb/operation/ListCollectionsOperation.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public void call(final AsyncConnectionSource source, final AsyncConnection conne
202202
source, connection);
203203
if (serverIsAtLeastVersionThreeDotZero(connection.getDescription())) {
204204
executeWrappedCommandProtocolAsync(binding, databaseName, getCommand(), createCommandDecoder(),
205-
connection, asyncTransformer(source),
205+
connection, asyncTransformer(source, connection),
206206
new SingleResultCallback<AsyncBatchCursor<T>>() {
207207
@Override
208208
public void onResult(final AsyncBatchCursor<T> result, final Throwable t) {
@@ -224,7 +224,7 @@ public void onResult(final QueryResult<BsonDocument> result, final Throwable t)
224224
} else {
225225
wrappedCallback.onResult(new ProjectingAsyncBatchCursor(
226226
new AsyncQueryBatchCursor<BsonDocument>(result, 0,
227-
batchSize, new BsonDocumentCodec(), source)
227+
batchSize, new BsonDocumentCodec(), source, connection)
228228
), null);
229229
}
230230
}
@@ -243,11 +243,12 @@ private MongoNamespace createNamespace() {
243243
return new MongoNamespace(databaseName, "$cmd.listCollections");
244244
}
245245

246-
private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final AsyncConnectionSource source) {
246+
private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final AsyncConnectionSource source,
247+
final AsyncConnection connection) {
247248
return new Function<BsonDocument, AsyncBatchCursor<T>>() {
248249
@Override
249250
public AsyncBatchCursor<T> apply(final BsonDocument result) {
250-
return cursorDocumentToAsyncBatchCursor(result.getDocument("cursor"), decoder, source, batchSize);
251+
return cursorDocumentToAsyncBatchCursor(result.getDocument("cursor"), decoder, source, connection, batchSize);
251252
}
252253
};
253254
}

driver-core/src/main/com/mongodb/operation/ListDatabasesOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final Async
136136
return new Function<BsonDocument, AsyncBatchCursor<T>>() {
137137
@Override
138138
public AsyncBatchCursor<T> apply(final BsonDocument result) {
139-
return new AsyncQueryBatchCursor<T>(createQueryResult(result, connection.getDescription()), 0, 0, decoder, source);
139+
return new AsyncQueryBatchCursor<T>(createQueryResult(result, connection.getDescription()), 0, 0, decoder, source,
140+
connection);
140141
}
141142
};
142143
}

driver-core/src/main/com/mongodb/operation/ListIndexesOperation.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void call(final AsyncConnectionSource source, final AsyncConnection conne
166166
source, connection);
167167
if (serverIsAtLeastVersionThreeDotZero(connection.getDescription())) {
168168
executeWrappedCommandProtocolAsync(binding, namespace.getDatabaseName(), getCommand(), createCommandDecoder(),
169-
connection, asyncTransformer(source),
169+
connection, asyncTransformer(source, connection),
170170
new SingleResultCallback<AsyncBatchCursor<T>>() {
171171
@Override
172172
public void onResult(final AsyncBatchCursor<T> result,
@@ -190,7 +190,8 @@ public void onResult(final QueryResult<T> result, final Throwable t) {
190190
if (t != null) {
191191
wrappedCallback.onResult(null, t);
192192
} else {
193-
wrappedCallback.onResult(new AsyncQueryBatchCursor<T>(result, 0, batchSize, decoder, source),
193+
wrappedCallback.onResult(new AsyncQueryBatchCursor<T>(result, 0, batchSize, decoder, source,
194+
connection),
194195
null);
195196
}
196197
}
@@ -238,11 +239,12 @@ public BatchCursor<T> apply(final BsonDocument result) {
238239
};
239240
}
240241

241-
private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final AsyncConnectionSource source) {
242+
private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final AsyncConnectionSource source,
243+
final AsyncConnection connection) {
242244
return new Function<BsonDocument, AsyncBatchCursor<T>>() {
243245
@Override
244246
public AsyncBatchCursor<T> apply(final BsonDocument result) {
245-
return cursorDocumentToAsyncBatchCursor(result.getDocument("cursor"), decoder, source, batchSize);
247+
return cursorDocumentToAsyncBatchCursor(result.getDocument("cursor"), decoder, source, connection, batchSize);
246248
}
247249
};
248250
}

driver-core/src/main/com/mongodb/operation/OperationHelper.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ static <T> BatchCursor<T> cursorDocumentToBatchCursor(final BsonDocument cursorD
9494
}
9595

9696
static <T> AsyncBatchCursor<T> cursorDocumentToAsyncBatchCursor(final BsonDocument cursorDocument, final Decoder<T> decoder,
97-
final AsyncConnectionSource source, final int batchSize) {
97+
final AsyncConnectionSource source, final AsyncConnection connection,
98+
final int batchSize) {
9899
return new AsyncQueryBatchCursor<T>(OperationHelper.<T>cursorDocumentToQueryResult(cursorDocument,
99100
source.getServerDescription().getAddress()),
100-
0, batchSize, decoder, source);
101+
0, batchSize, decoder, source, connection);
101102
}
102103

103104

0 commit comments

Comments
 (0)