Skip to content

Commit 45f0335

Browse files
committed
JAVA-2063: Obey ordered property in WriteProtocol when write concern is unacknowledged
1 parent 2df2adf commit 45f0335

File tree

3 files changed

+80
-13
lines changed

3 files changed

+80
-13
lines changed

driver-core/src/main/com/mongodb/connection/WriteProtocol.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public WriteConcernResult execute(final InternalConnection connection) {
9191
sentCommandStartedEvent = true;
9292
}
9393
messageId = nextMessage.getId();
94-
if (shouldAcknowledge(encodingMetadata)) {
94+
if (shouldAcknowledge(encodingMetadata.getNextMessage())) {
9595
CommandMessage getLastErrorMessage = new CommandMessage(new MongoNamespace(getNamespace().getDatabaseName(),
9696
COMMAND_COLLECTION_NAME).getFullName(),
9797
createGetLastErrorCommandDocument(), false,
@@ -110,7 +110,7 @@ public WriteConcernResult execute(final InternalConnection connection) {
110110
bsonOutput.close();
111111
}
112112

113-
if (shouldAcknowledge(encodingMetadata)) {
113+
if (shouldAcknowledge(encodingMetadata.getNextMessage())) {
114114
ResponseBuffers responseBuffers = null;
115115
try {
116116
responseBuffers = connection.receiveMessage(messageId);
@@ -128,7 +128,7 @@ public WriteConcernResult execute(final InternalConnection connection) {
128128
}
129129
if (writeConcern.isAcknowledged()) {
130130
throw e;
131-
} else {
131+
} else if (ordered) {
132132
break;
133133
}
134134
} catch (RuntimeException e) {
@@ -184,7 +184,7 @@ public void executeAsync(final InternalConnection connection, final SingleResult
184184
ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(connection);
185185
RequestMessage requestMessage = createRequestMessage(getMessageSettings(connection.getDescription()));
186186
RequestMessage nextMessage = encodeMessage(requestMessage, bsonOutput);
187-
if (writeConcern.isAcknowledged()) {
187+
if (shouldAcknowledge(nextMessage)) {
188188
CommandMessage getLastErrorMessage = new CommandMessage(new MongoNamespace(getNamespace().getDatabaseName(),
189189
COMMAND_COLLECTION_NAME).getFullName(),
190190
createGetLastErrorCommandDocument(), false,
@@ -233,8 +233,8 @@ protected BsonDocument getBaseCommandDocument() {
233233
protected abstract String getCommandName();
234234

235235

236-
private boolean shouldAcknowledge(final RequestMessage.EncodingMetadata encodingMetadata) {
237-
return writeConcern.isAcknowledged() || (isOrdered() && encodingMetadata.getNextMessage() != null);
236+
private boolean shouldAcknowledge(final RequestMessage nextMessage) {
237+
return writeConcern.isAcknowledged() || (isOrdered() && nextMessage != null);
238238
}
239239

240240
private BsonDocument createGetLastErrorCommandDocument() {

driver-core/src/main/com/mongodb/connection/WriteResultCallback.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.WriteConcern;
21+
import com.mongodb.WriteConcernException;
2122
import com.mongodb.WriteConcernResult;
2223
import com.mongodb.async.SingleResultCallback;
2324
import org.bson.BsonDocument;
@@ -52,9 +53,19 @@ protected void callCallback(final BsonDocument result, final Throwable t) {
5253
callback.onResult(null, t);
5354
} else {
5455
try {
55-
WriteConcernResult writeConcernResult = ProtocolHelper.getWriteResult(result,
56-
connection.getDescription().getServerAddress());
57-
if (nextMessage != null) {
56+
WriteConcernResult writeConcernResult = null;
57+
boolean shouldWriteNextMessage = true;
58+
try {
59+
writeConcernResult = ProtocolHelper.getWriteResult(result, connection.getDescription().getServerAddress());
60+
} catch (WriteConcernException e) {
61+
if (writeConcern.isAcknowledged()) {
62+
throw e;
63+
}
64+
if (ordered) {
65+
shouldWriteNextMessage = false;
66+
}
67+
}
68+
if (shouldWriteNextMessage && nextMessage != null) {
5869
new GenericWriteProtocol(namespace, nextMessage, ordered, writeConcern).executeAsync(connection, callback);
5970
} else {
6071
callback.onResult(writeConcernResult, null);

driver-core/src/test/functional/com/mongodb/connection/WriteProtocolSpecification.groovy

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ import org.bson.BsonDocument
2828
import org.bson.BsonInt32
2929
import org.bson.BsonString
3030
import org.bson.codecs.BsonDocumentCodec
31+
import spock.lang.IgnoreIf
3132
import spock.lang.Shared
3233

3334
import static com.mongodb.ClusterFixture.getCredentialList
3435
import static com.mongodb.ClusterFixture.getPrimary
3536
import static com.mongodb.ClusterFixture.getSslSettings
37+
import static com.mongodb.ClusterFixture.isSharded
3638
import static com.mongodb.WriteConcern.ACKNOWLEDGED
3739
import static com.mongodb.WriteConcern.UNACKNOWLEDGED
3840
import static com.mongodb.connection.ProtocolTestHelper.execute
@@ -104,6 +106,7 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
104106
async << [false, true]
105107
}
106108

109+
@IgnoreIf({ isSharded() })
107110
def 'should execute split unacknowledged inserts'() {
108111
given:
109112
def binary = new BsonBinary(new byte[15000000])
@@ -150,7 +153,9 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
150153
where:
151154
async << [false, true]
152155
}
153-
def 'should ignore write errors on split unacknowledged inserts'() {
156+
157+
@IgnoreIf({ isSharded() })
158+
def 'should stop writing on write error when an ordered unacknowledged inserts must be split'() {
154159
given:
155160
def binary = new BsonBinary(new byte[15000000])
156161
def documentOne = new BsonDocument('_id', new BsonInt32(1)).append('b', binary)
@@ -166,14 +171,40 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
166171

167172
when:
168173
execute(protocol, connection, async)
174+
// force acknowledgement
175+
new CommandProtocol(getDatabaseName(), new BsonDocument('ping', new BsonInt32(1)),
176+
new NoOpFieldNameValidator(), new BsonDocumentCodec()).execute(connection)
169177

170178
then:
171179
getCollectionHelper().count() == 1
172180

173-
cleanup:
181+
where:
182+
async << [false, true]
183+
}
184+
185+
@IgnoreIf({ isSharded() })
186+
def 'should continue writing on write error when an unordered unacknowledged inserts must be split'() {
187+
given:
188+
def binary = new BsonBinary(new byte[15000000])
189+
def documentOne = new BsonDocument('_id', new BsonInt32(1)).append('b', binary)
190+
def documentTwo = new BsonDocument('_id', new BsonInt32(2)).append('b', binary)
191+
def documentThree = new BsonDocument('_id', new BsonInt32(3)).append('b', binary)
192+
def documentFour = new BsonDocument('_id', new BsonInt32(4)).append('b', binary)
193+
194+
def insertRequest = [new InsertRequest(documentOne), new InsertRequest(documentTwo),
195+
new InsertRequest(documentThree), new InsertRequest(documentFour)]
196+
def protocol = new InsertProtocol(getNamespace(), false, UNACKNOWLEDGED, insertRequest)
197+
198+
getCollectionHelper().insertDocuments(documentOne)
199+
200+
when:
201+
execute(protocol, connection, async)
174202
// force acknowledgement
175-
new CommandProtocol(getDatabaseName(), new BsonDocument('drop', new BsonString(getCollectionName())),
176-
new NoOpFieldNameValidator(), new BsonDocumentCodec()).execute(connection)
203+
new CommandProtocol(getDatabaseName(), new BsonDocument('ping', new BsonInt32(1)),
204+
new NoOpFieldNameValidator(), new BsonDocumentCodec()).execute(connection)
205+
206+
then:
207+
getCollectionHelper().count() == 4
177208

178209
where:
179210
async << [false, true]
@@ -203,4 +234,29 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
203234
where:
204235
async << [false, true]
205236
}
237+
238+
@IgnoreIf({ isSharded() })
239+
def 'should not report write errors on split unacknowledged inserts'() {
240+
given:
241+
def binary = new BsonBinary(new byte[15000000])
242+
def documentOne = new BsonDocument('_id', new BsonInt32(1)).append('b', binary)
243+
def documentTwo = new BsonDocument('_id', new BsonInt32(2)).append('b', binary)
244+
def documentThree = new BsonDocument('_id', new BsonInt32(3)).append('b', binary)
245+
def documentFour = new BsonDocument('_id', new BsonInt32(4)).append('b', binary)
246+
247+
def insertRequest = [new InsertRequest(documentOne), new InsertRequest(documentTwo),
248+
new InsertRequest(documentThree), new InsertRequest(documentFour)]
249+
def protocol = new InsertProtocol(getNamespace(), true, UNACKNOWLEDGED, insertRequest)
250+
251+
getCollectionHelper().insertDocuments(documentOne)
252+
253+
when:
254+
execute(protocol, connection, async)
255+
256+
then:
257+
getCollectionHelper().count() == 1
258+
259+
where:
260+
async << [false, true]
261+
}
206262
}

0 commit comments

Comments
 (0)