Skip to content

Commit 7e3f9a9

Browse files
committed
Add builder for aggregate $merge stage
JAVA-3319
1 parent dbaadb8 commit 7e3f9a9

File tree

6 files changed

+609
-2
lines changed

6 files changed

+609
-2
lines changed

docs/reference/content/builders/aggregation.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,27 @@ This example writes the pipeline to the `authors` collection:
216216
out("authors")
217217
```
218218

219+
### Merge
220+
221+
The [`$merge`]({{< docsref "reference/operator/aggregation/merge/" >}}) pipeline stage merges all documents into the specified
222+
collection. It must be the last stage in any aggregate pipeline:
223+
224+
This example merges the pipeline into the `authors` collection using the default options:
225+
226+
```java
227+
merge("authors")
228+
```
229+
230+
This example merges the pipeline into the `authors` collection using some non-default options:
231+
232+
```java
233+
merge(new MongoNamespace("reporting", customers"),
234+
new MergeOptions().uniqueIdentifier(Arrays.asList("date", "customerId"))
235+
.whenMatched(MergeOptions.WhenMatched.REPLACE)
236+
.whenNotMatched(MergeOptions.WhenNotMatched.INSERT))
237+
```
238+
239+
219240
### GraphLookup
220241
221242
The [`$graphLookup`]({{< docsref "reference/operator/aggregation/graphLookup/" >}}) pipeline stage performs a recursive search on a specified collection to match field A of one document to some field B of the other documents. For the matching documents, the stage repeats the search to match field A from the matching documents to the field B of the remaining documents until no new documents are encountered or until a specified depth. To each output document, `$graphLookup` adds a new array field that contains the traversal results of the search for that document.

driver-core/src/main/com/mongodb/client/model/Aggregates.java

Lines changed: 188 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
package com.mongodb.client.model;
1818

19+
import com.mongodb.MongoNamespace;
1920
import com.mongodb.lang.Nullable;
2021
import org.bson.BsonBoolean;
2122
import org.bson.BsonDocument;
2223
import org.bson.BsonDocumentWriter;
2324
import org.bson.BsonInt32;
2425
import org.bson.BsonString;
26+
import org.bson.BsonValue;
2527
import org.bson.codecs.configuration.CodecRegistry;
2628
import org.bson.conversions.Bson;
2729

@@ -411,7 +413,7 @@ public static Bson unwind(final String fieldName, final UnwindOptions unwindOpti
411413
}
412414

413415
/**
414-
* Creates a $out pipeline stage for the specified filter
416+
* Creates a $out pipeline stage that writes into the specified collection
415417
*
416418
* @param collectionName the collection name
417419
* @return the $out pipeline stage
@@ -422,6 +424,61 @@ public static Bson out(final String collectionName) {
422424
return new BsonDocument("$out", new BsonString(collectionName));
423425
}
424426

427+
/**
428+
* Creates a $merge pipeline stage that merges into the specified collection
429+
*
430+
* @param collectionName the name of the collection to merge into
431+
* @return the $merge pipeline stage
432+
* @since 3.11
433+
* @mongodb.driver.manual reference/operator/aggregation/merge/ $merge
434+
* @mongodb.server.release 4.2
435+
*/
436+
public static Bson merge(final String collectionName) {
437+
return merge(collectionName, new MergeOptions());
438+
}
439+
440+
/**
441+
* Creates a $merge pipeline stage that merges into the specified namespace
442+
*
443+
* @param namespace the namespace to merge into
444+
* @return the $merge pipeline stage
445+
* @since 3.11
446+
* @mongodb.driver.manual reference/operator/aggregation/merge/ $merge
447+
* @mongodb.server.release 4.2
448+
*/
449+
public static Bson merge(final MongoNamespace namespace) {
450+
return merge(namespace, new MergeOptions());
451+
}
452+
453+
/**
454+
* Creates a $merge pipeline stage that merges into the specified collection using the specified options.
455+
*
456+
* @param collectionName the name of the collection to merge into
457+
* @param options the merge options
458+
* @return the $merge pipeline stage
459+
* @since 3.11
460+
* @mongodb.driver.manual reference/operator/aggregation/merge/ $merge
461+
* @mongodb.server.release 4.2
462+
*/
463+
public static Bson merge(final String collectionName, final MergeOptions options) {
464+
return new MergeStage(new BsonString(collectionName), options);
465+
}
466+
467+
/**
468+
* Creates a $merge pipeline stage that merges into the specified namespace using the specified options.
469+
*
470+
* @param namespace the namespace to merge into
471+
* @param options the merge options
472+
* @return the $merge pipeline stage
473+
* @since 3.11
474+
* @mongodb.driver.manual reference/operator/aggregation/merge/ $merge
475+
* @mongodb.server.release 4.2
476+
*/
477+
public static Bson merge(final MongoNamespace namespace, final MergeOptions options) {
478+
return new MergeStage(new BsonDocument("db", new BsonString(namespace.getDatabaseName()))
479+
.append("coll", new BsonString(namespace.getCollectionName())), options);
480+
}
481+
425482
/**
426483
* Creates a $replaceRoot pipeline stage
427484
*
@@ -1132,6 +1189,136 @@ public String toString() {
11321189
}
11331190
}
11341191

1192+
private static class MergeStage implements Bson {
1193+
private final BsonValue intoValue;
1194+
private final MergeOptions options;
1195+
1196+
MergeStage(final BsonValue intoValue, final MergeOptions options) {
1197+
this.intoValue = intoValue;
1198+
this.options = options;
1199+
}
1200+
1201+
@Override
1202+
public <TDocument> BsonDocument toBsonDocument(final Class<TDocument> documentClass, final CodecRegistry codecRegistry) {
1203+
BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument());
1204+
writer.writeStartDocument();
1205+
writer.writeStartDocument("$merge");
1206+
writer.writeName("into");
1207+
if (intoValue.isString()) {
1208+
writer.writeString(intoValue.asString().getValue());
1209+
} else {
1210+
writer.writeStartDocument();
1211+
writer.writeString("db", intoValue.asDocument().getString("db").getValue());
1212+
writer.writeString("coll", intoValue.asDocument().getString("coll").getValue());
1213+
writer.writeEndDocument();
1214+
}
1215+
if (options.getUniqueIdentifier() != null) {
1216+
if (options.getUniqueIdentifier().size() == 1) {
1217+
writer.writeString("on", options.getUniqueIdentifier().get(0));
1218+
} else {
1219+
writer.writeStartArray("on");
1220+
for (String cur : options.getUniqueIdentifier()) {
1221+
writer.writeString(cur);
1222+
}
1223+
writer.writeEndArray();
1224+
}
1225+
}
1226+
if (options.getVariables() != null) {
1227+
writer.writeStartDocument("let");
1228+
1229+
for (Variable<?> variable : options.getVariables()) {
1230+
writer.writeName(variable.getName());
1231+
BuildersHelper.encodeValue(writer, variable.getValue(), codecRegistry);
1232+
}
1233+
1234+
writer.writeEndDocument();
1235+
}
1236+
1237+
if (options.getWhenMatched() != null) {
1238+
writer.writeName("whenMatched");
1239+
switch (options.getWhenMatched()) {
1240+
case REPLACE:
1241+
writer.writeString("replace");
1242+
break;
1243+
case KEEP_EXISTING:
1244+
writer.writeString("keepExisting");
1245+
break;
1246+
case MERGE:
1247+
writer.writeString("merge");
1248+
break;
1249+
case PIPELINE:
1250+
writer.writeStartArray();
1251+
for (Bson curStage : options.getWhenMatchedPipeline()) {
1252+
BuildersHelper.encodeValue(writer, curStage, codecRegistry);
1253+
}
1254+
writer.writeEndArray();
1255+
break;
1256+
case FAIL:
1257+
writer.writeString("fail");
1258+
break;
1259+
default:
1260+
throw new UnsupportedOperationException("Unexpected value: " + options.getWhenMatched());
1261+
}
1262+
}
1263+
if (options.getWhenNotMatched() != null) {
1264+
writer.writeName("whenNotMatched");
1265+
switch (options.getWhenNotMatched()) {
1266+
case INSERT:
1267+
writer.writeString("insert");
1268+
break;
1269+
case DISCARD:
1270+
writer.writeString("discard");
1271+
break;
1272+
case FAIL:
1273+
writer.writeString("fail");
1274+
break;
1275+
default:
1276+
throw new UnsupportedOperationException("Unexpected value: " + options.getWhenNotMatched());
1277+
}
1278+
}
1279+
writer.writeEndDocument();
1280+
writer.writeEndDocument();
1281+
return writer.getDocument();
1282+
}
1283+
1284+
@Override
1285+
public boolean equals(final Object o) {
1286+
if (this == o) {
1287+
return true;
1288+
}
1289+
if (o == null || getClass() != o.getClass()) {
1290+
return false;
1291+
}
1292+
1293+
MergeStage that = (MergeStage) o;
1294+
1295+
if (!intoValue.equals(that.intoValue)) {
1296+
return false;
1297+
}
1298+
if (!options.equals(that.options)) {
1299+
return false;
1300+
}
1301+
1302+
return true;
1303+
}
1304+
1305+
@Override
1306+
public int hashCode() {
1307+
int result = intoValue.hashCode();
1308+
result = 31 * result + options.hashCode();
1309+
return result;
1310+
}
1311+
1312+
@Override
1313+
public String toString() {
1314+
return "Stage{"
1315+
+ "name='$merge', "
1316+
+ ", into=" + intoValue
1317+
+ ", options=" + options
1318+
+ '}';
1319+
}
1320+
}
1321+
11351322
private Aggregates() {
11361323
}
11371324
}

0 commit comments

Comments
 (0)