Skip to content

Enable failure store for log data streams #131261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131261.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131261
summary: Enable failure store for log-*-* data streams
area: Data streams
type: feature
issues:
- 131105
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -740,6 +741,73 @@ public void testIgnoreDynamicBeyondLimit() throws Exception {
assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty());
}

@SuppressWarnings("unchecked")
public void testFailureStoreWithInvalidFieldType() throws Exception {
String dataStreamName = "logs-app-with-failure-store";
createDataStream(client, dataStreamName);

indexDoc(client, dataStreamName, """
{
"@timestamp": "2023-11-30T12:00:00Z",
"message": "This is a valid message"
}
""");

// invalid document (message as an object instead of string)
indexDoc(client, dataStreamName, """
{
"@timestamp": "2023-11-30T12:01:00Z",
"message": {
"nested": "This should fail because message should be a string"
}
}
""");

refreshAllIndices();

Request dsInfoRequest = new Request("GET", "/_data_stream/" + dataStreamName);
Map<String, Object> dsInfoResponse = entityAsMap(client.performRequest(dsInfoRequest));
List<Map<String, Object>> dataStreams = (List<Map<String, Object>>) dsInfoResponse.get("data_streams");
Map<String, Object> dataStream = dataStreams.getFirst();
Map<String, Object> failureStoreInfo = (Map<String, Object>) dataStream.get("failure_store");
assertNotNull(failureStoreInfo);
assertThat(failureStoreInfo.get("enabled"), is(true));
List<Map<String, Object>> failureIndices = (List<Map<String, Object>>) failureStoreInfo.get("indices");

assertThat(failureIndices, not(empty()));
String failureIndex = (String) failureIndices.getFirst().get("index_name");
assertThat(failureIndex, matchesRegex("\\.fs-" + dataStreamName + "-.*"));

// query the failure store index
Request failureStoreQuery = new Request("GET", "/" + failureIndex + "/_search");
failureStoreQuery.setJsonEntity("""
{
"query": {
"match_all": {}
}
}
""");
Map<String, Object> failureStoreResponse = entityAsMap(client.performRequest(failureStoreQuery));
Map<String, Object> hits = (Map<String, Object>) failureStoreResponse.get("hits");
List<Map<String, Object>> hitsList = (List<Map<String, Object>>) hits.get("hits");

// Verify the failed document is in the failure store
assertThat(hitsList.size(), is(1));
Map<String, Object> failedDoc = (Map<String, Object>) hitsList.getFirst().get("_source");
Map<String, Object> document = (Map<String, Object>) failedDoc.get("document");
assertNotNull(document);
Map<String, Object> source = (Map<String, Object>) document.get("source");
assertNotNull(source);
Map<String, Object> message = (Map<String, Object>) source.get("message");
assertNotNull(message);
assertThat(message.get("nested"), equalTo("This should fail because message should be a string"));
Map<String, Object> error = (Map<String, Object>) failedDoc.get("error");
assertNotNull(error);
assertEquals("document_parsing_exception", error.get("type"));
String errorMessage = (String) error.get("message");
assertThat(errorMessage, containsString("failed to parse field [message] of type [match_only_text] in document with id"));
}

@Override
protected String indexTemplateName() {
return "logs";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
---
setup:
- requires:
cluster_features: [ "gte_v9.1.99" ]
reason: "failure store became enabled by default for log data streams in 9.2.0"

- do:
indices.create_data_stream:
name: logs-app-default
---
teardown:
- do:
indices.delete_data_stream:
name: logs-app-default
ignore: 404

---
"Test logs-*-* data streams have failure store enabled by default":
# index a valid document (string message)
- do:
index:
index: logs-app-default
refresh: true
body:
'@timestamp': '2023-01-01T12:00:00Z'
host:
name: 'server-01'
severity: 'INFO'
message: "Application started successfully"
- match: { result: created }

- do:
indices.get_data_stream:
name: logs-app-default
- match: { data_streams.0.name: logs-app-default }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.failure_store.enabled: true }
- length: { data_streams.0.failure_store.indices: 0 }

# index a document with (object message, causing a mapping conflict)
- do:
index:
index: logs-app-default
refresh: true
body:
'@timestamp': '2023-01-01T12:01:00Z'
host:
name: 'server-02'
severity: 'ERROR'
message:
struct:
value: 42
- match: { result: 'created' }
- match: { failure_store: used}

- do:
indices.get_data_stream:
name: logs-app-default
- length: { data_streams.0.failure_store.indices: 1 }

- do:
search:
index: logs-app-default::data
body:
query:
match_all: {}
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.severity: "INFO" }
- match: { hits.hits.0._source.message: "Application started successfully" }

- do:
search:
index: logs-app-default::failures
body:
query:
match_all: {}
- length: { hits.hits: 1 }
- match: { hits.hits.0._source.document.source.message.struct.value: 42 }
- match: { hits.hits.0._source.error.type: "document_parsing_exception" }
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ public void testDSXpackUsage() throws Exception {
assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(0));
assertThat(failureStoreStats.get("failure_indices_count"), equalTo(0));
assertBusy(() -> {
Map<?, ?> logsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/logs").get("index_templates")).get(0);
assertThat(logsTemplate, notNullValue());
assertThat(logsTemplate.get("name"), equalTo("logs"));
assertThat(((Map<?, ?>) logsTemplate.get("index_template")).get("data_stream"), notNullValue());
Map<?, ?> syntheticsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/synthetics").get("index_templates")).get(0);
assertThat(syntheticsTemplate, notNullValue());
assertThat(syntheticsTemplate.get("name"), equalTo("synthetics"));
assertThat(((Map<?, ?>) syntheticsTemplate.get("index_template")).get("data_stream"), notNullValue());
});
putFailureStoreTemplate();

// Create a data stream
Request indexRequest = new Request("POST", "/logs-mysql-default/_doc");
Request indexRequest = new Request("POST", "/synthetics-myapp-default/_doc");
indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
client().performRequest(indexRequest);

// Roll over the data stream
Request rollover = new Request("POST", "/logs-mysql-default/_rollover");
Request rollover = new Request("POST", "/synthetics-myapp-default/_rollover");
client().performRequest(rollover);

// Create failure store data stream
Expand All @@ -105,10 +105,10 @@ public void testDSXpackUsage() throws Exception {
assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(1));
assertThat(failureStoreStats.get("failure_indices_count"), equalTo(1));

// Enable the failure store for logs-mysql-default using the cluster setting...
// Enable the failure store for synthetics-myapp-default using the cluster setting...
updateClusterSettings(
Settings.builder()
.put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "logs-mysql-default")
.put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "synthetics-myapp-default")
.build()
);
// ...and assert that it counts towards effectively_enabled_count but not explicitly_enabled_count:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
},
"default_pipeline": "logs@default-pipeline"
}
},
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
},
"_meta": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class StackTemplateRegistry extends IndexTemplateRegistry {

// The stack template registry version. This number must be incremented when we make changes
// to built-in templates.
public static final int REGISTRY_VERSION = 16;
public static final int REGISTRY_VERSION = 17;

public static final String TEMPLATE_VERSION_VARIABLE = "xpack.stack.template.version";
public static final Setting<Boolean> STACK_TEMPLATES_ENABLED = Setting.boolSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,14 @@ setup:
data_stream.namespace: "namespace1"

- do:
catch: bad_request
index:
index: logs-dataset0-namespace1
body:
"@timestamp": "2020-01-01"
data_stream.type: "metrics"
data_stream.dataset: "dataset0"
data_stream.namespace: "namespace1"
- match: { failure_store: used }

- do:
catch: bad_request
Expand Down
Loading