Skip to content

Commit a4bc4bf

Browse files
authored
[GRPC] Update GRPC APIs to use latest opensearch-protobufs:0.3.0 jar and various javadocs fixes (#17895)
Signed-off-by: Karen Xu <karenxyr@gmail.com>
1 parent 4d0ac04 commit a4bc4bf

27 files changed

+474
-296
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6666
- Bump `com.google.api.grpc:proto-google-iam-v1` from 1.33.0 to 1.49.1 ([#17811](https://github.com/opensearch-project/OpenSearch/pull/17811))
6767
- Bump `com.azure:azure-core` from 1.54.1 to 1.55.3 ([#17810](https://github.com/opensearch-project/OpenSearch/pull/17810))
6868
- Bump `org.apache.poi` version from 5.2.5 to 5.4.1 in /plugins/ingest-attachment ([#17887](https://github.com/opensearch-project/OpenSearch/pull/17887))
69+
- Bump `org.opensearch:protobufs` from 0.2.0 to 0.3.0 ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))
6970

7071
### Changed
7172

plugins/transport-grpc/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ dependencies {
3535
implementation "io.grpc:grpc-stub:${versions.grpc}"
3636
implementation "io.grpc:grpc-util:${versions.grpc}"
3737
implementation "io.perfmark:perfmark-api:0.26.0"
38-
implementation "org.opensearch:protobufs:0.2.0"
38+
implementation "org.opensearch:protobufs:0.3.0"
3939
testImplementation project(':test:framework')
4040
}
4141

plugins/transport-grpc/licenses/protobufs-0.2.0.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
5e22ed37e4535c9c9cfeb8993f5294ba1201795c

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/ObjectMapProtoUtils.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import java.util.Map;
1717

1818
/**
19-
* Utility class for converting ObjectMap Protobuf type to a Java object.
19+
* Utility class for converting ObjectMap Protocol Buffer types to standard Java objects.
20+
* This class provides methods to transform Protocol Buffer representations of object maps
21+
* into their corresponding Java Map, List, and primitive type equivalents.
2022
*/
2123
public class ObjectMapProtoUtils {
2224

@@ -25,11 +27,12 @@ private ObjectMapProtoUtils() {
2527
}
2628

2729
/**
28-
* Converts a ObjectMap to Java POJO representation.
29-
* Similar to {@link XContentParser#map()}
30+
* Converts a Protocol Buffer ObjectMap to a Java Map representation.
31+
* Similar to {@link XContentParser#map()}, this method transforms the structured
32+
* Protocol Buffer data into a standard Java Map with appropriate value types.
3033
*
31-
* @param objectMap The generic protobuf objectMap to convert
32-
* @return A Protobuf builder .google.protobuf.Struct representation
34+
* @param objectMap The Protocol Buffer ObjectMap to convert
35+
* @return A Java Map containing the key-value pairs from the Protocol Buffer ObjectMap
3336
*/
3437
public static Map<String, Object> fromProto(ObjectMap objectMap) {
3538

@@ -43,11 +46,14 @@ public static Map<String, Object> fromProto(ObjectMap objectMap) {
4346
}
4447

4548
/**
46-
* Converts a ObjectMap.Value to Java POJO representation.
47-
* Similar to {@link XContentParser#map()}
49+
* Converts a Protocol Buffer ObjectMap.Value to an appropriate Java object representation.
50+
* This method handles various value types (numbers, strings, booleans, lists, nested maps)
51+
* and converts them to their Java equivalents.
4852
*
49-
* @param value The generic protobuf ObjectMap.Value to convert
50-
* @return A Protobuf builder .google.protobuf.Struct representation
53+
* @param value The Protocol Buffer ObjectMap.Value to convert
54+
* @return A Java object representing the value (could be a primitive type, String, List, or Map)
55+
* @throws UnsupportedOperationException if the value is null, which cannot be added to a Java map
56+
* @throws IllegalArgumentException if the value type cannot be converted
5157
*/
5258
public static Object fromProto(ObjectMap.Value value) {
5359
if (value.hasNullValue()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc.proto.request.common;
10+
11+
import org.opensearch.action.DocWriteRequest;
12+
import org.opensearch.protobufs.OpType;
13+
14+
/**
15+
* Utility class for converting OpType Protocol Buffers to OpenSearch DocWriteRequest.OpType objects.
16+
* This class handles the conversion of Protocol Buffer representations to their
17+
* corresponding OpenSearch operation type enumerations.
18+
*/
19+
public class OpTypeProtoUtils {
20+
21+
private OpTypeProtoUtils() {
22+
// Utility class, no instances
23+
}
24+
25+
/**
26+
* Converts a Protocol Buffer OpType to its corresponding OpenSearch DocWriteRequest.OpType.
27+
* Similar to {@link DocWriteRequest.OpType}.
28+
*
29+
* @param opType The Protocol Buffer OpType to convert
30+
* @return The corresponding OpenSearch DocWriteRequest.OpType
31+
* @throws UnsupportedOperationException if the operation type is not supported
32+
*/
33+
public static DocWriteRequest.OpType fromProto(OpType opType) {
34+
35+
switch (opType) {
36+
case OP_TYPE_CREATE:
37+
return DocWriteRequest.OpType.CREATE;
38+
case OP_TYPE_INDEX:
39+
return DocWriteRequest.OpType.INDEX;
40+
default:
41+
throw new UnsupportedOperationException("Invalid optype: " + opType);
42+
}
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc.proto.request.common;
10+
11+
import org.opensearch.action.support.WriteRequest;
12+
13+
/**
14+
* Utility class for converting Refresh Protocol Buffers to OpenSearch WriteRequest.RefreshPolicy values.
15+
* This class handles the conversion of Protocol Buffer refresh policy representations to their
16+
* corresponding OpenSearch refresh policy string values.
17+
*/
18+
public class RefreshProtoUtils {
19+
20+
private RefreshProtoUtils() {
21+
// Utility class, no instances
22+
}
23+
24+
/**
25+
* Converts a Protocol Buffer Refresh enum to its corresponding OpenSearch refresh policy string value.
26+
* This method maps the gRPC protocol buffer refresh policy values to the internal
27+
* OpenSearch WriteRequest.RefreshPolicy string values.
28+
*
29+
* @param refresh The Protocol Buffer Refresh enum to convert
30+
* @return The corresponding OpenSearch refresh policy string value
31+
*/
32+
public static String getRefreshPolicy(org.opensearch.protobufs.Refresh refresh) {
33+
switch (refresh) {
34+
case REFRESH_TRUE:
35+
return WriteRequest.RefreshPolicy.IMMEDIATE.getValue();
36+
case REFRESH_WAIT_FOR:
37+
return WriteRequest.RefreshPolicy.WAIT_UNTIL.getValue();
38+
case REFRESH_FALSE:
39+
case REFRESH_UNSPECIFIED:
40+
default:
41+
return WriteRequest.RefreshPolicy.NONE.getValue();
42+
}
43+
}
44+
}

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/document/bulk/ActiveShardCountProtoUtils.java

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.plugin.transport.grpc.proto.request.document.bulk;
1010

1111
import org.opensearch.action.support.ActiveShardCount;
12-
import org.opensearch.protobufs.BulkRequest;
1312
import org.opensearch.protobufs.WaitForActiveShards;
1413

1514
/**
@@ -23,7 +22,7 @@ public class ActiveShardCountProtoUtils {
2322
/**
2423
* Private constructor to prevent instantiation of utility class.
2524
*/
26-
protected ActiveShardCountProtoUtils() {
25+
private ActiveShardCountProtoUtils() {
2726
// Utility class, no instances
2827
}
2928

@@ -33,37 +32,25 @@ protected ActiveShardCountProtoUtils() {
3332
* the wait_for_active_shards parameter from the Protocol Buffer request and applies
3433
* the appropriate ActiveShardCount setting to the OpenSearch bulk request.
3534
*
36-
* @param bulkRequest The OpenSearch bulk request to modify
37-
* @param request The Protocol Buffer request containing the active shard count settings
38-
* @return The modified OpenSearch bulk request with updated active shard count settings
35+
* @param waitForActiveShards The protobuf object containing the active shard count
36+
* @return The modified bulk request
3937
*/
40-
public static org.opensearch.action.bulk.BulkRequest getActiveShardCount(
41-
org.opensearch.action.bulk.BulkRequest bulkRequest,
42-
BulkRequest request
43-
) {
44-
if (!request.hasWaitForActiveShards()) {
45-
return bulkRequest;
46-
}
47-
WaitForActiveShards waitForActiveShards = request.getWaitForActiveShards();
38+
public static ActiveShardCount parseProto(WaitForActiveShards waitForActiveShards) {
39+
4840
switch (waitForActiveShards.getWaitForActiveShardsCase()) {
4941
case WaitForActiveShards.WaitForActiveShardsCase.WAIT_FOR_ACTIVE_SHARD_OPTIONS:
5042
switch (waitForActiveShards.getWaitForActiveShardOptions()) {
5143
case WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED:
5244
throw new UnsupportedOperationException("No mapping for WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED");
5345
case WAIT_FOR_ACTIVE_SHARD_OPTIONS_ALL:
54-
bulkRequest.waitForActiveShards(ActiveShardCount.ALL);
55-
break;
46+
return ActiveShardCount.ALL;
5647
default:
57-
bulkRequest.waitForActiveShards(ActiveShardCount.DEFAULT);
58-
break;
48+
return ActiveShardCount.DEFAULT;
5949
}
60-
break;
6150
case WaitForActiveShards.WaitForActiveShardsCase.INT32_VALUE:
62-
bulkRequest.waitForActiveShards(waitForActiveShards.getInt32Value());
63-
break;
51+
return ActiveShardCount.from(waitForActiveShards.getInt32Value());
6452
default:
65-
throw new UnsupportedOperationException("No mapping for WAIT_FOR_ACTIVE_SHARD_OPTIONS_UNSPECIFIED");
53+
return ActiveShardCount.DEFAULT;
6654
}
67-
return bulkRequest;
6855
}
6956
}

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import org.opensearch.index.seqno.SequenceNumbers;
2424
import org.opensearch.plugin.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
2525
import org.opensearch.plugin.transport.grpc.proto.request.common.ScriptProtoUtils;
26+
import org.opensearch.plugin.transport.grpc.proto.response.document.common.VersionTypeProtoUtils;
2627
import org.opensearch.protobufs.BulkRequest;
2728
import org.opensearch.protobufs.BulkRequestBody;
2829
import org.opensearch.protobufs.CreateOperation;
2930
import org.opensearch.protobufs.DeleteOperation;
3031
import org.opensearch.protobufs.IndexOperation;
32+
import org.opensearch.protobufs.OpType;
3133
import org.opensearch.protobufs.UpdateOperation;
3234
import org.opensearch.script.Script;
3335
import org.opensearch.search.fetch.subphase.FetchSourceContext;
@@ -110,7 +112,7 @@ public static DocWriteRequest<?>[] getDocWriteRequests(
110112
String id = null;
111113
String routing = valueOrDefault(defaultRouting, request.getRouting());
112114
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
113-
IndexOperation.OpType opType = null;
115+
OpType opType = null;
114116
long version = Versions.MATCH_ANY;
115117
VersionType versionType = VersionType.INTERNAL;
116118
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -226,17 +228,8 @@ public static IndexRequest buildCreateRequest(
226228
routing = createOperation.hasRouting() ? createOperation.getRouting() : routing;
227229
version = createOperation.hasVersion() ? createOperation.getVersion() : version;
228230
if (createOperation.hasVersionType()) {
229-
switch (createOperation.getVersionType()) {
230-
case VERSION_TYPE_EXTERNAL:
231-
versionType = VersionType.EXTERNAL;
232-
break;
233-
case VERSION_TYPE_EXTERNAL_GTE:
234-
versionType = VersionType.EXTERNAL_GTE;
235-
break;
236-
default:
237-
versionType = VersionType.INTERNAL;
238-
break;
239-
}
231+
versionType = VersionTypeProtoUtils.fromProto(createOperation.getVersionType());
232+
240233
}
241234
pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline;
242235
ifSeqNo = createOperation.hasIfSeqNo() ? createOperation.getIfSeqNo() : ifSeqNo;
@@ -276,7 +269,7 @@ public static IndexRequest buildCreateRequest(
276269
public static IndexRequest buildIndexRequest(
277270
IndexOperation indexOperation,
278271
byte[] document,
279-
IndexOperation.OpType opType,
272+
OpType opType,
280273
String index,
281274
String id,
282275
String routing,
@@ -293,17 +286,7 @@ public static IndexRequest buildIndexRequest(
293286
routing = indexOperation.hasRouting() ? indexOperation.getRouting() : routing;
294287
version = indexOperation.hasVersion() ? indexOperation.getVersion() : version;
295288
if (indexOperation.hasVersionType()) {
296-
switch (indexOperation.getVersionType()) {
297-
case VERSION_TYPE_EXTERNAL:
298-
versionType = VersionType.EXTERNAL;
299-
break;
300-
case VERSION_TYPE_EXTERNAL_GTE:
301-
versionType = VersionType.EXTERNAL_GTE;
302-
break;
303-
default:
304-
versionType = VersionType.INTERNAL;
305-
break;
306-
}
289+
versionType = VersionTypeProtoUtils.fromProto(indexOperation.getVersionType());
307290
}
308291
pipeline = indexOperation.hasPipeline() ? indexOperation.getPipeline() : pipeline;
309292
ifSeqNo = indexOperation.hasIfSeqNo() ? indexOperation.getIfSeqNo() : ifSeqNo;
@@ -326,7 +309,7 @@ public static IndexRequest buildIndexRequest(
326309
.routing(routing)
327310
.version(version)
328311
.versionType(versionType)
329-
.create(opType.equals(IndexOperation.OpType.OP_TYPE_CREATE))
312+
.create(opType.equals(OpType.OP_TYPE_CREATE))
330313
.setPipeline(pipeline)
331314
.setIfSeqNo(ifSeqNo)
332315
.setIfPrimaryTerm(ifPrimaryTerm)
@@ -487,17 +470,7 @@ public static DeleteRequest buildDeleteRequest(
487470
routing = deleteOperation.hasRouting() ? deleteOperation.getRouting() : routing;
488471
version = deleteOperation.hasVersion() ? deleteOperation.getVersion() : version;
489472
if (deleteOperation.hasVersionType()) {
490-
switch (deleteOperation.getVersionType()) {
491-
case VERSION_TYPE_EXTERNAL:
492-
versionType = VersionType.EXTERNAL;
493-
break;
494-
case VERSION_TYPE_EXTERNAL_GTE:
495-
versionType = VersionType.EXTERNAL_GTE;
496-
break;
497-
default:
498-
versionType = VersionType.INTERNAL;
499-
break;
500-
}
473+
versionType = VersionTypeProtoUtils.fromProto(deleteOperation.getVersionType());
501474
}
502475
ifSeqNo = deleteOperation.hasIfSeqNo() ? deleteOperation.getIfSeqNo() : ifSeqNo;
503476
ifPrimaryTerm = deleteOperation.hasIfPrimaryTerm() ? deleteOperation.getIfPrimaryTerm() : ifPrimaryTerm;

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
package org.opensearch.plugin.transport.grpc.proto.request.document.bulk;
1010

1111
import org.opensearch.action.bulk.BulkShardRequest;
12-
import org.opensearch.action.support.WriteRequest;
1312
import org.opensearch.plugin.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
13+
import org.opensearch.plugin.transport.grpc.proto.request.common.RefreshProtoUtils;
1414
import org.opensearch.protobufs.BulkRequest;
1515
import org.opensearch.rest.RestRequest;
1616
import org.opensearch.rest.action.document.RestBulkAction;
@@ -27,13 +27,13 @@ public class BulkRequestProtoUtils {
2727
/**
2828
* Private constructor to prevent instantiation of utility class.
2929
*/
30-
protected BulkRequestProtoUtils() {
30+
private BulkRequestProtoUtils() {
3131
// Utility class, no instances
3232
}
3333

3434
/**
3535
* Prepare the request for execution.
36-
* Similar to {@link RestBulkAction#prepareRequest(RestRequest, NodeClient)} ()}
36+
* Similar to {@link RestBulkAction#prepareRequest(RestRequest, NodeClient)}
3737
* Please ensure to keep both implementations consistent.
3838
*
3939
* @param request the request to execute
@@ -47,8 +47,9 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
4747
FetchSourceContext defaultFetchSourceContext = FetchSourceContextProtoUtils.parseFromProtoRequest(request);
4848
String defaultPipeline = request.hasPipeline() ? request.getPipeline() : null;
4949

50-
bulkRequest = ActiveShardCountProtoUtils.getActiveShardCount(bulkRequest, request);
51-
50+
if (request.hasWaitForActiveShards()) {
51+
bulkRequest.waitForActiveShards(ActiveShardCountProtoUtils.parseProto(request.getWaitForActiveShards()));
52+
}
5253
Boolean defaultRequireAlias = request.hasRequireAlias() ? request.getRequireAlias() : null;
5354

5455
if (request.hasTimeout()) {
@@ -57,7 +58,7 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
5758
bulkRequest.timeout(BulkShardRequest.DEFAULT_TIMEOUT);
5859
}
5960

60-
bulkRequest.setRefreshPolicy(getRefreshPolicy(request));
61+
bulkRequest.setRefreshPolicy(RefreshProtoUtils.getRefreshPolicy(request.getRefresh()));
6162

6263
// Note: batch_size is deprecated in OS 3.x. Add batch_size parameter when backporting to OS 2.x
6364
/*
@@ -80,26 +81,4 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
8081

8182
return bulkRequest;
8283
}
83-
84-
/**
85-
* Extracts the refresh policy from the bulk request.
86-
*
87-
* @param request The bulk request containing the refresh policy
88-
* @return The refresh policy as a string, or null if not specified
89-
*/
90-
public static String getRefreshPolicy(org.opensearch.protobufs.BulkRequest request) {
91-
if (!request.hasRefresh()) {
92-
return null;
93-
}
94-
switch (request.getRefresh()) {
95-
case REFRESH_TRUE:
96-
return WriteRequest.RefreshPolicy.IMMEDIATE.getValue();
97-
case REFRESH_WAIT_FOR:
98-
return WriteRequest.RefreshPolicy.WAIT_UNTIL.getValue();
99-
case REFRESH_FALSE:
100-
case REFRESH_UNSPECIFIED:
101-
default:
102-
return WriteRequest.RefreshPolicy.NONE.getValue();
103-
}
104-
}
10584
}

0 commit comments

Comments
 (0)