Skip to content

Enhanced .close() in WebsocketGatewaClientTransport #906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.resolver.DefaultAddressResolverGroup;
import io.scalecube.services.Address;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.api.ServiceMessage;
Expand All @@ -26,7 +27,6 @@
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

public final class WebsocketGatewayClientTransport implements ClientChannel, ClientTransport {

Expand All @@ -39,10 +39,8 @@ public final class WebsocketGatewayClientTransport implements ClientChannel, Cli
private static final int CONNECT_TIMEOUT_MILLIS = (int) Duration.ofSeconds(5).toMillis();

private final GatewayClientCodec clientCodec;
private final LoopResources loopResources;
private final Duration keepAliveInterval;
private final Function<HttpClient, HttpClient> operator;
private final boolean ownsLoopResources;

private final AtomicLong sidCounter = new AtomicLong();
private final AtomicReference<WebsocketGatewayClientSession> clientSessionReference =
Expand All @@ -52,11 +50,6 @@ private WebsocketGatewayClientTransport(Builder builder) {
this.clientCodec = builder.clientCodec;
this.keepAliveInterval = builder.keepAliveInterval;
this.operator = builder.operator;
this.loopResources =
builder.loopResources == null
? LoopResources.create("websocket-gateway-client", 1, true)
: builder.loopResources;
this.ownsLoopResources = builder.loopResources == null;
}

@Override
Expand All @@ -70,7 +63,7 @@ public ClientChannel create(ServiceReference serviceReference) {
final HttpClient httpClient =
operator.apply(
HttpClient.create(ConnectionProvider.newConnection())
.runOn(loopResources)
.resolver(DefaultAddressResolverGroup.INSTANCE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MILLIS)
.option(ChannelOption.TCP_NODELAY, true)
.headers(headers -> headers.set(HttpHeaderNames.CONTENT_TYPE, CONTENT_TYPE)));
Expand Down Expand Up @@ -192,15 +185,15 @@ private static Throwable getRootCause(Throwable throwable) {

@Override
public void close() {
if (ownsLoopResources) {
loopResources.dispose();
final var session = clientSessionReference.get();
if (session != null) {
session.close().doOnError(ex -> {}).subscribe();
}
}

public static class Builder {

private GatewayClientCodec clientCodec = CLIENT_CODEC;
private LoopResources loopResources;
private Duration keepAliveInterval = Duration.ZERO;
private Function<HttpClient, HttpClient> operator = client -> client;

Expand All @@ -211,11 +204,6 @@ public Builder clientCodec(GatewayClientCodec clientCodec) {
return this;
}

public Builder loopResources(LoopResources loopResources) {
this.loopResources = loopResources;
return this;
}

public Builder httpClient(UnaryOperator<HttpClient> operator) {
this.operator = this.operator.andThen(operator);
return this;
Expand Down