-
Notifications
You must be signed in to change notification settings - Fork 277
Open
Description
@override
Future<Uint8List> consume(
Context ctx, String urn, Execution exec, Uint8List inbound) async {
try {
final uri = ctx.getAttribute(addressKey);
final chan = chans.putIfAbsent(uri, () => create(uri));
final method = ClientMethod<Uint8List, Uint8List>('/mpc/grpc', (v) {
return v;
}, (v) {
return Uint8List.fromList(v);
});
final option = CallOptions(timeout: exec.schema().timeout, metadata: {});
final call = chan.createCall<Uint8List, Uint8List>(
method, Stream.value(inbound), option);
return await call.response.first;
} catch (e) {
if (e is GrpcError) {
switch (e.code) {
case StatusCode.aborted:
case StatusCode.cancelled:
case StatusCode.unknown:
case StatusCode.internal:
case StatusCode.alreadyExists:
throw Status.system.err(r: e.toString());
case StatusCode.invalidArgument:
case StatusCode.dataLoss:
throw Status.validate.err(r: e.toString());
case StatusCode.deadlineExceeded:
throw Status.timeout.err(r: e.toString());
case StatusCode.unavailable:
throw Status.netUnavailable.err(r: e.toString());
case StatusCode.notFound:
case StatusCode.unimplemented:
throw Status.notfound.err(r: e.toString());
case StatusCode.outOfRange:
case StatusCode.resourceExhausted:
case StatusCode.unauthenticated:
case StatusCode.permissionDenied:
case StatusCode.failedPrecondition:
throw Status.unauthorized.err(r: e.toString());
}
}
rethrow;
}
}
final call = chan.createCall<Uint8List, Uint8List>(
method, Stream.value(inbound), option);
return await call.response.first;
Code above will disconnect connection when some requests response. It can not reuse too many times. How to reuse connections gracefully?
Metadata
Metadata
Assignees
Labels
No labels