Skip to content

Commit 081e579

Browse files
committed
Implemented TCK tests for all Publishers
JAVA-3536
1 parent 8628bd9 commit 081e579

14 files changed

+999
-57
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import org.bson.Document;
20+
import org.reactivestreams.Publisher;
21+
import org.reactivestreams.tck.PublisherVerification;
22+
import org.reactivestreams.tck.TestEnvironment;
23+
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.LongStream;
28+
29+
import static com.mongodb.reactivestreams.client.MongoFixture.DEFAULT_TIMEOUT_MILLIS;
30+
import static com.mongodb.reactivestreams.client.MongoFixture.PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS;
31+
import static com.mongodb.reactivestreams.client.MongoFixture.run;
32+
33+
public class AggregatePublisherVerification extends PublisherVerification<Document> {
34+
35+
public AggregatePublisherVerification() {
36+
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
37+
}
38+
39+
40+
@Override
41+
public Publisher<Document> createPublisher(final long elements) {
42+
assert (elements <= maxElementsFromPublisher());
43+
44+
MongoCollection<Document> collection = MongoFixture.getDefaultDatabase().getCollection("AggregationTest");
45+
run(collection.drop());
46+
if (elements > 0) {
47+
List<Document> documentList = LongStream.rangeClosed(1, elements).boxed()
48+
.map(i -> new Document("a", i)).collect(Collectors.toList());
49+
50+
run(collection.insertMany(documentList));
51+
}
52+
53+
return collection.aggregate(Collections.singletonList(Document.parse("{$match: {}}")));
54+
}
55+
56+
@Override
57+
public Publisher<Document> createFailedPublisher() {
58+
return null;
59+
}
60+
61+
@Override
62+
public long maxElementsFromPublisher() {
63+
return 100;
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import com.mongodb.client.model.changestream.ChangeStreamDocument;
20+
import org.bson.Document;
21+
import org.reactivestreams.Publisher;
22+
import org.reactivestreams.tck.PublisherVerification;
23+
import org.reactivestreams.tck.TestEnvironment;
24+
25+
import java.util.List;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.LongStream;
29+
30+
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
31+
import static com.mongodb.reactivestreams.client.MongoFixture.DEFAULT_TIMEOUT_MILLIS;
32+
import static com.mongodb.reactivestreams.client.MongoFixture.PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS;
33+
import static com.mongodb.reactivestreams.client.MongoFixture.run;
34+
35+
public class ChangeStreamPublisherVerification extends PublisherVerification<ChangeStreamDocument<Document>> {
36+
37+
public static final AtomicInteger COUNTER = new AtomicInteger();
38+
39+
public ChangeStreamPublisherVerification() {
40+
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
41+
}
42+
43+
@Override
44+
public Publisher<ChangeStreamDocument<Document>> createPublisher(final long elements) {
45+
assert (elements <= maxElementsFromPublisher());
46+
if (!isDiscoverableReplicaSet()) {
47+
notVerified();
48+
}
49+
50+
MongoCollection<Document> collection = MongoFixture.getDefaultDatabase()
51+
.getCollection("ChangeStreamTest" + COUNTER.getAndIncrement());
52+
53+
if (elements > 0) {
54+
MongoFixture.ObservableSubscriber<ChangeStreamDocument<Document>> observer =
55+
new MongoFixture.ObservableSubscriber<>(() -> run(collection.insertOne(Document.parse("{a: 1}"))));
56+
collection.watch().first().subscribe(observer);
57+
58+
ChangeStreamDocument<Document> changeDocument = observer.get().get(0);
59+
60+
// Limit the number of elements returned - this is essentially an infinite stream but to high will cause a OOM.
61+
long maxElements = elements > 10000 ? 10000 : elements;
62+
List<Document> documentList = LongStream.rangeClosed(1, maxElements).boxed()
63+
.map(i -> new Document("a", i)).collect(Collectors.toList());
64+
65+
run(collection.insertMany(documentList));
66+
67+
return collection.watch().startAfter(changeDocument.getResumeToken());
68+
}
69+
70+
return collection.watch();
71+
}
72+
73+
@Override
74+
public Publisher<ChangeStreamDocument<Document>> createFailedPublisher() {
75+
return null;
76+
}
77+
78+
@Override
79+
public long maxElementsFromPublisher() {
80+
return publisherUnableToSignalOnComplete();
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import org.bson.Document;
20+
import org.reactivestreams.Publisher;
21+
import org.reactivestreams.tck.PublisherVerification;
22+
import org.reactivestreams.tck.TestEnvironment;
23+
24+
import java.util.List;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.LongStream;
27+
28+
import static com.mongodb.reactivestreams.client.MongoFixture.DEFAULT_TIMEOUT_MILLIS;
29+
import static com.mongodb.reactivestreams.client.MongoFixture.PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS;
30+
import static com.mongodb.reactivestreams.client.MongoFixture.run;
31+
32+
public class DistinctPublisherVerification extends PublisherVerification<Integer> {
33+
34+
public DistinctPublisherVerification() {
35+
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
36+
}
37+
38+
39+
@Override
40+
public Publisher<Integer> createPublisher(final long elements) {
41+
assert (elements <= maxElementsFromPublisher());
42+
43+
MongoCollection<Document> collection = MongoFixture.getDefaultDatabase().getCollection("DistinctTest");
44+
run(collection.drop());
45+
if (elements > 0) {
46+
List<Document> documentList = LongStream.rangeClosed(1, elements).boxed()
47+
.map(i -> new Document("a", i)).collect(Collectors.toList());
48+
49+
run(collection.insertMany(documentList));
50+
}
51+
52+
return collection.distinct("a", Integer.class);
53+
}
54+
55+
@Override
56+
public Publisher<Integer> createFailedPublisher() {
57+
return null;
58+
}
59+
60+
@Override
61+
public long maxElementsFromPublisher() {
62+
return 100;
63+
}
64+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import org.bson.Document;
20+
import org.reactivestreams.Publisher;
21+
import org.reactivestreams.tck.PublisherVerification;
22+
import org.reactivestreams.tck.TestEnvironment;
23+
24+
import java.util.List;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.LongStream;
27+
28+
import static com.mongodb.reactivestreams.client.MongoFixture.DEFAULT_TIMEOUT_MILLIS;
29+
import static com.mongodb.reactivestreams.client.MongoFixture.PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS;
30+
import static com.mongodb.reactivestreams.client.MongoFixture.run;
31+
32+
public class FindPublisherVerification extends PublisherVerification<Document> {
33+
34+
public FindPublisherVerification() {
35+
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
36+
}
37+
38+
39+
@Override
40+
public Publisher<Document> createPublisher(final long elements) {
41+
assert (elements <= maxElementsFromPublisher());
42+
43+
MongoCollection<Document> collection = MongoFixture.getDefaultDatabase().getCollection("FindTest");
44+
run(collection.drop());
45+
if (elements > 0) {
46+
List<Document> documentList = LongStream.rangeClosed(1, elements).boxed()
47+
.map(i -> new Document("a", i)).collect(Collectors.toList());
48+
49+
run(collection.insertMany(documentList));
50+
}
51+
52+
return collection.find();
53+
}
54+
55+
@Override
56+
public Publisher<Document> createFailedPublisher() {
57+
return null;
58+
}
59+
60+
@Override
61+
public long maxElementsFromPublisher() {
62+
return 100;
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import org.bson.Document;
20+
import org.reactivestreams.Publisher;
21+
import org.reactivestreams.tck.PublisherVerification;
22+
import org.reactivestreams.tck.TestEnvironment;
23+
24+
import static com.mongodb.reactivestreams.client.MongoFixture.DEFAULT_TIMEOUT_MILLIS;
25+
import static com.mongodb.reactivestreams.client.MongoFixture.PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS;
26+
import static com.mongodb.reactivestreams.client.MongoFixture.run;
27+
28+
public class ListCollectionsPublisherVerification extends PublisherVerification<Document> {
29+
30+
public ListCollectionsPublisherVerification() {
31+
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
32+
}
33+
34+
35+
@Override
36+
public Publisher<Document> createPublisher(final long elements) {
37+
assert (elements <= maxElementsFromPublisher());
38+
39+
MongoDatabase database = MongoFixture.getDefaultDatabase();
40+
run(database.drop());
41+
42+
for (long i = 0; i < elements; i++) {
43+
run(database.createCollection("listCollectionTest" + i));
44+
}
45+
46+
return database.listCollections();
47+
}
48+
49+
@Override
50+
public Publisher<Document> createFailedPublisher() {
51+
return null;
52+
}
53+
54+
@Override
55+
public long maxElementsFromPublisher() {
56+
return 100;
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import com.mongodb.client.model.Filters;
20+
import org.bson.Document;
21+
import org.reactivestreams.Publisher;
22+
import org.reactivestreams.tck.PublisherVerification;
23+
import org.reactivestreams.tck.TestEnvironment;
24+
25+
import static com.mongodb.reactivestreams.client.MongoFixture.DEFAULT_TIMEOUT_MILLIS;
26+
import static com.mongodb.reactivestreams.client.MongoFixture.PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS;
27+
import static com.mongodb.reactivestreams.client.MongoFixture.cleanDatabases;
28+
import static com.mongodb.reactivestreams.client.MongoFixture.getDefaultDatabaseName;
29+
import static com.mongodb.reactivestreams.client.MongoFixture.getMongoClient;
30+
import static com.mongodb.reactivestreams.client.MongoFixture.run;
31+
32+
public class ListDatabasesPublisherVerification extends PublisherVerification<Document> {
33+
34+
public ListDatabasesPublisherVerification() {
35+
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
36+
}
37+
38+
39+
@Override
40+
public Publisher<Document> createPublisher(final long elements) {
41+
assert (elements <= maxElementsFromPublisher());
42+
43+
cleanDatabases();
44+
MongoClient client = getMongoClient();
45+
for (long i = 0; i < elements; i++) {
46+
run(client.getDatabase(getDefaultDatabaseName() + i).createCollection("test" + i));
47+
}
48+
49+
return client.listDatabases().filter(Filters.nin("name", "admin", "config", "local"));
50+
}
51+
52+
@Override
53+
public Publisher<Document> createFailedPublisher() {
54+
return null;
55+
}
56+
57+
@Override
58+
public long maxElementsFromPublisher() {
59+
return 5;
60+
}
61+
}

0 commit comments

Comments
 (0)