Skip to content

Commit be8bbe1

Browse files
authored
[GRPC] Map to proper GRPC status codes and achieve exception handling parity with HTTP (opensearch-project#18925)
* [GRPC] Add proper GRPC status code and error handling Signed-off-by: Karen Xu <karenxyr@gmail.com> * rename Http to Rest, group together some convertRestToGrpcStatus codes Signed-off-by: Karen Xu <karenxyr@gmail.com> * make final Signed-off-by: Karen Xu <karenxyr@gmail.com> --------- Signed-off-by: Karen Xu <karenxyr@gmail.com> Signed-off-by: Karen X <karenxyr@gmail.com>
1 parent ba296fb commit be8bbe1

File tree

17 files changed

+717
-29
lines changed

17 files changed

+717
-29
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5656
- Supporting Scripted Metric Aggregation when reducing aggregations in InternalValueCount and InternalAvg ([18411](https://github.com/opensearch-project/OpenSearch/pull18411)))
5757
- Support `search_after` numeric queries with Approximation Framework ([#18896](https://github.com/opensearch-project/OpenSearch/pull/18896))
5858
- Add skip_list parameter to Numeric Field Mappers (default false) ([#18889](https://github.com/opensearch-project/OpenSearch/pull/18889))
59+
- Map to proper GRPC status codes and achieve exception handling parity with HTTP APIs([#18925](https://github.com/opensearch-project/OpenSearch/pull/18925))
5960

6061
### Changed
6162
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

libs/core/src/main/java/org/opensearch/ExceptionsHelper.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,38 @@
7676
public final class ExceptionsHelper {
7777
private static final Logger logger = LogManager.getLogger(ExceptionsHelper.class);
7878

79+
/**
80+
* Shared error message constants for consistent error handling across HTTP and gRPC protocols.
81+
* These constants ensure that both REST API and gRPC API return identical error messages
82+
* for the same types of exceptions.
83+
*/
84+
public static final class ErrorMessages {
85+
/** Error message for invalid argument exceptions */
86+
public static final String INVALID_ARGUMENT = "Invalid argument";
87+
88+
/** Error message for JSON parsing failures */
89+
public static final String JSON_PARSE_FAILED = "Failed to parse JSON";
90+
91+
/** Error message for rate limiting / rejected execution */
92+
public static final String TOO_MANY_REQUESTS = "Too many requests";
93+
94+
/** Error message for JSON type coercion failures */
95+
public static final String JSON_COERCION_FAILED = "Incompatible JSON value";
96+
97+
/** Error message for content format issues */
98+
public static final String INVALID_CONTENT_FORMAT = "Invalid content format";
99+
100+
/** Error message for compression format issues */
101+
public static final String INVALID_COMPRESSION_FORMAT = "Invalid compression format";
102+
103+
/** Generic fallback error message for unknown exceptions */
104+
public static final String INTERNAL_FAILURE = "Internal failure";
105+
106+
private ErrorMessages() {
107+
// Utility class, no instances
108+
}
109+
}
110+
79111
// utility class: no ctor
80112
private ExceptionsHelper() {}
81113

@@ -117,16 +149,16 @@ public static String summaryMessage(Throwable t) {
117149
if (t instanceof OpenSearchException) {
118150
return getExceptionSimpleClassName(t) + "[" + t.getMessage() + "]";
119151
} else if (t instanceof IllegalArgumentException) {
120-
return "Invalid argument";
152+
return ErrorMessages.INVALID_ARGUMENT;
121153
} else if (t instanceof InputCoercionException) {
122-
return "Incompatible JSON value";
154+
return ErrorMessages.JSON_COERCION_FAILED;
123155
} else if (t instanceof JsonParseException) {
124-
return "Failed to parse JSON";
156+
return ErrorMessages.JSON_PARSE_FAILED;
125157
} else if (t instanceof OpenSearchRejectedExecutionException) {
126-
return "Too many requests";
158+
return ErrorMessages.TOO_MANY_REQUESTS;
127159
}
128160
}
129-
return "Internal failure";
161+
return ErrorMessages.INTERNAL_FAILURE;
130162
}
131163

132164
public static Throwable unwrapCause(Throwable t) {
@@ -149,6 +181,25 @@ public static Throwable unwrapCause(Throwable t) {
149181
return result;
150182
}
151183

184+
/**
185+
* Unwraps exception causes up to 10 levels looking for the first OpenSearchException.
186+
* This method is used by both HTTP and gRPC error handling to ensure consistent exception
187+
* unwrapping behavior across protocols.
188+
*
189+
* @param e The exception to unwrap
190+
* @return The first OpenSearchException found in the cause chain, or the original exception if none found
191+
*/
192+
public static Throwable unwrapToOpenSearchException(Throwable e) {
193+
Throwable t = e;
194+
for (int counter = 0; counter < 10 && t != null; counter++) {
195+
if (t instanceof OpenSearchException) {
196+
break;
197+
}
198+
t = t.getCause();
199+
}
200+
return t != null ? t : e;
201+
}
202+
152203
/**
153204
* @deprecated Don't swallow exceptions, allow them to propagate.
154205
*/

libs/core/src/main/java/org/opensearch/OpenSearchException.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -628,14 +628,8 @@ public static void generateFailureXContent(XContentBuilder builder, ToXContent.P
628628

629629
// Render the exception with a simple message
630630
if (detailed == false) {
631-
Throwable t = e;
632-
for (int counter = 0; counter < 10 && t != null; counter++) {
633-
if (t instanceof OpenSearchException) {
634-
break;
635-
}
636-
t = t.getCause();
637-
}
638-
builder.field(ERROR, ExceptionsHelper.summaryMessage(t != null ? t : e));
631+
Throwable unwrapped = ExceptionsHelper.unwrapToOpenSearchException(e);
632+
builder.field(ERROR, ExceptionsHelper.summaryMessage(unwrapped));
639633
return;
640634
}
641635

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/BulkRequestActionListener.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import org.opensearch.action.bulk.BulkResponse;
1414
import org.opensearch.core.action.ActionListener;
1515
import org.opensearch.transport.grpc.proto.response.document.bulk.BulkResponseProtoUtils;
16+
import org.opensearch.transport.grpc.util.GrpcErrorHandler;
1617

1718
import java.io.IOException;
1819

20+
import io.grpc.StatusRuntimeException;
1921
import io.grpc.stub.StreamObserver;
2022

2123
/**
@@ -49,7 +51,9 @@ public void onResponse(org.opensearch.action.bulk.BulkResponse response) {
4951
responseObserver.onNext(protoResponse);
5052
responseObserver.onCompleted();
5153
} catch (RuntimeException | IOException e) {
52-
responseObserver.onError(e);
54+
logger.error("Failed to convert bulk response to protobuf: " + e.getMessage());
55+
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
56+
responseObserver.onError(grpcError);
5357
}
5458
}
5559

@@ -61,7 +65,8 @@ public void onResponse(org.opensearch.action.bulk.BulkResponse response) {
6165
*/
6266
@Override
6367
public void onFailure(Exception e) {
64-
logger.error("BulkRequestActionListener failed to process bulk request:" + e.getMessage());
65-
responseObserver.onError(e);
68+
logger.error("BulkRequestActionListener failed to process bulk request: " + e.getMessage());
69+
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
70+
responseObserver.onError(grpcError);
6671
}
6772
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/SearchRequestActionListener.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import org.opensearch.action.search.SearchResponse;
1414
import org.opensearch.core.action.ActionListener;
1515
import org.opensearch.transport.grpc.proto.response.search.SearchResponseProtoUtils;
16+
import org.opensearch.transport.grpc.util.GrpcErrorHandler;
1617

1718
import java.io.IOException;
1819

20+
import io.grpc.StatusRuntimeException;
1921
import io.grpc.stub.StreamObserver;
2022

2123
/**
@@ -44,13 +46,16 @@ public void onResponse(SearchResponse response) {
4446
responseObserver.onNext(protoResponse);
4547
responseObserver.onCompleted();
4648
} catch (RuntimeException | IOException e) {
47-
responseObserver.onError(e);
49+
logger.error("Failed to convert search response to protobuf: " + e.getMessage());
50+
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
51+
responseObserver.onError(grpcError);
4852
}
4953
}
5054

5155
@Override
5256
public void onFailure(Exception e) {
53-
logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage());
54-
responseObserver.onError(e);
57+
logger.error("SearchRequestActionListener failed to process search request: " + e.getMessage());
58+
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
59+
responseObserver.onError(grpcError);
5560
}
5661
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/document/bulk/BulkItemResponseProtoUtils.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.transport.grpc.proto.response.document.common.DocWriteResponseProtoUtils;
2121
import org.opensearch.transport.grpc.proto.response.document.get.GetResultProtoUtils;
2222
import org.opensearch.transport.grpc.proto.response.exceptions.opensearchexception.OpenSearchExceptionProtoUtils;
23+
import org.opensearch.transport.grpc.util.RestToGrpcStatusConverter;
2324

2425
import java.io.IOException;
2526

@@ -52,8 +53,8 @@ public static Item toProto(BulkItemResponse response) throws IOException {
5253
DocWriteResponse docResponse = response.getResponse();
5354
responseItemBuilder = DocWriteResponseProtoUtils.toProto(docResponse);
5455

55-
// TODO set the GRPC status instead of HTTP Status
56-
responseItemBuilder.setStatus(docResponse.status().getStatus());
56+
int grpcStatusCode = RestToGrpcStatusConverter.getGrpcStatusCode(docResponse.status());
57+
responseItemBuilder.setStatus(grpcStatusCode);
5758
} else {
5859
BulkItemResponse.Failure failure = response.getFailure();
5960
responseItemBuilder = ResponseItem.newBuilder();
@@ -64,8 +65,8 @@ public static Item toProto(BulkItemResponse response) throws IOException {
6465
} else {
6566
responseItemBuilder.setId(ResponseItem.Id.newBuilder().setString(response.getId()).build());
6667
}
67-
// TODO set the GRPC status instead of HTTP Status
68-
responseItemBuilder.setStatus(failure.getStatus().getStatus());
68+
int grpcStatusCode = RestToGrpcStatusConverter.getGrpcStatusCode(failure.getStatus());
69+
responseItemBuilder.setStatus(grpcStatusCode);
6970

7071
ErrorCause errorCause = OpenSearchExceptionProtoUtils.generateThrowableProto(failure.getCause());
7172

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.opensearch.transport.client.Client;
1515
import org.opensearch.transport.grpc.listeners.BulkRequestActionListener;
1616
import org.opensearch.transport.grpc.proto.request.document.bulk.BulkRequestProtoUtils;
17+
import org.opensearch.transport.grpc.util.GrpcErrorHandler;
1718

19+
import io.grpc.StatusRuntimeException;
1820
import io.grpc.stub.StreamObserver;
1921

2022
/**
@@ -47,7 +49,8 @@ public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver<or
4749
client.bulk(bulkRequest, listener);
4850
} catch (RuntimeException e) {
4951
logger.error("DocumentServiceImpl failed to process bulk request, request=" + request + ", error=" + e.getMessage());
50-
responseObserver.onError(e);
52+
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
53+
responseObserver.onError(grpcError);
5154
}
5255
}
5356
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
import org.opensearch.transport.grpc.listeners.SearchRequestActionListener;
1616
import org.opensearch.transport.grpc.proto.request.search.SearchRequestProtoUtils;
1717
import org.opensearch.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
18+
import org.opensearch.transport.grpc.util.GrpcErrorHandler;
1819

1920
import java.io.IOException;
2021

22+
import io.grpc.StatusRuntimeException;
2123
import io.grpc.stub.StreamObserver;
2224

2325
/**
@@ -66,7 +68,8 @@ public void search(
6668
client.search(searchRequest, listener);
6769
} catch (RuntimeException | IOException e) {
6870
logger.error("SearchServiceImpl failed to process search request, request=" + request + ", error=" + e.getMessage());
69-
responseObserver.onError(e);
71+
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
72+
responseObserver.onError(grpcError);
7073
}
7174
}
7275
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport.grpc.util;
10+
11+
import com.fasterxml.jackson.core.JsonParseException;
12+
import com.fasterxml.jackson.core.exc.InputCoercionException;
13+
14+
import org.apache.logging.log4j.LogManager;
15+
import org.apache.logging.log4j.Logger;
16+
import org.opensearch.ExceptionsHelper;
17+
import org.opensearch.OpenSearchException;
18+
import org.opensearch.core.compress.NotCompressedException;
19+
import org.opensearch.core.compress.NotXContentException;
20+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
21+
22+
import java.io.IOException;
23+
import java.util.concurrent.TimeoutException;
24+
25+
import io.grpc.Status;
26+
import io.grpc.StatusRuntimeException;
27+
28+
/**
29+
* Converts exceptions to a GRPC StatusRuntimeException.
30+
*/
31+
public class GrpcErrorHandler {
32+
private static final Logger logger = LogManager.getLogger(GrpcErrorHandler.class);
33+
34+
private GrpcErrorHandler() {
35+
// Utility class, no instances
36+
}
37+
38+
/**
39+
* Converts an exception to an appropriate GRPC StatusRuntimeException.
40+
* Uses shared constants from {@link ExceptionsHelper.ErrorMessages} and {@link ExceptionsHelper#summaryMessage}
41+
* for exact parity with HTTP error handling.
42+
*
43+
* @param e The exception to convert
44+
* @return StatusRuntimeException with appropriate GRPC status and HTTP-identical error messages
45+
*/
46+
public static StatusRuntimeException convertToGrpcError(Exception e) {
47+
// ========== OpenSearch Business Logic Exceptions ==========
48+
// Custom OpenSearch exceptions which extend {@link OpenSearchException}.
49+
// Uses {@link RestToGrpcStatusConverter} for REST -> gRPC status mapping and
50+
// follows {@link OpenSearchException#generateFailureXContent} unwrapping logic
51+
if (e instanceof OpenSearchException) {
52+
return handleOpenSearchException((OpenSearchException) e);
53+
}
54+
55+
// ========== OpenSearch Core System Exceptions ==========
56+
// Low-level OpenSearch exceptions that don't extend OpenSearchException - include full details
57+
else if (e instanceof OpenSearchRejectedExecutionException) {
58+
return Status.RESOURCE_EXHAUSTED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
59+
} else if (e instanceof NotXContentException) {
60+
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
61+
} else if (e instanceof NotCompressedException) {
62+
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
63+
}
64+
65+
// ========== 3. Third-party Library Exceptions ==========
66+
// External library exceptions (Jackson JSON parsing) - include full details
67+
else if (e instanceof InputCoercionException) {
68+
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
69+
} else if (e instanceof JsonParseException) {
70+
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
71+
}
72+
73+
// ========== 4. Standard Java Exceptions ==========
74+
// Generic Java runtime exceptions - include full exception details for debugging
75+
else if (e instanceof IllegalArgumentException) {
76+
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
77+
} else if (e instanceof IllegalStateException) {
78+
return Status.FAILED_PRECONDITION.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
79+
} else if (e instanceof SecurityException) {
80+
return Status.PERMISSION_DENIED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
81+
} else if (e instanceof TimeoutException) {
82+
return Status.DEADLINE_EXCEEDED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
83+
} else if (e instanceof InterruptedException) {
84+
return Status.CANCELLED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
85+
} else if (e instanceof IOException) {
86+
return Status.INTERNAL.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
87+
}
88+
89+
// ========== 5. Unknown/Unmapped Exceptions ==========
90+
// Safety fallback for any unexpected exception to {@code Status.INTERNAL} with full debugging info
91+
else {
92+
logger.warn("Unmapped exception type: {}, treating as INTERNAL error", e.getClass().getSimpleName());
93+
return Status.INTERNAL.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
94+
}
95+
}
96+
97+
/**
98+
* Handles OpenSearch-specific exceptions by converting their HTTP status to GRPC status.
99+
* Uses {@link ExceptionsHelper#summaryMessage(Throwable)} for exact parity with HTTP error handling.
100+
*
101+
* Uses {@link ExceptionsHelper#unwrapToOpenSearchException(Throwable)} for shared unwrapping logic
102+
* with HTTP's {@link OpenSearchException#generateFailureXContent}.
103+
*
104+
* @param e The {@link OpenSearchException} to convert
105+
* @return StatusRuntimeException with mapped GRPC status and HTTP-identical error message
106+
*/
107+
private static StatusRuntimeException handleOpenSearchException(OpenSearchException e) {
108+
Status grpcStatus = RestToGrpcStatusConverter.convertRestToGrpcStatus(e.status());
109+
110+
Throwable unwrapped = ExceptionsHelper.unwrapToOpenSearchException(e);
111+
112+
String description = ExceptionsHelper.summaryMessage(unwrapped);
113+
return grpcStatus.withDescription(description).asRuntimeException();
114+
}
115+
116+
}

0 commit comments

Comments
 (0)