Skip to content

Commit 07c9b05

Browse files
authored
Support of heartbeat for websocket gateway (#864)
1 parent d2a1fc1 commit 07c9b05

File tree

19 files changed

+176
-214
lines changed

19 files changed

+176
-214
lines changed

services-api/src/main/java/io/scalecube/services/gateway/Gateway.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.scalecube.services.gateway;
22

33
import io.scalecube.services.Address;
4+
import io.scalecube.services.ServiceCall;
5+
import io.scalecube.services.registry.api.ServiceRegistry;
46

57
public interface Gateway {
68

@@ -21,9 +23,11 @@ public interface Gateway {
2123
/**
2224
* Starts gateway.
2325
*
26+
* @param call {@link ServiceCall} instance
27+
* @param serviceRegistry {@link ServiceRegistry} instance
2428
* @return gateway instance
2529
*/
26-
Gateway start();
30+
Gateway start(ServiceCall call, ServiceRegistry serviceRegistry);
2731

2832
/** Stops gateway. */
2933
void stop();

services-api/src/main/java/io/scalecube/services/gateway/GatewayOptions.java

Lines changed: 0 additions & 74 deletions
This file was deleted.

services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public interface ServiceTransport {
1818
/**
1919
* Provider for {@link ServerTransport}.
2020
*
21-
* @param serviceRegistry serviceRegistry
21+
* @param serviceRegistry {@link ServiceRegistry} instance
2222
* @return {@code ServerTransport} instance
2323
*/
2424
ServerTransport serverTransport(ServiceRegistry serviceRegistry);

services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGateway.java

Lines changed: 43 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,23 @@
44
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
55
import io.netty.handler.codec.http.cors.CorsHandler;
66
import io.scalecube.services.Address;
7+
import io.scalecube.services.ServiceCall;
78
import io.scalecube.services.exceptions.DefaultErrorMapper;
89
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
910
import io.scalecube.services.gateway.Gateway;
10-
import io.scalecube.services.gateway.GatewayOptions;
11+
import io.scalecube.services.registry.api.ServiceRegistry;
1112
import java.net.InetSocketAddress;
12-
import java.util.StringJoiner;
1313
import java.util.function.Consumer;
14+
import java.util.function.Function;
1415
import reactor.netty.DisposableServer;
1516
import reactor.netty.http.server.HttpServer;
1617
import reactor.netty.resources.LoopResources;
1718

1819
public class HttpGateway implements Gateway {
1920

20-
private final GatewayOptions options;
21+
private final String id;
22+
private final int port;
23+
private final Function<ServiceCall, ServiceCall> callFactory;
2124
private final ServiceProviderErrorMapper errorMapper;
2225
private final boolean corsEnabled;
2326
private final CorsConfigBuilder corsConfigBuilder;
@@ -26,28 +29,35 @@ public class HttpGateway implements Gateway {
2629
private LoopResources loopResources;
2730

2831
private HttpGateway(Builder builder) {
29-
this.options = builder.options;
32+
this.id = builder.id;
33+
this.port = builder.port;
34+
this.callFactory = builder.callFactory;
3035
this.errorMapper = builder.errorMapper;
3136
this.corsEnabled = builder.corsEnabled;
3237
this.corsConfigBuilder = builder.corsConfigBuilder;
3338
}
3439

3540
@Override
3641
public String id() {
37-
return options.id();
42+
return id;
3843
}
3944

4045
@Override
41-
public Gateway start() {
42-
HttpGatewayAcceptor gatewayAcceptor = new HttpGatewayAcceptor(options.call(), errorMapper);
43-
46+
public Gateway start(ServiceCall call, ServiceRegistry serviceRegistry) {
4447
loopResources =
45-
LoopResources.create(
46-
options.id() + ":" + options.port(), LoopResources.DEFAULT_IO_WORKER_COUNT, true);
48+
LoopResources.create(id + ":" + port, LoopResources.DEFAULT_IO_WORKER_COUNT, true);
4749

4850
try {
49-
prepareHttpServer(loopResources, options.port())
50-
.handle(gatewayAcceptor)
51+
HttpServer.create()
52+
.runOn(loopResources)
53+
.bindAddress(() -> new InetSocketAddress(port))
54+
.doOnConnection(
55+
connection -> {
56+
if (corsEnabled) {
57+
connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build()));
58+
}
59+
})
60+
.handle(new HttpGatewayAcceptor(callFactory.apply(call), errorMapper))
5161
.bind()
5262
.doOnSuccess(server -> this.server = server)
5363
.toFuture()
@@ -59,23 +69,6 @@ public Gateway start() {
5969
return this;
6070
}
6171

62-
private HttpServer prepareHttpServer(LoopResources loopResources, int port) {
63-
HttpServer httpServer = HttpServer.create();
64-
65-
if (loopResources != null) {
66-
httpServer = httpServer.runOn(loopResources);
67-
}
68-
69-
return httpServer
70-
.bindAddress(() -> new InetSocketAddress(port))
71-
.doOnConnection(
72-
connection -> {
73-
if (corsEnabled) {
74-
connection.addHandlerLast(new CorsHandler(corsConfigBuilder.build()));
75-
}
76-
});
77-
}
78-
7972
@Override
8073
public Address address() {
8174
InetSocketAddress address = (InetSocketAddress) server.address();
@@ -100,21 +93,11 @@ private void shutdownLoopResources(LoopResources loopResources) {
10093
}
10194
}
10295

103-
@Override
104-
public String toString() {
105-
return new StringJoiner(", ", HttpGateway.class.getSimpleName() + "[", "]")
106-
.add("options=" + options)
107-
.add("errorMapper=" + errorMapper)
108-
.add("corsEnabled=" + corsEnabled)
109-
.add("corsConfigBuilder=" + corsConfigBuilder)
110-
.add("server=" + server)
111-
.add("loopResources=" + loopResources)
112-
.toString();
113-
}
114-
11596
public static class Builder {
11697

117-
private GatewayOptions options;
98+
private String id = "http@" + Integer.toHexString(hashCode());
99+
private int port;
100+
private Function<ServiceCall, ServiceCall> callFactory = call -> call;
118101
private ServiceProviderErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
119102
private boolean corsEnabled = false;
120103
private CorsConfigBuilder corsConfigBuilder =
@@ -125,12 +108,26 @@ public static class Builder {
125108

126109
public Builder() {}
127110

128-
public GatewayOptions options() {
129-
return options;
111+
public String id() {
112+
return id;
113+
}
114+
115+
public Builder id(String id) {
116+
this.id = id;
117+
return this;
118+
}
119+
120+
public int port() {
121+
return port;
122+
}
123+
124+
public Builder port(int port) {
125+
this.port = port;
126+
return this;
130127
}
131128

132-
public Builder options(GatewayOptions options) {
133-
this.options = options;
129+
public Builder serviceCall(Function<ServiceCall, ServiceCall> operator) {
130+
callFactory = callFactory.andThen(operator);
134131
return this;
135132
}
136133

services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import io.scalecube.services.ServiceCall;
1515
import io.scalecube.services.api.ErrorData;
1616
import io.scalecube.services.api.ServiceMessage;
17-
import io.scalecube.services.exceptions.DefaultErrorMapper;
1817
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
1918
import io.scalecube.services.gateway.ReferenceCountUtil;
2019
import io.scalecube.services.transport.api.DataCodec;
@@ -37,10 +36,6 @@ public class HttpGatewayAcceptor
3736
private final ServiceCall serviceCall;
3837
private final ServiceProviderErrorMapper errorMapper;
3938

40-
HttpGatewayAcceptor(ServiceCall serviceCall) {
41-
this(serviceCall, DefaultErrorMapper.INSTANCE);
42-
}
43-
4439
HttpGatewayAcceptor(ServiceCall serviceCall, ServiceProviderErrorMapper errorMapper) {
4540
this.serviceCall = serviceCall;
4641
this.errorMapper = errorMapper;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.scalecube.services.gateway.websocket;
2+
3+
import io.scalecube.services.annotations.Service;
4+
import io.scalecube.services.annotations.ServiceMethod;
5+
import reactor.core.publisher.Mono;
6+
7+
@Service(HeartbeatService.NAMESPACE)
8+
public interface HeartbeatService {
9+
10+
String NAMESPACE = "v1/scalecube.websocket.heartbeat";
11+
12+
@ServiceMethod
13+
Mono<Long> ping(long value);
14+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.scalecube.services.gateway.websocket;
2+
3+
import reactor.core.publisher.Mono;
4+
5+
public class HeartbeatServiceImpl implements HeartbeatService {
6+
7+
@Override
8+
public Mono<Long> ping(long value) {
9+
return Mono.just(value);
10+
}
11+
}

0 commit comments

Comments
 (0)