Skip to content

Commit b2bda82

Browse files
committed
Add primary store size to snapshot status result for shallow V2 snapshots
Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
1 parent 397cbb1 commit b2bda82

25 files changed

+219
-69
lines changed

server/src/internalClusterTest/java/org/opensearch/snapshots/RemoteIndexSnapshotStatusApiIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private void assertShallowV2SnapshotStatus(SnapshotStatus snapshotStatus, boolea
278278
if (hasIndexFilter) {
279279
assertEquals(0, snapshotStatus.getStats().getTotalSize());
280280
} else {
281-
// TODO: after adding primary store size at the snapshot level, total size here should be > 0
281+
assertTrue(snapshotStatus.getStats().getTotalSize() > 0L);
282282
}
283283
// assert that total and incremental values of file count and size_in_bytes are 0 at index and shard levels
284284
assertEquals(0, snapshotStatus.getStats().getTotalFileCount());

server/src/internalClusterTest/java/org/opensearch/snapshots/SnapshotStatusApisIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@
5252
import org.opensearch.common.util.io.IOUtils;
5353
import org.opensearch.core.common.Strings;
5454
import org.opensearch.core.common.unit.ByteSizeUnit;
55-
import org.opensearch.repositories.IndexId;
5655
import org.opensearch.core.rest.RestStatus;
5756
import org.opensearch.index.IndexNotFoundException;
57+
import org.opensearch.repositories.IndexId;
5858
import org.opensearch.repositories.blobstore.BlobStoreRepository;
5959
import org.opensearch.threadpool.ThreadPool;
6060

@@ -70,8 +70,8 @@
7070
import java.util.concurrent.TimeUnit;
7171
import java.util.stream.Collectors;
7272

73-
import static org.opensearch.test.OpenSearchIntegTestCase.resolvePath;
7473
import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API;
74+
import static org.opensearch.test.OpenSearchIntegTestCase.resolvePath;
7575
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
7676
import static org.hamcrest.Matchers.equalTo;
7777
import static org.hamcrest.Matchers.greaterThan;

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.action.admin.cluster.snapshots.status;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.cluster.SnapshotsInProgress;
3637
import org.opensearch.cluster.SnapshotsInProgress.State;
3738
import org.opensearch.common.Nullable;
@@ -86,6 +87,8 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
8687

8788
private SnapshotStats stats;
8889

90+
private final long initialTotalSizeInBytes;
91+
8992
@Nullable
9093
private final Boolean includeGlobalState;
9194

@@ -96,7 +99,12 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
9699
includeGlobalState = in.readOptionalBoolean();
97100
final long startTime = in.readLong();
98101
final long time = in.readLong();
99-
updateShardStats(startTime, time);
102+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
103+
initialTotalSizeInBytes = in.readOptionalLong();
104+
} else {
105+
initialTotalSizeInBytes = 0L;
106+
}
107+
updateShardStats(startTime, time, initialTotalSizeInBytes);
100108
}
101109

102110
SnapshotStatus(
@@ -105,15 +113,18 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
105113
List<SnapshotIndexShardStatus> shards,
106114
Boolean includeGlobalState,
107115
long startTime,
108-
long time
116+
long time,
117+
long initialTotalSizeInBytes
109118
) {
110119
this.snapshot = Objects.requireNonNull(snapshot);
111120
this.state = Objects.requireNonNull(state);
112121
this.shards = Objects.requireNonNull(shards);
113122
this.includeGlobalState = includeGlobalState;
114123
shardsStats = new SnapshotShardsStats(shards);
115124
assert time >= 0 : "time must be >= 0 but received [" + time + "]";
116-
updateShardStats(startTime, time);
125+
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
126+
assert initialTotalSizeInBytes >= 0 : "initialTotalSizeInBytes must be >= 0 but received [" + initialTotalSizeInBytes + "]";
127+
updateShardStats(startTime, time, initialTotalSizeInBytes);
117128
}
118129

119130
private SnapshotStatus(
@@ -123,7 +134,8 @@ private SnapshotStatus(
123134
Map<String, SnapshotIndexStatus> indicesStatus,
124135
SnapshotShardsStats shardsStats,
125136
SnapshotStats stats,
126-
Boolean includeGlobalState
137+
Boolean includeGlobalState,
138+
long initialTotalSizeInBytes
127139
) {
128140
this.snapshot = snapshot;
129141
this.state = state;
@@ -132,6 +144,7 @@ private SnapshotStatus(
132144
this.shardsStats = shardsStats;
133145
this.stats = stats;
134146
this.includeGlobalState = includeGlobalState;
147+
this.initialTotalSizeInBytes = initialTotalSizeInBytes;
135148
}
136149

137150
/**
@@ -204,6 +217,9 @@ public void writeTo(StreamOutput out) throws IOException {
204217
out.writeOptionalBoolean(includeGlobalState);
205218
out.writeLong(stats.getStartTime());
206219
out.writeLong(stats.getTime());
220+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
221+
out.writeOptionalLong(initialTotalSizeInBytes);
222+
}
207223
}
208224

209225
@Override
@@ -224,6 +240,7 @@ public SnapshotStats getStats() {
224240
private static final String STATE = "state";
225241
private static final String INDICES = "indices";
226242
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";
243+
private static final String INITIAL_TOTAL_SIZE_IN_BYTES = "initial_total_size_in_bytes";
227244

228245
@Override
229246
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@@ -235,6 +252,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
235252
if (includeGlobalState != null) {
236253
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
237254
}
255+
if (initialTotalSizeInBytes != 0) {
256+
builder.field(INITIAL_TOTAL_SIZE_IN_BYTES, initialTotalSizeInBytes);
257+
}
238258
builder.field(SnapshotShardsStats.Fields.SHARDS_STATS, shardsStats, params);
239259
builder.field(SnapshotStats.Fields.STATS, stats, params);
240260
builder.startObject(INDICES);
@@ -256,6 +276,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
256276
String uuid = (String) parsedObjects[i++];
257277
String rawState = (String) parsedObjects[i++];
258278
Boolean includeGlobalState = (Boolean) parsedObjects[i++];
279+
Long initialTotalSizeInBytes = (Long) parsedObjects[i++];
259280
SnapshotStats stats = ((SnapshotStats) parsedObjects[i++]);
260281
SnapshotShardsStats shardsStats = ((SnapshotShardsStats) parsedObjects[i++]);
261282
@SuppressWarnings("unchecked")
@@ -276,7 +297,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
276297
shards.addAll(index.getShards().values());
277298
}
278299
}
279-
return new SnapshotStatus(snapshot, state, shards, indicesStatus, shardsStats, stats, includeGlobalState);
300+
return new SnapshotStatus(
301+
snapshot,
302+
state,
303+
shards,
304+
indicesStatus,
305+
shardsStats,
306+
stats,
307+
includeGlobalState,
308+
initialTotalSizeInBytes
309+
);
280310
}
281311
);
282312
static {
@@ -285,6 +315,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
285315
PARSER.declareString(constructorArg(), new ParseField(UUID));
286316
PARSER.declareString(constructorArg(), new ParseField(STATE));
287317
PARSER.declareBoolean(optionalConstructorArg(), new ParseField(INCLUDE_GLOBAL_STATE));
318+
PARSER.declareLong(optionalConstructorArg(), new ParseField(INITIAL_TOTAL_SIZE_IN_BYTES));
288319
PARSER.declareField(
289320
constructorArg(),
290321
SnapshotStats::fromXContent,
@@ -299,8 +330,8 @@ public static SnapshotStatus fromXContent(XContentParser parser) throws IOExcept
299330
return PARSER.parse(parser, null);
300331
}
301332

302-
private void updateShardStats(long startTime, long time) {
303-
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
333+
private void updateShardStats(long startTime, long time, long initialTotalSizeInBytes) {
334+
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, initialTotalSizeInBytes, 0);
304335
shardsStats = new SnapshotShardsStats(shards);
305336
for (SnapshotIndexShardStatus shard : shards) {
306337
// BWC: only update timestamps when we did not get a start time from an old node

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ private void buildResponse(
297297
Collections.unmodifiableList(shardStatusBuilder),
298298
entry.includeGlobalState(),
299299
entry.startTime(),
300-
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)
300+
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L),
301+
0L
301302
)
302303
);
303304
}
@@ -344,7 +345,7 @@ private void loadRepositoryData(
344345
boolean isShallowV2Snapshot = snapshotInfo.getPinnedTimestamp() > 0;
345346
long initialSnapshotTotalSize = 0;
346347
if (isShallowV2Snapshot && request.indices().length == 0) {
347-
// TODO: add primary store size in bytes at the snapshot level
348+
initialSnapshotTotalSize = snapshotInfo.getSnapshotSizeInBytes();
348349
}
349350

350351
for (Map.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatuses.entrySet()) {
@@ -377,7 +378,8 @@ private void loadRepositoryData(
377378
snapshotInfo.includeGlobalState(),
378379
startTime,
379380
// Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0
380-
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime
381+
(endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime,
382+
initialSnapshotTotalSize
381383
)
382384
);
383385
}

server/src/main/java/org/opensearch/cluster/ClusterInfo.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,13 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
6969
final Map<ShardRouting, String> routingToDataPath;
7070
final Map<NodeAndPath, ReservedSpace> reservedSpace;
7171
final Map<String, FileCacheStats> nodeFileCacheStats;
72+
final long primaryStoreSize;
73+
7274
private long avgTotalBytes;
7375
private long avgFreeByte;
7476

7577
protected ClusterInfo() {
76-
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
78+
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), 0L);
7779
}
7880

7981
/**
@@ -84,6 +86,7 @@ protected ClusterInfo() {
8486
* @param shardSizes a shardkey to size in bytes mapping per shard.
8587
* @param routingToDataPath the shard routing to datapath mapping
8688
* @param reservedSpace reserved space per shard broken down by node and data path
89+
* @param primaryStoreSize total size in bytes for all the primary shards
8790
* @see #shardIdentifierFromRouting
8891
*/
8992
public ClusterInfo(
@@ -92,14 +95,16 @@ public ClusterInfo(
9295
final Map<String, Long> shardSizes,
9396
final Map<ShardRouting, String> routingToDataPath,
9497
final Map<NodeAndPath, ReservedSpace> reservedSpace,
95-
final Map<String, FileCacheStats> nodeFileCacheStats
98+
final Map<String, FileCacheStats> nodeFileCacheStats,
99+
final long primaryStoreSize
96100
) {
97101
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
98102
this.shardSizes = shardSizes;
99103
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
100104
this.routingToDataPath = routingToDataPath;
101105
this.reservedSpace = reservedSpace;
102106
this.nodeFileCacheStats = nodeFileCacheStats;
107+
this.primaryStoreSize = primaryStoreSize;
103108
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
104109
}
105110

@@ -121,6 +126,11 @@ public ClusterInfo(StreamInput in) throws IOException {
121126
} else {
122127
this.nodeFileCacheStats = Map.of();
123128
}
129+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
130+
this.primaryStoreSize = in.readOptionalLong();
131+
} else {
132+
this.primaryStoreSize = 0L;
133+
}
124134

125135
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
126136
}
@@ -166,6 +176,9 @@ public void writeTo(StreamOutput out) throws IOException {
166176
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
167177
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
168178
}
179+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
180+
out.writeOptionalLong(this.primaryStoreSize);
181+
}
169182
}
170183

171184
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@@ -220,6 +233,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
220233
}
221234
}
222235
builder.endArray(); // end "reserved_sizes"
236+
builder.field("primary_store_size", this.primaryStoreSize);
223237
return builder;
224238
}
225239

@@ -246,6 +260,13 @@ public Map<String, FileCacheStats> getNodeFileCacheStats() {
246260
return Collections.unmodifiableMap(this.nodeFileCacheStats);
247261
}
248262

263+
/**
264+
* Returns the total size in bytes for all the primary shards
265+
*/
266+
public long getPrimaryStoreSize() {
267+
return primaryStoreSize;
268+
}
269+
249270
/**
250271
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
251272
*/

server/src/main/java/org/opensearch/cluster/InternalClusterInfoService.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
115115
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
116116
private volatile IndicesStatsSummary indicesStatsSummary;
117117
// null if this node is not currently the cluster-manager
118+
119+
private volatile long primaryStoreSize;
118120
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
119121
private volatile boolean enabled;
120122
private volatile TimeValue fetchTimeout;
@@ -127,6 +129,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
127129
this.mostAvailableSpaceUsages = Map.of();
128130
this.nodeFileCacheStats = Map.of();
129131
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
132+
this.primaryStoreSize = 0L;
130133
this.threadPool = threadPool;
131134
this.client = client;
132135
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
@@ -213,7 +216,8 @@ public ClusterInfo getClusterInfo() {
213216
indicesStatsSummary.shardSizes,
214217
indicesStatsSummary.shardRoutingToDataPath,
215218
indicesStatsSummary.reservedSpace,
216-
nodeFileCacheStats
219+
nodeFileCacheStats,
220+
primaryStoreSize
217221
);
218222
}
219223

@@ -305,8 +309,13 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
305309
final Map<String, Long> shardSizeByIdentifierBuilder = new HashMap<>();
306310
final Map<ShardRouting, String> dataPathByShardRoutingBuilder = new HashMap<>();
307311
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders = new HashMap<>();
308-
buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders);
309-
312+
primaryStoreSize = buildShardLevelInfo(
313+
logger,
314+
stats,
315+
shardSizeByIdentifierBuilder,
316+
dataPathByShardRoutingBuilder,
317+
reservedSpaceBuilders
318+
);
310319
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> rsrvdSpace = new HashMap<>();
311320
reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build()));
312321

@@ -366,13 +375,14 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
366375
listeners.add(clusterInfoConsumer);
367376
}
368377

369-
static void buildShardLevelInfo(
378+
static long buildShardLevelInfo(
370379
Logger logger,
371380
ShardStats[] stats,
372381
final Map<String, Long> shardSizes,
373382
final Map<ShardRouting, String> newShardRoutingToDataPath,
374383
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceByShard
375384
) {
385+
long currentPrimaryStoreSize = 0L;
376386
for (ShardStats s : stats) {
377387
final ShardRouting shardRouting = s.getShardRouting();
378388
newShardRoutingToDataPath.put(shardRouting, s.getDataPath());
@@ -382,6 +392,9 @@ static void buildShardLevelInfo(
382392
continue;
383393
}
384394
final long size = storeStats.sizeInBytes();
395+
if (shardRouting.primary()) {
396+
currentPrimaryStoreSize += size;
397+
}
385398
final long reserved = storeStats.getReservedSize().getBytes();
386399

387400
final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
@@ -396,6 +409,7 @@ static void buildShardLevelInfo(
396409
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
397410
}
398411
}
412+
return currentPrimaryStoreSize;
399413
}
400414

401415
static void fillDiskUsagePerNode(

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,7 @@ protected Node(
12081208
clusterModule.getIndexNameExpressionResolver(),
12091209
repositoryService,
12101210
transportService,
1211+
clusterInfoService,
12111212
actionModule.getActionFilters(),
12121213
remoteStorePinnedTimestampService
12131214
);

0 commit comments

Comments
 (0)