diff --git a/docs/changelog/131167.yaml b/docs/changelog/131167.yaml new file mode 100644 index 0000000000000..743cf17365a7b --- /dev/null +++ b/docs/changelog/131167.yaml @@ -0,0 +1,6 @@ +--- +type: enhancement +area: "Ingest" +issues: [] +release_note: > + Added `doc` field wrapper in `update_by_query` scripts to match the `update` API behavior. diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportUpdateByQueryAction.java index 860edd7be8b3f..49f44f801741c 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/TransportUpdateByQueryAction.java @@ -1,12 +1,3 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - package org.elasticsearch.reindex; import org.apache.logging.log4j.Logger; @@ -26,7 +17,6 @@ import org.elasticsearch.index.reindex.UpdateByQueryAction; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState; -import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.script.CtxMap; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; @@ -49,7 +39,6 @@ public class TransportUpdateByQueryAction extends HandledTransportAction listener) { + // ✅ Convert doc to script if needed + if (request.getScript() == null && request.getDoc() != null) { + StringBuilder scriptBuilder = new StringBuilder(); + for (Map.Entry entry : request.getDoc().entrySet()) { + scriptBuilder.append("ctx._source.") + .append(entry.getKey()) + .append(" = params.") + .append(entry.getKey()) + .append("; "); + } + Script generatedScript = new Script( + Script.DEFAULT_SCRIPT_TYPE, + Script.DEFAULT_SCRIPT_LANG, + scriptBuilder.toString().trim(), + request.getDoc() + ); + request.setScript(generatedScript); + } + BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; long startTime = System.nanoTime(); BulkByScrollParallelizationHelper.startSlicedAction( @@ -104,9 +112,6 @@ protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener ); } - /** - * Simple implementation of update-by-query using scrolling and bulk. - */ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction { AsyncIndexBySearchAction( @@ -121,7 +126,6 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction execute(ScrollableHitSource.Hit doc, Map @Override protected void updateRequest(RequestWrapper request, UpdateByQueryMetadata metadata) { - // do nothing + // no-op } } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/UpdateByQueryBasicTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/UpdateByQueryBasicTests.java index 33c80e9138d28..f92fae17c6fd6 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/UpdateByQueryBasicTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/UpdateByQueryBasicTests.java @@ -114,6 +114,28 @@ public void testSlices() throws Exception { assertEquals(2, client().prepareGet("test", "3").get().getVersion()); assertEquals(2, client().prepareGet("test", "4").get().getVersion()); } + public void testUpdateByQueryWithDocField() throws Exception { + String index = "test-doc-update"; + createIndex(index); + + // Index a sample document + client().prepareIndex(index).setId("1").setSource("counter", 1, "tag", "python").get(); + refresh(index); + + // Run update_by_query with doc field (instead of script) + UpdateByQueryRequestBuilder updateRequest = new UpdateByQueryRequestBuilder(client()) + .source(index) + .setDoc(Map.of("counter", 2)) // <- using doc instead of script + .filter(QueryBuilders.termQuery("tag", "python")); + + BulkByScrollResponse response = updateRequest.get(); + assertEquals(1L, response.getUpdated()); // One document should be updated + + // Fetch and verify + GetResponse updatedDoc = client().prepareGet(index, "1").get(); + assertEquals(2, updatedDoc.getSource().get("counter")); + } + public void testMultipleSources() throws Exception { int sourceIndices = between(2, 5); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java index 66e9e5b847128..f7c3cb3ae1dba 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java @@ -15,11 +15,13 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.script.Script; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Map; /** * Request to update some documents. That means you can't change their type, id, index, or anything like that. This implements @@ -31,11 +33,17 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest doc; + public UpdateByQueryRequest() { this(new SearchRequest()); } @@ -51,6 +59,7 @@ public UpdateByQueryRequest(String... indices) { public UpdateByQueryRequest(StreamInput in) throws IOException { super(in); pipeline = in.readOptionalString(); + doc = in.readMap(StreamInput::readString, StreamInput::readGenericValue); // Deserialize doc } UpdateByQueryRequest(SearchRequest search, boolean setDefaults) { @@ -65,6 +74,21 @@ public UpdateByQueryRequest setPipeline(String pipeline) { return this; } + /** + * Optional doc to be applied to matched documents. + */ + public UpdateByQueryRequest setDoc(Map doc) { + this.doc = doc; + return this; + } + + /** + * Get the doc used for partial update, if present. + */ + public Map getDoc() { + return doc; + } + /** * Set the query for selective update */ @@ -136,6 +160,7 @@ public boolean includeDataStreams() { public UpdateByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices) { UpdateByQueryRequest request = doForSlice(new UpdateByQueryRequest(slice, false), slicingTask, totalSlices); request.setPipeline(pipeline); + request.setDoc(doc); // Ensure doc is copied to sliced request return request; } @@ -172,6 +197,7 @@ public IndicesOptions indicesOptions() { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalString(pipeline); + out.writeMap(doc, StreamOutput::writeString, StreamOutput::writeGenericValue); // Serialize doc } @Override @@ -181,6 +207,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("script"); getScript().toXContent(builder, params); } + if (doc != null) { + builder.field("doc", doc); // Include doc in output + } getSearchRequest().source().innerToXContent(builder, params); builder.endObject(); return builder; diff --git a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequestBuilder.java b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequestBuilder.java index ddb850b974d8b..f646ba15d637b 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequestBuilder.java @@ -13,6 +13,8 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.internal.ElasticsearchClient; +import java.util.Map; + public class UpdateByQueryRequestBuilder extends AbstractBulkIndexByScrollRequestBuilder< UpdateByQueryRequest, UpdateByQueryRequestBuilder> { @@ -20,6 +22,8 @@ public class UpdateByQueryRequestBuilder extends AbstractBulkIndexByScrollReques private Boolean abortOnVersionConflict; private String pipeline; + private Map doc; // Added + public UpdateByQueryRequestBuilder(ElasticsearchClient client) { this(client, new SearchRequestBuilder(client)); } @@ -44,6 +48,12 @@ public UpdateByQueryRequestBuilder setPipeline(String pipeline) { return this; } + // NEW: Add setter for doc + public UpdateByQueryRequestBuilder setDoc(Map doc) { + this.doc = doc; + return this; + } + @Override public UpdateByQueryRequest request() { SearchRequest search = source().request(); @@ -71,5 +81,8 @@ public void apply(UpdateByQueryRequest request) { if (pipeline != null) { request.setPipeline(pipeline); } + if (doc != null) { // Apply doc field + request.setDoc(doc); + } } }