Skip to content

Commit 2df2adf

Browse files
committed
JAVA-2059: For generic command, legacy write, and legacy getMore protocols, properly publish command failed event on connection failure
1 parent 2ac1ecf commit 2df2adf

File tree

6 files changed

+199
-37
lines changed

6 files changed

+199
-37
lines changed

driver-core/src/main/com/mongodb/connection/CommandProtocol.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,10 @@ public T execute(final InternalConnection connection) {
103103
connection.getDescription().getServerAddress()));
104104
}
105105
long startTimeNanos = System.nanoTime();
106-
CommandMessage commandMessage = null;
106+
CommandMessage commandMessage = new CommandMessage(namespace.getFullName(), command, slaveOk, fieldNameValidator,
107+
ProtocolHelper.getMessageSettings(connection.getDescription()));
107108
try {
108-
commandMessage = sendMessage(connection);
109+
sendMessage(commandMessage, connection);
109110
ResponseBuffers responseBuffers = connection.receiveMessage(commandMessage.getId());
110111
ReplyMessage<BsonDocument> replyMessage;
111112
try {
@@ -176,11 +177,9 @@ private String getCommandName() {
176177
return commandName != null ? commandName : command.keySet().iterator().next();
177178
}
178179

179-
private CommandMessage sendMessage(final InternalConnection connection) {
180+
private void sendMessage(final CommandMessage message, final InternalConnection connection) {
180181
ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(connection);
181182
try {
182-
CommandMessage message = new CommandMessage(namespace.getFullName(), command, slaveOk, fieldNameValidator,
183-
ProtocolHelper.getMessageSettings(connection.getDescription()));
184183
int documentPosition = message.encodeWithMetadata(bsonOutput).getFirstDocumentPosition();
185184
if (commandListener != null) {
186185
ByteBufBsonDocument byteBufBsonDocument = createOne(bsonOutput, documentPosition);
@@ -199,7 +198,6 @@ private CommandMessage sendMessage(final InternalConnection connection) {
199198
}
200199

201200
connection.sendMessage(bsonOutput.getByteBuffers(), message.getId());
202-
return message;
203201
} finally {
204202
bsonOutput.close();
205203
}

driver-core/src/main/com/mongodb/connection/GetMoreProtocol.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ public QueryResult<T> execute(final InternalConnection connection) {
8080
connection.getDescription().getServerAddress()));
8181
}
8282
long startTimeNanos = System.nanoTime();
83-
GetMoreMessage message = null;
83+
GetMoreMessage message = new GetMoreMessage(namespace.getFullName(), cursorId, numberToReturn);
8484
QueryResult<T> queryResult = null;
8585
try {
86-
message = sendMessage(connection);
86+
sendMessage(message, connection);
8787
ResponseBuffers responseBuffers = connection.receiveMessage(message.getId());
8888
try {
8989
if (responseBuffers.getReplyHeader().isCursorNotFound()) {
@@ -149,17 +149,15 @@ public void setCommandListener(final CommandListener commandListener) {
149149
this.commandListener = commandListener;
150150
}
151151

152-
private GetMoreMessage sendMessage(final InternalConnection connection) {
152+
private void sendMessage(final GetMoreMessage message, final InternalConnection connection) {
153153
ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(connection);
154154
try {
155-
GetMoreMessage message = new GetMoreMessage(namespace.getFullName(), cursorId, numberToReturn);
156155
if (commandListener != null) {
157156
sendCommandStartedEvent(message, namespace.getDatabaseName(), COMMAND_NAME, asGetMoreCommandDocument(),
158157
connection.getDescription(), commandListener);
159158
}
160159
message.encode(bsonOutput);
161160
connection.sendMessage(bsonOutput.getByteBuffers(), message.getId());
162-
return message;
163161
} finally {
164162
bsonOutput.close();
165163
}

driver-core/src/main/com/mongodb/connection/WriteProtocol.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ public WriteConcernResult execute(final InternalConnection connection) {
111111
}
112112

113113
if (shouldAcknowledge(encodingMetadata)) {
114-
ResponseBuffers responseBuffers = connection.receiveMessage(messageId);
114+
ResponseBuffers responseBuffers = null;
115115
try {
116+
responseBuffers = connection.receiveMessage(messageId);
116117
ReplyMessage<BsonDocument> replyMessage = new ReplyMessage<BsonDocument>(responseBuffers, new BsonDocumentCodec(),
117118
messageId);
118119
writeConcernResult = ProtocolHelper.getWriteResult(replyMessage.getDocuments().get(0),
@@ -136,7 +137,9 @@ public WriteConcernResult execute(final InternalConnection connection) {
136137
}
137138
throw e;
138139
} finally {
139-
responseBuffers.close();
140+
if (responseBuffers != null) {
141+
responseBuffers.close();
142+
}
140143
}
141144
}
142145

driver-core/src/test/functional/com/mongodb/connection/TestCommandListener.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,27 +102,36 @@ public void eventsWereDelivered(final List<CommandEvent> expectedEvents) {
102102
CommandEvent actual = events.get(i);
103103
CommandEvent expected = expectedEvents.get(i);
104104

105-
assertEquals(expected.getClass(), actual.getClass());
106-
107105
if (actual instanceof CommandStartedEvent) {
108106
currentlyExpectedRequestId = actual.getRequestId();
109107
} else {
110108
assertEquals(currentlyExpectedRequestId, actual.getRequestId());
111109
}
112110

113-
assertEquals(expected.getConnectionDescription(), actual.getConnectionDescription());
111+
assertEventEquivalence(actual, expected);
112+
}
113+
}
114114

115-
assertEquals(expected.getCommandName(), actual.getCommandName());
115+
public void eventWasDelivered(final CommandEvent expectedEvent, final int index) {
116+
assertTrue(events.size() > index);
117+
assertEventEquivalence(events.get(index), expectedEvent);
118+
}
116119

117-
if (actual.getClass().equals(CommandStartedEvent.class)) {
118-
assertEquivalence((CommandStartedEvent) actual, (CommandStartedEvent) expected);
119-
} else if (actual.getClass().equals(CommandSucceededEvent.class)) {
120-
assertEquivalence((CommandSucceededEvent) actual, (CommandSucceededEvent) expected);
121-
} else if (actual.getClass().equals(CommandFailedEvent.class)) {
122-
assertEquivalence((CommandFailedEvent) actual, (CommandFailedEvent) expected);
123-
} else {
124-
throw new UnsupportedOperationException("Unsupported event type: " + actual.getClass());
125-
}
120+
private void assertEventEquivalence(final CommandEvent actual, final CommandEvent expected) {
121+
assertEquals(expected.getClass(), actual.getClass());
122+
123+
assertEquals(expected.getConnectionDescription(), actual.getConnectionDescription());
124+
125+
assertEquals(expected.getCommandName(), actual.getCommandName());
126+
127+
if (actual.getClass().equals(CommandStartedEvent.class)) {
128+
assertEquivalence((CommandStartedEvent) actual, (CommandStartedEvent) expected);
129+
} else if (actual.getClass().equals(CommandSucceededEvent.class)) {
130+
assertEquivalence((CommandSucceededEvent) actual, (CommandSucceededEvent) expected);
131+
} else if (actual.getClass().equals(CommandFailedEvent.class)) {
132+
assertEquivalence((CommandFailedEvent) actual, (CommandFailedEvent) expected);
133+
} else {
134+
throw new UnsupportedOperationException("Unsupported event type: " + actual.getClass());
126135
}
127136
}
128137

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2015 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.mongodb.connection
19+
20+
import com.mongodb.MongoNamespace
21+
import com.mongodb.MongoSocketReadException
22+
import com.mongodb.MongoSocketWriteException
23+
import com.mongodb.ServerAddress
24+
import com.mongodb.bulk.DeleteRequest
25+
import com.mongodb.event.CommandFailedEvent
26+
import com.mongodb.internal.validator.NoOpFieldNameValidator
27+
import org.bson.BsonDocument
28+
import org.bson.BsonInt32
29+
import org.bson.codecs.DocumentCodec
30+
import spock.lang.Shared
31+
import spock.lang.Specification
32+
33+
import static com.mongodb.WriteConcern.ACKNOWLEDGED
34+
import static com.mongodb.WriteConcern.UNACKNOWLEDGED
35+
import static com.mongodb.connection.ProtocolTestHelper.execute
36+
37+
class CommandEventOnConnectionFailureSpecification extends Specification {
38+
39+
@Shared
40+
def namespace = new MongoNamespace('test.test')
41+
private TestInternalConnection connection;
42+
43+
def setup() {
44+
connection = new TestInternalConnection(new ServerId(new ClusterId(), new ServerAddress()));
45+
}
46+
47+
def 'should publish failed command event when sendMessage throws exception'() {
48+
String commandName = protocolInfo[0]
49+
Protocol protocol = protocolInfo[1]
50+
51+
def commandListener = new TestCommandListener()
52+
protocol.commandListener = commandListener
53+
connection.enqueueSendMessageException(new MongoSocketWriteException('Failure', new ServerAddress(), new IOException()));
54+
55+
when:
56+
execute(protocol, connection, async)
57+
58+
then:
59+
def e = thrown(MongoSocketWriteException)
60+
commandListener.events.size() == 2
61+
commandListener.eventWasDelivered(new CommandFailedEvent(1, connection.getDescription(), commandName, 0, e), 1)
62+
63+
where:
64+
[protocolInfo, async] << [[['ping',
65+
new CommandProtocol('admin', new BsonDocument('ping', new BsonInt32(1)),
66+
new NoOpFieldNameValidator(), new DocumentCodec())],
67+
['killCursors',
68+
new KillCursorProtocol(namespace, [42L])],
69+
['getMore',
70+
new GetMoreProtocol(namespace, 42L, 1, new DocumentCodec())],
71+
['find',
72+
new QueryProtocol(namespace, 0, 1, 1, new BsonDocument(), new BsonDocument(), new DocumentCodec())],
73+
['delete',
74+
new DeleteCommandProtocol(namespace, true, ACKNOWLEDGED,
75+
[new DeleteRequest(new BsonDocument('_id', new BsonInt32(1)))])],
76+
['delete',
77+
new DeleteProtocol(namespace, true, ACKNOWLEDGED,
78+
[new DeleteRequest(new BsonDocument('_id', new BsonInt32(1)))])],
79+
['delete',
80+
new DeleteProtocol(namespace, true, UNACKNOWLEDGED,
81+
[new DeleteRequest(new BsonDocument('_id', new BsonInt32(1)))])],
82+
],
83+
[false]].combinations()
84+
}
85+
86+
def 'should publish failed command event when receiveMessage throws exception'() {
87+
String commandName = protocolInfo[0]
88+
Protocol protocol = protocolInfo[1]
89+
90+
def commandListener = new TestCommandListener()
91+
protocol.commandListener = commandListener
92+
connection.enqueueReceiveMessageException(new MongoSocketReadException('Failure', new ServerAddress(), new IOException()));
93+
94+
when:
95+
execute(protocol, connection, async)
96+
97+
then:
98+
def e = thrown(MongoSocketReadException)
99+
commandListener.events.size() == 2
100+
commandListener.eventWasDelivered(new CommandFailedEvent(1, connection.getDescription(), commandName, 0, e), 1)
101+
102+
where:
103+
[protocolInfo, async] << [[['ping',
104+
new CommandProtocol('admin', new BsonDocument('ping', new BsonInt32(1)),
105+
new NoOpFieldNameValidator(), new DocumentCodec())],
106+
['getMore',
107+
new GetMoreProtocol(namespace, 42L, 1, new DocumentCodec())],
108+
['find',
109+
new QueryProtocol(namespace, 0, 1, 1, new BsonDocument(), new BsonDocument(), new DocumentCodec())],
110+
['delete',
111+
new DeleteCommandProtocol(namespace, true, ACKNOWLEDGED,
112+
[new DeleteRequest(new BsonDocument('_id', new BsonInt32(1)))])],
113+
['delete',
114+
new DeleteProtocol(namespace, true, ACKNOWLEDGED,
115+
[new DeleteRequest(new BsonDocument('_id', new BsonInt32(1)))])],
116+
],
117+
[false]].combinations()
118+
}
119+
}

driver-core/src/test/unit/com/mongodb/connection/TestInternalConnection.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,16 @@
3030
import java.util.List;
3131

3232
class TestInternalConnection implements InternalConnection {
33+
34+
private static class Interaction {
35+
private ResponseBuffers responseBuffers;
36+
private RuntimeException receiveException;
37+
private RuntimeException sendException;
38+
}
39+
3340
private final ConnectionDescription description;
3441
private final BufferProvider bufferProvider;
35-
private final Deque<ResponseBuffers> replies;
42+
private final Deque<Interaction> replies;
3643
private final List<BsonInput> sent;
3744
private boolean opened;
3845
private boolean closed;
@@ -41,12 +48,26 @@ public TestInternalConnection(final ServerId serverId) {
4148
this.description = new ConnectionDescription(serverId);
4249
this.bufferProvider = new SimpleBufferProvider();
4350

44-
this.replies = new LinkedList<ResponseBuffers>();
51+
this.replies = new LinkedList<Interaction>();
4552
this.sent = new LinkedList<BsonInput>();
4653
}
4754

48-
public void enqueueReply(final ResponseBuffers buffers) {
49-
this.replies.add(buffers);
55+
public void enqueueReply(final ResponseBuffers responseBuffers) {
56+
Interaction interaction = new Interaction();
57+
interaction.responseBuffers = responseBuffers;
58+
replies.add(interaction);
59+
}
60+
61+
public void enqueueSendMessageException(final RuntimeException e) {
62+
Interaction interaction = new Interaction();
63+
interaction.sendException = e;
64+
replies.add(interaction);
65+
}
66+
67+
public void enqueueReceiveMessageException(final RuntimeException e) {
68+
Interaction interaction = new Interaction();
69+
interaction.receiveException = e;
70+
replies.add(interaction);
5071
}
5172

5273
public List<BsonInput> getSent() {
@@ -98,11 +119,16 @@ public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId
98119

99120
combined.flip();
100121

101-
ResponseBuffers nextToReceive = replies.removeFirst();
102-
ReplyHeader header = replaceResponseTo(nextToReceive.getReplyHeader(), lastRequestId);
103-
replies.addFirst(new ResponseBuffers(header, nextToReceive.getBodyByteBuffer()));
122+
Interaction interaction = replies.getFirst();
123+
if (interaction.responseBuffers != null) {
124+
ReplyHeader header = replaceResponseTo(interaction.responseBuffers.getReplyHeader(), lastRequestId);
125+
interaction.responseBuffers = (new ResponseBuffers(header, interaction.responseBuffers.getBodyByteBuffer()));
104126

105-
sent.add(new ByteBufferBsonInput(new ByteBufNIO(combined)));
127+
sent.add(new ByteBufferBsonInput(new ByteBufNIO(combined)));
128+
} else if (interaction.sendException != null) {
129+
replies.removeFirst();
130+
throw interaction.sendException;
131+
}
106132
}
107133

108134
private ReplyHeader replaceResponseTo(final ReplyHeader header, final int responseTo) {
@@ -128,13 +154,22 @@ public ResponseBuffers receiveMessage(final int responseTo) {
128154
throw new MongoException("Test was not setup properly as too many calls to receiveMessage occured.");
129155
}
130156

131-
return this.replies.remove();
157+
Interaction interaction = replies.removeFirst();
158+
if (interaction.responseBuffers != null) {
159+
return interaction.responseBuffers;
160+
} else {
161+
throw interaction.receiveException;
162+
}
132163
}
133164

134165
@Override
135166
public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequestId, final SingleResultCallback<Void> callback) {
136-
sendMessage(byteBuffers, lastRequestId);
137-
callback.onResult(null, null);
167+
try {
168+
sendMessage(byteBuffers, lastRequestId);
169+
callback.onResult(null, null);
170+
} catch (RuntimeException e) {
171+
callback.onResult(null, e);
172+
}
138173
}
139174

140175
@Override

0 commit comments

Comments
 (0)