Skip to content

Commit ed6544e

Browse files
committed
fix changelog conflicts
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
2 parents 1628eab + aec3fe9 commit ed6544e

36 files changed

+1662
-353
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3232

3333
### Changed
3434
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))
35-
35+
3636
### Dependencies
3737
- Update Apache Lucene from 10.1.0 to 10.2.1 ([#17961](https://github.com/opensearch-project/OpenSearch/pull/17961))
3838
- Bump `com.google.code.gson:gson` from 2.12.1 to 2.13.1 ([#17923](https://github.com/opensearch-project/OpenSearch/pull/17923), [#18266](https://github.com/opensearch-project/OpenSearch/pull/18266))

server/src/internalClusterTest/java/org/opensearch/cluster/ClusterInfoServiceIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.opensearch.index.IndexService;
5252
import org.opensearch.index.shard.IndexShard;
5353
import org.opensearch.index.store.Store;
54-
import org.opensearch.index.store.remote.filecache.FileCacheStats;
54+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
5555
import org.opensearch.indices.IndicesService;
5656
import org.opensearch.indices.SystemIndexDescriptor;
5757
import org.opensearch.plugins.ActionPlugin;
@@ -193,7 +193,7 @@ public void testClusterInfoServiceCollectsInformation() {
193193
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
194194
}
195195

196-
final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
196+
final Map<String, AggregateFileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
197197
assertNotNull(nodeFileCacheStats);
198198
assertThat("file cache is empty on non warm nodes", nodeFileCacheStats.size(), Matchers.equalTo(0));
199199

@@ -227,12 +227,12 @@ public void testClusterInfoServiceCollectsFileCacheInformation() {
227227
infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
228228
ClusterInfo info = infoService.refresh();
229229
assertNotNull("info should not be null", info);
230-
final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
230+
final Map<String, AggregateFileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
231231
assertNotNull(nodeFileCacheStats);
232232
assertThat("file cache is enabled on both warm nodes", nodeFileCacheStats.size(), Matchers.equalTo(2));
233233

234-
for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) {
235-
assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L));
234+
for (AggregateFileCacheStats aggregateFileCacheStats : nodeFileCacheStats.values()) {
235+
assertThat("file cache is non empty", aggregateFileCacheStats.getTotal().getBytes(), greaterThan(0L));
236236
}
237237
}
238238

server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
import org.apache.lucene.store.Directory;
1414
import org.apache.lucene.store.FilterDirectory;
15+
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
16+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
17+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
1518
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
1619
import org.opensearch.action.admin.indices.get.GetIndexRequest;
1720
import org.opensearch.action.admin.indices.get.GetIndexResponse;
@@ -27,6 +30,7 @@
2730
import org.opensearch.index.shard.IndexShard;
2831
import org.opensearch.index.store.CompositeDirectory;
2932
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
33+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
3034
import org.opensearch.index.store.remote.filecache.FileCache;
3135
import org.opensearch.index.store.remote.utils.FileTypeUtils;
3236
import org.opensearch.indices.IndicesService;
@@ -36,7 +40,9 @@
3640

3741
import java.util.Arrays;
3842
import java.util.HashSet;
43+
import java.util.Objects;
3944
import java.util.Set;
45+
import java.util.concurrent.ExecutionException;
4046
import java.util.stream.Collectors;
4147

4248
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -49,6 +55,7 @@
4955
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {
5056

5157
protected static final String INDEX_NAME = "test-idx-1";
58+
protected static final String INDEX_NAME_2 = "test-idx-2";
5259
protected static final int NUM_DOCS_IN_BULK = 1000;
5360

5461
/*
@@ -172,4 +179,82 @@ public void testWritableWarmBasic() throws Exception {
172179
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
173180
fileCache.prune();
174181
}
182+
183+
public void testFullFileAndFileCacheStats() throws ExecutionException, InterruptedException {
184+
185+
InternalTestCluster internalTestCluster = internalCluster();
186+
internalTestCluster.startClusterManagerOnlyNode();
187+
internalTestCluster.startDataAndWarmNodes(1);
188+
189+
Settings settings = Settings.builder()
190+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
191+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
192+
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
193+
.build();
194+
195+
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME_2).setSettings(settings).get());
196+
197+
// Verify from the cluster settings if the data locality is partial
198+
GetIndexResponse getIndexResponse = client().admin()
199+
.indices()
200+
.getIndex(new GetIndexRequest().indices(INDEX_NAME_2).includeDefaults(true))
201+
.get();
202+
203+
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME_2);
204+
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));
205+
206+
// Ingesting docs again before force merge
207+
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
208+
flushAndRefresh(INDEX_NAME_2);
209+
210+
// ensuring cluster is green
211+
ensureGreen();
212+
213+
SearchResponse searchResponse = client().prepareSearch(INDEX_NAME_2).setQuery(QueryBuilders.matchAllQuery()).get();
214+
// Asserting that search returns same number of docs as ingested
215+
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
216+
217+
// Ingesting docs again before force merge
218+
indexBulk(INDEX_NAME_2, NUM_DOCS_IN_BULK);
219+
flushAndRefresh(INDEX_NAME_2);
220+
221+
FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();
222+
223+
// TODO: Make these validation more robust, when SwitchableIndexInput is implemented.
224+
225+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
226+
227+
AggregateFileCacheStats fileCacheStats = nodesStatsResponse.getNodes()
228+
.stream()
229+
.filter(n -> n.getNode().isDataNode())
230+
.toList()
231+
.getFirst()
232+
.getFileCacheStats();
233+
234+
if (Objects.isNull(fileCacheStats)) {
235+
fail("File Cache Stats should not be null");
236+
}
237+
238+
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
239+
// leaks
240+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME_2)).get());
241+
fileCache.prune();
242+
243+
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
244+
int nonEmptyFileCacheNodes = 0;
245+
for (NodeStats stats : response.getNodes()) {
246+
AggregateFileCacheStats fcStats = stats.getFileCacheStats();
247+
if (Objects.isNull(fcStats) == false) {
248+
if (isFileCacheEmpty(fcStats) == false) {
249+
nonEmptyFileCacheNodes++;
250+
}
251+
}
252+
}
253+
assertEquals(0, nonEmptyFileCacheNodes);
254+
255+
}
256+
257+
private boolean isFileCacheEmpty(AggregateFileCacheStats stats) {
258+
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
259+
}
175260
}

server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import org.opensearch.index.IndexNotFoundException;
4545
import org.opensearch.index.shard.ShardPath;
4646
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
47-
import org.opensearch.index.store.remote.filecache.FileCacheStats;
47+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
4848
import org.opensearch.monitor.fs.FsInfo;
4949
import org.opensearch.node.Node;
5050
import org.opensearch.repositories.fs.FsRepository;
@@ -711,7 +711,7 @@ private void assertIndexDirectoryDoesNotExist(String... indexNames) {
711711
private void assertAllNodesFileCacheEmpty() {
712712
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
713713
for (NodeStats stats : response.getNodes()) {
714-
FileCacheStats fcstats = stats.getFileCacheStats();
714+
AggregateFileCacheStats fcstats = stats.getFileCacheStats();
715715
if (fcstats != null) {
716716
assertTrue(isFileCacheEmpty(fcstats));
717717
}
@@ -722,7 +722,7 @@ private void assertNodesFileCacheNonEmpty(int numNodes) {
722722
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
723723
int nonEmptyFileCacheNodes = 0;
724724
for (NodeStats stats : response.getNodes()) {
725-
FileCacheStats fcStats = stats.getFileCacheStats();
725+
AggregateFileCacheStats fcStats = stats.getFileCacheStats();
726726
if (stats.getNode().isWarmNode()) {
727727
if (!isFileCacheEmpty(fcStats)) {
728728
nonEmptyFileCacheNodes++;
@@ -735,7 +735,7 @@ private void assertNodesFileCacheNonEmpty(int numNodes) {
735735
assertEquals(numNodes, nonEmptyFileCacheNodes);
736736
}
737737

738-
private boolean isFileCacheEmpty(FileCacheStats stats) {
738+
private boolean isFileCacheEmpty(AggregateFileCacheStats stats) {
739739
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
740740
}
741741

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import org.opensearch.index.SegmentReplicationRejectionStats;
5151
import org.opensearch.index.stats.IndexingPressureStats;
5252
import org.opensearch.index.stats.ShardIndexingPressureStats;
53-
import org.opensearch.index.store.remote.filecache.FileCacheStats;
53+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
5454
import org.opensearch.indices.NodeIndicesStats;
5555
import org.opensearch.ingest.IngestStats;
5656
import org.opensearch.monitor.fs.FsInfo;
@@ -143,7 +143,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
143143
private WeightedRoutingStats weightedRoutingStats;
144144

145145
@Nullable
146-
private FileCacheStats fileCacheStats;
146+
private AggregateFileCacheStats fileCacheStats;
147147

148148
@Nullable
149149
private TaskCancellationStats taskCancellationStats;
@@ -208,7 +208,7 @@ public NodeStats(StreamInput in) throws IOException {
208208
weightedRoutingStats = null;
209209
}
210210
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
211-
fileCacheStats = in.readOptionalWriteable(FileCacheStats::new);
211+
fileCacheStats = in.readOptionalWriteable(AggregateFileCacheStats::new);
212212
} else {
213213
fileCacheStats = null;
214214
}
@@ -277,7 +277,7 @@ public NodeStats(
277277
@Nullable SearchBackpressureStats searchBackpressureStats,
278278
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
279279
@Nullable WeightedRoutingStats weightedRoutingStats,
280-
@Nullable FileCacheStats fileCacheStats,
280+
@Nullable AggregateFileCacheStats fileCacheStats,
281281
@Nullable TaskCancellationStats taskCancellationStats,
282282
@Nullable SearchPipelineStats searchPipelineStats,
283283
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@@ -444,7 +444,7 @@ public WeightedRoutingStats getWeightedRoutingStats() {
444444
return weightedRoutingStats;
445445
}
446446

447-
public FileCacheStats getFileCacheStats() {
447+
public AggregateFileCacheStats getFileCacheStats() {
448448
return fileCacheStats;
449449
}
450450

server/src/main/java/org/opensearch/cluster/ClusterInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import org.opensearch.core.index.shard.ShardId;
4444
import org.opensearch.core.xcontent.ToXContentFragment;
4545
import org.opensearch.core.xcontent.XContentBuilder;
46-
import org.opensearch.index.store.remote.filecache.FileCacheStats;
46+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
4747

4848
import java.io.IOException;
4949
import java.util.Collections;
@@ -68,7 +68,7 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
6868
public static final ClusterInfo EMPTY = new ClusterInfo();
6969
final Map<ShardRouting, String> routingToDataPath;
7070
final Map<NodeAndPath, ReservedSpace> reservedSpace;
71-
final Map<String, FileCacheStats> nodeFileCacheStats;
71+
final Map<String, AggregateFileCacheStats> nodeFileCacheStats;
7272
private long avgTotalBytes;
7373
private long avgFreeByte;
7474

@@ -92,7 +92,7 @@ public ClusterInfo(
9292
final Map<String, Long> shardSizes,
9393
final Map<ShardRouting, String> routingToDataPath,
9494
final Map<NodeAndPath, ReservedSpace> reservedSpace,
95-
final Map<String, FileCacheStats> nodeFileCacheStats
95+
final Map<String, AggregateFileCacheStats> nodeFileCacheStats
9696
) {
9797
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
9898
this.shardSizes = shardSizes;
@@ -117,7 +117,7 @@ public ClusterInfo(StreamInput in) throws IOException {
117117
this.routingToDataPath = Collections.unmodifiableMap(routingMap);
118118
this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap);
119119
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
120-
this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new);
120+
this.nodeFileCacheStats = in.readMap(StreamInput::readString, AggregateFileCacheStats::new);
121121
} else {
122122
this.nodeFileCacheStats = Map.of();
123123
}
@@ -242,7 +242,7 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
242242
/**
243243
* Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it.
244244
*/
245-
public Map<String, FileCacheStats> getNodeFileCacheStats() {
245+
public Map<String, AggregateFileCacheStats> getNodeFileCacheStats() {
246246
return Collections.unmodifiableMap(this.nodeFileCacheStats);
247247
}
248248

server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
import org.opensearch.core.action.ActionListener;
5959
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
6060
import org.opensearch.index.store.StoreStats;
61-
import org.opensearch.index.store.remote.filecache.FileCacheStats;
61+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
6262
import org.opensearch.monitor.fs.FsInfo;
6363
import org.opensearch.threadpool.ThreadPool;
6464
import org.opensearch.transport.ReceiveTimeoutTransportException;
@@ -112,7 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
112112

113113
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
114114
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
115-
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
115+
private volatile Map<String, AggregateFileCacheStats> nodeFileCacheStats;
116116
private volatile IndicesStatsSummary indicesStatsSummary;
117117
// null if this node is not currently the cluster-manager
118118
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import org.opensearch.common.settings.ClusterSettings;
4444
import org.opensearch.common.settings.Settings;
4545
import org.opensearch.core.common.unit.ByteSizeValue;
46+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
4647
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
47-
import org.opensearch.index.store.remote.filecache.FileCacheStats;
4848

4949
import java.util.List;
5050
import java.util.stream.Collectors;
@@ -280,7 +280,7 @@ private long calculateTotalAddressableSpace(RoutingNode node, RoutingAllocation
280280
ClusterInfo clusterInfo = allocation.clusterInfo();
281281
// TODO: Change the default value to 5 instead of 0
282282
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
283-
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
283+
final AggregateFileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
284284
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
285285
return (long) dataToFileCacheSizeRatio * nodeCacheSize;
286286
}
@@ -309,7 +309,7 @@ private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation)
309309
}
310310

311311
// Fail open if there are no file cache stats available
312-
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
312+
final AggregateFileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
313313
if (fileCacheStats == null) {
314314
if (logger.isTraceEnabled()) {
315315
logger.trace("unable to get file cache stats for node [{}], allowing allocation", node.nodeId());

server/src/main/java/org/opensearch/common/cache/stats/DefaultCacheStatsHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private void resetHelper(Node current) {
111111

112112
@Override
113113
public long count() {
114-
// Include this here so caches don't have to create an entire CacheStats object to run count().
114+
// Include this here so caches don't have to create an entire AggregateRefCountedCacheStats object to run count().
115115
return statsRoot.getEntries();
116116
}
117117

server/src/main/java/org/opensearch/common/cache/stats/ImmutableCacheStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Objects;
2121

2222
/**
23-
* An immutable snapshot of CacheStats.
23+
* An immutable snapshot of AggregateRefCountedCacheStats.
2424
*
2525
* @opensearch.experimental
2626
*/

0 commit comments

Comments
 (0)