Skip to content

Commit 89079b1

Browse files
jstewart148rozza
authored andcommitted
Avoid calling batch cursor when the cursor is closed
JAVA-3710
1 parent 1893c78 commit 89079b1

File tree

4 files changed

+11
-5
lines changed

4 files changed

+11
-5
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AbstractSubscription.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ public void request(final long n) {
7979
+ "argument is <= 0."));
8080
return;
8181
}
82+
if (isTerminated()) {
83+
return;
84+
}
8285

8386
boolean requestData = false;
8487
synchronized (this) {
@@ -142,7 +145,7 @@ void onError(final Throwable t) {
142145
throw MongoException.fromThrowableNonNull(t1);
143146
}
144147
} else {
145-
throw MongoException.fromThrowableNonNull(t);
148+
throw new MongoException("Subscription has already been terminated", t);
146149
}
147150
}
148151

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoIterableSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ void postTerminate() {
7474
void requestMoreData() {
7575
boolean mustRead = false;
7676
synchronized (this) {
77-
if (!isReading && !isTerminated() && batchCursor != null) {
77+
if (!isReading && !isTerminated() && batchCursor != null && !batchCursor.isClosed()) {
7878
isReading = true;
7979
mustRead = true;
8080
}

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/FlatteningSingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,8 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
265265

266266
then:
267267
def ex = thrown(MongoException)
268-
ex.message == 'exception calling onComplete'
268+
ex.message == 'Subscription has already been terminated'
269+
ex.cause.cause.message == 'exception calling onComplete'
269270
subscriber.assertTerminalEvent()
270271
subscriber.assertNoErrors()
271272
}
@@ -298,7 +299,8 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
298299

299300
then:
300301
def ex = thrown(MongoException)
301-
ex.message == 'exception calling onError'
302+
ex.message == 'Subscription has already been terminated'
303+
ex.cause.cause.message == 'exception calling onError'
302304
subscriber.assertTerminalEvent()
303305
subscriber.assertErrored()
304306
}

driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/SingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
302302
def ex = thrown(MongoException)
303303
subscriber.assertNoErrors()
304304
subscriber.assertTerminalEvent()
305-
ex.message == 'exception calling onComplete'
305+
ex.message == 'Subscription has already been terminated'
306+
ex.cause.cause.message == 'exception calling onComplete'
306307
}
307308

308309
def 'should throw the exception if calling onError raises one'() {

0 commit comments

Comments
 (0)