-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Restrict Indexing To Child Streams When Streams Is Enabled #132011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
c0f1a12
f8fa32b
34082ef
2457383
a2eab2a
1a861cd
5faf7fb
1c4225b
fbfd61b
78cf0ef
5e1d615
387b4e3
d54b7b9
e5581aa
a7e6f7a
3569947
34e944e
3e82b35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 132011 | ||
summary: Restrict Indexing To Child Streams When Streams Is Enabled | ||
area: Data streams | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
--- | ||
"Check User Can't Write To Substream Directly": | ||
- do: | ||
streams.logs_enable: { } | ||
szybia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
- is_true: acknowledged | ||
|
||
- do: | ||
streams.status: { } | ||
- is_true: logs.enabled | ||
|
||
- do: | ||
bulk: | ||
body: | | ||
{ "index": { "_index": "logs.foo" } } | ||
{ "foo": "bar" } | ||
- match: { errors: true } | ||
- match: { items.0.index.status: 400 } | ||
- match: { items.0.index.error.type: "illegal_argument_exception" } | ||
- match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } | ||
|
||
--- | ||
"Check User Can't Write To Substream Directly With Single Doc": | ||
- do: | ||
streams.logs_enable: { } | ||
- is_true: acknowledged | ||
|
||
- do: | ||
streams.status: { } | ||
- is_true: logs.enabled | ||
|
||
- do: | ||
catch: bad_request | ||
index: | ||
index: logs.foo | ||
id: "1" | ||
body: | ||
foo: bar | ||
- match: { error.type: "illegal_argument_exception" } | ||
- match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" } | ||
|
||
--- | ||
"Check Bulk Index With Reroute Processor To Substream Is Rejected": | ||
- do: | ||
streams.logs_enable: { } | ||
- is_true: acknowledged | ||
|
||
- do: | ||
streams.status: { } | ||
- is_true: logs.enabled | ||
|
||
- do: | ||
ingest.put_pipeline: | ||
id: "reroute-to-logs-foo" | ||
body: | ||
processors: | ||
- reroute: | ||
destination: "logs.foo" | ||
- do: | ||
indices.create: | ||
index: "bad-index" | ||
body: | ||
settings: | ||
index.default_pipeline: "reroute-to-logs-foo" | ||
- do: | ||
bulk: | ||
body: | | ||
{ "index": { "_index": "bad-index" } } | ||
{ "foo": "bar" } | ||
- match: { errors: true } | ||
- match: { items.0.index.status: 400 } | ||
- match: { items.0.index.error.type: "illegal_argument_exception" } | ||
- match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [reroute-to-logs-foo] tried to reroute this document from index [bad-index] to index [logs.foo]. Reroute history: bad-index" } | ||
|
||
--- | ||
"Check Bulk Index With Script Processor To Substream Is Rejected": | ||
- do: | ||
streams.logs_enable: { } | ||
- is_true: acknowledged | ||
|
||
- do: | ||
streams.status: { } | ||
- is_true: logs.enabled | ||
|
||
- do: | ||
ingest.put_pipeline: | ||
id: "script-to-logs-foo" | ||
body: | ||
processors: | ||
- script: | ||
source: "ctx._index = 'logs.foo'" | ||
- do: | ||
indices.create: | ||
index: "bad-index-script" | ||
body: | ||
settings: | ||
index.default_pipeline: "script-to-logs-foo" | ||
- do: | ||
bulk: | ||
body: | | ||
{ "index": { "_index": "bad-index-script" } } | ||
{ "foo": "bar" } | ||
- match: { errors: true } | ||
- match: { items.0.index.status: 400 } | ||
- match: { items.0.index.error.type: "illegal_argument_exception" } | ||
- match: { items.0.index.error.reason: "Pipelines can't re-route documents to child streams, but pipeline [script-to-logs-foo] tried to reroute this document from index [bad-index-script] to index [logs.foo]. Reroute history: bad-index-script" } | ||
|
||
--- | ||
"Check Delete By Query Directly On Substream After Reroute Succeeds": | ||
- do: | ||
streams.logs_enable: { } | ||
- is_true: acknowledged | ||
|
||
- do: | ||
streams.status: { } | ||
- is_true: logs.enabled | ||
|
||
- do: | ||
ingest.put_pipeline: | ||
id: "reroute-to-logs-foo-success" | ||
body: | ||
processors: | ||
- reroute: | ||
destination: "logs.foo" | ||
- do: | ||
indices.create: | ||
index: "logs" | ||
body: | ||
settings: | ||
index.default_pipeline: "reroute-to-logs-foo-success" | ||
- do: | ||
bulk: | ||
refresh: true | ||
body: | | ||
{ "index": { "_index": "logs" } } | ||
{ "foo": "bar", "baz": "qux" } | ||
- match: { errors: false } | ||
- match: { items.0.index.status: 201 } | ||
|
||
- do: | ||
delete_by_query: | ||
index: logs.foo | ||
refresh: true | ||
body: | ||
query: | ||
match: | ||
foo: "bar" | ||
- match: { deleted: 1 } | ||
- match: { total: 1 } | ||
|
||
- do: | ||
search: | ||
index: logs.foo | ||
body: | ||
query: | ||
match_all: {} | ||
- match: { hits.total.value: 0 } |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,9 @@ | |
import org.elasticsearch.xcontent.ToXContent; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.Iterator; | ||
import java.util.Objects; | ||
|
||
/** | ||
* A response of a bulk execution. Holding a response for each item responding (in order) of the | ||
|
@@ -166,4 +168,19 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params | |
return builder.startArray(ITEMS); | ||
}), Iterators.forArray(responses), Iterators.<ToXContent>single((builder, p) -> builder.endArray().endObject())); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we adding this for completeness sake or are we using it somewhere? Checking bulk response equality seems like something that could be unintentionally expensive for a large or complicated request. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahhh this was needed to fix a unit test which relied on a |
||
return o == this | ||
|| (o instanceof BulkResponse that | ||
&& tookInMillis == that.tookInMillis | ||
&& ingestTookInMillis == that.ingestTookInMillis | ||
&& Arrays.equals(responses, that.responses) | ||
&& Objects.equals(incrementalState, that.incrementalState)); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(Arrays.hashCode(responses), tookInMillis, ingestTookInMillis, incrementalState); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
import org.elasticsearch.cluster.project.ProjectResolver; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.common.streams.StreamType; | ||
import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
import org.elasticsearch.core.Assertions; | ||
import org.elasticsearch.core.Releasable; | ||
|
@@ -396,8 +397,35 @@ private void applyPipelinesAndDoInternalExecute( | |
ActionListener<BulkResponse> listener | ||
) throws IOException { | ||
final long relativeStartTimeNanos = relativeTimeNanos(); | ||
if (applyPipelines(task, bulkRequest, executor, listener) == false) { | ||
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); | ||
|
||
// Validate child stream writes before processing pipelines | ||
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); | ||
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); | ||
|
||
for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) { | ||
for (int i = 0; i < bulkRequest.requests.size(); i++) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're iterating using the locally available bulk request, but you're dereferencing the bulk request from the request modifier to get the documents. This assumes that the request modifier will never make any changes to its internal state. I think we can avoid that kind of snag if we iterate over the request items and check each stream type per document instead of iterating over the request items multiple times for each eventual stream type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked around where we use bulk request modifier in other places and it's actually an iterable itself. In the IngestService we just iterate over it like a regular iterable, and maintain a slot counter separately. Might read more clearly if we do that here too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have switched this to use the bulk modifier as an iterator and moved the stream types to be the inner iterator so best of both worlds :-) |
||
DocWriteRequest<?> req = bulkRequestModifier.bulkRequest.requests.get(i); | ||
|
||
if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) { | ||
IllegalArgumentException e = new IllegalArgumentException( | ||
"Direct writes to child streams are prohibited. Index directly into the [" | ||
+ streamType.getStreamName() | ||
+ "] stream instead" | ||
); | ||
Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); | ||
lukewhiting marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (Boolean.TRUE.equals(failureStoreEnabled)) { | ||
bulkRequestModifier.markItemForFailureStore(i, req.index(), e); | ||
} else { | ||
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED); | ||
lukewhiting marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
} | ||
|
||
var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); | ||
|
||
if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { | ||
szybia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.common.streams; | ||
|
||
import org.elasticsearch.cluster.metadata.ProjectMetadata; | ||
import org.elasticsearch.cluster.metadata.StreamsMetadata; | ||
|
||
import java.util.Arrays; | ||
import java.util.EnumSet; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
public enum StreamType { | ||
|
||
LOGS("logs"); | ||
szybia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private final String streamName; | ||
|
||
StreamType(String streamName) { | ||
this.streamName = streamName; | ||
} | ||
|
||
public String getStreamName() { | ||
return streamName; | ||
} | ||
|
||
public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) { | ||
lukewhiting marked this conversation as resolved.
Show resolved
Hide resolved
|
||
StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); | ||
return switch (this) { | ||
case LOGS -> metadata.isLogsEnabled(); | ||
}; | ||
} | ||
|
||
public boolean matchesStreamPrefix(String indexName) { | ||
lukewhiting marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (indexName == null) { | ||
return false; | ||
} | ||
return indexName.startsWith(streamName + "."); | ||
} | ||
|
||
public static Set<StreamType> getEnabledStreamTypesForProject(ProjectMetadata projectMetadata) { | ||
return Arrays.stream(values()) | ||
.filter(t -> t.streamTypeIsEnabled(projectMetadata)) | ||
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); | ||
} | ||
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.