Skip to content

Commit f49ca69

Browse files
committed
fix conflicts
2 parents 58e444a + fe22c29 commit f49ca69

File tree

92 files changed

+2697
-629
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+2697
-629
lines changed

.github/workflows/wrapper.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ jobs:
88
runs-on: ubuntu-latest
99
steps:
1010
- uses: actions/checkout@v4
11-
- uses: gradle/actions/wrapper-validation@v3
11+
- uses: gradle/actions/wrapper-validation@v4

CHANGELOG.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131
- Upgrade crypto kms plugin dependencies for AWS SDK v2.x. ([#18268](https://github.com/opensearch-project/OpenSearch/pull/18268))
3232
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
3333
- [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))
34+
- Optimize gRPC perf by passing by reference ([#18303](https://github.com/opensearch-project/OpenSearch/pull/18303))
3435
- Added File Cache Stats - Involves Block level as well as full file level stats ([#17538](https://github.com/opensearch-project/OpenSearch/issues/17479))
36+
- Added File Cache Pinning ([#17617](https://github.com/opensearch-project/OpenSearch/issues/13648))
37+
- Support consumer reset in Resume API for pull-based ingestion. This PR includes a breaking change for the experimental pull-based ingestion feature. ([#18332](https://github.com/opensearch-project/OpenSearch/pull/18332))
3538
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))
3639

3740
### Changed
3841
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))
39-
42+
- Change implementation for `percentiles` aggregation for latency improvement [#18124](https://github.com/opensearch-project/OpenSearch/pull/18124)
43+
4044
### Dependencies
4145
- Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961))
4246
- Bump `com.google.code.gson:gson` from 2.12.1 to 2.13.1 ([#17923](https://github.com/opensearch-project/OpenSearch/pull/17923), [#18266](https://github.com/opensearch-project/OpenSearch/pull/18266))
@@ -54,6 +58,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5458
- Bump `lycheeverse/lychee-action` from 2.4.0 to 2.4.1 ([#18264](https://github.com/opensearch-project/OpenSearch/pull/18264))
5559
- Bump `com.maxmind.geoip2:geoip2` from 4.2.1 to 4.3.0 ([#18263](https://github.com/opensearch-project/OpenSearch/pull/18263))
5660
- Bump `com.azure:azure-json` from 1.3.0 to 1.5.0 ([#18335](https://github.com/opensearch-project/OpenSearch/pull/18335))
61+
- Bump `org.jline:jline` from 3.29.0 to 3.30.3 ([#18368](https://github.com/opensearch-project/OpenSearch/pull/18368))
62+
- Bump `com.nimbusds:oauth2-oidc-sdk` from 11.23.1 to 11.25 ([#18369](https://github.com/opensearch-project/OpenSearch/pull/18369))
63+
- Bump `gradle/actions` from 3 to 4 ([#18371](https://github.com/opensearch-project/OpenSearch/pull/18371))
5764

5865
### Deprecated
5966

@@ -70,6 +77,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
7077
- Avoid NPE if on SnapshotInfo if 'shallow' boolean not present ([#18187](https://github.com/opensearch-project/OpenSearch/issues/18187))
7178
- Fix 'system call filter not installed' caused when network.host: 0.0.0.0 ([#18309](https://github.com/opensearch-project/OpenSearch/pull/18309))
7279
- Fix MatrixStatsAggregator reuse when mode parameter changes ([#18242](https://github.com/opensearch-project/OpenSearch/issues/18242))
80+
- Replace the deprecated construction method of TopScoreDocCollectorManager with the new method ([#18395](https://github.com/opensearch-project/OpenSearch/pull/18395))
7381

7482
### Security
7583

CODE_OF_CONDUCT.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ This code of conduct applies to all spaces provided by the OpenSource project in
2222
* Advocating for or encouraging any of the above behaviors.
2323
* Enforcement and Reporting Code of Conduct Issues:
2424

25-
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported. [Contact us](mailto:opensource-codeofconduct@amazon.com). All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances.
25+
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported. [Contact us](mailto:conduct@opensearch.foundation). All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances.

MAINTAINERS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
1818
| Binlong Gao | [gaobinlong](https://github.com/gaobinlong) | Amazon |
1919
| Gaurav Bafna | [gbbafna](https://github.com/gbbafna) | Amazon |
2020
| Jay Deng | [jed326](https://github.com/jed326) | Amazon |
21+
| Ke Wei | [kkewwei](https://github.com/kkewwei) | ByteDance |
2122
| Kunal Kotwani | [kotwanikunal](https://github.com/kotwanikunal) | Amazon |
2223
| Varun Bansal | [linuxpi](https://github.com/linuxpi) | Amazon |
2324
| Marc Handalian | [mch2](https://github.com/mch2) | Amazon |

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
## Code of Conduct
4545

46-
This project has adopted the [Amazon Open Source Code of Conduct](CODE_OF_CONDUCT.md). For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq), or contact [opensource-codeofconduct@amazon.com](mailto:opensource-codeofconduct@amazon.com) with any additional questions or comments.
46+
The project's [Code of Conduct](CODE_OF_CONDUCT.md) outlines our expectations for all participants in our community, based on the [OpenSearch Code of Conduct](https://opensearch.org/code-of-conduct/). Please contact [conduct@opensearch.foundation](mailto:conduct@opensearch.foundation) with any additional questions or comments.
4747

4848
## Security
4949
If you discover a potential security issue in this project we ask that you notify OpenSearch Security directly via email to security@opensearch.org. Please do **not** create a public GitHub issue.

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ protobuf = "3.25.5"
2525
jakarta_annotation = "1.3.5"
2626
google_http_client = "1.44.1"
2727
google_auth = "1.29.0"
28-
tdigest = "3.3"
28+
tdigest = "3.3" # Warning: Before updating tdigest, ensure its serialization code for MergingDigest hasn't changed
2929
hdrhistogram = "2.2.2"
3030
grpc = "1.68.2"
3131
json_smart = "2.5.2"

modules/parent-join/src/main/java/org/opensearch/join/query/ParentChildInnerHitContextBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public TopDocsAndMaxScore topDocs(SearchHit hit) throws IOException {
175175
maxScoreCollector = new MaxScoreCollector();
176176
}
177177
} else {
178-
topDocsCollector = new TopScoreDocCollectorManager(topN, null, Integer.MAX_VALUE, false).newCollector();
178+
topDocsCollector = new TopScoreDocCollectorManager(topN, null, Integer.MAX_VALUE).newCollector();
179179
maxScoreCollector = new MaxScoreCollector();
180180
}
181181
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void testKafkaIngestion_RewindByTimeStamp() {
8585
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
8686
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
8787
.put("ingestion_source.type", "kafka")
88-
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
88+
.put("ingestion_source.pointer.init.reset", "reset_by_timestamp")
8989
// 1739459500000 is the timestamp of the first message
9090
// 1739459800000 is the timestamp of the second message
9191
// by resetting to 1739459600000, only the second message will be ingested
@@ -115,7 +115,7 @@ public void testKafkaIngestion_RewindByOffset() {
115115
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
116116
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
117117
.put("ingestion_source.type", "kafka")
118-
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
118+
.put("ingestion_source.pointer.init.reset", "reset_by_offset")
119119
.put("ingestion_source.pointer.init.reset.value", "1")
120120
.put("ingestion_source.param.topic", "test")
121121
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.kafka.clients.producer.ProducerRecord;
1616
import org.apache.kafka.common.serialization.StringSerializer;
1717
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
18+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1819
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
1920
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
2021
import org.opensearch.action.pagination.PageParams;
@@ -176,6 +177,15 @@ protected ResumeIngestionResponse resumeIngestion(String indexName) throws Execu
176177
return client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(indexName)).get();
177178
}
178179

180+
protected ResumeIngestionResponse resumeIngestion(
181+
String index,
182+
int shard,
183+
ResumeIngestionRequest.ResetSettings.ResetMode mode,
184+
String value
185+
) throws ExecutionException, InterruptedException {
186+
return client().admin().indices().resumeIngestion(Requests.resumeIngestionRequest(index, shard, mode, value)).get();
187+
}
188+
179189
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
180190
createIndexWithDefaultSettings(indexName, numShards, numReplicas, 1);
181191
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
1414
import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse;
15+
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest;
1516
import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse;
1617
import org.opensearch.action.admin.indices.streamingingestion.state.GetIngestionStateResponse;
1718
import org.opensearch.action.pagination.PageParams;
@@ -498,6 +499,132 @@ public void testClusterWriteBlock() throws Exception {
498499
waitForSearchableDocs(4, Arrays.asList(nodeB, nodeC));
499500
}
500501

502+
public void testOffsetUpdateOnBlockErrorPolicy() throws Exception {
503+
// setup nodes and index using block strategy
504+
// produce one invalid message to block the processor
505+
produceData("1", "name1", "21");
506+
produceData("{\"_op_type\":\"invalid\",\"_source\":{\"name\":\"name4\", \"age\": 25}}");
507+
produceData("2", "name2", "22");
508+
produceData("3", "name3", "24");
509+
produceData("4", "name4", "24");
510+
internalCluster().startClusterManagerOnlyNode();
511+
final String nodeA = internalCluster().startDataOnlyNode();
512+
final String nodeB = internalCluster().startDataOnlyNode();
513+
514+
createIndex(
515+
indexName,
516+
Settings.builder()
517+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
518+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
519+
.put("ingestion_source.type", "kafka")
520+
.put("ingestion_source.error_strategy", "block")
521+
.put("ingestion_source.pointer.init.reset", "earliest")
522+
.put("ingestion_source.internal_queue_size", "1000")
523+
.put("ingestion_source.param.topic", topicName)
524+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
525+
.put("index.replication.type", "SEGMENT")
526+
.build(),
527+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
528+
);
529+
530+
ensureGreen(indexName);
531+
// expect only 1 document to be successfully indexed
532+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
533+
534+
// pause ingestion
535+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
536+
assertTrue(pauseResponse.isAcknowledged());
537+
assertTrue(pauseResponse.isShardsAcknowledged());
538+
waitForState(() -> {
539+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
540+
return ingestionState.getFailedShards() == 0
541+
&& Arrays.stream(ingestionState.getShardStates())
542+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
543+
});
544+
// revalidate that only 1 document is visible
545+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
546+
547+
// update offset to skip past the invalid message
548+
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "2");
549+
assertTrue(resumeResponse.isAcknowledged());
550+
assertTrue(resumeResponse.isShardsAcknowledged());
551+
waitForState(() -> {
552+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
553+
return Arrays.stream(ingestionState.getShardStates())
554+
.allMatch(
555+
state -> state.isPollerPaused() == false
556+
&& (state.pollerState().equalsIgnoreCase("polling") || state.pollerState().equalsIgnoreCase("processing"))
557+
);
558+
});
559+
560+
// validate remaining messages are successfully indexed
561+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
562+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
563+
.getPollingIngestStats();
564+
assertThat(stats.getConsumerStats().totalDuplicateMessageSkippedCount(), is(0L));
565+
}
566+
567+
public void testConsumerResetByTimestamp() throws Exception {
568+
produceData("1", "name1", "21", 100, "index");
569+
produceData("2", "name2", "22", 105, "index");
570+
produceData("3", "name3", "24", 110, "index");
571+
produceData("4", "name4", "24", 120, "index");
572+
internalCluster().startClusterManagerOnlyNode();
573+
final String nodeA = internalCluster().startDataOnlyNode();
574+
final String nodeB = internalCluster().startDataOnlyNode();
575+
576+
createIndex(
577+
indexName,
578+
Settings.builder()
579+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
580+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
581+
.put("ingestion_source.type", "kafka")
582+
.put("ingestion_source.error_strategy", "drop")
583+
.put("ingestion_source.pointer.init.reset", "earliest")
584+
.put("ingestion_source.internal_queue_size", "1000")
585+
.put("ingestion_source.param.topic", topicName)
586+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
587+
.put("index.replication.type", "SEGMENT")
588+
.build(),
589+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
590+
);
591+
592+
ensureGreen(indexName);
593+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
594+
595+
// expect error response since ingestion not yet paused
596+
ResumeIngestionResponse resumeResponse = resumeIngestion(
597+
indexName,
598+
0,
599+
ResumeIngestionRequest.ResetSettings.ResetMode.TIMESTAMP,
600+
"100"
601+
);
602+
assertTrue(resumeResponse.isAcknowledged());
603+
assertFalse(resumeResponse.isShardsAcknowledged());
604+
assertEquals(1, resumeResponse.getShardFailures().length);
605+
606+
// pause ingestion
607+
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
608+
assertTrue(pauseResponse.isAcknowledged());
609+
assertTrue(pauseResponse.isShardsAcknowledged());
610+
waitForState(() -> {
611+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
612+
return ingestionState.getFailedShards() == 0
613+
&& Arrays.stream(ingestionState.getShardStates())
614+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
615+
});
616+
617+
// reset consumer by a timestamp after first message was produced
618+
resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.TIMESTAMP, "102");
619+
assertTrue(resumeResponse.isAcknowledged());
620+
assertTrue(resumeResponse.isShardsAcknowledged());
621+
waitForState(() -> {
622+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
623+
.getPollingIngestStats();
624+
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 3;
625+
});
626+
}
627+
501628
private void verifyRemoteStoreEnabled(String node) {
502629
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
503630
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

0 commit comments

Comments
 (0)