Skip to content

Commit 2ac1ecf

Browse files
committed
Test asynchronous execution of WriteCommandProtocol and WriteProtocol
1 parent ed95a50 commit 2ac1ecf

File tree

3 files changed

+141
-22
lines changed

3 files changed

+141
-22
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2015 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.connection;
18+
19+
import com.mongodb.ClusterFixture;
20+
import com.mongodb.async.FutureResultCallback;
21+
22+
import static java.util.concurrent.TimeUnit.SECONDS;
23+
24+
final class ProtocolTestHelper {
25+
public static <T> T execute(final Protocol<T> protocol, final InternalConnection connection, final boolean async) throws Throwable{
26+
if (async) {
27+
final FutureResultCallback<T> futureResultCallback = new FutureResultCallback<T>();
28+
protocol.executeAsync(connection, futureResultCallback);
29+
return futureResultCallback.get(ClusterFixture.TIMEOUT, SECONDS);
30+
} else {
31+
return protocol.execute(connection);
32+
}
33+
}
34+
35+
private ProtocolTestHelper() {}
36+
}

driver-core/src/test/functional/com/mongodb/connection/WriteCommandProtocolSpecification.groovy

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import static com.mongodb.ClusterFixture.getPrimary
3838
import static com.mongodb.ClusterFixture.getSslSettings
3939
import static com.mongodb.ClusterFixture.serverVersionAtLeast
4040
import static com.mongodb.WriteConcern.ACKNOWLEDGED
41+
import static com.mongodb.connection.ProtocolTestHelper.execute
4142

4243
@IgnoreIf({ !serverVersionAtLeast([2, 6, 0]) })
4344
class WriteCommandProtocolSpecification extends OperationFunctionalSpecification {
@@ -47,7 +48,7 @@ class WriteCommandProtocolSpecification extends OperationFunctionalSpecification
4748

4849
def setupSpec() {
4950
connection = new InternalStreamConnectionFactory(new NettyStreamFactory(SocketSettings.builder().build(), getSslSettings()),
50-
getCredentialList(), new NoOpConnectionListener())
51+
getCredentialList(), new NoOpConnectionListener())
5152
.create(new ServerId(new ClusterId(), getPrimary()))
5253
connection.open();
5354
}
@@ -62,13 +63,17 @@ class WriteCommandProtocolSpecification extends OperationFunctionalSpecification
6263

6364
def insertRequest = [new InsertRequest(document)]
6465
def protocol = new InsertCommandProtocol(getNamespace(), true, ACKNOWLEDGED, null, insertRequest)
66+
6567
when:
66-
def result = protocol.execute(connection)
68+
def result = execute(protocol, connection, async)
6769

6870
then:
6971
result.insertedCount == 1
7072
result.upserts == []
7173
collectionHelper.find(document, new BsonDocumentCodec()).first() == document
74+
75+
where:
76+
async << [false, true]
7277
}
7378

7479
def 'should insert documents'() {
@@ -78,22 +83,25 @@ class WriteCommandProtocolSpecification extends OperationFunctionalSpecification
7883
def protocol = new InsertCommandProtocol(getNamespace(), true, ACKNOWLEDGED, null, requests)
7984

8085
when:
81-
protocol.execute(connection)
86+
execute(protocol, connection, async)
8287

8388
then:
8489
collectionHelper.count() == 2
90+
91+
where:
92+
async << [false, true]
8593
}
8694

8795
def 'should throw exception'() {
8896
given:
8997
def protocol = new InsertCommandProtocol(getNamespace(), false, ACKNOWLEDGED, false,
90-
[new InsertRequest(new BsonDocument('_id', new BsonInt32(1))),
91-
new InsertRequest(new BsonDocument('_id', new BsonInt32(2)))]
98+
[new InsertRequest(new BsonDocument('_id', new BsonInt32(1))),
99+
new InsertRequest(new BsonDocument('_id', new BsonInt32(2)))]
92100
)
93101
protocol.execute(connection)
94102

95103
when:
96-
protocol.execute(connection) // now do it again
104+
execute(protocol, connection, async)
97105

98106
then:
99107
def e = thrown(MongoBulkWriteException)
@@ -112,6 +120,9 @@ class WriteCommandProtocolSpecification extends OperationFunctionalSpecification
112120
code == 11000
113121
message != null
114122
}
123+
124+
where:
125+
async << [false, true]
115126
}
116127

117128
@Category(Slow)
@@ -133,11 +144,14 @@ class WriteCommandProtocolSpecification extends OperationFunctionalSpecification
133144
def protocol = new InsertCommandProtocol(getNamespace(), true, ACKNOWLEDGED, null, insertList)
134145

135146
when:
136-
def result = protocol.execute(connection)
147+
def result = execute(protocol, connection, async)
137148

138149
then:
139150
result.insertedCount == 4
140151
documents.size() == collectionHelper.count()
152+
153+
where:
154+
async << [false, true]
141155
}
142156

143157
@Category(Slow)
@@ -164,11 +178,14 @@ class WriteCommandProtocolSpecification extends OperationFunctionalSpecification
164178
def protocol = new InsertCommandProtocol(getNamespace(), true, ACKNOWLEDGED, null, insertList)
165179

166180
when:
167-
protocol.execute(connection)
181+
execute(protocol, connection, async)
168182

169183
then:
170184
def exception = thrown(MongoBulkWriteException)
171185
exception.writeResult.insertedCount == 1
186+
187+
where:
188+
async << [false, true]
172189
}
173190

174191
@Category(Slow)
@@ -198,7 +215,7 @@ class WriteCommandProtocolSpecification extends OperationFunctionalSpecification
198215
def protocol = new InsertCommandProtocol(getNamespace(), false, ACKNOWLEDGED, null, insertList)
199216

200217
when:
201-
protocol.execute(connection)
218+
execute(protocol, connection, async)
202219

203220
then:
204221
def e = thrown(MongoBulkWriteException)
@@ -207,26 +224,33 @@ class WriteCommandProtocolSpecification extends OperationFunctionalSpecification
207224
e.writeErrors[0].index == 0
208225
e.writeErrors[1].index == 2
209226
e.writeErrors[2].index == 3
227+
228+
where:
229+
async << [false, true]
210230
}
211231

212232
def 'should upsert items'() {
213233
given:
214234
def protocol = new UpdateCommandProtocol(getNamespace(), true, ACKNOWLEDGED, null,
215-
[new UpdateRequest(new BsonDocument('_id', new BsonInt32(1)),
216-
new BsonDocument('$set', new BsonDocument('x', new BsonInt32(1))),
217-
WriteRequest.Type.UPDATE)
218-
.upsert(true),
219-
new UpdateRequest(new BsonDocument('_id', new BsonInt32(2)),
220-
new BsonDocument('$set', new BsonDocument('x', new BsonInt32(2))),
221-
WriteRequest.Type.UPDATE)
222-
.upsert(true)]
235+
[new UpdateRequest(new BsonDocument('_id', new BsonInt32(1)),
236+
new BsonDocument('$set', new BsonDocument('x', new BsonInt32(1))),
237+
WriteRequest.Type.UPDATE)
238+
.upsert(true),
239+
new UpdateRequest(new BsonDocument('_id', new BsonInt32(2)),
240+
new BsonDocument('$set', new BsonDocument('x', new BsonInt32(2))),
241+
WriteRequest.Type.UPDATE)
242+
.upsert(true)]
223243
);
224244

225245
when:
226-
def result = protocol.execute(connection);
246+
def result = execute(protocol, connection, async)
247+
227248

228249
then:
229250
result.matchedCount == 0;
230251
result.upserts == [new BulkWriteUpsert(0, new BsonInt32(1)), new BulkWriteUpsert(1, new BsonInt32(2))]
252+
253+
where:
254+
async << [false, true]
231255
}
232256
}

driver-core/src/test/functional/com/mongodb/connection/WriteProtocolSpecification.groovy

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import static com.mongodb.ClusterFixture.getPrimary
3535
import static com.mongodb.ClusterFixture.getSslSettings
3636
import static com.mongodb.WriteConcern.ACKNOWLEDGED
3737
import static com.mongodb.WriteConcern.UNACKNOWLEDGED
38+
import static com.mongodb.connection.ProtocolTestHelper.execute
3839

3940
class WriteProtocolSpecification extends OperationFunctionalSpecification {
4041
@Shared
@@ -65,7 +66,7 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
6566
getCollectionHelper().insertDocuments(documentOne)
6667

6768
when:
68-
protocol.execute(connection)
69+
execute(protocol, connection, async)
6970

7071
then:
7172
getCollectionHelper().find(new BsonDocumentCodec()) == [documentOne]
@@ -74,6 +75,9 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
7475
// force acknowledgement
7576
new CommandProtocol(getDatabaseName(), new BsonDocument('drop', new BsonString(getCollectionName())),
7677
new NoOpFieldNameValidator(), new BsonDocumentCodec()).execute(connection)
78+
79+
where:
80+
async << [false, true]
7781
}
7882

7983
def 'should report write errors on acknowledged inserts'() {
@@ -90,13 +94,62 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
9094
getCollectionHelper().insertDocuments(documentOne)
9195

9296
when:
93-
protocol.execute(connection)
97+
execute(protocol, connection, async)
9498

9599
then:
96100
thrown(DuplicateKeyException)
97101
getCollectionHelper().count() == 1
102+
103+
where:
104+
async << [false, true]
105+
}
106+
107+
def 'should execute split unacknowledged inserts'() {
108+
given:
109+
def binary = new BsonBinary(new byte[15000000])
110+
def documentOne = new BsonDocument('_id', new BsonInt32(1)).append('b', binary)
111+
def documentTwo = new BsonDocument('_id', new BsonInt32(2)).append('b', binary)
112+
def documentThree = new BsonDocument('_id', new BsonInt32(3)).append('b', binary)
113+
def documentFour = new BsonDocument('_id', new BsonInt32(4)).append('b', binary)
114+
115+
def insertRequest = [new InsertRequest(documentOne), new InsertRequest(documentTwo),
116+
new InsertRequest(documentThree), new InsertRequest(documentFour)]
117+
def protocol = new InsertProtocol(getNamespace(), true, UNACKNOWLEDGED, insertRequest)
118+
119+
when:
120+
execute(protocol, connection, async)
121+
// force acknowledgement
122+
new CommandProtocol(getDatabaseName(), new BsonDocument('ping', new BsonInt32(1)),
123+
new NoOpFieldNameValidator(), new BsonDocumentCodec()).execute(connection)
124+
125+
then:
126+
getCollectionHelper().count() == 4
127+
128+
where:
129+
async << [false, true]
98130
}
99131

132+
def 'should execute split acknowledged inserts'() {
133+
given:
134+
def binary = new BsonBinary(new byte[15000000])
135+
def documentOne = new BsonDocument('_id', new BsonInt32(1)).append('b', binary)
136+
def documentTwo = new BsonDocument('_id', new BsonInt32(2)).append('b', binary)
137+
def documentThree = new BsonDocument('_id', new BsonInt32(3)).append('b', binary)
138+
def documentFour = new BsonDocument('_id', new BsonInt32(4)).append('b', binary)
139+
140+
def insertRequest = [new InsertRequest(documentOne), new InsertRequest(documentTwo),
141+
new InsertRequest(documentThree), new InsertRequest(documentFour)]
142+
def protocol = new InsertProtocol(getNamespace(), true, ACKNOWLEDGED, insertRequest)
143+
144+
when:
145+
execute(protocol, connection, async)
146+
147+
then:
148+
getCollectionHelper().count() == 4
149+
150+
where:
151+
async << [false, true]
152+
}
100153
def 'should ignore write errors on split unacknowledged inserts'() {
101154
given:
102155
def binary = new BsonBinary(new byte[15000000])
@@ -112,7 +165,7 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
112165
getCollectionHelper().insertDocuments(documentOne)
113166

114167
when:
115-
protocol.execute(connection)
168+
execute(protocol, connection, async)
116169

117170
then:
118171
getCollectionHelper().count() == 1
@@ -121,6 +174,9 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
121174
// force acknowledgement
122175
new CommandProtocol(getDatabaseName(), new BsonDocument('drop', new BsonString(getCollectionName())),
123176
new NoOpFieldNameValidator(), new BsonDocumentCodec()).execute(connection)
177+
178+
where:
179+
async << [false, true]
124180
}
125181

126182
def 'should report write errors on split acknowledged inserts'() {
@@ -138,10 +194,13 @@ class WriteProtocolSpecification extends OperationFunctionalSpecification {
138194
getCollectionHelper().insertDocuments(documentOne)
139195

140196
when:
141-
protocol.execute(connection)
197+
execute(protocol, connection, async)
142198

143199
then:
144200
thrown(DuplicateKeyException)
145201
getCollectionHelper().count() == 1
202+
203+
where:
204+
async << [false, true]
146205
}
147206
}

0 commit comments

Comments
 (0)